{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Simplex.Messaging.Server.Env.STM where
import Control.Concurrent (ThreadId)
import Control.Monad.IO.Unlift
import Crypto.Random
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Network.Socket (ServiceName)
import Numeric.Natural
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.MsgStore.STM
import Simplex.Messaging.Server.QueueStore (QueueRec (..))
import Simplex.Messaging.Server.QueueStore.STM
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.Transport (ATransport)
import System.IO (IOMode (..))
import UnliftIO.STM
data ServerConfig = ServerConfig
{ ServerConfig -> [(ServiceName, ATransport)]
transports :: [(ServiceName, ATransport)],
ServerConfig -> Natural
tbqSize :: Natural,
ServerConfig -> Int
queueIdBytes :: Int,
ServerConfig -> Int
msgIdBytes :: Int,
ServerConfig -> Maybe (StoreLog 'ReadMode)
storeLog :: Maybe (StoreLog 'ReadMode),
ServerConfig -> FullPrivateKey
serverPrivateKey :: C.FullPrivateKey
}
data Env = Env
{ Env -> ServerConfig
config :: ServerConfig,
Env -> Server
server :: Server,
Env -> QueueStore
queueStore :: QueueStore,
Env -> STMMsgStore
msgStore :: STMMsgStore,
Env -> TVar ChaChaDRG
idsDrg :: TVar ChaChaDRG,
Env -> FullKeyPair
serverKeyPair :: C.FullKeyPair,
Env -> Maybe (StoreLog 'WriteMode)
storeLog :: Maybe (StoreLog 'WriteMode)
}
data Server = Server
{ Server -> TBQueue (RecipientId, Client)
subscribedQ :: TBQueue (RecipientId, Client),
Server -> TVar (Map RecipientId Client)
subscribers :: TVar (Map RecipientId Client)
}
data Client = Client
{ Client -> TVar (Map RecipientId Sub)
subscriptions :: TVar (Map RecipientId Sub),
Client -> TBQueue Transmission
rcvQ :: TBQueue Transmission,
Client -> TBQueue Transmission
sndQ :: TBQueue Transmission
}
data SubscriptionThread = NoSub | SubPending | SubThread ThreadId
data Sub = Sub
{ Sub -> SubscriptionThread
subThread :: SubscriptionThread,
Sub -> TMVar ()
delivered :: TMVar ()
}
newServer :: Natural -> STM Server
newServer :: Natural -> STM Server
newServer Natural
qSize = do
TBQueue (RecipientId, Client)
subscribedQ <- Natural -> STM (TBQueue (RecipientId, Client))
forall a. Natural -> STM (TBQueue a)
newTBQueue Natural
qSize
TVar (Map RecipientId Client)
subscribers <- Map RecipientId Client -> STM (TVar (Map RecipientId Client))
forall a. a -> STM (TVar a)
newTVar Map RecipientId Client
forall k a. Map k a
M.empty
Server -> STM Server
forall (m :: * -> *) a. Monad m => a -> m a
return Server :: TBQueue (RecipientId, Client)
-> TVar (Map RecipientId Client) -> Server
Server {TBQueue (RecipientId, Client)
subscribedQ :: TBQueue (RecipientId, Client)
$sel:subscribedQ:Server :: TBQueue (RecipientId, Client)
subscribedQ, TVar (Map RecipientId Client)
subscribers :: TVar (Map RecipientId Client)
$sel:subscribers:Server :: TVar (Map RecipientId Client)
subscribers}
newClient :: Natural -> STM Client
newClient :: Natural -> STM Client
newClient Natural
qSize = do
TVar (Map RecipientId Sub)
subscriptions <- Map RecipientId Sub -> STM (TVar (Map RecipientId Sub))
forall a. a -> STM (TVar a)
newTVar Map RecipientId Sub
forall k a. Map k a
M.empty
TBQueue Transmission
rcvQ <- Natural -> STM (TBQueue Transmission)
forall a. Natural -> STM (TBQueue a)
newTBQueue Natural
qSize
TBQueue Transmission
sndQ <- Natural -> STM (TBQueue Transmission)
forall a. Natural -> STM (TBQueue a)
newTBQueue Natural
qSize
Client -> STM Client
forall (m :: * -> *) a. Monad m => a -> m a
return Client :: TVar (Map RecipientId Sub)
-> TBQueue Transmission -> TBQueue Transmission -> Client
Client {TVar (Map RecipientId Sub)
subscriptions :: TVar (Map RecipientId Sub)
$sel:subscriptions:Client :: TVar (Map RecipientId Sub)
subscriptions, TBQueue Transmission
rcvQ :: TBQueue Transmission
$sel:rcvQ:Client :: TBQueue Transmission
rcvQ, TBQueue Transmission
sndQ :: TBQueue Transmission
$sel:sndQ:Client :: TBQueue Transmission
sndQ}
newSubscription :: STM Sub
newSubscription :: STM Sub
newSubscription = do
TMVar ()
delivered <- STM (TMVar ())
forall a. STM (TMVar a)
newEmptyTMVar
Sub -> STM Sub
forall (m :: * -> *) a. Monad m => a -> m a
return Sub :: SubscriptionThread -> TMVar () -> Sub
Sub {$sel:subThread:Sub :: SubscriptionThread
subThread = SubscriptionThread
NoSub, TMVar ()
delivered :: TMVar ()
$sel:delivered:Sub :: TMVar ()
delivered}
newEnv :: forall m. (MonadUnliftIO m, MonadRandom m) => ServerConfig -> m Env
newEnv :: ServerConfig -> m Env
newEnv ServerConfig
config = do
Server
server <- STM Server -> m Server
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Server -> m Server) -> STM Server -> m Server
forall a b. (a -> b) -> a -> b
$ Natural -> STM Server
newServer (ServerConfig -> Natural
tbqSize ServerConfig
config)
QueueStore
queueStore <- STM QueueStore -> m QueueStore
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically STM QueueStore
newQueueStore
STMMsgStore
msgStore <- STM STMMsgStore -> m STMMsgStore
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically STM STMMsgStore
newMsgStore
TVar ChaChaDRG
idsDrg <- m ChaChaDRG
forall (randomly :: * -> *).
MonadRandom randomly =>
randomly ChaChaDRG
drgNew m ChaChaDRG
-> (ChaChaDRG -> m (TVar ChaChaDRG)) -> m (TVar ChaChaDRG)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ChaChaDRG -> m (TVar ChaChaDRG)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO
Maybe (StoreLog 'WriteMode)
s' <- QueueStore -> StoreLog 'ReadMode -> m (StoreLog 'WriteMode)
restoreQueues QueueStore
queueStore (StoreLog 'ReadMode -> m (StoreLog 'WriteMode))
-> Maybe (StoreLog 'ReadMode) -> m (Maybe (StoreLog 'WriteMode))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
`mapM` ServerConfig -> Maybe (StoreLog 'ReadMode)
storeLog (ServerConfig
config :: ServerConfig)
let pk :: FullPrivateKey
pk = ServerConfig -> FullPrivateKey
serverPrivateKey ServerConfig
config
serverKeyPair :: FullKeyPair
serverKeyPair = (FullPrivateKey -> PublicKey
C.publicKey FullPrivateKey
pk, FullPrivateKey
pk)
Env -> m Env
forall (m :: * -> *) a. Monad m => a -> m a
return Env :: ServerConfig
-> Server
-> QueueStore
-> STMMsgStore
-> TVar ChaChaDRG
-> FullKeyPair
-> Maybe (StoreLog 'WriteMode)
-> Env
Env {ServerConfig
config :: ServerConfig
$sel:config:Env :: ServerConfig
config, Server
server :: Server
$sel:server:Env :: Server
server, QueueStore
queueStore :: QueueStore
$sel:queueStore:Env :: QueueStore
queueStore, STMMsgStore
msgStore :: STMMsgStore
$sel:msgStore:Env :: STMMsgStore
msgStore, TVar ChaChaDRG
idsDrg :: TVar ChaChaDRG
$sel:idsDrg:Env :: TVar ChaChaDRG
idsDrg, FullKeyPair
serverKeyPair :: FullKeyPair
$sel:serverKeyPair:Env :: FullKeyPair
serverKeyPair, $sel:storeLog:Env :: Maybe (StoreLog 'WriteMode)
storeLog = Maybe (StoreLog 'WriteMode)
s'}
where
restoreQueues :: QueueStore -> StoreLog 'ReadMode -> m (StoreLog 'WriteMode)
restoreQueues :: QueueStore -> StoreLog 'ReadMode -> m (StoreLog 'WriteMode)
restoreQueues QueueStore
queueStore StoreLog 'ReadMode
s = do
(Map RecipientId QueueRec
queues, StoreLog 'WriteMode
s') <- IO (Map RecipientId QueueRec, StoreLog 'WriteMode)
-> m (Map RecipientId QueueRec, StoreLog 'WriteMode)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Map RecipientId QueueRec, StoreLog 'WriteMode)
-> m (Map RecipientId QueueRec, StoreLog 'WriteMode))
-> IO (Map RecipientId QueueRec, StoreLog 'WriteMode)
-> m (Map RecipientId QueueRec, StoreLog 'WriteMode)
forall a b. (a -> b) -> a -> b
$ StoreLog 'ReadMode
-> IO (Map RecipientId QueueRec, StoreLog 'WriteMode)
readWriteStoreLog StoreLog 'ReadMode
s
STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ QueueStore -> (QueueStoreData -> QueueStoreData) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar QueueStore
queueStore ((QueueStoreData -> QueueStoreData) -> STM ())
-> (QueueStoreData -> QueueStoreData) -> STM ()
forall a b. (a -> b) -> a -> b
$ \QueueStoreData
d -> QueueStoreData
d {Map RecipientId QueueRec
queues :: Map RecipientId QueueRec
queues :: Map RecipientId QueueRec
queues, senders :: Map RecipientId RecipientId
senders = (QueueRec
-> Map RecipientId RecipientId -> Map RecipientId RecipientId)
-> Map RecipientId RecipientId
-> Map RecipientId QueueRec
-> Map RecipientId RecipientId
forall a b k. (a -> b -> b) -> b -> Map k a -> b
M.foldr' QueueRec
-> Map RecipientId RecipientId -> Map RecipientId RecipientId
addSender Map RecipientId RecipientId
forall k a. Map k a
M.empty Map RecipientId QueueRec
queues}
StoreLog 'WriteMode -> m (StoreLog 'WriteMode)
forall (f :: * -> *) a. Applicative f => a -> f a
pure StoreLog 'WriteMode
s'
addSender :: QueueRec -> Map SenderId RecipientId -> Map SenderId RecipientId
addSender :: QueueRec
-> Map RecipientId RecipientId -> Map RecipientId RecipientId
addSender QueueRec
q = RecipientId
-> RecipientId
-> Map RecipientId RecipientId
-> Map RecipientId RecipientId
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert (QueueRec -> RecipientId
senderId QueueRec
q) (QueueRec -> RecipientId
recipientId QueueRec
q)