{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
module Simplex.Messaging.Server (runSMPServer, runSMPServerBlocking) where
import Control.Concurrent.STM (stateTVar)
import Control.Monad
import Control.Monad.Except
import Control.Monad.IO.Unlift
import Control.Monad.Reader
import Crypto.Random
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Functor (($>))
import qualified Data.Map.Strict as M
import Data.Time.Clock
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.Env.STM
import Simplex.Messaging.Server.MsgStore
import Simplex.Messaging.Server.MsgStore.STM (MsgQueue)
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.QueueStore.STM (QueueStore)
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.Transport
import Simplex.Messaging.Util
import UnliftIO.Async
import UnliftIO.Concurrent
import UnliftIO.Exception
import UnliftIO.IO
import UnliftIO.STM
runSMPServer :: (MonadRandom m, MonadUnliftIO m) => ServerConfig -> m ()
runSMPServer :: ServerConfig -> m ()
runSMPServer cfg :: ServerConfig
cfg = m (TMVar Bool)
forall (m :: * -> *) a. MonadIO m => m (TMVar a)
newEmptyTMVarIO m (TMVar Bool) -> (TMVar Bool -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (TMVar Bool -> ServerConfig -> m ()
forall (m :: * -> *).
(MonadRandom m, MonadUnliftIO m) =>
TMVar Bool -> ServerConfig -> m ()
`runSMPServerBlocking` ServerConfig
cfg)
runSMPServerBlocking :: (MonadRandom m, MonadUnliftIO m) => TMVar Bool -> ServerConfig -> m ()
runSMPServerBlocking :: TMVar Bool -> ServerConfig -> m ()
runSMPServerBlocking started :: TMVar Bool
started cfg :: ServerConfig
cfg@ServerConfig {ServiceName
$sel:tcpPort:ServerConfig :: ServerConfig -> ServiceName
tcpPort :: ServiceName
tcpPort} = do
Env
env <- ServerConfig -> m Env
forall (m :: * -> *).
(MonadUnliftIO m, MonadRandom m) =>
ServerConfig -> m Env
newEnv ServerConfig
cfg
ReaderT Env m () -> Env -> m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT ReaderT Env m ()
forall (m :: * -> *). (MonadUnliftIO m, MonadReader Env m) => m ()
smpServer Env
env
where
smpServer :: (MonadUnliftIO m, MonadReader Env m) => m ()
smpServer :: m ()
smpServer = do
Server
s <- (Env -> Server) -> m Server
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> Server
server
m () -> m () -> m ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m ()
race_ (TMVar Bool -> ServiceName -> (Handle -> m ()) -> m ()
forall (m :: * -> *).
MonadUnliftIO m =>
TMVar Bool -> ServiceName -> (Handle -> m ()) -> m ()
runTCPServer TMVar Bool
started ServiceName
tcpPort Handle -> m ()
forall (m :: * -> *).
(MonadUnliftIO m, MonadReader Env m) =>
Handle -> m ()
runClient) (Server -> m ()
forall (m :: * -> *). MonadUnliftIO m => Server -> m ()
serverThread Server
s)
m () -> m () -> m ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`finally` (StoreLog 'WriteMode -> IO ()) -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadReader Env m) =>
(StoreLog 'WriteMode -> IO a) -> m ()
withLog StoreLog 'WriteMode -> IO ()
forall (a :: IOMode). StoreLog a -> IO ()
closeStoreLog
serverThread :: MonadUnliftIO m => Server -> m ()
serverThread :: Server -> m ()
serverThread Server {TBQueue (RecipientId, Client)
$sel:subscribedQ:Server :: Server -> TBQueue (RecipientId, Client)
subscribedQ :: TBQueue (RecipientId, Client)
subscribedQ, TVar (Map RecipientId Client)
$sel:subscribers:Server :: Server -> TVar (Map RecipientId Client)
subscribers :: TVar (Map RecipientId Client)
subscribers} = m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> (STM () -> m ()) -> STM () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ do
(rId :: RecipientId
rId, clnt :: Client
clnt) <- TBQueue (RecipientId, Client) -> STM (RecipientId, Client)
forall a. TBQueue a -> STM a
readTBQueue TBQueue (RecipientId, Client)
subscribedQ
Map RecipientId Client
cs <- TVar (Map RecipientId Client) -> STM (Map RecipientId Client)
forall a. TVar a -> STM a
readTVar TVar (Map RecipientId Client)
subscribers
case RecipientId -> Map RecipientId Client -> Maybe Client
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup RecipientId
rId Map RecipientId Client
cs of
Just Client {TBQueue Transmission
$sel:rcvQ:Client :: Client -> TBQueue Transmission
rcvQ :: TBQueue Transmission
rcvQ} -> TBQueue Transmission -> Transmission -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue Transmission
rcvQ (RecipientId -> CorrId
CorrId RecipientId
B.empty, RecipientId
rId, SParty 'Broker -> Command 'Broker -> Cmd
forall (a :: Party). SParty a -> Command a -> Cmd
Cmd SParty 'Broker
SBroker Command 'Broker
END)
Nothing -> () -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
TVar (Map RecipientId Client) -> Map RecipientId Client -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Map RecipientId Client)
subscribers (Map RecipientId Client -> STM ())
-> Map RecipientId Client -> STM ()
forall a b. (a -> b) -> a -> b
$ RecipientId
-> Client -> Map RecipientId Client -> Map RecipientId Client
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert RecipientId
rId Client
clnt Map RecipientId Client
cs
runClient :: (MonadUnliftIO m, MonadReader Env m) => Handle -> m ()
runClient :: Handle -> m ()
runClient h :: Handle
h = do
FullKeyPair
keyPair <- (Env -> FullKeyPair) -> m FullKeyPair
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> FullKeyPair
serverKeyPair
IO (Either TransportError THandle)
-> m (Either TransportError THandle)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (ExceptT TransportError IO THandle
-> IO (Either TransportError THandle)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT TransportError IO THandle
-> IO (Either TransportError THandle))
-> ExceptT TransportError IO THandle
-> IO (Either TransportError THandle)
forall a b. (a -> b) -> a -> b
$ Handle -> FullKeyPair -> ExceptT TransportError IO THandle
serverHandshake Handle
h FullKeyPair
keyPair) m (Either TransportError THandle)
-> (Either TransportError THandle -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Right th :: THandle
th -> THandle -> m ()
forall (m :: * -> *).
(MonadUnliftIO m, MonadReader Env m) =>
THandle -> m ()
runClientTransport THandle
th
Left _ -> () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
runClientTransport :: (MonadUnliftIO m, MonadReader Env m) => THandle -> m ()
runClientTransport :: THandle -> m ()
runClientTransport th :: THandle
th = do
Natural
q <- (Env -> Natural) -> m Natural
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> Natural) -> m Natural) -> (Env -> Natural) -> m Natural
forall a b. (a -> b) -> a -> b
$ ServerConfig -> Natural
tbqSize (ServerConfig -> Natural)
-> (Env -> ServerConfig) -> Env -> Natural
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> ServerConfig
config
Client
c <- STM Client -> m Client
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Client -> m Client) -> STM Client -> m Client
forall a b. (a -> b) -> a -> b
$ Natural -> STM Client
newClient Natural
q
Server
s <- (Env -> Server) -> m Server
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> Server
server
[m ()] -> m ()
forall (m :: * -> *) a. MonadUnliftIO m => [m a] -> m ()
raceAny_ [THandle -> Client -> m ()
forall (m :: * -> *). MonadUnliftIO m => THandle -> Client -> m ()
send THandle
th Client
c, Client -> Server -> m ()
forall (m :: * -> *).
(MonadUnliftIO m, MonadReader Env m) =>
Client -> Server -> m ()
client Client
c Server
s, THandle -> Client -> m ()
forall (m :: * -> *).
(MonadUnliftIO m, MonadReader Env m) =>
THandle -> Client -> m ()
receive THandle
th Client
c]
m () -> m () -> m ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`finally` Client -> m ()
forall (m :: * -> *). MonadUnliftIO m => Client -> m ()
cancelSubscribers Client
c
cancelSubscribers :: MonadUnliftIO m => Client -> m ()
cancelSubscribers :: Client -> m ()
cancelSubscribers Client {TVar (Map RecipientId Sub)
$sel:subscriptions:Client :: Client -> TVar (Map RecipientId Sub)
subscriptions :: TVar (Map RecipientId Sub)
subscriptions} =
TVar (Map RecipientId Sub) -> m (Map RecipientId Sub)
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar (Map RecipientId Sub)
subscriptions m (Map RecipientId Sub) -> (Map RecipientId Sub -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Sub -> m ()) -> Map RecipientId Sub -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Sub -> m ()
forall (m :: * -> *). MonadUnliftIO m => Sub -> m ()
cancelSub
cancelSub :: MonadUnliftIO m => Sub -> m ()
cancelSub :: Sub -> m ()
cancelSub = \case
Sub {$sel:subThread:Sub :: Sub -> SubscriptionThread
subThread = SubThread t :: ThreadId
t} -> ThreadId -> m ()
forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread ThreadId
t
_ -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
receive :: (MonadUnliftIO m, MonadReader Env m) => THandle -> Client -> m ()
receive :: THandle -> Client -> m ()
receive h :: THandle
h Client {TBQueue Transmission
rcvQ :: TBQueue Transmission
$sel:rcvQ:Client :: Client -> TBQueue Transmission
rcvQ} = m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
(signature :: Signature
signature, (corrId :: CorrId
corrId, queueId :: RecipientId
queueId, cmdOrError :: Either ErrorType Cmd
cmdOrError)) <- (Cmd -> Either ErrorType Cmd)
-> THandle -> m (Signature, TransmissionOrError)
forall (m :: * -> *).
MonadIO m =>
(Cmd -> Either ErrorType Cmd)
-> THandle -> m (Signature, TransmissionOrError)
tGet Cmd -> Either ErrorType Cmd
fromClient THandle
h
Transmission
t <- case Either ErrorType Cmd
cmdOrError of
Left e :: ErrorType
e -> Transmission -> m Transmission
forall (m :: * -> *) a. Monad m => a -> m a
return (Transmission -> m Transmission)
-> (Command 'Broker -> Transmission)
-> Command 'Broker
-> m Transmission
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CorrId -> RecipientId -> Command 'Broker -> Transmission
mkResp CorrId
corrId RecipientId
queueId (Command 'Broker -> m Transmission)
-> Command 'Broker -> m Transmission
forall a b. (a -> b) -> a -> b
$ ErrorType -> Command 'Broker
ERR ErrorType
e
Right cmd :: Cmd
cmd -> SignedTransmission -> m Transmission
forall (m :: * -> *).
(MonadUnliftIO m, MonadReader Env m) =>
SignedTransmission -> m Transmission
verifyTransmission (Signature
signature, (CorrId
corrId, RecipientId
queueId, Cmd
cmd))
STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ TBQueue Transmission -> Transmission -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue Transmission
rcvQ Transmission
t
send :: MonadUnliftIO m => THandle -> Client -> m ()
send :: THandle -> Client -> m ()
send h :: THandle
h Client {TBQueue Transmission
$sel:sndQ:Client :: Client -> TBQueue Transmission
sndQ :: TBQueue Transmission
sndQ} = m (Either TransportError ()) -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m (Either TransportError ()) -> m ())
-> m (Either TransportError ()) -> m ()
forall a b. (a -> b) -> a -> b
$ do
Transmission
t <- STM Transmission -> m Transmission
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Transmission -> m Transmission)
-> STM Transmission -> m Transmission
forall a b. (a -> b) -> a -> b
$ TBQueue Transmission -> STM Transmission
forall a. TBQueue a -> STM a
readTBQueue TBQueue Transmission
sndQ
IO (Either TransportError ()) -> m (Either TransportError ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either TransportError ()) -> m (Either TransportError ()))
-> IO (Either TransportError ()) -> m (Either TransportError ())
forall a b. (a -> b) -> a -> b
$ THandle -> SignedRawTransmission -> IO (Either TransportError ())
tPut THandle
h ("", Transmission -> RecipientId
serializeTransmission Transmission
t)
mkResp :: CorrId -> QueueId -> Command 'Broker -> Transmission
mkResp :: CorrId -> RecipientId -> Command 'Broker -> Transmission
mkResp corrId :: CorrId
corrId queueId :: RecipientId
queueId command :: Command 'Broker
command = (CorrId
corrId, RecipientId
queueId, SParty 'Broker -> Command 'Broker -> Cmd
forall (a :: Party). SParty a -> Command a -> Cmd
Cmd SParty 'Broker
SBroker Command 'Broker
command)
verifyTransmission :: forall m. (MonadUnliftIO m, MonadReader Env m) => SignedTransmission -> m Transmission
verifyTransmission :: SignedTransmission -> m Transmission
verifyTransmission (sig :: Signature
sig, t :: Transmission
t@(corrId :: CorrId
corrId, queueId :: RecipientId
queueId, cmd :: Cmd
cmd)) = do
(CorrId
corrId,RecipientId
queueId,) (Cmd -> Transmission) -> m Cmd -> m Transmission
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> case Cmd
cmd of
Cmd SBroker _ -> Cmd -> m Cmd
forall (m :: * -> *) a. Monad m => a -> m a
return (Cmd -> m Cmd) -> Cmd -> m Cmd
forall a b. (a -> b) -> a -> b
$ ErrorType -> Cmd
smpErr ErrorType
INTERNAL
Cmd SRecipient (NEW k :: RecipientPublicKey
k) -> Cmd -> m Cmd
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Cmd -> m Cmd) -> Cmd -> m Cmd
forall a b. (a -> b) -> a -> b
$ RecipientPublicKey -> Cmd
verifySignature RecipientPublicKey
k
Cmd SRecipient _ -> SParty 'Recipient -> (QueueRec -> Cmd) -> m Cmd
forall (p :: Party). SParty p -> (QueueRec -> Cmd) -> m Cmd
verifyCmd SParty 'Recipient
SRecipient ((QueueRec -> Cmd) -> m Cmd) -> (QueueRec -> Cmd) -> m Cmd
forall a b. (a -> b) -> a -> b
$ RecipientPublicKey -> Cmd
verifySignature (RecipientPublicKey -> Cmd)
-> (QueueRec -> RecipientPublicKey) -> QueueRec -> Cmd
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QueueRec -> RecipientPublicKey
recipientKey
Cmd SSender (SEND _) -> SParty 'Sender -> (QueueRec -> Cmd) -> m Cmd
forall (p :: Party). SParty p -> (QueueRec -> Cmd) -> m Cmd
verifyCmd SParty 'Sender
SSender ((QueueRec -> Cmd) -> m Cmd) -> (QueueRec -> Cmd) -> m Cmd
forall a b. (a -> b) -> a -> b
$ Signature -> Maybe RecipientPublicKey -> Cmd
verifySend Signature
sig (Maybe RecipientPublicKey -> Cmd)
-> (QueueRec -> Maybe RecipientPublicKey) -> QueueRec -> Cmd
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QueueRec -> Maybe RecipientPublicKey
senderKey
Cmd SSender PING -> Cmd -> m Cmd
forall (m :: * -> *) a. Monad m => a -> m a
return Cmd
cmd
where
verifyCmd :: SParty p -> (QueueRec -> Cmd) -> m Cmd
verifyCmd :: SParty p -> (QueueRec -> Cmd) -> m Cmd
verifyCmd party :: SParty p
party f :: QueueRec -> Cmd
f = do
QueueStore
st <- (Env -> QueueStore) -> m QueueStore
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> QueueStore
queueStore
Either ErrorType QueueRec
q <- STM (Either ErrorType QueueRec) -> m (Either ErrorType QueueRec)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (Either ErrorType QueueRec) -> m (Either ErrorType QueueRec))
-> STM (Either ErrorType QueueRec) -> m (Either ErrorType QueueRec)
forall a b. (a -> b) -> a -> b
$ QueueStore
-> SParty p -> RecipientId -> STM (Either ErrorType QueueRec)
forall s (m :: * -> *) (a :: Party).
MonadQueueStore s m =>
s -> SParty a -> RecipientId -> m (Either ErrorType QueueRec)
getQueue QueueStore
st SParty p
party RecipientId
queueId
Cmd -> m Cmd
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Cmd -> m Cmd) -> Cmd -> m Cmd
forall a b. (a -> b) -> a -> b
$ (ErrorType -> Cmd)
-> (QueueRec -> Cmd) -> Either ErrorType QueueRec -> Cmd
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (Cmd -> ErrorType -> Cmd
forall a b. a -> b -> a
const (Cmd -> ErrorType -> Cmd) -> Cmd -> ErrorType -> Cmd
forall a b. (a -> b) -> a -> b
$ Cmd -> Cmd
forall a. a -> a
dummyVerify Cmd
authErr) QueueRec -> Cmd
f Either ErrorType QueueRec
q
verifySend :: C.Signature -> Maybe SenderPublicKey -> Cmd
verifySend :: Signature -> Maybe RecipientPublicKey -> Cmd
verifySend "" = Cmd
-> (RecipientPublicKey -> Cmd) -> Maybe RecipientPublicKey -> Cmd
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Cmd
cmd (Cmd -> RecipientPublicKey -> Cmd
forall a b. a -> b -> a
const Cmd
authErr)
verifySend _ = Cmd
-> (RecipientPublicKey -> Cmd) -> Maybe RecipientPublicKey -> Cmd
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Cmd
authErr RecipientPublicKey -> Cmd
verifySignature
verifySignature :: C.PublicKey -> Cmd
verifySignature :: RecipientPublicKey -> Cmd
verifySignature key :: RecipientPublicKey
key = if RecipientPublicKey -> Bool
verify RecipientPublicKey
key then Cmd
cmd else Cmd
authErr
verify :: RecipientPublicKey -> Bool
verify key :: RecipientPublicKey
key
| RecipientPublicKey -> Int
C.publicKeySize RecipientPublicKey
key Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
sigLen = RecipientPublicKey -> Bool
cryptoVerify RecipientPublicKey
key
| Bool
otherwise = Bool -> Bool
forall a. a -> a
dummyVerify Bool
False
cryptoVerify :: RecipientPublicKey -> Bool
cryptoVerify key :: RecipientPublicKey
key = RecipientPublicKey -> Signature -> RecipientId -> Bool
C.verify RecipientPublicKey
key Signature
sig (Transmission -> RecipientId
serializeTransmission Transmission
t)
smpErr :: ErrorType -> Cmd
smpErr = SParty 'Broker -> Command 'Broker -> Cmd
forall (a :: Party). SParty a -> Command a -> Cmd
Cmd SParty 'Broker
SBroker (Command 'Broker -> Cmd)
-> (ErrorType -> Command 'Broker) -> ErrorType -> Cmd
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ErrorType -> Command 'Broker
ERR
authErr :: Cmd
authErr = ErrorType -> Cmd
smpErr ErrorType
AUTH
dummyVerify :: a -> a
dummyVerify :: a -> a
dummyVerify = Bool -> a -> a
forall a b. a -> b -> b
seq (Bool -> a -> a) -> Bool -> a -> a
forall a b. (a -> b) -> a -> b
$
RecipientPublicKey -> Bool
cryptoVerify (RecipientPublicKey -> Bool) -> RecipientPublicKey -> Bool
forall a b. (a -> b) -> a -> b
$ case Int
sigLen of
128 -> RecipientPublicKey
dummyKey128
256 -> RecipientPublicKey
dummyKey256
512 -> RecipientPublicKey
dummyKey512
_ -> RecipientPublicKey
dummyKey256
sigLen :: Int
sigLen = RecipientId -> Int
B.length (RecipientId -> Int) -> RecipientId -> Int
forall a b. (a -> b) -> a -> b
$ Signature -> RecipientId
C.unSignature Signature
sig
dummyKey128 :: C.PublicKey
dummyKey128 :: RecipientPublicKey
dummyKey128 = "MIIBIDANBgkqhkiG9w0BAQEFAAOCAQ0AMIIBCAKBgQC2oeA7s4roXN5K2N6022I1/2CTeMKjWH0m00bSZWa4N8LDKeFcShh8YUxZea5giAveViTRNOOVLgcuXbKvR3u24szN04xP0+KnYUuUUIIoT3YSjX0IlomhDhhSyup4BmA0gAZ+D1OaIKZFX6J8yQ1Lr/JGLEfSRsBjw8l+4hs9OwKBgQDKA+YlZvGb3BcpDwKmatiCXN7ZRDWkjXbj8VAW5zV95tSRCCVN48hrFM1H4Ju2QMMUc6kPUVX+eW4ZjdCl5blIqIHMcTmsdcmsDDCg3PjUNrwc6bv/1TcirbAKcmnKt9iurIt6eerxSO7TZUXXMUVsi7eRwb/RUNhpCrpJ/hpIOw=="
dummyKey256 :: C.PublicKey
dummyKey256 :: RecipientPublicKey
dummyKey256 = "MIIBoDANBgkqhkiG9w0BAQEFAAOCAY0AMIIBiAKCAQEAxwmTvaqmdTbkfUGNi8Yu0L/T4cxuOlQlx3zGZ9X9Qx0+oZjknWK+QHrdWTcpS+zH4Hi7fP6kanOQoQ90Hj6Ghl57VU1GEdUPywSw4i1/7t0Wv9uT9Q2ktHp2rqVo3xkC9IVIpL7EZAxdRviIN2OsOB3g4a/F1ZpjxcAaZeOMUugiAX1+GtkLuE0Xn4neYjCaOghLxQTdhybN70VtnkiQLx/X9NjkDIl/spYGm3tQFMyYKkP6IWoEpj0926hJ0fmlmhy8tAOhlZsb/baW5cgkEZ3E9jVVrySCgQzoLQgma610FIISRpRJbSyv26jU7MkMxiyuBiDaFOORkXFttoKbtQKBgEbDS9II2brsz+vfI7uP8atFcawkE52cx4M1UWQhqb1H3tBiRl+qO+dMq1pPQF2bW7dlZAWYzS4W/367bTAuALHBDGB8xi1P4Njhh9vaOgTvuqrHG9NJQ85BLy0qGw8rjIWSIXVmVpfrXFJ8po5l04UE258Ll2yocv3QRQmddQW9"
dummyKey512 :: C.PublicKey
dummyKey512 :: RecipientPublicKey
dummyKey512 = "MIICoDANBgkqhkiG9w0BAQEFAAOCAo0AMIICiAKCAgEArkCY9DuverJ4mmzDektv9aZMFyeRV46WZK9NsOBKEc+1ncqMs+LhLti9asKNgUBRbNzmbOe0NYYftrUpwnATaenggkTFxxbJ4JGJuGYbsEdFWkXSvrbWGtM8YUmn5RkAGme12xQ89bSM4VoJAGnrYPHwmcQd+KYCPZvTUsxaxgrJTX65ejHN9BsAn8XtGViOtHTDJO9yUMD2WrJvd7wnNa+0ugEteDLzMU++xS98VC+uA1vfauUqi3yXVchdfrLdVUuM+JE0gUEXCgzjuHkaoHiaGNiGhdPYoAJJdOKQOIHAKdk7Th6OPhirPhc9XYNB4O8JDthKhNtfokvFIFlC4QBRzJhpLIENaEBDt08WmgpOnecZB/CuxkqqOrNa8j5K5jNrtXAI67W46VEC2jeQy/gZwb64Zit2A4D00xXzGbQTPGj4ehcEMhLx5LSCygViEf0w0tN3c3TEyUcgPzvECd2ZVpQLr9Z4a07Ebr+YSuxcHhjg4Rg1VyJyOTTvaCBGm5X2B3+tI4NUttmikIHOYpBnsLmHY2BgfH2KcrIsDyAhInXmTFr/L2+erFarUnlfATd2L8Ti43TNHDedO6k6jI5Gyi62yPwjqPLEIIK8l+pIeNfHJ3pPmjhHBfzFcQLMMMXffHWNK8kWklrQXK+4j4HiPcTBvlO1FEtG9nEIZhUCgYA4a6WtI2k5YNli1C89GY5rGUY7RP71T6RWri/D3Lz9T7GvU+FemAyYmsvCQwqijUOur0uLvwSP8VdxpSUcrjJJSWur2hrPWzWlu0XbNaeizxpFeKbQP+zSrWJ1z8RwfAeUjShxt8q1TuqGqY10wQyp3nyiTGvS+KwZVj5h5qx8NQ=="
client :: forall m. (MonadUnliftIO m, MonadReader Env m) => Client -> Server -> m ()
client :: Client -> Server -> m ()
client clnt :: Client
clnt@Client {TVar (Map RecipientId Sub)
subscriptions :: TVar (Map RecipientId Sub)
$sel:subscriptions:Client :: Client -> TVar (Map RecipientId Sub)
subscriptions, TBQueue Transmission
rcvQ :: TBQueue Transmission
$sel:rcvQ:Client :: Client -> TBQueue Transmission
rcvQ, TBQueue Transmission
sndQ :: TBQueue Transmission
$sel:sndQ:Client :: Client -> TBQueue Transmission
sndQ} Server {TBQueue (RecipientId, Client)
subscribedQ :: TBQueue (RecipientId, Client)
$sel:subscribedQ:Server :: Server -> TBQueue (RecipientId, Client)
subscribedQ} =
m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
STM Transmission -> m Transmission
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TBQueue Transmission -> STM Transmission
forall a. TBQueue a -> STM a
readTBQueue TBQueue Transmission
rcvQ)
m Transmission
-> (Transmission -> m Transmission) -> m Transmission
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Transmission -> m Transmission
processCommand
m Transmission -> (Transmission -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ())
-> (Transmission -> STM ()) -> Transmission -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TBQueue Transmission -> Transmission -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue Transmission
sndQ
where
processCommand :: Transmission -> m Transmission
processCommand :: Transmission -> m Transmission
processCommand (corrId :: CorrId
corrId, queueId :: RecipientId
queueId, cmd :: Cmd
cmd) = do
QueueStore
st <- (Env -> QueueStore) -> m QueueStore
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> QueueStore
queueStore
case Cmd
cmd of
Cmd SBroker END -> m ()
unsubscribeQueue m () -> Transmission -> m Transmission
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> (CorrId
corrId, RecipientId
queueId, Cmd
cmd)
Cmd SBroker _ -> Transmission -> m Transmission
forall (m :: * -> *) a. Monad m => a -> m a
return (CorrId
corrId, RecipientId
queueId, Cmd
cmd)
Cmd SSender command :: Command a
command -> case Command a
command of
SEND msgBody :: RecipientId
msgBody -> QueueStore -> RecipientId -> m Transmission
sendMessage QueueStore
st RecipientId
msgBody
PING -> Transmission -> m Transmission
forall (m :: * -> *) a. Monad m => a -> m a
return (CorrId
corrId, RecipientId
queueId, SParty 'Broker -> Command 'Broker -> Cmd
forall (a :: Party). SParty a -> Command a -> Cmd
Cmd SParty 'Broker
SBroker Command 'Broker
PONG)
Cmd SRecipient command :: Command a
command -> case Command a
command of
NEW rKey :: RecipientPublicKey
rKey -> QueueStore -> RecipientPublicKey -> m Transmission
createQueue QueueStore
st RecipientPublicKey
rKey
SUB -> RecipientId -> m Transmission
subscribeQueue RecipientId
queueId
ACK -> m Transmission
acknowledgeMsg
KEY sKey :: RecipientPublicKey
sKey -> QueueStore -> RecipientPublicKey -> m Transmission
secureQueue_ QueueStore
st RecipientPublicKey
sKey
OFF -> QueueStore -> m Transmission
suspendQueue_ QueueStore
st
DEL -> QueueStore -> m Transmission
delQueueAndMsgs QueueStore
st
where
createQueue :: QueueStore -> RecipientPublicKey -> m Transmission
createQueue :: QueueStore -> RecipientPublicKey -> m Transmission
createQueue st :: QueueStore
st rKey :: RecipientPublicKey
rKey =
RecipientPublicKey -> m (Command 'Broker) -> m Transmission
forall (m' :: * -> *).
Monad m' =>
RecipientPublicKey -> m' (Command 'Broker) -> m' Transmission
checkKeySize RecipientPublicKey
rKey m (Command 'Broker)
addSubscribe
where
addSubscribe :: m (Command 'Broker)
addSubscribe =
Int -> m (Either ErrorType (RecipientId, RecipientId))
addQueueRetry 3 m (Either ErrorType (RecipientId, RecipientId))
-> (Either ErrorType (RecipientId, RecipientId)
-> m (Command 'Broker))
-> m (Command 'Broker)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left e :: ErrorType
e -> Command 'Broker -> m (Command 'Broker)
forall (m :: * -> *) a. Monad m => a -> m a
return (Command 'Broker -> m (Command 'Broker))
-> Command 'Broker -> m (Command 'Broker)
forall a b. (a -> b) -> a -> b
$ ErrorType -> Command 'Broker
ERR ErrorType
e
Right (rId :: RecipientId
rId, sId :: RecipientId
sId) -> do
(StoreLog 'WriteMode -> IO ()) -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadReader Env m) =>
(StoreLog 'WriteMode -> IO a) -> m ()
withLog (StoreLog 'WriteMode -> RecipientId -> IO ()
`logCreateById` RecipientId
rId)
RecipientId -> m Transmission
subscribeQueue RecipientId
rId m Transmission -> Command 'Broker -> m (Command 'Broker)
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> RecipientId -> RecipientId -> Command 'Broker
IDS RecipientId
rId RecipientId
sId
addQueueRetry :: Int -> m (Either ErrorType (RecipientId, SenderId))
addQueueRetry :: Int -> m (Either ErrorType (RecipientId, RecipientId))
addQueueRetry 0 = Either ErrorType (RecipientId, RecipientId)
-> m (Either ErrorType (RecipientId, RecipientId))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either ErrorType (RecipientId, RecipientId)
-> m (Either ErrorType (RecipientId, RecipientId)))
-> Either ErrorType (RecipientId, RecipientId)
-> m (Either ErrorType (RecipientId, RecipientId))
forall a b. (a -> b) -> a -> b
$ ErrorType -> Either ErrorType (RecipientId, RecipientId)
forall a b. a -> Either a b
Left ErrorType
INTERNAL
addQueueRetry n :: Int
n = do
(RecipientId, RecipientId)
ids <- m (RecipientId, RecipientId)
getIds
STM (Either ErrorType ()) -> m (Either ErrorType ())
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (QueueStore
-> RecipientPublicKey
-> (RecipientId, RecipientId)
-> STM (Either ErrorType ())
forall s (m :: * -> *).
MonadQueueStore s m =>
s
-> RecipientPublicKey
-> (RecipientId, RecipientId)
-> m (Either ErrorType ())
addQueue QueueStore
st RecipientPublicKey
rKey (RecipientId, RecipientId)
ids) m (Either ErrorType ())
-> (Either ErrorType ()
-> m (Either ErrorType (RecipientId, RecipientId)))
-> m (Either ErrorType (RecipientId, RecipientId))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left DUPLICATE_ -> Int -> m (Either ErrorType (RecipientId, RecipientId))
addQueueRetry (Int -> m (Either ErrorType (RecipientId, RecipientId)))
-> Int -> m (Either ErrorType (RecipientId, RecipientId))
forall a b. (a -> b) -> a -> b
$ Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- 1
Left e :: ErrorType
e -> Either ErrorType (RecipientId, RecipientId)
-> m (Either ErrorType (RecipientId, RecipientId))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either ErrorType (RecipientId, RecipientId)
-> m (Either ErrorType (RecipientId, RecipientId)))
-> Either ErrorType (RecipientId, RecipientId)
-> m (Either ErrorType (RecipientId, RecipientId))
forall a b. (a -> b) -> a -> b
$ ErrorType -> Either ErrorType (RecipientId, RecipientId)
forall a b. a -> Either a b
Left ErrorType
e
Right _ -> Either ErrorType (RecipientId, RecipientId)
-> m (Either ErrorType (RecipientId, RecipientId))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either ErrorType (RecipientId, RecipientId)
-> m (Either ErrorType (RecipientId, RecipientId)))
-> Either ErrorType (RecipientId, RecipientId)
-> m (Either ErrorType (RecipientId, RecipientId))
forall a b. (a -> b) -> a -> b
$ (RecipientId, RecipientId)
-> Either ErrorType (RecipientId, RecipientId)
forall a b. b -> Either a b
Right (RecipientId, RecipientId)
ids
logCreateById :: StoreLog 'WriteMode -> RecipientId -> IO ()
logCreateById :: StoreLog 'WriteMode -> RecipientId -> IO ()
logCreateById s :: StoreLog 'WriteMode
s rId :: RecipientId
rId =
STM (Either ErrorType QueueRec) -> IO (Either ErrorType QueueRec)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (QueueStore
-> SParty 'Recipient
-> RecipientId
-> STM (Either ErrorType QueueRec)
forall s (m :: * -> *) (a :: Party).
MonadQueueStore s m =>
s -> SParty a -> RecipientId -> m (Either ErrorType QueueRec)
getQueue QueueStore
st SParty 'Recipient
SRecipient RecipientId
rId) IO (Either ErrorType QueueRec)
-> (Either ErrorType QueueRec -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Right q :: QueueRec
q -> StoreLog 'WriteMode -> QueueRec -> IO ()
logCreateQueue StoreLog 'WriteMode
s QueueRec
q
_ -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
getIds :: m (RecipientId, SenderId)
getIds :: m (RecipientId, RecipientId)
getIds = do
Int
n <- (Env -> Int) -> m Int
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> Int) -> m Int) -> (Env -> Int) -> m Int
forall a b. (a -> b) -> a -> b
$ ServerConfig -> Int
queueIdBytes (ServerConfig -> Int) -> (Env -> ServerConfig) -> Env -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> ServerConfig
config
(RecipientId -> RecipientId -> (RecipientId, RecipientId))
-> m RecipientId -> m RecipientId -> m (RecipientId, RecipientId)
forall (m :: * -> *) a1 a2 r.
Monad m =>
(a1 -> a2 -> r) -> m a1 -> m a2 -> m r
liftM2 (,) (Int -> m RecipientId
forall (m :: * -> *).
(MonadUnliftIO m, MonadReader Env m) =>
Int -> m RecipientId
randomId Int
n) (Int -> m RecipientId
forall (m :: * -> *).
(MonadUnliftIO m, MonadReader Env m) =>
Int -> m RecipientId
randomId Int
n)
secureQueue_ :: QueueStore -> SenderPublicKey -> m Transmission
secureQueue_ :: QueueStore -> RecipientPublicKey -> m Transmission
secureQueue_ st :: QueueStore
st sKey :: RecipientPublicKey
sKey = do
(StoreLog 'WriteMode -> IO ()) -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadReader Env m) =>
(StoreLog 'WriteMode -> IO a) -> m ()
withLog ((StoreLog 'WriteMode -> IO ()) -> m ())
-> (StoreLog 'WriteMode -> IO ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \s :: StoreLog 'WriteMode
s -> StoreLog 'WriteMode -> RecipientId -> RecipientPublicKey -> IO ()
logSecureQueue StoreLog 'WriteMode
s RecipientId
queueId RecipientPublicKey
sKey
STM Transmission -> m Transmission
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Transmission -> m Transmission)
-> (STM (Command 'Broker) -> STM Transmission)
-> STM (Command 'Broker)
-> m Transmission
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RecipientPublicKey -> STM (Command 'Broker) -> STM Transmission
forall (m' :: * -> *).
Monad m' =>
RecipientPublicKey -> m' (Command 'Broker) -> m' Transmission
checkKeySize RecipientPublicKey
sKey (STM (Command 'Broker) -> m Transmission)
-> STM (Command 'Broker) -> m Transmission
forall a b. (a -> b) -> a -> b
$ (ErrorType -> Command 'Broker)
-> (() -> Command 'Broker)
-> Either ErrorType ()
-> Command 'Broker
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either ErrorType -> Command 'Broker
ERR (Command 'Broker -> () -> Command 'Broker
forall a b. a -> b -> a
const Command 'Broker
OK) (Either ErrorType () -> Command 'Broker)
-> STM (Either ErrorType ()) -> STM (Command 'Broker)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> QueueStore
-> RecipientId -> RecipientPublicKey -> STM (Either ErrorType ())
forall s (m :: * -> *).
MonadQueueStore s m =>
s -> RecipientId -> RecipientPublicKey -> m (Either ErrorType ())
secureQueue QueueStore
st RecipientId
queueId RecipientPublicKey
sKey
checkKeySize :: Monad m' => C.PublicKey -> m' (Command 'Broker) -> m' Transmission
checkKeySize :: RecipientPublicKey -> m' (Command 'Broker) -> m' Transmission
checkKeySize key :: RecipientPublicKey
key action :: m' (Command 'Broker)
action =
CorrId -> RecipientId -> Command 'Broker -> Transmission
mkResp CorrId
corrId RecipientId
queueId
(Command 'Broker -> Transmission)
-> m' (Command 'Broker) -> m' Transmission
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> if Int -> Bool
C.validKeySize (Int -> Bool) -> Int -> Bool
forall a b. (a -> b) -> a -> b
$ RecipientPublicKey -> Int
C.publicKeySize RecipientPublicKey
key
then m' (Command 'Broker)
action
else Command 'Broker -> m' (Command 'Broker)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Command 'Broker -> m' (Command 'Broker))
-> (ErrorType -> Command 'Broker)
-> ErrorType
-> m' (Command 'Broker)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ErrorType -> Command 'Broker
ERR (ErrorType -> m' (Command 'Broker))
-> ErrorType -> m' (Command 'Broker)
forall a b. (a -> b) -> a -> b
$ CommandError -> ErrorType
CMD CommandError
KEY_SIZE
suspendQueue_ :: QueueStore -> m Transmission
suspendQueue_ :: QueueStore -> m Transmission
suspendQueue_ st :: QueueStore
st = do
(StoreLog 'WriteMode -> IO ()) -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadReader Env m) =>
(StoreLog 'WriteMode -> IO a) -> m ()
withLog (StoreLog 'WriteMode -> RecipientId -> IO ()
`logDeleteQueue` RecipientId
queueId)
Either ErrorType () -> Transmission
okResp (Either ErrorType () -> Transmission)
-> m (Either ErrorType ()) -> m Transmission
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM (Either ErrorType ()) -> m (Either ErrorType ())
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (QueueStore -> RecipientId -> STM (Either ErrorType ())
forall s (m :: * -> *).
MonadQueueStore s m =>
s -> RecipientId -> m (Either ErrorType ())
suspendQueue QueueStore
st RecipientId
queueId)
subscribeQueue :: RecipientId -> m Transmission
subscribeQueue :: RecipientId -> m Transmission
subscribeQueue rId :: RecipientId
rId =
STM Sub -> m Sub
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (RecipientId -> STM Sub
getSubscription RecipientId
rId) m Sub -> (Sub -> m Transmission) -> m Transmission
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (MsgQueue -> STM (Maybe Message))
-> RecipientId -> Sub -> m Transmission
deliverMessage MsgQueue -> STM (Maybe Message)
forall q (m :: * -> *). MonadMsgQueue q m => q -> m (Maybe Message)
tryPeekMsg RecipientId
rId
getSubscription :: RecipientId -> STM Sub
getSubscription :: RecipientId -> STM Sub
getSubscription rId :: RecipientId
rId = do
Map RecipientId Sub
subs <- TVar (Map RecipientId Sub) -> STM (Map RecipientId Sub)
forall a. TVar a -> STM a
readTVar TVar (Map RecipientId Sub)
subscriptions
case RecipientId -> Map RecipientId Sub -> Maybe Sub
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup RecipientId
rId Map RecipientId Sub
subs of
Just s :: Sub
s -> TMVar () -> STM (Maybe ())
forall a. TMVar a -> STM (Maybe a)
tryTakeTMVar (Sub -> TMVar ()
delivered Sub
s) STM (Maybe ()) -> Sub -> STM Sub
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Sub
s
Nothing -> do
TBQueue (RecipientId, Client) -> (RecipientId, Client) -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (RecipientId, Client)
subscribedQ (RecipientId
rId, Client
clnt)
Sub
s <- STM Sub
newSubscription
TVar (Map RecipientId Sub) -> Map RecipientId Sub -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Map RecipientId Sub)
subscriptions (Map RecipientId Sub -> STM ()) -> Map RecipientId Sub -> STM ()
forall a b. (a -> b) -> a -> b
$ RecipientId -> Sub -> Map RecipientId Sub -> Map RecipientId Sub
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert RecipientId
rId Sub
s Map RecipientId Sub
subs
Sub -> STM Sub
forall (m :: * -> *) a. Monad m => a -> m a
return Sub
s
unsubscribeQueue :: m ()
unsubscribeQueue :: m ()
unsubscribeQueue = do
Maybe Sub
sub <- STM (Maybe Sub) -> m (Maybe Sub)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (Maybe Sub) -> m (Maybe Sub))
-> ((Map RecipientId Sub -> (Maybe Sub, Map RecipientId Sub))
-> STM (Maybe Sub))
-> (Map RecipientId Sub -> (Maybe Sub, Map RecipientId Sub))
-> m (Maybe Sub)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar (Map RecipientId Sub)
-> (Map RecipientId Sub -> (Maybe Sub, Map RecipientId Sub))
-> STM (Maybe Sub)
forall s a. TVar s -> (s -> (a, s)) -> STM a
stateTVar TVar (Map RecipientId Sub)
subscriptions ((Map RecipientId Sub -> (Maybe Sub, Map RecipientId Sub))
-> m (Maybe Sub))
-> (Map RecipientId Sub -> (Maybe Sub, Map RecipientId Sub))
-> m (Maybe Sub)
forall a b. (a -> b) -> a -> b
$
\cs :: Map RecipientId Sub
cs -> (RecipientId -> Map RecipientId Sub -> Maybe Sub
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup RecipientId
queueId Map RecipientId Sub
cs, RecipientId -> Map RecipientId Sub -> Map RecipientId Sub
forall k a. Ord k => k -> Map k a -> Map k a
M.delete RecipientId
queueId Map RecipientId Sub
cs)
(Sub -> m ()) -> Maybe Sub -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Sub -> m ()
forall (m :: * -> *). MonadUnliftIO m => Sub -> m ()
cancelSub Maybe Sub
sub
acknowledgeMsg :: m Transmission
acknowledgeMsg :: m Transmission
acknowledgeMsg =
STM (Maybe (Maybe Sub)) -> m (Maybe (Maybe Sub))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (RecipientId -> (Sub -> STM (Maybe Sub)) -> STM (Maybe (Maybe Sub))
forall a. RecipientId -> (Sub -> STM a) -> STM (Maybe a)
withSub RecipientId
queueId ((Sub -> STM (Maybe Sub)) -> STM (Maybe (Maybe Sub)))
-> (Sub -> STM (Maybe Sub)) -> STM (Maybe (Maybe Sub))
forall a b. (a -> b) -> a -> b
$ \s :: Sub
s -> Sub -> () -> Sub
forall a b. a -> b -> a
const Sub
s (() -> Sub) -> STM (Maybe ()) -> STM (Maybe Sub)
forall (f :: * -> *) (g :: * -> *) a b.
(Functor f, Functor g) =>
(a -> b) -> f (g a) -> f (g b)
<$$> TMVar () -> STM (Maybe ())
forall a. TMVar a -> STM (Maybe a)
tryTakeTMVar (Sub -> TMVar ()
delivered Sub
s))
m (Maybe (Maybe Sub))
-> (Maybe (Maybe Sub) -> m Transmission) -> m Transmission
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just (Just s :: Sub
s) -> (MsgQueue -> STM (Maybe Message))
-> RecipientId -> Sub -> m Transmission
deliverMessage MsgQueue -> STM (Maybe Message)
forall q (m :: * -> *). MonadMsgQueue q m => q -> m (Maybe Message)
tryDelPeekMsg RecipientId
queueId Sub
s
_ -> Transmission -> m Transmission
forall (m :: * -> *) a. Monad m => a -> m a
return (Transmission -> m Transmission) -> Transmission -> m Transmission
forall a b. (a -> b) -> a -> b
$ ErrorType -> Transmission
err ErrorType
NO_MSG
withSub :: RecipientId -> (Sub -> STM a) -> STM (Maybe a)
withSub :: RecipientId -> (Sub -> STM a) -> STM (Maybe a)
withSub rId :: RecipientId
rId f :: Sub -> STM a
f = TVar (Map RecipientId Sub) -> STM (Map RecipientId Sub)
forall a. TVar a -> STM a
readTVar TVar (Map RecipientId Sub)
subscriptions STM (Map RecipientId Sub)
-> (Map RecipientId Sub -> STM (Maybe a)) -> STM (Maybe a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Sub -> STM a) -> Maybe Sub -> STM (Maybe a)
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM Sub -> STM a
f (Maybe Sub -> STM (Maybe a))
-> (Map RecipientId Sub -> Maybe Sub)
-> Map RecipientId Sub
-> STM (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RecipientId -> Map RecipientId Sub -> Maybe Sub
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup RecipientId
rId
sendMessage :: QueueStore -> MsgBody -> m Transmission
sendMessage :: QueueStore -> RecipientId -> m Transmission
sendMessage st :: QueueStore
st msgBody :: RecipientId
msgBody = do
Either ErrorType QueueRec
qr <- STM (Either ErrorType QueueRec) -> m (Either ErrorType QueueRec)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (Either ErrorType QueueRec) -> m (Either ErrorType QueueRec))
-> STM (Either ErrorType QueueRec) -> m (Either ErrorType QueueRec)
forall a b. (a -> b) -> a -> b
$ QueueStore
-> SParty 'Sender -> RecipientId -> STM (Either ErrorType QueueRec)
forall s (m :: * -> *) (a :: Party).
MonadQueueStore s m =>
s -> SParty a -> RecipientId -> m (Either ErrorType QueueRec)
getQueue QueueStore
st SParty 'Sender
SSender RecipientId
queueId
(ErrorType -> m Transmission)
-> (QueueRec -> m Transmission)
-> Either ErrorType QueueRec
-> m Transmission
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (Transmission -> m Transmission
forall (m :: * -> *) a. Monad m => a -> m a
return (Transmission -> m Transmission)
-> (ErrorType -> Transmission) -> ErrorType -> m Transmission
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ErrorType -> Transmission
err) QueueRec -> m Transmission
storeMessage Either ErrorType QueueRec
qr
where
mkMessage :: m Message
mkMessage :: m Message
mkMessage = do
RecipientId
msgId <- (Env -> Int) -> m Int
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks (ServerConfig -> Int
msgIdBytes (ServerConfig -> Int) -> (Env -> ServerConfig) -> Env -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> ServerConfig
config) m Int -> (Int -> m RecipientId) -> m RecipientId
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Int -> m RecipientId
forall (m :: * -> *).
(MonadUnliftIO m, MonadReader Env m) =>
Int -> m RecipientId
randomId
UTCTime
ts <- IO UTCTime -> m UTCTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
Message -> m Message
forall (m :: * -> *) a. Monad m => a -> m a
return (Message -> m Message) -> Message -> m Message
forall a b. (a -> b) -> a -> b
$ Message :: RecipientId -> UTCTime -> RecipientId -> Message
Message {RecipientId
msgId :: RecipientId
msgId :: RecipientId
msgId, UTCTime
ts :: UTCTime
ts :: UTCTime
ts, RecipientId
msgBody :: RecipientId
msgBody :: RecipientId
msgBody}
storeMessage :: QueueRec -> m Transmission
storeMessage :: QueueRec -> m Transmission
storeMessage qr :: QueueRec
qr = case QueueRec -> QueueStatus
status QueueRec
qr of
QueueOff -> Transmission -> m Transmission
forall (m :: * -> *) a. Monad m => a -> m a
return (Transmission -> m Transmission) -> Transmission -> m Transmission
forall a b. (a -> b) -> a -> b
$ ErrorType -> Transmission
err ErrorType
AUTH
QueueActive -> do
STMMsgStore
ms <- (Env -> STMMsgStore) -> m STMMsgStore
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> STMMsgStore
msgStore
Message
msg <- m Message
mkMessage
STM Transmission -> m Transmission
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Transmission -> m Transmission)
-> STM Transmission -> m Transmission
forall a b. (a -> b) -> a -> b
$ do
MsgQueue
q <- STMMsgStore -> RecipientId -> STM MsgQueue
forall s q (m :: * -> *).
MonadMsgStore s q m =>
s -> RecipientId -> m q
getMsgQueue STMMsgStore
ms (QueueRec -> RecipientId
recipientId QueueRec
qr)
MsgQueue -> Message -> STM ()
forall q (m :: * -> *). MonadMsgQueue q m => q -> Message -> m ()
writeMsg MsgQueue
q Message
msg
Transmission -> STM Transmission
forall (m :: * -> *) a. Monad m => a -> m a
return Transmission
ok
deliverMessage :: (MsgQueue -> STM (Maybe Message)) -> RecipientId -> Sub -> m Transmission
deliverMessage :: (MsgQueue -> STM (Maybe Message))
-> RecipientId -> Sub -> m Transmission
deliverMessage tryPeek :: MsgQueue -> STM (Maybe Message)
tryPeek rId :: RecipientId
rId = \case
Sub {$sel:subThread:Sub :: Sub -> SubscriptionThread
subThread = SubscriptionThread
NoSub} -> do
STMMsgStore
ms <- (Env -> STMMsgStore) -> m STMMsgStore
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> STMMsgStore
msgStore
MsgQueue
q <- STM MsgQueue -> m MsgQueue
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM MsgQueue -> m MsgQueue) -> STM MsgQueue -> m MsgQueue
forall a b. (a -> b) -> a -> b
$ STMMsgStore -> RecipientId -> STM MsgQueue
forall s q (m :: * -> *).
MonadMsgStore s q m =>
s -> RecipientId -> m q
getMsgQueue STMMsgStore
ms RecipientId
rId
STM (Maybe Message) -> m (Maybe Message)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (MsgQueue -> STM (Maybe Message)
tryPeek MsgQueue
q) m (Maybe Message)
-> (Maybe Message -> m Transmission) -> m Transmission
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Nothing -> MsgQueue -> m ()
forkSub MsgQueue
q m () -> Transmission -> m Transmission
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Transmission
ok
Just msg :: Message
msg -> STM (Maybe Bool) -> m (Maybe Bool)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically STM (Maybe Bool)
setDelivered m (Maybe Bool) -> Transmission -> m Transmission
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> CorrId -> RecipientId -> Command 'Broker -> Transmission
mkResp CorrId
corrId RecipientId
rId (Message -> Command 'Broker
msgCmd Message
msg)
_ -> Transmission -> m Transmission
forall (m :: * -> *) a. Monad m => a -> m a
return Transmission
ok
where
forkSub :: MsgQueue -> m ()
forkSub :: MsgQueue -> m ()
forkSub q :: MsgQueue
q = do
STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ())
-> ((Sub -> Sub) -> STM ()) -> (Sub -> Sub) -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Sub -> Sub) -> STM ()
setSub ((Sub -> Sub) -> m ()) -> (Sub -> Sub) -> m ()
forall a b. (a -> b) -> a -> b
$ \s :: Sub
s -> Sub
s {$sel:subThread:Sub :: SubscriptionThread
subThread = SubscriptionThread
SubPending}
ThreadId
t <- m () -> m ThreadId
forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO (m () -> m ThreadId) -> m () -> m ThreadId
forall a b. (a -> b) -> a -> b
$ MsgQueue -> m ()
subscriber MsgQueue
q
STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ())
-> ((Sub -> Sub) -> STM ()) -> (Sub -> Sub) -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Sub -> Sub) -> STM ()
setSub ((Sub -> Sub) -> m ()) -> (Sub -> Sub) -> m ()
forall a b. (a -> b) -> a -> b
$ \case
s :: Sub
s@Sub {$sel:subThread:Sub :: Sub -> SubscriptionThread
subThread = SubscriptionThread
SubPending} -> Sub
s {$sel:subThread:Sub :: SubscriptionThread
subThread = ThreadId -> SubscriptionThread
SubThread ThreadId
t}
s :: Sub
s -> Sub
s
subscriber :: MsgQueue -> m ()
subscriber :: MsgQueue -> m ()
subscriber q :: MsgQueue
q = STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Message
msg <- MsgQueue -> STM Message
forall q (m :: * -> *). MonadMsgQueue q m => q -> m Message
peekMsg MsgQueue
q
TBQueue Transmission -> Transmission -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue Transmission
sndQ (Transmission -> STM ()) -> Transmission -> STM ()
forall a b. (a -> b) -> a -> b
$ CorrId -> RecipientId -> Command 'Broker -> Transmission
mkResp (RecipientId -> CorrId
CorrId RecipientId
B.empty) RecipientId
rId (Message -> Command 'Broker
msgCmd Message
msg)
(Sub -> Sub) -> STM ()
setSub (\s :: Sub
s -> Sub
s {$sel:subThread:Sub :: SubscriptionThread
subThread = SubscriptionThread
NoSub})
STM (Maybe Bool) -> STM ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void STM (Maybe Bool)
setDelivered
setSub :: (Sub -> Sub) -> STM ()
setSub :: (Sub -> Sub) -> STM ()
setSub f :: Sub -> Sub
f = TVar (Map RecipientId Sub)
-> (Map RecipientId Sub -> Map RecipientId Sub) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar (Map RecipientId Sub)
subscriptions ((Map RecipientId Sub -> Map RecipientId Sub) -> STM ())
-> (Map RecipientId Sub -> Map RecipientId Sub) -> STM ()
forall a b. (a -> b) -> a -> b
$ (Sub -> Sub)
-> RecipientId -> Map RecipientId Sub -> Map RecipientId Sub
forall k a. Ord k => (a -> a) -> k -> Map k a -> Map k a
M.adjust Sub -> Sub
f RecipientId
rId
setDelivered :: STM (Maybe Bool)
setDelivered :: STM (Maybe Bool)
setDelivered = RecipientId -> (Sub -> STM Bool) -> STM (Maybe Bool)
forall a. RecipientId -> (Sub -> STM a) -> STM (Maybe a)
withSub RecipientId
rId ((Sub -> STM Bool) -> STM (Maybe Bool))
-> (Sub -> STM Bool) -> STM (Maybe Bool)
forall a b. (a -> b) -> a -> b
$ \s :: Sub
s -> TMVar () -> () -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar (Sub -> TMVar ()
delivered Sub
s) ()
delQueueAndMsgs :: QueueStore -> m Transmission
delQueueAndMsgs :: QueueStore -> m Transmission
delQueueAndMsgs st :: QueueStore
st = do
(StoreLog 'WriteMode -> IO ()) -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadReader Env m) =>
(StoreLog 'WriteMode -> IO a) -> m ()
withLog (StoreLog 'WriteMode -> RecipientId -> IO ()
`logDeleteQueue` RecipientId
queueId)
STMMsgStore
ms <- (Env -> STMMsgStore) -> m STMMsgStore
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> STMMsgStore
msgStore
STM Transmission -> m Transmission
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Transmission -> m Transmission)
-> STM Transmission -> m Transmission
forall a b. (a -> b) -> a -> b
$
QueueStore -> RecipientId -> STM (Either ErrorType ())
forall s (m :: * -> *).
MonadQueueStore s m =>
s -> RecipientId -> m (Either ErrorType ())
deleteQueue QueueStore
st RecipientId
queueId STM (Either ErrorType ())
-> (Either ErrorType () -> STM Transmission) -> STM Transmission
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left e :: ErrorType
e -> Transmission -> STM Transmission
forall (m :: * -> *) a. Monad m => a -> m a
return (Transmission -> STM Transmission)
-> Transmission -> STM Transmission
forall a b. (a -> b) -> a -> b
$ ErrorType -> Transmission
err ErrorType
e
Right _ -> STMMsgStore -> RecipientId -> STM ()
forall s q (m :: * -> *).
MonadMsgStore s q m =>
s -> RecipientId -> m ()
delMsgQueue STMMsgStore
ms RecipientId
queueId STM () -> Transmission -> STM Transmission
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Transmission
ok
ok :: Transmission
ok :: Transmission
ok = CorrId -> RecipientId -> Command 'Broker -> Transmission
mkResp CorrId
corrId RecipientId
queueId Command 'Broker
OK
err :: ErrorType -> Transmission
err :: ErrorType -> Transmission
err = CorrId -> RecipientId -> Command 'Broker -> Transmission
mkResp CorrId
corrId RecipientId
queueId (Command 'Broker -> Transmission)
-> (ErrorType -> Command 'Broker) -> ErrorType -> Transmission
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ErrorType -> Command 'Broker
ERR
okResp :: Either ErrorType () -> Transmission
okResp :: Either ErrorType () -> Transmission
okResp = (ErrorType -> Transmission)
-> (() -> Transmission) -> Either ErrorType () -> Transmission
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either ErrorType -> Transmission
err ((() -> Transmission) -> Either ErrorType () -> Transmission)
-> (() -> Transmission) -> Either ErrorType () -> Transmission
forall a b. (a -> b) -> a -> b
$ Transmission -> () -> Transmission
forall a b. a -> b -> a
const Transmission
ok
msgCmd :: Message -> Command 'Broker
msgCmd :: Message -> Command 'Broker
msgCmd Message {RecipientId
msgId :: RecipientId
msgId :: Message -> RecipientId
msgId, UTCTime
ts :: UTCTime
ts :: Message -> UTCTime
ts, RecipientId
msgBody :: RecipientId
msgBody :: Message -> RecipientId
msgBody} = RecipientId -> UTCTime -> RecipientId -> Command 'Broker
MSG RecipientId
msgId UTCTime
ts RecipientId
msgBody
withLog :: (MonadUnliftIO m, MonadReader Env m) => (StoreLog 'WriteMode -> IO a) -> m ()
withLog :: (StoreLog 'WriteMode -> IO a) -> m ()
withLog action :: StoreLog 'WriteMode -> IO a
action = do
Env
env <- m Env
forall r (m :: * -> *). MonadReader r m => m r
ask
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ())
-> (Maybe (StoreLog 'WriteMode) -> IO ())
-> Maybe (StoreLog 'WriteMode)
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StoreLog 'WriteMode -> IO a)
-> Maybe (StoreLog 'WriteMode) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ StoreLog 'WriteMode -> IO a
action (Maybe (StoreLog 'WriteMode) -> m ())
-> Maybe (StoreLog 'WriteMode) -> m ()
forall a b. (a -> b) -> a -> b
$ Env -> Maybe (StoreLog 'WriteMode)
storeLog (Env
env :: Env)
randomId :: (MonadUnliftIO m, MonadReader Env m) => Int -> m Encoded
randomId :: Int -> m RecipientId
randomId n :: Int
n = do
TVar ChaChaDRG
gVar <- (Env -> TVar ChaChaDRG) -> m (TVar ChaChaDRG)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> TVar ChaChaDRG
idsDrg
STM RecipientId -> m RecipientId
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (Int -> TVar ChaChaDRG -> STM RecipientId
randomBytes Int
n TVar ChaChaDRG
gVar)
randomBytes :: Int -> TVar ChaChaDRG -> STM ByteString
randomBytes :: Int -> TVar ChaChaDRG -> STM RecipientId
randomBytes n :: Int
n gVar :: TVar ChaChaDRG
gVar = do
ChaChaDRG
g <- TVar ChaChaDRG -> STM ChaChaDRG
forall a. TVar a -> STM a
readTVar TVar ChaChaDRG
gVar
let (bytes :: RecipientId
bytes, g' :: ChaChaDRG
g') = Int -> ChaChaDRG -> (RecipientId, ChaChaDRG)
forall gen byteArray.
(DRG gen, ByteArray byteArray) =>
Int -> gen -> (byteArray, gen)
randomBytesGenerate Int
n ChaChaDRG
g
TVar ChaChaDRG -> ChaChaDRG -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar ChaChaDRG
gVar ChaChaDRG
g'
RecipientId -> STM RecipientId
forall (m :: * -> *) a. Monad m => a -> m a
return RecipientId
bytes