{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
module Simplex.Messaging.Server (runSMPServer, runSMPServerBlocking) where
import Control.Concurrent.STM (stateTVar)
import Control.Monad
import Control.Monad.Except
import Control.Monad.IO.Unlift
import Control.Monad.Reader
import Crypto.Random
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Functor (($>))
import qualified Data.Map.Strict as M
import Data.Time.Clock
import Network.Socket (ServiceName)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.Env.STM
import Simplex.Messaging.Server.MsgStore
import Simplex.Messaging.Server.MsgStore.STM (MsgQueue)
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.QueueStore.STM (QueueStore)
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.Transport
import Simplex.Messaging.Util
import UnliftIO.Concurrent
import UnliftIO.Exception
import UnliftIO.IO
import UnliftIO.STM
runSMPServer :: (MonadRandom m, MonadUnliftIO m) => ServerConfig -> m ()
runSMPServer :: ServerConfig -> m ()
runSMPServer ServerConfig
cfg = do
TMVar Bool
started <- m (TMVar Bool)
forall (m :: * -> *) a. MonadIO m => m (TMVar a)
newEmptyTMVarIO
TMVar Bool -> ServerConfig -> m ()
forall (m :: * -> *).
(MonadRandom m, MonadUnliftIO m) =>
TMVar Bool -> ServerConfig -> m ()
runSMPServerBlocking TMVar Bool
started ServerConfig
cfg
runSMPServerBlocking :: (MonadRandom m, MonadUnliftIO m) => TMVar Bool -> ServerConfig -> m ()
runSMPServerBlocking :: TMVar Bool -> ServerConfig -> m ()
runSMPServerBlocking TMVar Bool
started cfg :: ServerConfig
cfg@ServerConfig {[(ServiceName, ATransport)]
$sel:transports:ServerConfig :: ServerConfig -> [(ServiceName, ATransport)]
transports :: [(ServiceName, ATransport)]
transports} = do
Env
env <- ServerConfig -> m Env
forall (m :: * -> *).
(MonadUnliftIO m, MonadRandom m) =>
ServerConfig -> m Env
newEnv ServerConfig
cfg
ReaderT Env m () -> Env -> m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT ReaderT Env m ()
forall (m' :: * -> *).
(MonadUnliftIO m', MonadReader Env m') =>
m' ()
smpServer Env
env
where
smpServer :: (MonadUnliftIO m', MonadReader Env m') => m' ()
smpServer :: m' ()
smpServer = do
Server
s <- (Env -> Server) -> m' Server
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> Server
server
[m' ()] -> m' ()
forall (m :: * -> *) a. MonadUnliftIO m => [m a] -> m ()
raceAny_ (Server -> m' ()
forall (m' :: * -> *). MonadUnliftIO m' => Server -> m' ()
serverThread Server
s m' () -> [m' ()] -> [m' ()]
forall a. a -> [a] -> [a]
: ((ServiceName, ATransport) -> m' ())
-> [(ServiceName, ATransport)] -> [m' ()]
forall a b. (a -> b) -> [a] -> [b]
map (ServiceName, ATransport) -> m' ()
forall (m' :: * -> *).
(MonadUnliftIO m', MonadReader Env m') =>
(ServiceName, ATransport) -> m' ()
runServer [(ServiceName, ATransport)]
transports)
m' () -> m' () -> m' ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`finally` (StoreLog 'WriteMode -> IO ()) -> m' ()
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadReader Env m) =>
(StoreLog 'WriteMode -> IO a) -> m ()
withLog StoreLog 'WriteMode -> IO ()
forall (a :: IOMode). StoreLog a -> IO ()
closeStoreLog
runServer :: (MonadUnliftIO m', MonadReader Env m') => (ServiceName, ATransport) -> m' ()
runServer :: (ServiceName, ATransport) -> m' ()
runServer (ServiceName
tcpPort, ATransport TProxy c
t) = TMVar Bool -> ServiceName -> (c -> m' ()) -> m' ()
forall c (m :: * -> *).
(Transport c, MonadUnliftIO m) =>
TMVar Bool -> ServiceName -> (c -> m ()) -> m ()
runTransportServer TMVar Bool
started ServiceName
tcpPort (TProxy c -> c -> m' ()
forall c (m :: * -> *).
(Transport c, MonadUnliftIO m, MonadReader Env m) =>
TProxy c -> c -> m ()
runClient TProxy c
t)
serverThread :: MonadUnliftIO m' => Server -> m' ()
serverThread :: Server -> m' ()
serverThread Server {TBQueue (RecipientId, Client)
$sel:subscribedQ:Server :: Server -> TBQueue (RecipientId, Client)
subscribedQ :: TBQueue (RecipientId, Client)
subscribedQ, TVar (Map RecipientId Client)
$sel:subscribers:Server :: Server -> TVar (Map RecipientId Client)
subscribers :: TVar (Map RecipientId Client)
subscribers} = m' () -> m' ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m' () -> m' ()) -> (STM () -> m' ()) -> STM () -> m' ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> m' ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m' ()) -> STM () -> m' ()
forall a b. (a -> b) -> a -> b
$ do
(RecipientId
rId, Client
clnt) <- TBQueue (RecipientId, Client) -> STM (RecipientId, Client)
forall a. TBQueue a -> STM a
readTBQueue TBQueue (RecipientId, Client)
subscribedQ
Map RecipientId Client
cs <- TVar (Map RecipientId Client) -> STM (Map RecipientId Client)
forall a. TVar a -> STM a
readTVar TVar (Map RecipientId Client)
subscribers
case RecipientId -> Map RecipientId Client -> Maybe Client
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup RecipientId
rId Map RecipientId Client
cs of
Just Client {TBQueue Transmission
$sel:rcvQ:Client :: Client -> TBQueue Transmission
rcvQ :: TBQueue Transmission
rcvQ} -> TBQueue Transmission -> Transmission -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue Transmission
rcvQ (RecipientId -> CorrId
CorrId RecipientId
B.empty, RecipientId
rId, SParty 'Broker -> Command 'Broker -> Cmd
forall (a :: Party). SParty a -> Command a -> Cmd
Cmd SParty 'Broker
SBroker Command 'Broker
END)
Maybe Client
Nothing -> () -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
TVar (Map RecipientId Client) -> Map RecipientId Client -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Map RecipientId Client)
subscribers (Map RecipientId Client -> STM ())
-> Map RecipientId Client -> STM ()
forall a b. (a -> b) -> a -> b
$ RecipientId
-> Client -> Map RecipientId Client -> Map RecipientId Client
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert RecipientId
rId Client
clnt Map RecipientId Client
cs
runClient :: (Transport c, MonadUnliftIO m, MonadReader Env m) => TProxy c -> c -> m ()
runClient :: TProxy c -> c -> m ()
runClient TProxy c
_ c
h = do
FullKeyPair
keyPair <- (Env -> FullKeyPair) -> m FullKeyPair
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> FullKeyPair
serverKeyPair
IO (Either TransportError (THandle c))
-> m (Either TransportError (THandle c))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (ExceptT TransportError IO (THandle c)
-> IO (Either TransportError (THandle c))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT TransportError IO (THandle c)
-> IO (Either TransportError (THandle c)))
-> ExceptT TransportError IO (THandle c)
-> IO (Either TransportError (THandle c))
forall a b. (a -> b) -> a -> b
$ c -> FullKeyPair -> ExceptT TransportError IO (THandle c)
forall c.
Transport c =>
c -> FullKeyPair -> ExceptT TransportError IO (THandle c)
serverHandshake c
h FullKeyPair
keyPair) m (Either TransportError (THandle c))
-> (Either TransportError (THandle c) -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Right THandle c
th -> THandle c -> m ()
forall c (m :: * -> *).
(Transport c, MonadUnliftIO m, MonadReader Env m) =>
THandle c -> m ()
runClientTransport THandle c
th
Left TransportError
_ -> () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
runClientTransport :: (Transport c, MonadUnliftIO m, MonadReader Env m) => THandle c -> m ()
runClientTransport :: THandle c -> m ()
runClientTransport THandle c
th = do
Natural
q <- (Env -> Natural) -> m Natural
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> Natural) -> m Natural) -> (Env -> Natural) -> m Natural
forall a b. (a -> b) -> a -> b
$ ServerConfig -> Natural
tbqSize (ServerConfig -> Natural)
-> (Env -> ServerConfig) -> Env -> Natural
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> ServerConfig
config
Client
c <- STM Client -> m Client
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Client -> m Client) -> STM Client -> m Client
forall a b. (a -> b) -> a -> b
$ Natural -> STM Client
newClient Natural
q
Server
s <- (Env -> Server) -> m Server
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> Server
server
[m ()] -> m ()
forall (m :: * -> *) a. MonadUnliftIO m => [m a] -> m ()
raceAny_ [THandle c -> Client -> m ()
forall c (m :: * -> *).
(Transport c, MonadUnliftIO m) =>
THandle c -> Client -> m ()
send THandle c
th Client
c, Client -> Server -> m ()
forall (m :: * -> *).
(MonadUnliftIO m, MonadReader Env m) =>
Client -> Server -> m ()
client Client
c Server
s, THandle c -> Client -> m ()
forall c (m :: * -> *).
(Transport c, MonadUnliftIO m, MonadReader Env m) =>
THandle c -> Client -> m ()
receive THandle c
th Client
c]
m () -> m () -> m ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`finally` Client -> m ()
forall (m :: * -> *). MonadUnliftIO m => Client -> m ()
cancelSubscribers Client
c
cancelSubscribers :: MonadUnliftIO m => Client -> m ()
cancelSubscribers :: Client -> m ()
cancelSubscribers Client {TVar (Map RecipientId Sub)
$sel:subscriptions:Client :: Client -> TVar (Map RecipientId Sub)
subscriptions :: TVar (Map RecipientId Sub)
subscriptions} =
TVar (Map RecipientId Sub) -> m (Map RecipientId Sub)
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar (Map RecipientId Sub)
subscriptions m (Map RecipientId Sub) -> (Map RecipientId Sub -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Sub -> m ()) -> Map RecipientId Sub -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Sub -> m ()
forall (m :: * -> *). MonadUnliftIO m => Sub -> m ()
cancelSub
cancelSub :: MonadUnliftIO m => Sub -> m ()
cancelSub :: Sub -> m ()
cancelSub = \case
Sub {$sel:subThread:Sub :: Sub -> SubscriptionThread
subThread = SubThread ThreadId
t} -> ThreadId -> m ()
forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread ThreadId
t
Sub
_ -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
receive :: (Transport c, MonadUnliftIO m, MonadReader Env m) => THandle c -> Client -> m ()
receive :: THandle c -> Client -> m ()
receive THandle c
h Client {TBQueue Transmission
rcvQ :: TBQueue Transmission
$sel:rcvQ:Client :: Client -> TBQueue Transmission
rcvQ} = m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
(Signature
signature, (CorrId
corrId, RecipientId
queueId, Either ErrorType Cmd
cmdOrError)) <- (Cmd -> Either ErrorType Cmd)
-> THandle c -> m (Signature, TransmissionOrError)
forall c (m :: * -> *).
(Transport c, MonadIO m) =>
(Cmd -> Either ErrorType Cmd)
-> THandle c -> m (Signature, TransmissionOrError)
tGet Cmd -> Either ErrorType Cmd
fromClient THandle c
h
Transmission
t <- case Either ErrorType Cmd
cmdOrError of
Left ErrorType
e -> Transmission -> m Transmission
forall (m :: * -> *) a. Monad m => a -> m a
return (Transmission -> m Transmission)
-> (Command 'Broker -> Transmission)
-> Command 'Broker
-> m Transmission
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CorrId -> RecipientId -> Command 'Broker -> Transmission
mkResp CorrId
corrId RecipientId
queueId (Command 'Broker -> m Transmission)
-> Command 'Broker -> m Transmission
forall a b. (a -> b) -> a -> b
$ ErrorType -> Command 'Broker
ERR ErrorType
e
Right Cmd
cmd -> SignedTransmission -> m Transmission
forall (m :: * -> *).
(MonadUnliftIO m, MonadReader Env m) =>
SignedTransmission -> m Transmission
verifyTransmission (Signature
signature, (CorrId
corrId, RecipientId
queueId, Cmd
cmd))
STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ TBQueue Transmission -> Transmission -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue Transmission
rcvQ Transmission
t
send :: (Transport c, MonadUnliftIO m) => THandle c -> Client -> m ()
send :: THandle c -> Client -> m ()
send THandle c
h Client {TBQueue Transmission
$sel:sndQ:Client :: Client -> TBQueue Transmission
sndQ :: TBQueue Transmission
sndQ} = m (Either TransportError ()) -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m (Either TransportError ()) -> m ())
-> m (Either TransportError ()) -> m ()
forall a b. (a -> b) -> a -> b
$ do
Transmission
t <- STM Transmission -> m Transmission
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Transmission -> m Transmission)
-> STM Transmission -> m Transmission
forall a b. (a -> b) -> a -> b
$ TBQueue Transmission -> STM Transmission
forall a. TBQueue a -> STM a
readTBQueue TBQueue Transmission
sndQ
IO (Either TransportError ()) -> m (Either TransportError ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either TransportError ()) -> m (Either TransportError ()))
-> IO (Either TransportError ()) -> m (Either TransportError ())
forall a b. (a -> b) -> a -> b
$ THandle c -> SignedRawTransmission -> IO (Either TransportError ())
forall c.
Transport c =>
THandle c -> SignedRawTransmission -> IO (Either TransportError ())
tPut THandle c
h (Signature
"", Transmission -> RecipientId
serializeTransmission Transmission
t)
mkResp :: CorrId -> QueueId -> Command 'Broker -> Transmission
mkResp :: CorrId -> RecipientId -> Command 'Broker -> Transmission
mkResp CorrId
corrId RecipientId
queueId Command 'Broker
command = (CorrId
corrId, RecipientId
queueId, SParty 'Broker -> Command 'Broker -> Cmd
forall (a :: Party). SParty a -> Command a -> Cmd
Cmd SParty 'Broker
SBroker Command 'Broker
command)
verifyTransmission :: forall m. (MonadUnliftIO m, MonadReader Env m) => SignedTransmission -> m Transmission
verifyTransmission :: SignedTransmission -> m Transmission
verifyTransmission (Signature
sig, t :: Transmission
t@(CorrId
corrId, RecipientId
queueId, Cmd
cmd)) = do
(CorrId
corrId,RecipientId
queueId,) (Cmd -> Transmission) -> m Cmd -> m Transmission
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> case Cmd
cmd of
Cmd SParty a
SBroker Command a
_ -> Cmd -> m Cmd
forall (m :: * -> *) a. Monad m => a -> m a
return (Cmd -> m Cmd) -> Cmd -> m Cmd
forall a b. (a -> b) -> a -> b
$ ErrorType -> Cmd
smpErr ErrorType
INTERNAL
Cmd SParty a
SRecipient (NEW RecipientPublicKey
k) -> Cmd -> m Cmd
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Cmd -> m Cmd) -> Cmd -> m Cmd
forall a b. (a -> b) -> a -> b
$ RecipientPublicKey -> Cmd
verifySignature RecipientPublicKey
k
Cmd SParty a
SRecipient Command a
_ -> SParty 'Recipient -> (QueueRec -> Cmd) -> m Cmd
forall (p :: Party). SParty p -> (QueueRec -> Cmd) -> m Cmd
verifyCmd SParty 'Recipient
SRecipient ((QueueRec -> Cmd) -> m Cmd) -> (QueueRec -> Cmd) -> m Cmd
forall a b. (a -> b) -> a -> b
$ RecipientPublicKey -> Cmd
verifySignature (RecipientPublicKey -> Cmd)
-> (QueueRec -> RecipientPublicKey) -> QueueRec -> Cmd
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QueueRec -> RecipientPublicKey
recipientKey
Cmd SParty a
SSender (SEND RecipientId
_) -> SParty 'Sender -> (QueueRec -> Cmd) -> m Cmd
forall (p :: Party). SParty p -> (QueueRec -> Cmd) -> m Cmd
verifyCmd SParty 'Sender
SSender ((QueueRec -> Cmd) -> m Cmd) -> (QueueRec -> Cmd) -> m Cmd
forall a b. (a -> b) -> a -> b
$ Signature -> Maybe RecipientPublicKey -> Cmd
verifySend Signature
sig (Maybe RecipientPublicKey -> Cmd)
-> (QueueRec -> Maybe RecipientPublicKey) -> QueueRec -> Cmd
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QueueRec -> Maybe RecipientPublicKey
senderKey
Cmd SParty a
SSender Command a
PING -> Cmd -> m Cmd
forall (m :: * -> *) a. Monad m => a -> m a
return Cmd
cmd
where
verifyCmd :: SParty p -> (QueueRec -> Cmd) -> m Cmd
verifyCmd :: SParty p -> (QueueRec -> Cmd) -> m Cmd
verifyCmd SParty p
party QueueRec -> Cmd
f = do
QueueStore
st <- (Env -> QueueStore) -> m QueueStore
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> QueueStore
queueStore
Either ErrorType QueueRec
q <- STM (Either ErrorType QueueRec) -> m (Either ErrorType QueueRec)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (Either ErrorType QueueRec) -> m (Either ErrorType QueueRec))
-> STM (Either ErrorType QueueRec) -> m (Either ErrorType QueueRec)
forall a b. (a -> b) -> a -> b
$ QueueStore
-> SParty p -> RecipientId -> STM (Either ErrorType QueueRec)
forall s (m :: * -> *) (a :: Party).
MonadQueueStore s m =>
s -> SParty a -> RecipientId -> m (Either ErrorType QueueRec)
getQueue QueueStore
st SParty p
party RecipientId
queueId
Cmd -> m Cmd
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Cmd -> m Cmd) -> Cmd -> m Cmd
forall a b. (a -> b) -> a -> b
$ (ErrorType -> Cmd)
-> (QueueRec -> Cmd) -> Either ErrorType QueueRec -> Cmd
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (Cmd -> ErrorType -> Cmd
forall a b. a -> b -> a
const (Cmd -> ErrorType -> Cmd) -> Cmd -> ErrorType -> Cmd
forall a b. (a -> b) -> a -> b
$ Cmd -> Cmd
forall a. a -> a
dummyVerify Cmd
authErr) QueueRec -> Cmd
f Either ErrorType QueueRec
q
verifySend :: C.Signature -> Maybe SenderPublicKey -> Cmd
verifySend :: Signature -> Maybe RecipientPublicKey -> Cmd
verifySend Signature
"" = Cmd
-> (RecipientPublicKey -> Cmd) -> Maybe RecipientPublicKey -> Cmd
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Cmd
cmd (Cmd -> RecipientPublicKey -> Cmd
forall a b. a -> b -> a
const Cmd
authErr)
verifySend Signature
_ = Cmd
-> (RecipientPublicKey -> Cmd) -> Maybe RecipientPublicKey -> Cmd
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Cmd
authErr RecipientPublicKey -> Cmd
verifySignature
verifySignature :: C.PublicKey -> Cmd
verifySignature :: RecipientPublicKey -> Cmd
verifySignature RecipientPublicKey
key = if RecipientPublicKey -> Bool
verify RecipientPublicKey
key then Cmd
cmd else Cmd
authErr
verify :: RecipientPublicKey -> Bool
verify RecipientPublicKey
key
| RecipientPublicKey -> Int
C.publicKeySize RecipientPublicKey
key Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
sigLen = RecipientPublicKey -> Bool
cryptoVerify RecipientPublicKey
key
| Bool
otherwise = Bool -> Bool
forall a. a -> a
dummyVerify Bool
False
cryptoVerify :: RecipientPublicKey -> Bool
cryptoVerify RecipientPublicKey
key = RecipientPublicKey -> Signature -> RecipientId -> Bool
C.verify RecipientPublicKey
key Signature
sig (Transmission -> RecipientId
serializeTransmission Transmission
t)
smpErr :: ErrorType -> Cmd
smpErr = SParty 'Broker -> Command 'Broker -> Cmd
forall (a :: Party). SParty a -> Command a -> Cmd
Cmd SParty 'Broker
SBroker (Command 'Broker -> Cmd)
-> (ErrorType -> Command 'Broker) -> ErrorType -> Cmd
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ErrorType -> Command 'Broker
ERR
authErr :: Cmd
authErr = ErrorType -> Cmd
smpErr ErrorType
AUTH
dummyVerify :: a -> a
dummyVerify :: a -> a
dummyVerify = Bool -> a -> a
seq (Bool -> a -> a) -> Bool -> a -> a
forall a b. (a -> b) -> a -> b
$
RecipientPublicKey -> Bool
cryptoVerify (RecipientPublicKey -> Bool) -> RecipientPublicKey -> Bool
forall a b. (a -> b) -> a -> b
$ case Int
sigLen of
Int
128 -> RecipientPublicKey
dummyKey128
Int
256 -> RecipientPublicKey
dummyKey256
Int
512 -> RecipientPublicKey
dummyKey512
Int
_ -> RecipientPublicKey
dummyKey256
sigLen :: Int
sigLen = RecipientId -> Int
B.length (RecipientId -> Int) -> RecipientId -> Int
forall a b. (a -> b) -> a -> b
$ Signature -> RecipientId
C.unSignature Signature
sig
dummyKey128 :: C.PublicKey
dummyKey128 :: RecipientPublicKey
dummyKey128 = RecipientPublicKey
"MIIBIDANBgkqhkiG9w0BAQEFAAOCAQ0AMIIBCAKBgQC2oeA7s4roXN5K2N6022I1/2CTeMKjWH0m00bSZWa4N8LDKeFcShh8YUxZea5giAveViTRNOOVLgcuXbKvR3u24szN04xP0+KnYUuUUIIoT3YSjX0IlomhDhhSyup4BmA0gAZ+D1OaIKZFX6J8yQ1Lr/JGLEfSRsBjw8l+4hs9OwKBgQDKA+YlZvGb3BcpDwKmatiCXN7ZRDWkjXbj8VAW5zV95tSRCCVN48hrFM1H4Ju2QMMUc6kPUVX+eW4ZjdCl5blIqIHMcTmsdcmsDDCg3PjUNrwc6bv/1TcirbAKcmnKt9iurIt6eerxSO7TZUXXMUVsi7eRwb/RUNhpCrpJ/hpIOw=="
dummyKey256 :: C.PublicKey
dummyKey256 :: RecipientPublicKey
dummyKey256 = RecipientPublicKey
"MIIBoDANBgkqhkiG9w0BAQEFAAOCAY0AMIIBiAKCAQEAxwmTvaqmdTbkfUGNi8Yu0L/T4cxuOlQlx3zGZ9X9Qx0+oZjknWK+QHrdWTcpS+zH4Hi7fP6kanOQoQ90Hj6Ghl57VU1GEdUPywSw4i1/7t0Wv9uT9Q2ktHp2rqVo3xkC9IVIpL7EZAxdRviIN2OsOB3g4a/F1ZpjxcAaZeOMUugiAX1+GtkLuE0Xn4neYjCaOghLxQTdhybN70VtnkiQLx/X9NjkDIl/spYGm3tQFMyYKkP6IWoEpj0926hJ0fmlmhy8tAOhlZsb/baW5cgkEZ3E9jVVrySCgQzoLQgma610FIISRpRJbSyv26jU7MkMxiyuBiDaFOORkXFttoKbtQKBgEbDS9II2brsz+vfI7uP8atFcawkE52cx4M1UWQhqb1H3tBiRl+qO+dMq1pPQF2bW7dlZAWYzS4W/367bTAuALHBDGB8xi1P4Njhh9vaOgTvuqrHG9NJQ85BLy0qGw8rjIWSIXVmVpfrXFJ8po5l04UE258Ll2yocv3QRQmddQW9"
dummyKey512 :: C.PublicKey
dummyKey512 :: RecipientPublicKey
dummyKey512 = RecipientPublicKey
"MIICoDANBgkqhkiG9w0BAQEFAAOCAo0AMIICiAKCAgEArkCY9DuverJ4mmzDektv9aZMFyeRV46WZK9NsOBKEc+1ncqMs+LhLti9asKNgUBRbNzmbOe0NYYftrUpwnATaenggkTFxxbJ4JGJuGYbsEdFWkXSvrbWGtM8YUmn5RkAGme12xQ89bSM4VoJAGnrYPHwmcQd+KYCPZvTUsxaxgrJTX65ejHN9BsAn8XtGViOtHTDJO9yUMD2WrJvd7wnNa+0ugEteDLzMU++xS98VC+uA1vfauUqi3yXVchdfrLdVUuM+JE0gUEXCgzjuHkaoHiaGNiGhdPYoAJJdOKQOIHAKdk7Th6OPhirPhc9XYNB4O8JDthKhNtfokvFIFlC4QBRzJhpLIENaEBDt08WmgpOnecZB/CuxkqqOrNa8j5K5jNrtXAI67W46VEC2jeQy/gZwb64Zit2A4D00xXzGbQTPGj4ehcEMhLx5LSCygViEf0w0tN3c3TEyUcgPzvECd2ZVpQLr9Z4a07Ebr+YSuxcHhjg4Rg1VyJyOTTvaCBGm5X2B3+tI4NUttmikIHOYpBnsLmHY2BgfH2KcrIsDyAhInXmTFr/L2+erFarUnlfATd2L8Ti43TNHDedO6k6jI5Gyi62yPwjqPLEIIK8l+pIeNfHJ3pPmjhHBfzFcQLMMMXffHWNK8kWklrQXK+4j4HiPcTBvlO1FEtG9nEIZhUCgYA4a6WtI2k5YNli1C89GY5rGUY7RP71T6RWri/D3Lz9T7GvU+FemAyYmsvCQwqijUOur0uLvwSP8VdxpSUcrjJJSWur2hrPWzWlu0XbNaeizxpFeKbQP+zSrWJ1z8RwfAeUjShxt8q1TuqGqY10wQyp3nyiTGvS+KwZVj5h5qx8NQ=="
client :: forall m. (MonadUnliftIO m, MonadReader Env m) => Client -> Server -> m ()
client :: Client -> Server -> m ()
client clnt :: Client
clnt@Client {TVar (Map RecipientId Sub)
subscriptions :: TVar (Map RecipientId Sub)
$sel:subscriptions:Client :: Client -> TVar (Map RecipientId Sub)
subscriptions, TBQueue Transmission
rcvQ :: TBQueue Transmission
$sel:rcvQ:Client :: Client -> TBQueue Transmission
rcvQ, TBQueue Transmission
sndQ :: TBQueue Transmission
$sel:sndQ:Client :: Client -> TBQueue Transmission
sndQ} Server {TBQueue (RecipientId, Client)
subscribedQ :: TBQueue (RecipientId, Client)
$sel:subscribedQ:Server :: Server -> TBQueue (RecipientId, Client)
subscribedQ} =
m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
STM Transmission -> m Transmission
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TBQueue Transmission -> STM Transmission
forall a. TBQueue a -> STM a
readTBQueue TBQueue Transmission
rcvQ)
m Transmission
-> (Transmission -> m Transmission) -> m Transmission
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Transmission -> m Transmission
processCommand
m Transmission -> (Transmission -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ())
-> (Transmission -> STM ()) -> Transmission -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TBQueue Transmission -> Transmission -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue Transmission
sndQ
where
processCommand :: Transmission -> m Transmission
processCommand :: Transmission -> m Transmission
processCommand (CorrId
corrId, RecipientId
queueId, Cmd
cmd) = do
QueueStore
st <- (Env -> QueueStore) -> m QueueStore
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> QueueStore
queueStore
case Cmd
cmd of
Cmd SParty a
SBroker Command a
END -> m ()
unsubscribeQueue m () -> Transmission -> m Transmission
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> (CorrId
corrId, RecipientId
queueId, Cmd
cmd)
Cmd SParty a
SBroker Command a
_ -> Transmission -> m Transmission
forall (m :: * -> *) a. Monad m => a -> m a
return (CorrId
corrId, RecipientId
queueId, Cmd
cmd)
Cmd SParty a
SSender Command a
command -> case Command a
command of
SEND RecipientId
msgBody -> QueueStore -> RecipientId -> m Transmission
sendMessage QueueStore
st RecipientId
msgBody
Command a
PING -> Transmission -> m Transmission
forall (m :: * -> *) a. Monad m => a -> m a
return (CorrId
corrId, RecipientId
queueId, SParty 'Broker -> Command 'Broker -> Cmd
forall (a :: Party). SParty a -> Command a -> Cmd
Cmd SParty 'Broker
SBroker Command 'Broker
PONG)
Cmd SParty a
SRecipient Command a
command -> case Command a
command of
NEW RecipientPublicKey
rKey -> QueueStore -> RecipientPublicKey -> m Transmission
createQueue QueueStore
st RecipientPublicKey
rKey
Command a
SUB -> RecipientId -> m Transmission
subscribeQueue RecipientId
queueId
Command a
ACK -> m Transmission
acknowledgeMsg
KEY RecipientPublicKey
sKey -> QueueStore -> RecipientPublicKey -> m Transmission
secureQueue_ QueueStore
st RecipientPublicKey
sKey
Command a
OFF -> QueueStore -> m Transmission
suspendQueue_ QueueStore
st
Command a
DEL -> QueueStore -> m Transmission
delQueueAndMsgs QueueStore
st
where
createQueue :: QueueStore -> RecipientPublicKey -> m Transmission
createQueue :: QueueStore -> RecipientPublicKey -> m Transmission
createQueue QueueStore
st RecipientPublicKey
rKey =
RecipientPublicKey -> m (Command 'Broker) -> m Transmission
forall (m' :: * -> *).
Monad m' =>
RecipientPublicKey -> m' (Command 'Broker) -> m' Transmission
checkKeySize RecipientPublicKey
rKey m (Command 'Broker)
addSubscribe
where
addSubscribe :: m (Command 'Broker)
addSubscribe =
Int -> m (Either ErrorType (RecipientId, RecipientId))
addQueueRetry Int
3 m (Either ErrorType (RecipientId, RecipientId))
-> (Either ErrorType (RecipientId, RecipientId)
-> m (Command 'Broker))
-> m (Command 'Broker)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left ErrorType
e -> Command 'Broker -> m (Command 'Broker)
forall (m :: * -> *) a. Monad m => a -> m a
return (Command 'Broker -> m (Command 'Broker))
-> Command 'Broker -> m (Command 'Broker)
forall a b. (a -> b) -> a -> b
$ ErrorType -> Command 'Broker
ERR ErrorType
e
Right (RecipientId
rId, RecipientId
sId) -> do
(StoreLog 'WriteMode -> IO ()) -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadReader Env m) =>
(StoreLog 'WriteMode -> IO a) -> m ()
withLog (StoreLog 'WriteMode -> RecipientId -> IO ()
`logCreateById` RecipientId
rId)
RecipientId -> m Transmission
subscribeQueue RecipientId
rId m Transmission -> Command 'Broker -> m (Command 'Broker)
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> RecipientId -> RecipientId -> Command 'Broker
IDS RecipientId
rId RecipientId
sId
addQueueRetry :: Int -> m (Either ErrorType (RecipientId, SenderId))
addQueueRetry :: Int -> m (Either ErrorType (RecipientId, RecipientId))
addQueueRetry Int
0 = Either ErrorType (RecipientId, RecipientId)
-> m (Either ErrorType (RecipientId, RecipientId))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either ErrorType (RecipientId, RecipientId)
-> m (Either ErrorType (RecipientId, RecipientId)))
-> Either ErrorType (RecipientId, RecipientId)
-> m (Either ErrorType (RecipientId, RecipientId))
forall a b. (a -> b) -> a -> b
$ ErrorType -> Either ErrorType (RecipientId, RecipientId)
forall a b. a -> Either a b
Left ErrorType
INTERNAL
addQueueRetry Int
n = do
(RecipientId, RecipientId)
ids <- m (RecipientId, RecipientId)
getIds
STM (Either ErrorType ()) -> m (Either ErrorType ())
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (QueueStore
-> RecipientPublicKey
-> (RecipientId, RecipientId)
-> STM (Either ErrorType ())
forall s (m :: * -> *).
MonadQueueStore s m =>
s
-> RecipientPublicKey
-> (RecipientId, RecipientId)
-> m (Either ErrorType ())
addQueue QueueStore
st RecipientPublicKey
rKey (RecipientId, RecipientId)
ids) m (Either ErrorType ())
-> (Either ErrorType ()
-> m (Either ErrorType (RecipientId, RecipientId)))
-> m (Either ErrorType (RecipientId, RecipientId))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left ErrorType
DUPLICATE_ -> Int -> m (Either ErrorType (RecipientId, RecipientId))
addQueueRetry (Int -> m (Either ErrorType (RecipientId, RecipientId)))
-> Int -> m (Either ErrorType (RecipientId, RecipientId))
forall a b. (a -> b) -> a -> b
$ Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1
Left ErrorType
e -> Either ErrorType (RecipientId, RecipientId)
-> m (Either ErrorType (RecipientId, RecipientId))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either ErrorType (RecipientId, RecipientId)
-> m (Either ErrorType (RecipientId, RecipientId)))
-> Either ErrorType (RecipientId, RecipientId)
-> m (Either ErrorType (RecipientId, RecipientId))
forall a b. (a -> b) -> a -> b
$ ErrorType -> Either ErrorType (RecipientId, RecipientId)
forall a b. a -> Either a b
Left ErrorType
e
Right ()
_ -> Either ErrorType (RecipientId, RecipientId)
-> m (Either ErrorType (RecipientId, RecipientId))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either ErrorType (RecipientId, RecipientId)
-> m (Either ErrorType (RecipientId, RecipientId)))
-> Either ErrorType (RecipientId, RecipientId)
-> m (Either ErrorType (RecipientId, RecipientId))
forall a b. (a -> b) -> a -> b
$ (RecipientId, RecipientId)
-> Either ErrorType (RecipientId, RecipientId)
forall a b. b -> Either a b
Right (RecipientId, RecipientId)
ids
logCreateById :: StoreLog 'WriteMode -> RecipientId -> IO ()
logCreateById :: StoreLog 'WriteMode -> RecipientId -> IO ()
logCreateById StoreLog 'WriteMode
s RecipientId
rId =
STM (Either ErrorType QueueRec) -> IO (Either ErrorType QueueRec)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (QueueStore
-> SParty 'Recipient
-> RecipientId
-> STM (Either ErrorType QueueRec)
forall s (m :: * -> *) (a :: Party).
MonadQueueStore s m =>
s -> SParty a -> RecipientId -> m (Either ErrorType QueueRec)
getQueue QueueStore
st SParty 'Recipient
SRecipient RecipientId
rId) IO (Either ErrorType QueueRec)
-> (Either ErrorType QueueRec -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Right QueueRec
q -> StoreLog 'WriteMode -> QueueRec -> IO ()
logCreateQueue StoreLog 'WriteMode
s QueueRec
q
Either ErrorType QueueRec
_ -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
getIds :: m (RecipientId, SenderId)
getIds :: m (RecipientId, RecipientId)
getIds = do
Int
n <- (Env -> Int) -> m Int
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> Int) -> m Int) -> (Env -> Int) -> m Int
forall a b. (a -> b) -> a -> b
$ ServerConfig -> Int
queueIdBytes (ServerConfig -> Int) -> (Env -> ServerConfig) -> Env -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> ServerConfig
config
(RecipientId -> RecipientId -> (RecipientId, RecipientId))
-> m RecipientId -> m RecipientId -> m (RecipientId, RecipientId)
forall (m :: * -> *) a1 a2 r.
Monad m =>
(a1 -> a2 -> r) -> m a1 -> m a2 -> m r
liftM2 (,) (Int -> m RecipientId
forall (m :: * -> *).
(MonadUnliftIO m, MonadReader Env m) =>
Int -> m RecipientId
randomId Int
n) (Int -> m RecipientId
forall (m :: * -> *).
(MonadUnliftIO m, MonadReader Env m) =>
Int -> m RecipientId
randomId Int
n)
secureQueue_ :: QueueStore -> SenderPublicKey -> m Transmission
secureQueue_ :: QueueStore -> RecipientPublicKey -> m Transmission
secureQueue_ QueueStore
st RecipientPublicKey
sKey = do
(StoreLog 'WriteMode -> IO ()) -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadReader Env m) =>
(StoreLog 'WriteMode -> IO a) -> m ()
withLog ((StoreLog 'WriteMode -> IO ()) -> m ())
-> (StoreLog 'WriteMode -> IO ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \StoreLog 'WriteMode
s -> StoreLog 'WriteMode -> RecipientId -> RecipientPublicKey -> IO ()
logSecureQueue StoreLog 'WriteMode
s RecipientId
queueId RecipientPublicKey
sKey
STM Transmission -> m Transmission
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Transmission -> m Transmission)
-> (STM (Command 'Broker) -> STM Transmission)
-> STM (Command 'Broker)
-> m Transmission
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RecipientPublicKey -> STM (Command 'Broker) -> STM Transmission
forall (m' :: * -> *).
Monad m' =>
RecipientPublicKey -> m' (Command 'Broker) -> m' Transmission
checkKeySize RecipientPublicKey
sKey (STM (Command 'Broker) -> m Transmission)
-> STM (Command 'Broker) -> m Transmission
forall a b. (a -> b) -> a -> b
$ (ErrorType -> Command 'Broker)
-> (() -> Command 'Broker)
-> Either ErrorType ()
-> Command 'Broker
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either ErrorType -> Command 'Broker
ERR (Command 'Broker -> () -> Command 'Broker
forall a b. a -> b -> a
const Command 'Broker
OK) (Either ErrorType () -> Command 'Broker)
-> STM (Either ErrorType ()) -> STM (Command 'Broker)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> QueueStore
-> RecipientId -> RecipientPublicKey -> STM (Either ErrorType ())
forall s (m :: * -> *).
MonadQueueStore s m =>
s -> RecipientId -> RecipientPublicKey -> m (Either ErrorType ())
secureQueue QueueStore
st RecipientId
queueId RecipientPublicKey
sKey
checkKeySize :: Monad m' => C.PublicKey -> m' (Command 'Broker) -> m' Transmission
checkKeySize :: RecipientPublicKey -> m' (Command 'Broker) -> m' Transmission
checkKeySize RecipientPublicKey
key m' (Command 'Broker)
action =
CorrId -> RecipientId -> Command 'Broker -> Transmission
mkResp CorrId
corrId RecipientId
queueId
(Command 'Broker -> Transmission)
-> m' (Command 'Broker) -> m' Transmission
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> if Int -> Bool
C.validKeySize (Int -> Bool) -> Int -> Bool
forall a b. (a -> b) -> a -> b
$ RecipientPublicKey -> Int
C.publicKeySize RecipientPublicKey
key
then m' (Command 'Broker)
action
else Command 'Broker -> m' (Command 'Broker)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Command 'Broker -> m' (Command 'Broker))
-> (ErrorType -> Command 'Broker)
-> ErrorType
-> m' (Command 'Broker)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ErrorType -> Command 'Broker
ERR (ErrorType -> m' (Command 'Broker))
-> ErrorType -> m' (Command 'Broker)
forall a b. (a -> b) -> a -> b
$ CommandError -> ErrorType
CMD CommandError
KEY_SIZE
suspendQueue_ :: QueueStore -> m Transmission
suspendQueue_ :: QueueStore -> m Transmission
suspendQueue_ QueueStore
st = do
(StoreLog 'WriteMode -> IO ()) -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadReader Env m) =>
(StoreLog 'WriteMode -> IO a) -> m ()
withLog (StoreLog 'WriteMode -> RecipientId -> IO ()
`logDeleteQueue` RecipientId
queueId)
Either ErrorType () -> Transmission
okResp (Either ErrorType () -> Transmission)
-> m (Either ErrorType ()) -> m Transmission
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM (Either ErrorType ()) -> m (Either ErrorType ())
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (QueueStore -> RecipientId -> STM (Either ErrorType ())
forall s (m :: * -> *).
MonadQueueStore s m =>
s -> RecipientId -> m (Either ErrorType ())
suspendQueue QueueStore
st RecipientId
queueId)
subscribeQueue :: RecipientId -> m Transmission
subscribeQueue :: RecipientId -> m Transmission
subscribeQueue RecipientId
rId =
STM Sub -> m Sub
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (RecipientId -> STM Sub
getSubscription RecipientId
rId) m Sub -> (Sub -> m Transmission) -> m Transmission
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (MsgQueue -> STM (Maybe Message))
-> RecipientId -> Sub -> m Transmission
deliverMessage MsgQueue -> STM (Maybe Message)
forall q (m :: * -> *). MonadMsgQueue q m => q -> m (Maybe Message)
tryPeekMsg RecipientId
rId
getSubscription :: RecipientId -> STM Sub
getSubscription :: RecipientId -> STM Sub
getSubscription RecipientId
rId = do
Map RecipientId Sub
subs <- TVar (Map RecipientId Sub) -> STM (Map RecipientId Sub)
forall a. TVar a -> STM a
readTVar TVar (Map RecipientId Sub)
subscriptions
case RecipientId -> Map RecipientId Sub -> Maybe Sub
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup RecipientId
rId Map RecipientId Sub
subs of
Just Sub
s -> TMVar () -> STM (Maybe ())
forall a. TMVar a -> STM (Maybe a)
tryTakeTMVar (Sub -> TMVar ()
delivered Sub
s) STM (Maybe ()) -> Sub -> STM Sub
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Sub
s
Maybe Sub
Nothing -> do
TBQueue (RecipientId, Client) -> (RecipientId, Client) -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (RecipientId, Client)
subscribedQ (RecipientId
rId, Client
clnt)
Sub
s <- STM Sub
newSubscription
TVar (Map RecipientId Sub) -> Map RecipientId Sub -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Map RecipientId Sub)
subscriptions (Map RecipientId Sub -> STM ()) -> Map RecipientId Sub -> STM ()
forall a b. (a -> b) -> a -> b
$ RecipientId -> Sub -> Map RecipientId Sub -> Map RecipientId Sub
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert RecipientId
rId Sub
s Map RecipientId Sub
subs
Sub -> STM Sub
forall (m :: * -> *) a. Monad m => a -> m a
return Sub
s
unsubscribeQueue :: m ()
unsubscribeQueue :: m ()
unsubscribeQueue = do
Maybe Sub
sub <- STM (Maybe Sub) -> m (Maybe Sub)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (Maybe Sub) -> m (Maybe Sub))
-> ((Map RecipientId Sub -> (Maybe Sub, Map RecipientId Sub))
-> STM (Maybe Sub))
-> (Map RecipientId Sub -> (Maybe Sub, Map RecipientId Sub))
-> m (Maybe Sub)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar (Map RecipientId Sub)
-> (Map RecipientId Sub -> (Maybe Sub, Map RecipientId Sub))
-> STM (Maybe Sub)
forall s a. TVar s -> (s -> (a, s)) -> STM a
stateTVar TVar (Map RecipientId Sub)
subscriptions ((Map RecipientId Sub -> (Maybe Sub, Map RecipientId Sub))
-> m (Maybe Sub))
-> (Map RecipientId Sub -> (Maybe Sub, Map RecipientId Sub))
-> m (Maybe Sub)
forall a b. (a -> b) -> a -> b
$
\Map RecipientId Sub
cs -> (RecipientId -> Map RecipientId Sub -> Maybe Sub
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup RecipientId
queueId Map RecipientId Sub
cs, RecipientId -> Map RecipientId Sub -> Map RecipientId Sub
forall k a. Ord k => k -> Map k a -> Map k a
M.delete RecipientId
queueId Map RecipientId Sub
cs)
(Sub -> m ()) -> Maybe Sub -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Sub -> m ()
forall (m :: * -> *). MonadUnliftIO m => Sub -> m ()
cancelSub Maybe Sub
sub
acknowledgeMsg :: m Transmission
acknowledgeMsg :: m Transmission
acknowledgeMsg =
STM (Maybe (Maybe Sub)) -> m (Maybe (Maybe Sub))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (RecipientId -> (Sub -> STM (Maybe Sub)) -> STM (Maybe (Maybe Sub))
forall a. RecipientId -> (Sub -> STM a) -> STM (Maybe a)
withSub RecipientId
queueId ((Sub -> STM (Maybe Sub)) -> STM (Maybe (Maybe Sub)))
-> (Sub -> STM (Maybe Sub)) -> STM (Maybe (Maybe Sub))
forall a b. (a -> b) -> a -> b
$ \Sub
s -> Sub -> () -> Sub
forall a b. a -> b -> a
const Sub
s (() -> Sub) -> STM (Maybe ()) -> STM (Maybe Sub)
forall (f :: * -> *) (g :: * -> *) a b.
(Functor f, Functor g) =>
(a -> b) -> f (g a) -> f (g b)
<$$> TMVar () -> STM (Maybe ())
forall a. TMVar a -> STM (Maybe a)
tryTakeTMVar (Sub -> TMVar ()
delivered Sub
s))
m (Maybe (Maybe Sub))
-> (Maybe (Maybe Sub) -> m Transmission) -> m Transmission
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just (Just Sub
s) -> (MsgQueue -> STM (Maybe Message))
-> RecipientId -> Sub -> m Transmission
deliverMessage MsgQueue -> STM (Maybe Message)
forall q (m :: * -> *). MonadMsgQueue q m => q -> m (Maybe Message)
tryDelPeekMsg RecipientId
queueId Sub
s
Maybe (Maybe Sub)
_ -> Transmission -> m Transmission
forall (m :: * -> *) a. Monad m => a -> m a
return (Transmission -> m Transmission) -> Transmission -> m Transmission
forall a b. (a -> b) -> a -> b
$ ErrorType -> Transmission
err ErrorType
NO_MSG
withSub :: RecipientId -> (Sub -> STM a) -> STM (Maybe a)
withSub :: RecipientId -> (Sub -> STM a) -> STM (Maybe a)
withSub RecipientId
rId Sub -> STM a
f = TVar (Map RecipientId Sub) -> STM (Map RecipientId Sub)
forall a. TVar a -> STM a
readTVar TVar (Map RecipientId Sub)
subscriptions STM (Map RecipientId Sub)
-> (Map RecipientId Sub -> STM (Maybe a)) -> STM (Maybe a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Sub -> STM a) -> Maybe Sub -> STM (Maybe a)
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM Sub -> STM a
f (Maybe Sub -> STM (Maybe a))
-> (Map RecipientId Sub -> Maybe Sub)
-> Map RecipientId Sub
-> STM (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RecipientId -> Map RecipientId Sub -> Maybe Sub
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup RecipientId
rId
sendMessage :: QueueStore -> MsgBody -> m Transmission
sendMessage :: QueueStore -> RecipientId -> m Transmission
sendMessage QueueStore
st RecipientId
msgBody = do
Either ErrorType QueueRec
qr <- STM (Either ErrorType QueueRec) -> m (Either ErrorType QueueRec)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (Either ErrorType QueueRec) -> m (Either ErrorType QueueRec))
-> STM (Either ErrorType QueueRec) -> m (Either ErrorType QueueRec)
forall a b. (a -> b) -> a -> b
$ QueueStore
-> SParty 'Sender -> RecipientId -> STM (Either ErrorType QueueRec)
forall s (m :: * -> *) (a :: Party).
MonadQueueStore s m =>
s -> SParty a -> RecipientId -> m (Either ErrorType QueueRec)
getQueue QueueStore
st SParty 'Sender
SSender RecipientId
queueId
(ErrorType -> m Transmission)
-> (QueueRec -> m Transmission)
-> Either ErrorType QueueRec
-> m Transmission
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (Transmission -> m Transmission
forall (m :: * -> *) a. Monad m => a -> m a
return (Transmission -> m Transmission)
-> (ErrorType -> Transmission) -> ErrorType -> m Transmission
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ErrorType -> Transmission
err) QueueRec -> m Transmission
storeMessage Either ErrorType QueueRec
qr
where
mkMessage :: m Message
mkMessage :: m Message
mkMessage = do
RecipientId
msgId <- (Env -> Int) -> m Int
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks (ServerConfig -> Int
msgIdBytes (ServerConfig -> Int) -> (Env -> ServerConfig) -> Env -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> ServerConfig
config) m Int -> (Int -> m RecipientId) -> m RecipientId
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Int -> m RecipientId
forall (m :: * -> *).
(MonadUnliftIO m, MonadReader Env m) =>
Int -> m RecipientId
randomId
UTCTime
ts <- IO UTCTime -> m UTCTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
Message -> m Message
forall (m :: * -> *) a. Monad m => a -> m a
return (Message -> m Message) -> Message -> m Message
forall a b. (a -> b) -> a -> b
$ Message :: RecipientId -> UTCTime -> RecipientId -> Message
Message {RecipientId
msgId :: RecipientId
msgId :: RecipientId
msgId, UTCTime
ts :: UTCTime
ts :: UTCTime
ts, RecipientId
msgBody :: RecipientId
msgBody :: RecipientId
msgBody}
storeMessage :: QueueRec -> m Transmission
storeMessage :: QueueRec -> m Transmission
storeMessage QueueRec
qr = case QueueRec -> QueueStatus
status QueueRec
qr of
QueueStatus
QueueOff -> Transmission -> m Transmission
forall (m :: * -> *) a. Monad m => a -> m a
return (Transmission -> m Transmission) -> Transmission -> m Transmission
forall a b. (a -> b) -> a -> b
$ ErrorType -> Transmission
err ErrorType
AUTH
QueueStatus
QueueActive -> do
STMMsgStore
ms <- (Env -> STMMsgStore) -> m STMMsgStore
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> STMMsgStore
msgStore
Message
msg <- m Message
mkMessage
STM Transmission -> m Transmission
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Transmission -> m Transmission)
-> STM Transmission -> m Transmission
forall a b. (a -> b) -> a -> b
$ do
MsgQueue
q <- STMMsgStore -> RecipientId -> STM MsgQueue
forall s q (m :: * -> *).
MonadMsgStore s q m =>
s -> RecipientId -> m q
getMsgQueue STMMsgStore
ms (QueueRec -> RecipientId
recipientId QueueRec
qr)
MsgQueue -> Message -> STM ()
forall q (m :: * -> *). MonadMsgQueue q m => q -> Message -> m ()
writeMsg MsgQueue
q Message
msg
Transmission -> STM Transmission
forall (m :: * -> *) a. Monad m => a -> m a
return Transmission
ok
deliverMessage :: (MsgQueue -> STM (Maybe Message)) -> RecipientId -> Sub -> m Transmission
deliverMessage :: (MsgQueue -> STM (Maybe Message))
-> RecipientId -> Sub -> m Transmission
deliverMessage MsgQueue -> STM (Maybe Message)
tryPeek RecipientId
rId = \case
Sub {$sel:subThread:Sub :: Sub -> SubscriptionThread
subThread = SubscriptionThread
NoSub} -> do
STMMsgStore
ms <- (Env -> STMMsgStore) -> m STMMsgStore
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> STMMsgStore
msgStore
MsgQueue
q <- STM MsgQueue -> m MsgQueue
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM MsgQueue -> m MsgQueue) -> STM MsgQueue -> m MsgQueue
forall a b. (a -> b) -> a -> b
$ STMMsgStore -> RecipientId -> STM MsgQueue
forall s q (m :: * -> *).
MonadMsgStore s q m =>
s -> RecipientId -> m q
getMsgQueue STMMsgStore
ms RecipientId
rId
STM (Maybe Message) -> m (Maybe Message)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (MsgQueue -> STM (Maybe Message)
tryPeek MsgQueue
q) m (Maybe Message)
-> (Maybe Message -> m Transmission) -> m Transmission
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe Message
Nothing -> MsgQueue -> m ()
forkSub MsgQueue
q m () -> Transmission -> m Transmission
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Transmission
ok
Just Message
msg -> STM (Maybe Bool) -> m (Maybe Bool)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically STM (Maybe Bool)
setDelivered m (Maybe Bool) -> Transmission -> m Transmission
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> CorrId -> RecipientId -> Command 'Broker -> Transmission
mkResp CorrId
corrId RecipientId
rId (Message -> Command 'Broker
msgCmd Message
msg)
Sub
_ -> Transmission -> m Transmission
forall (m :: * -> *) a. Monad m => a -> m a
return Transmission
ok
where
forkSub :: MsgQueue -> m ()
forkSub :: MsgQueue -> m ()
forkSub MsgQueue
q = do
STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ())
-> ((Sub -> Sub) -> STM ()) -> (Sub -> Sub) -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Sub -> Sub) -> STM ()
setSub ((Sub -> Sub) -> m ()) -> (Sub -> Sub) -> m ()
forall a b. (a -> b) -> a -> b
$ \Sub
s -> Sub
s {$sel:subThread:Sub :: SubscriptionThread
subThread = SubscriptionThread
SubPending}
ThreadId
t <- m () -> m ThreadId
forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO (m () -> m ThreadId) -> m () -> m ThreadId
forall a b. (a -> b) -> a -> b
$ MsgQueue -> m ()
subscriber MsgQueue
q
STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ())
-> ((Sub -> Sub) -> STM ()) -> (Sub -> Sub) -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Sub -> Sub) -> STM ()
setSub ((Sub -> Sub) -> m ()) -> (Sub -> Sub) -> m ()
forall a b. (a -> b) -> a -> b
$ \case
s :: Sub
s@Sub {$sel:subThread:Sub :: Sub -> SubscriptionThread
subThread = SubscriptionThread
SubPending} -> Sub
s {$sel:subThread:Sub :: SubscriptionThread
subThread = ThreadId -> SubscriptionThread
SubThread ThreadId
t}
Sub
s -> Sub
s
subscriber :: MsgQueue -> m ()
subscriber :: MsgQueue -> m ()
subscriber MsgQueue
q = STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Message
msg <- MsgQueue -> STM Message
forall q (m :: * -> *). MonadMsgQueue q m => q -> m Message
peekMsg MsgQueue
q
TBQueue Transmission -> Transmission -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue Transmission
sndQ (Transmission -> STM ()) -> Transmission -> STM ()
forall a b. (a -> b) -> a -> b
$ CorrId -> RecipientId -> Command 'Broker -> Transmission
mkResp (RecipientId -> CorrId
CorrId RecipientId
B.empty) RecipientId
rId (Message -> Command 'Broker
msgCmd Message
msg)
(Sub -> Sub) -> STM ()
setSub (\Sub
s -> Sub
s {$sel:subThread:Sub :: SubscriptionThread
subThread = SubscriptionThread
NoSub})
STM (Maybe Bool) -> STM ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void STM (Maybe Bool)
setDelivered
setSub :: (Sub -> Sub) -> STM ()
setSub :: (Sub -> Sub) -> STM ()
setSub Sub -> Sub
f = TVar (Map RecipientId Sub)
-> (Map RecipientId Sub -> Map RecipientId Sub) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar (Map RecipientId Sub)
subscriptions ((Map RecipientId Sub -> Map RecipientId Sub) -> STM ())
-> (Map RecipientId Sub -> Map RecipientId Sub) -> STM ()
forall a b. (a -> b) -> a -> b
$ (Sub -> Sub)
-> RecipientId -> Map RecipientId Sub -> Map RecipientId Sub
forall k a. Ord k => (a -> a) -> k -> Map k a -> Map k a
M.adjust Sub -> Sub
f RecipientId
rId
setDelivered :: STM (Maybe Bool)
setDelivered :: STM (Maybe Bool)
setDelivered = RecipientId -> (Sub -> STM Bool) -> STM (Maybe Bool)
forall a. RecipientId -> (Sub -> STM a) -> STM (Maybe a)
withSub RecipientId
rId ((Sub -> STM Bool) -> STM (Maybe Bool))
-> (Sub -> STM Bool) -> STM (Maybe Bool)
forall a b. (a -> b) -> a -> b
$ \Sub
s -> TMVar () -> () -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar (Sub -> TMVar ()
delivered Sub
s) ()
delQueueAndMsgs :: QueueStore -> m Transmission
delQueueAndMsgs :: QueueStore -> m Transmission
delQueueAndMsgs QueueStore
st = do
(StoreLog 'WriteMode -> IO ()) -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadReader Env m) =>
(StoreLog 'WriteMode -> IO a) -> m ()
withLog (StoreLog 'WriteMode -> RecipientId -> IO ()
`logDeleteQueue` RecipientId
queueId)
STMMsgStore
ms <- (Env -> STMMsgStore) -> m STMMsgStore
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> STMMsgStore
msgStore
STM Transmission -> m Transmission
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Transmission -> m Transmission)
-> STM Transmission -> m Transmission
forall a b. (a -> b) -> a -> b
$
QueueStore -> RecipientId -> STM (Either ErrorType ())
forall s (m :: * -> *).
MonadQueueStore s m =>
s -> RecipientId -> m (Either ErrorType ())
deleteQueue QueueStore
st RecipientId
queueId STM (Either ErrorType ())
-> (Either ErrorType () -> STM Transmission) -> STM Transmission
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left ErrorType
e -> Transmission -> STM Transmission
forall (m :: * -> *) a. Monad m => a -> m a
return (Transmission -> STM Transmission)
-> Transmission -> STM Transmission
forall a b. (a -> b) -> a -> b
$ ErrorType -> Transmission
err ErrorType
e
Right ()
_ -> STMMsgStore -> RecipientId -> STM ()
forall s q (m :: * -> *).
MonadMsgStore s q m =>
s -> RecipientId -> m ()
delMsgQueue STMMsgStore
ms RecipientId
queueId STM () -> Transmission -> STM Transmission
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Transmission
ok
ok :: Transmission
ok :: Transmission
ok = CorrId -> RecipientId -> Command 'Broker -> Transmission
mkResp CorrId
corrId RecipientId
queueId Command 'Broker
OK
err :: ErrorType -> Transmission
err :: ErrorType -> Transmission
err = CorrId -> RecipientId -> Command 'Broker -> Transmission
mkResp CorrId
corrId RecipientId
queueId (Command 'Broker -> Transmission)
-> (ErrorType -> Command 'Broker) -> ErrorType -> Transmission
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ErrorType -> Command 'Broker
ERR
okResp :: Either ErrorType () -> Transmission
okResp :: Either ErrorType () -> Transmission
okResp = (ErrorType -> Transmission)
-> (() -> Transmission) -> Either ErrorType () -> Transmission
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either ErrorType -> Transmission
err ((() -> Transmission) -> Either ErrorType () -> Transmission)
-> (() -> Transmission) -> Either ErrorType () -> Transmission
forall a b. (a -> b) -> a -> b
$ Transmission -> () -> Transmission
forall a b. a -> b -> a
const Transmission
ok
msgCmd :: Message -> Command 'Broker
msgCmd :: Message -> Command 'Broker
msgCmd Message {RecipientId
msgId :: RecipientId
msgId :: Message -> RecipientId
msgId, UTCTime
ts :: UTCTime
ts :: Message -> UTCTime
ts, RecipientId
msgBody :: RecipientId
msgBody :: Message -> RecipientId
msgBody} = RecipientId -> UTCTime -> RecipientId -> Command 'Broker
MSG RecipientId
msgId UTCTime
ts RecipientId
msgBody
withLog :: (MonadUnliftIO m, MonadReader Env m) => (StoreLog 'WriteMode -> IO a) -> m ()
withLog :: (StoreLog 'WriteMode -> IO a) -> m ()
withLog StoreLog 'WriteMode -> IO a
action = do
Env
env <- m Env
forall r (m :: * -> *). MonadReader r m => m r
ask
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ())
-> (Maybe (StoreLog 'WriteMode) -> IO ())
-> Maybe (StoreLog 'WriteMode)
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StoreLog 'WriteMode -> IO a)
-> Maybe (StoreLog 'WriteMode) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ StoreLog 'WriteMode -> IO a
action (Maybe (StoreLog 'WriteMode) -> m ())
-> Maybe (StoreLog 'WriteMode) -> m ()
forall a b. (a -> b) -> a -> b
$ Env -> Maybe (StoreLog 'WriteMode)
storeLog (Env
env :: Env)
randomId :: (MonadUnliftIO m, MonadReader Env m) => Int -> m Encoded
randomId :: Int -> m RecipientId
randomId Int
n = do
TVar ChaChaDRG
gVar <- (Env -> TVar ChaChaDRG) -> m (TVar ChaChaDRG)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> TVar ChaChaDRG
idsDrg
STM RecipientId -> m RecipientId
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (Int -> TVar ChaChaDRG -> STM RecipientId
randomBytes Int
n TVar ChaChaDRG
gVar)
randomBytes :: Int -> TVar ChaChaDRG -> STM ByteString
randomBytes :: Int -> TVar ChaChaDRG -> STM RecipientId
randomBytes Int
n TVar ChaChaDRG
gVar = do
ChaChaDRG
g <- TVar ChaChaDRG -> STM ChaChaDRG
forall a. TVar a -> STM a
readTVar TVar ChaChaDRG
gVar
let (RecipientId
bytes, ChaChaDRG
g') = Int -> ChaChaDRG -> (RecipientId, ChaChaDRG)
forall gen byteArray.
(DRG gen, ByteArray byteArray) =>
Int -> gen -> (byteArray, gen)
randomBytesGenerate Int
n ChaChaDRG
g
TVar ChaChaDRG -> ChaChaDRG -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar ChaChaDRG
gVar ChaChaDRG
g'
RecipientId -> STM RecipientId
forall (m :: * -> *) a. Monad m => a -> m a
return RecipientId
bytes