{-# LANGUAGE ConstraintKinds #-} {-# LANGUAGE DataKinds #-} {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE KindSignatures #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedLists #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TupleSections #-} -- | -- Module : Simplex.Messaging.Agent -- Copyright : (c) simplex.chat -- License : AGPL-3 -- -- Maintainer : chat@simplex.chat -- Stability : experimental -- Portability : non-portable -- -- This module defines SMP protocol agent with SQLite persistence. -- -- See https://github.com/simplex-chat/simplexmq/blob/master/protocol/agent-protocol.md module Simplex.Messaging.Agent ( -- * queue-based SMP agent getAgentClient, runAgentClient, -- * SMP agent functional API AgentClient (..), AgentMonad, AgentErrorMonad, getSMPAgentClient, disconnectAgentClient, -- used in tests withAgentLock, createConnection, joinConnection, allowConnection, acceptContact, rejectContact, subscribeConnection, sendMessage, ackMessage, suspendConnection, deleteConnection, setSMPServers, logConnection, ) where import Control.Concurrent.STM (stateTVar) import Control.Logger.Simple (logInfo, showText) import Control.Monad.Except import Control.Monad.IO.Unlift (MonadUnliftIO) import Control.Monad.Reader import Crypto.Random (MonadRandom) import Data.Bifunctor (first, second) import Data.ByteString.Char8 (ByteString) import Data.Composition ((.:), (.:.)) import Data.Functor (($>)) import Data.List.NonEmpty (NonEmpty (..)) import qualified Data.List.NonEmpty as L import qualified Data.Map.Strict as M import Data.Maybe (isJust) import qualified Data.Text as T import Data.Time.Clock import Data.Time.Clock.System (systemToUTCTime) import Database.SQLite.Simple (SQLError) import Simplex.Messaging.Agent.Client import Simplex.Messaging.Agent.Env.SQLite import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.RetryInterval import Simplex.Messaging.Agent.Store import Simplex.Messaging.Agent.Store.SQLite (SQLiteStore) import Simplex.Messaging.Client (SMPClient (..), SMPServerTransmission) import qualified Simplex.Messaging.Crypto as C import qualified Simplex.Messaging.Crypto.Ratchet as CR import Simplex.Messaging.Encoding import Simplex.Messaging.Parsers (parse) import Simplex.Messaging.Protocol (MsgBody) import qualified Simplex.Messaging.Protocol as SMP import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Util (bshow, liftError, tryError, unlessM) import Simplex.Messaging.Version import System.Random (randomR) import UnliftIO.Async (async, race_) import qualified UnliftIO.Exception as E import UnliftIO.STM -- | Creates an SMP agent client instance getSMPAgentClient :: (MonadRandom m, MonadUnliftIO m) => AgentConfig -> m AgentClient getSMPAgentClient cfg = newSMPAgentEnv cfg >>= runReaderT runAgent where runAgent = do c <- getAgentClient action <- async $ subscriber c `E.finally` disconnectAgentClient c pure c {smpSubscriber = action} disconnectAgentClient :: MonadUnliftIO m => AgentClient -> m () disconnectAgentClient c = closeAgentClient c >> logConnection c False -- | type AgentErrorMonad m = (MonadUnliftIO m, MonadError AgentErrorType m) -- | Create SMP agent connection (NEW command) createConnection :: AgentErrorMonad m => AgentClient -> SConnectionMode c -> m (ConnId, ConnectionRequestUri c) createConnection c cMode = withAgentEnv c $ newConn c "" cMode -- | Join SMP agent connection (JOIN command) joinConnection :: AgentErrorMonad m => AgentClient -> ConnectionRequestUri c -> ConnInfo -> m ConnId joinConnection c = withAgentEnv c .: joinConn c "" -- | Allow connection to continue after CONF notification (LET command) allowConnection :: AgentErrorMonad m => AgentClient -> ConnId -> ConfirmationId -> ConnInfo -> m () allowConnection c = withAgentEnv c .:. allowConnection' c -- | Accept contact after REQ notification (ACPT command) acceptContact :: AgentErrorMonad m => AgentClient -> ConfirmationId -> ConnInfo -> m ConnId acceptContact c = withAgentEnv c .: acceptContact' c "" -- | Reject contact (RJCT command) rejectContact :: AgentErrorMonad m => AgentClient -> ConnId -> ConfirmationId -> m () rejectContact c = withAgentEnv c .: rejectContact' c -- | Subscribe to receive connection messages (SUB command) subscribeConnection :: AgentErrorMonad m => AgentClient -> ConnId -> m () subscribeConnection c = withAgentEnv c . subscribeConnection' c -- | Send message to the connection (SEND command) sendMessage :: AgentErrorMonad m => AgentClient -> ConnId -> MsgBody -> m AgentMsgId sendMessage c = withAgentEnv c .: sendMessage' c ackMessage :: AgentErrorMonad m => AgentClient -> ConnId -> AgentMsgId -> m () ackMessage c = withAgentEnv c .: ackMessage' c -- | Suspend SMP agent connection (OFF command) suspendConnection :: AgentErrorMonad m => AgentClient -> ConnId -> m () suspendConnection c = withAgentEnv c . suspendConnection' c -- | Delete SMP agent connection (DEL command) deleteConnection :: AgentErrorMonad m => AgentClient -> ConnId -> m () deleteConnection c = withAgentEnv c . deleteConnection' c -- | Change servers to be used for creating new queues setSMPServers :: AgentErrorMonad m => AgentClient -> NonEmpty SMPServer -> m () setSMPServers c = withAgentEnv c . setSMPServers' c withAgentEnv :: AgentClient -> ReaderT Env m a -> m a withAgentEnv c = (`runReaderT` agentEnv c) -- withAgentClient :: AgentErrorMonad m => AgentClient -> ReaderT Env m a -> m a -- withAgentClient c = withAgentLock c . withAgentEnv c -- | Creates an SMP agent client instance that receives commands and sends responses via 'TBQueue's. getAgentClient :: (MonadUnliftIO m, MonadReader Env m) => m AgentClient getAgentClient = ask >>= atomically . newAgentClient logConnection :: MonadUnliftIO m => AgentClient -> Bool -> m () logConnection c connected = let event = if connected then "connected to" else "disconnected from" in logInfo $ T.unwords ["client", showText (clientId c), event, "Agent"] -- | Runs an SMP agent instance that receives commands and sends responses via 'TBQueue's. runAgentClient :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m () runAgentClient c = race_ (subscriber c) (client c) client :: forall m. (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m () client c@AgentClient {rcvQ, subQ} = forever $ do (corrId, connId, cmd) <- atomically $ readTBQueue rcvQ withAgentLock c (runExceptT $ processCommand c (connId, cmd)) >>= atomically . writeTBQueue subQ . \case Left e -> (corrId, connId, ERR e) Right (connId', resp) -> (corrId, connId', resp) withStore :: AgentMonad m => (forall m'. (MonadUnliftIO m', MonadError StoreError m') => SQLiteStore -> m' a) -> m a withStore action = do st <- asks store runExceptT (action st `E.catch` handleInternal) >>= \case Right c -> return c Left e -> throwError $ storeError e where -- TODO when parsing exception happens in store, the agent hangs; -- changing SQLError to SomeException does not help handleInternal :: (MonadError StoreError m') => SQLError -> m' a handleInternal e = throwError . SEInternal $ bshow e storeError :: StoreError -> AgentErrorType storeError = \case SEConnNotFound -> CONN NOT_FOUND SEConnDuplicate -> CONN DUPLICATE SEBadConnType CRcv -> CONN SIMPLEX SEBadConnType CSnd -> CONN SIMPLEX SEInvitationNotFound -> CMD PROHIBITED e -> INTERNAL $ show e -- | execute any SMP agent command processCommand :: forall m. AgentMonad m => AgentClient -> (ConnId, ACommand 'Client) -> m (ConnId, ACommand 'Agent) processCommand c (connId, cmd) = case cmd of NEW (ACM cMode) -> second (INV . ACR cMode) <$> newConn c connId cMode JOIN (ACR _ cReq) connInfo -> (,OK) <$> joinConn c connId cReq connInfo LET confId ownCInfo -> allowConnection' c connId confId ownCInfo $> (connId, OK) ACPT invId ownCInfo -> (,OK) <$> acceptContact' c connId invId ownCInfo RJCT invId -> rejectContact' c connId invId $> (connId, OK) SUB -> subscribeConnection' c connId $> (connId, OK) SEND msgBody -> (connId,) . MID <$> sendMessage' c connId msgBody ACK msgId -> ackMessage' c connId msgId $> (connId, OK) OFF -> suspendConnection' c connId $> (connId, OK) DEL -> deleteConnection' c connId $> (connId, OK) newConn :: AgentMonad m => AgentClient -> ConnId -> SConnectionMode c -> m (ConnId, ConnectionRequestUri c) newConn c connId cMode = do srv <- getSMPServer c (rq, qUri) <- newRcvQueue c srv g <- asks idsDrg let cData = ConnData {connId} connId' <- withStore $ \st -> createRcvConn st g cData rq cMode addSubscription c rq connId' let crData = ConnReqUriData simplexChat smpAgentVRange [qUri] case cMode of SCMContact -> pure (connId', CRContactUri crData) SCMInvitation -> do (pk1, pk2, e2eRcvParams) <- liftIO $ CR.generateE2EParams CR.e2eEncryptVersion withStore $ \st -> createRatchetX3dhKeys st connId' pk1 pk2 pure (connId', CRInvitationUri crData $ toVersionRangeT e2eRcvParams CR.e2eEncryptVRange) joinConn :: AgentMonad m => AgentClient -> ConnId -> ConnectionRequestUri c -> ConnInfo -> m ConnId joinConn c connId (CRInvitationUri (ConnReqUriData _ agentVRange (qUri :| _)) e2eRcvParamsUri) cInfo = case ( qUri `compatibleVersion` SMP.smpClientVRange, e2eRcvParamsUri `compatibleVersion` CR.e2eEncryptVRange, agentVRange `compatibleVersion` smpAgentVRange ) of (Just qInfo, Just (Compatible e2eRcvParams@(CR.E2ERatchetParams _ _ rcDHRr)), Just _) -> do -- TODO in agent v2 - use found compatible version rather than current (pk1, pk2, e2eSndParams) <- liftIO . CR.generateE2EParams $ version e2eRcvParams (_, rcDHRs) <- liftIO C.generateKeyPair' let rc = CR.initSndRatchet rcDHRr rcDHRs $ CR.x3dhSnd pk1 pk2 e2eRcvParams sq <- newSndQueue qInfo g <- asks idsDrg let cData = ConnData {connId} connId' <- withStore $ \st -> do connId' <- createSndConn st g cData sq createRatchet st connId' rc pure connId' tryError (confirmQueue c connId' sq cInfo $ Just e2eSndParams) >>= \case Right _ -> do void $ enqueueMessage c connId' sq HELLO pure connId' Left e -> do -- TODO recovery for failure on network timeout, see rfcs/2022-04-20-smp-conf-timeout-recovery.md withStore (`deleteConn` connId') throwError e _ -> throwError $ AGENT A_VERSION joinConn c connId (CRContactUri (ConnReqUriData _ agentVRange (qUri :| _))) cInfo = case ( qUri `compatibleVersion` SMP.smpClientVRange, agentVRange `compatibleVersion` smpAgentVRange ) of (Just qInfo, Just _) -> do -- TODO in agent v2 - use found compatible version rather than current (connId', cReq) <- newConn c connId SCMInvitation sendInvitation c qInfo cReq cInfo pure connId' _ -> throwError $ AGENT A_VERSION createReplyQueue :: AgentMonad m => AgentClient -> ConnId -> SndQueue -> m () createReplyQueue c connId sq = do srv <- getSMPServer c (rq, qUri) <- newRcvQueue c srv -- TODO reply queue version should be the same as send queue, ignoring it in v1 let qInfo = toVersionT qUri SMP.smpClientVersion addSubscription c rq connId withStore $ \st -> upgradeSndConnToDuplex st connId rq void . enqueueMessage c connId sq $ REPLY [qInfo] -- | Approve confirmation (LET command) in Reader monad allowConnection' :: AgentMonad m => AgentClient -> ConnId -> ConfirmationId -> ConnInfo -> m () allowConnection' c connId confId ownConnInfo = do withStore (`getConn` connId) >>= \case SomeConn _ (RcvConnection _ rq) -> do AcceptedConfirmation {senderConf, ratchetState} <- withStore $ \st -> acceptConfirmation st confId ownConnInfo withStore $ \st -> createRatchet st connId ratchetState processConfirmation c rq senderConf _ -> throwError $ CMD PROHIBITED -- | Accept contact (ACPT command) in Reader monad acceptContact' :: AgentMonad m => AgentClient -> ConnId -> InvitationId -> ConnInfo -> m ConnId acceptContact' c connId invId ownConnInfo = do Invitation {contactConnId, connReq} <- withStore (`getInvitation` invId) withStore (`getConn` contactConnId) >>= \case SomeConn _ ContactConnection {} -> do withStore $ \st -> acceptInvitation st invId ownConnInfo joinConn c connId connReq ownConnInfo _ -> throwError $ CMD PROHIBITED -- | Reject contact (RJCT command) in Reader monad rejectContact' :: AgentMonad m => AgentClient -> ConnId -> InvitationId -> m () rejectContact' _ contactConnId invId = withStore $ \st -> deleteInvitation st contactConnId invId processConfirmation :: AgentMonad m => AgentClient -> RcvQueue -> SMPConfirmation -> m () processConfirmation c rq@RcvQueue {e2ePrivKey} SMPConfirmation {senderKey, e2ePubKey} = do let dhSecret = C.dh' e2ePubKey e2ePrivKey withStore $ \st -> setRcvQueueConfirmedE2E st rq dhSecret secureQueue c rq senderKey withStore $ \st -> setRcvQueueStatus st rq Secured -- | Subscribe to receive connection messages (SUB command) in Reader monad subscribeConnection' :: forall m. AgentMonad m => AgentClient -> ConnId -> m () subscribeConnection' c connId = withStore (`getConn` connId) >>= \case SomeConn _ (DuplexConnection _ rq sq) -> do resumeMsgDelivery c connId sq subscribeQueue c rq connId SomeConn _ (SndConnection _ sq) -> do resumeMsgDelivery c connId sq case status (sq :: SndQueue) of Confirmed -> pure () Active -> throwError $ CONN SIMPLEX _ -> throwError $ INTERNAL "unexpected queue status" SomeConn _ (RcvConnection _ rq) -> subscribeQueue c rq connId SomeConn _ (ContactConnection _ rq) -> subscribeQueue c rq connId -- | Send message to the connection (SEND command) in Reader monad sendMessage' :: forall m. AgentMonad m => AgentClient -> ConnId -> MsgBody -> m AgentMsgId sendMessage' c connId msg = withStore (`getConn` connId) >>= \case SomeConn _ (DuplexConnection _ _ sq) -> enqueueMsg sq SomeConn _ (SndConnection _ sq) -> enqueueMsg sq _ -> throwError $ CONN SIMPLEX where enqueueMsg :: SndQueue -> m AgentMsgId enqueueMsg sq = enqueueMessage c connId sq $ A_MSG msg enqueueMessage :: forall m. AgentMonad m => AgentClient -> ConnId -> SndQueue -> AMessage -> m AgentMsgId enqueueMessage c connId sq aMessage = do resumeMsgDelivery c connId sq msgId <- storeSentMsg queuePendingMsgs c connId sq [msgId] pure $ unId msgId where storeSentMsg :: m InternalId storeSentMsg = do internalTs <- liftIO getCurrentTime (internalId, internalSndId, prevMsgHash) <- withStore (`updateSndIds` connId) let privHeader = APrivHeader (unSndId internalSndId) prevMsgHash agentMsg = AgentMessage privHeader aMessage agentMsgStr = smpEncode agentMsg internalHash = C.sha256Hash agentMsgStr encAgentMessage <- agentRatchetEncrypt connId agentMsgStr e2eEncUserMsgLength let msgBody = smpEncode $ AgentMsgEnvelope {agentVersion = smpAgentVersion, encAgentMessage} msgType = agentMessageType agentMsg msgData = SndMsgData {internalId, internalSndId, internalTs, msgType, msgBody, internalHash, prevMsgHash} withStore $ \st -> createSndMsg st connId msgData pure internalId resumeMsgDelivery :: forall m. AgentMonad m => AgentClient -> ConnId -> SndQueue -> m () resumeMsgDelivery c connId sq@SndQueue {server, sndId} = do let qKey = (connId, server, sndId) unlessM (queueDelivering qKey) $ async (runSmpQueueMsgDelivery c connId sq) >>= \a -> atomically (TM.insert qKey a $ smpQueueMsgDeliveries c) unlessM connQueued $ withStore (`getPendingMsgs` connId) >>= queuePendingMsgs c connId sq where queueDelivering qKey = atomically $ isJust <$> TM.lookup qKey (smpQueueMsgDeliveries c) connQueued = atomically $ isJust <$> TM.lookupInsert connId True (connMsgsQueued c) queuePendingMsgs :: AgentMonad m => AgentClient -> ConnId -> SndQueue -> [InternalId] -> m () queuePendingMsgs c connId sq msgIds = atomically $ do q <- getPendingMsgQ c connId sq mapM_ (writeTQueue q) msgIds getPendingMsgQ :: AgentClient -> ConnId -> SndQueue -> STM (TQueue InternalId) getPendingMsgQ c connId SndQueue {server, sndId} = do let qKey = (connId, server, sndId) maybe (newMsgQueue qKey) pure =<< TM.lookup qKey (smpQueueMsgQueues c) where newMsgQueue qKey = do mq <- newTQueue TM.insert qKey mq $ smpQueueMsgQueues c pure mq runSmpQueueMsgDelivery :: forall m. AgentMonad m => AgentClient -> ConnId -> SndQueue -> m () runSmpQueueMsgDelivery c@AgentClient {subQ} connId sq = do mq <- atomically $ getPendingMsgQ c connId sq ri <- asks $ reconnectInterval . config forever $ do msgId <- atomically $ readTQueue mq let mId = unId msgId withStore (\st -> E.try $ getPendingMsgData st connId msgId) >>= \case Left (e :: E.SomeException) -> notify $ MERR mId (INTERNAL $ show e) Right (rq_, (msgType, msgBody, internalTs)) -> withRetryInterval ri $ \loop -> tryError (send msgType c sq msgBody) >>= \case Left e -> do let err = if msgType == AM_CONN_INFO then ERR e else MERR mId e case e of SMP SMP.QUOTA -> case msgType of AM_CONN_INFO -> connError msgId NOT_AVAILABLE _ -> loop SMP SMP.AUTH -> case msgType of AM_CONN_INFO -> connError msgId NOT_AVAILABLE AM_HELLO_ -> do helloTimeout <- asks $ helloTimeout . config currentTime <- liftIO getCurrentTime if diffUTCTime currentTime internalTs > helloTimeout then case rq_ of -- party initiating connection Just _ -> connError msgId NOT_AVAILABLE -- party joining connection _ -> connError msgId NOT_ACCEPTED else loop AM_REPLY_ -> notifyDel msgId $ ERR e AM_A_MSG_ -> notifyDel msgId $ MERR mId e SMP (SMP.CMD _) -> notifyDel msgId err SMP SMP.LARGE_MSG -> notifyDel msgId err SMP {} -> notify err >> loop _ -> loop Right () -> do case msgType of AM_CONN_INFO -> do withStore $ \st -> setSndQueueStatus st sq Confirmed when (isJust rq_) $ withStore (`removeConfirmations` connId) void $ enqueueMessage c connId sq HELLO AM_HELLO_ -> do withStore $ \st -> setSndQueueStatus st sq Active case rq_ of -- party initiating connection Just rq -> do subscribeQueue c rq connId notify CON -- party joining connection _ -> createReplyQueue c connId sq AM_A_MSG_ -> notify $ SENT mId _ -> pure () delMsg msgId where send = \case AM_CONN_INFO -> sendConfirmation _ -> sendAgentMessage delMsg :: InternalId -> m () delMsg msgId = withStore $ \st -> deleteMsg st connId msgId notify :: ACommand 'Agent -> m () notify cmd = atomically $ writeTBQueue subQ ("", connId, cmd) notifyDel :: InternalId -> ACommand 'Agent -> m () notifyDel msgId cmd = notify cmd >> delMsg msgId connError msgId = notifyDel msgId . ERR . CONN ackMessage' :: forall m. AgentMonad m => AgentClient -> ConnId -> AgentMsgId -> m () ackMessage' c connId msgId = do withStore (`getConn` connId) >>= \case SomeConn _ (DuplexConnection _ rq _) -> ack rq SomeConn _ (RcvConnection _ rq) -> ack rq _ -> throwError $ CONN SIMPLEX where ack :: RcvQueue -> m () ack rq = do let mId = InternalId msgId withStore $ \st -> checkRcvMsg st connId mId sendAck c rq withStore $ \st -> deleteMsg st connId mId -- | Suspend SMP agent connection (OFF command) in Reader monad suspendConnection' :: AgentMonad m => AgentClient -> ConnId -> m () suspendConnection' c connId = withStore (`getConn` connId) >>= \case SomeConn _ (DuplexConnection _ rq _) -> suspendQueue c rq SomeConn _ (RcvConnection _ rq) -> suspendQueue c rq _ -> throwError $ CONN SIMPLEX -- | Delete SMP agent connection (DEL command) in Reader monad deleteConnection' :: forall m. AgentMonad m => AgentClient -> ConnId -> m () deleteConnection' c connId = withStore (`getConn` connId) >>= \case SomeConn _ (DuplexConnection _ rq _) -> delete rq SomeConn _ (RcvConnection _ rq) -> delete rq SomeConn _ (ContactConnection _ rq) -> delete rq SomeConn _ (SndConnection _ _) -> withStore (`deleteConn` connId) where delete :: RcvQueue -> m () delete rq = do deleteQueue c rq atomically $ removeSubscription c connId withStore (`deleteConn` connId) -- | Change servers to be used for creating new queues, in Reader monad setSMPServers' :: forall m. AgentMonad m => AgentClient -> NonEmpty SMPServer -> m () setSMPServers' c servers = do atomically $ writeTVar (smpServers c) servers getSMPServer :: AgentMonad m => AgentClient -> m SMPServer getSMPServer c = do smpServers <- readTVarIO $ smpServers c case smpServers of srv :| [] -> pure srv servers -> do gen <- asks randomServer i <- atomically . stateTVar gen $ randomR (0, L.length servers - 1) pure $ servers L.!! i subscriber :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m () subscriber c@AgentClient {msgQ} = forever $ do t <- atomically $ readTBQueue msgQ withAgentLock c (runExceptT $ processSMPTransmission c t) >>= \case Left e -> liftIO $ print e Right _ -> return () processSMPTransmission :: forall m. AgentMonad m => AgentClient -> SMPServerTransmission -> m () processSMPTransmission c@AgentClient {smpClients, subQ} (srv, sessId, rId, cmd) = do withStore (\st -> getRcvConn st srv rId) >>= \case SomeConn SCDuplex (DuplexConnection cData rq _) -> processSMP SCDuplex cData rq SomeConn SCRcv (RcvConnection cData rq) -> processSMP SCRcv cData rq SomeConn SCContact (ContactConnection cData rq) -> processSMP SCContact cData rq _ -> atomically $ writeTBQueue subQ ("", "", ERR $ CONN NOT_FOUND) where processSMP :: SConnType c -> ConnData -> RcvQueue -> m () processSMP cType ConnData {connId} rq@RcvQueue {rcvDhSecret, e2ePrivKey, e2eDhSecret, status} = case cmd of SMP.MSG srvMsgId srvTs msgBody' -> handleNotifyAck $ do -- TODO deduplicate with previously received msgBody <- agentCbDecrypt rcvDhSecret (C.cbNonce srvMsgId) msgBody' clientMsg@SMP.ClientMsgEnvelope {cmHeader = SMP.PubHeader phVer e2ePubKey_} <- parseMessage msgBody unless (phVer `isCompatible` SMP.smpClientVRange) . throwError $ AGENT A_VERSION case (e2eDhSecret, e2ePubKey_) of (Nothing, Just e2ePubKey) -> do let e2eDh = C.dh' e2ePubKey e2ePrivKey decryptClientMessage e2eDh clientMsg >>= \case (SMP.PHConfirmation senderKey, AgentConfirmation {e2eEncryption, encConnInfo}) -> smpConfirmation senderKey e2ePubKey e2eEncryption encConnInfo >> ack (SMP.PHEmpty, AgentInvitation {connReq, connInfo}) -> smpInvitation connReq connInfo >> ack _ -> prohibited >> ack (Just e2eDh, Nothing) -> do decryptClientMessage e2eDh clientMsg >>= \case (SMP.PHEmpty, AgentMsgEnvelope _ encAgentMsg) -> do agentMsgBody <- agentRatchetDecrypt connId encAgentMsg parseMessage agentMsgBody >>= \case agentMsg@(AgentMessage APrivHeader {sndMsgId, prevMsgHash} aMessage) -> do let msgType = agentMessageType agentMsg (msgId, msgMeta) <- agentClientMsg prevMsgHash sndMsgId (srvMsgId, systemToUTCTime srvTs) agentMsgBody msgType case aMessage of HELLO -> helloMsg >> ack >> withStore (\st -> deleteMsg st connId msgId) REPLY cReq -> replyMsg cReq >> ack >> withStore (\st -> deleteMsg st connId msgId) -- note that there is no ACK sent here, it is sent with agent's user ACK command A_MSG body -> notify $ MSG msgMeta body _ -> prohibited >> ack _ -> prohibited >> ack _ -> prohibited >> ack SMP.END -> atomically (TM.lookup srv smpClients >>= fmap join . mapM tryReadTMVar >>= processEND) >>= logServer "<--" c srv rId where processEND = \case Just (Right clnt) | sessId == sessionId clnt -> do removeSubscription c connId writeTBQueue subQ ("", connId, END) pure "END" | otherwise -> ignored _ -> ignored ignored = pure "END from disconnected client - ignored" _ -> do logServer "<--" c srv rId $ "unexpected: " <> bshow cmd notify . ERR $ BROKER UNEXPECTED where notify :: ACommand 'Agent -> m () notify msg = atomically $ writeTBQueue subQ ("", connId, msg) handleNotifyAck :: m () -> m () handleNotifyAck m = m `catchError` \e -> notify (ERR e) >> ack prohibited :: m () prohibited = notify . ERR $ AGENT A_PROHIBITED ack :: m () ack = sendAck c rq decryptClientMessage :: C.DhSecretX25519 -> SMP.ClientMsgEnvelope -> m (SMP.PrivHeader, AgentMsgEnvelope) decryptClientMessage e2eDh SMP.ClientMsgEnvelope {cmNonce, cmEncBody} = do clientMsg <- agentCbDecrypt e2eDh cmNonce cmEncBody SMP.ClientMessage privHeader clientBody <- parseMessage clientMsg agentEnvelope <- parseMessage clientBody if agentVersion agentEnvelope `isCompatible` smpAgentVRange then pure (privHeader, agentEnvelope) else throwError $ AGENT A_VERSION parseMessage :: Encoding a => ByteString -> m a parseMessage = liftEither . parse smpP (AGENT A_MESSAGE) smpConfirmation :: C.APublicVerifyKey -> C.PublicKeyX25519 -> Maybe (CR.E2ERatchetParams 'C.X448) -> ByteString -> m () smpConfirmation senderKey e2ePubKey e2eEncryption encConnInfo = do logServer "<--" c srv rId "MSG " case status of New -> case (cType, e2eEncryption) of (SCRcv, Just e2eSndParams) -> do (pk1, rcDHRs) <- withStore $ \st -> getRatchetX3dhKeys st connId let rc = CR.initRcvRatchet rcDHRs $ CR.x3dhRcv pk1 rcDHRs e2eSndParams (agentMsgBody_, rc', skipped) <- liftError cryptoError $ CR.rcDecrypt rc M.empty encConnInfo case (agentMsgBody_, skipped) of (Right agentMsgBody, CR.SMDNoChange) -> parseMessage agentMsgBody >>= \case AgentConnInfo connInfo -> do g <- asks idsDrg let senderConf = SMPConfirmation {senderKey, e2ePubKey, connInfo} newConfirmation = NewConfirmation {connId, senderConf, ratchetState = rc'} confId <- withStore $ \st -> createConfirmation st g newConfirmation notify $ CONF confId connInfo _ -> prohibited _ -> prohibited (SCDuplex, Nothing) -> do agentRatchetDecrypt connId encConnInfo >>= parseMessage >>= \case AgentConnInfo connInfo -> do notify $ INFO connInfo processConfirmation c rq $ SMPConfirmation {senderKey, e2ePubKey, connInfo} _ -> prohibited _ -> prohibited _ -> prohibited helloMsg :: m () helloMsg = do logServer "<--" c srv rId "MSG " case status of Active -> prohibited _ -> do withStore $ \st -> setRcvQueueStatus st rq Active case cType of SCDuplex -> notifyConnected c connId _ -> pure () replyMsg :: L.NonEmpty SMPQueueInfo -> m () replyMsg (qInfo :| _) = do logServer "<--" c srv rId "MSG " case cType of SCRcv -> do AcceptedConfirmation {ownConnInfo} <- withStore (`getAcceptedConfirmation` connId) case qInfo `proveCompatible` SMP.smpClientVRange of Nothing -> notify . ERR $ AGENT A_VERSION Just qInfo' -> do sq <- newSndQueue qInfo' withStore $ \st -> upgradeRcvConnToDuplex st connId sq enqueueConfirmation c connId sq ownConnInfo Nothing _ -> prohibited agentClientMsg :: PrevRcvMsgHash -> ExternalSndId -> (BrokerId, BrokerTs) -> MsgBody -> AgentMessageType -> m (InternalId, MsgMeta) agentClientMsg externalPrevSndHash sndMsgId broker msgBody msgType = do logServer "<--" c srv rId "MSG " let internalHash = C.sha256Hash msgBody internalTs <- liftIO getCurrentTime (internalId, internalRcvId, prevExtSndId, prevRcvMsgHash) <- withStore (`updateRcvIds` connId) let integrity = checkMsgIntegrity prevExtSndId sndMsgId prevRcvMsgHash externalPrevSndHash recipient = (unId internalId, internalTs) msgMeta = MsgMeta {integrity, recipient, broker, sndMsgId} rcvMsg = RcvMsgData {msgMeta, msgType, msgBody, internalRcvId, internalHash, externalPrevSndHash} withStore $ \st -> createRcvMsg st connId rcvMsg pure (internalId, msgMeta) smpInvitation :: ConnectionRequestUri 'CMInvitation -> ConnInfo -> m () smpInvitation connReq cInfo = do logServer "<--" c srv rId "MSG " case cType of SCContact -> do g <- asks idsDrg let newInv = NewInvitation {contactConnId = connId, connReq, recipientConnInfo = cInfo} invId <- withStore $ \st -> createInvitation st g newInv notify $ REQ invId cInfo _ -> prohibited checkMsgIntegrity :: PrevExternalSndId -> ExternalSndId -> PrevRcvMsgHash -> ByteString -> MsgIntegrity checkMsgIntegrity prevExtSndId extSndId internalPrevMsgHash receivedPrevMsgHash | extSndId == prevExtSndId + 1 && internalPrevMsgHash == receivedPrevMsgHash = MsgOk | extSndId < prevExtSndId = MsgError $ MsgBadId extSndId | extSndId == prevExtSndId = MsgError MsgDuplicate -- ? deduplicate | extSndId > prevExtSndId + 1 = MsgError $ MsgSkipped (prevExtSndId + 1) (extSndId - 1) | internalPrevMsgHash /= receivedPrevMsgHash = MsgError MsgBadHash | otherwise = MsgError MsgDuplicate -- this case is not possible confirmQueue :: forall m. AgentMonad m => AgentClient -> ConnId -> SndQueue -> ConnInfo -> Maybe (CR.E2ERatchetParams 'C.X448) -> m () confirmQueue c connId sq connInfo e2eEncryption = do _ <- withStore (`updateSndIds` connId) msg <- mkConfirmation sendConfirmation c sq msg withStore $ \st -> setSndQueueStatus st sq Confirmed where mkConfirmation :: m MsgBody mkConfirmation = do encConnInfo <- agentRatchetEncrypt connId (smpEncode $ AgentConnInfo connInfo) e2eEncConnInfoLength pure . smpEncode $ AgentConfirmation {agentVersion = smpAgentVersion, e2eEncryption, encConnInfo} enqueueConfirmation :: forall m. AgentMonad m => AgentClient -> ConnId -> SndQueue -> ConnInfo -> Maybe (CR.E2ERatchetParams 'C.X448) -> m () enqueueConfirmation c connId sq connInfo e2eEncryption = do resumeMsgDelivery c connId sq msgId <- storeConfirmation queuePendingMsgs c connId sq [msgId] where storeConfirmation :: m InternalId storeConfirmation = do internalTs <- liftIO getCurrentTime (internalId, internalSndId, prevMsgHash) <- withStore (`updateSndIds` connId) let agentMsg = AgentConnInfo connInfo agentMsgStr = smpEncode agentMsg internalHash = C.sha256Hash agentMsgStr encConnInfo <- agentRatchetEncrypt connId agentMsgStr e2eEncConnInfoLength let msgBody = smpEncode $ AgentConfirmation {agentVersion = smpAgentVersion, e2eEncryption, encConnInfo} msgType = agentMessageType agentMsg msgData = SndMsgData {internalId, internalSndId, internalTs, msgType, msgBody, internalHash, prevMsgHash} withStore $ \st -> createSndMsg st connId msgData pure internalId -- encoded AgentMessage -> encoded EncAgentMessage agentRatchetEncrypt :: AgentMonad m => ConnId -> ByteString -> Int -> m ByteString agentRatchetEncrypt connId msg paddedLen = do rc <- withStore $ \st -> getRatchet st connId (encMsg, rc') <- liftError cryptoError $ CR.rcEncrypt rc paddedLen msg withStore $ \st -> updateRatchet st connId rc' CR.SMDNoChange pure encMsg -- encoded EncAgentMessage -> encoded AgentMessage agentRatchetDecrypt :: AgentMonad m => ConnId -> ByteString -> m ByteString agentRatchetDecrypt connId encAgentMsg = do (rc, skipped) <- withStore $ \st -> (,) <$> getRatchet st connId <*> getSkippedMsgKeys st connId (agentMsgBody_, rc', skippedDiff) <- liftError cryptoError $ CR.rcDecrypt rc skipped encAgentMsg withStore $ \st -> updateRatchet st connId rc' skippedDiff liftEither $ first cryptoError agentMsgBody_ notifyConnected :: AgentMonad m => AgentClient -> ConnId -> m () notifyConnected c connId = atomically $ writeTBQueue (subQ c) ("", connId, CON) newSndQueue :: (MonadUnliftIO m, MonadReader Env m) => Compatible SMPQueueInfo -> m SndQueue newSndQueue qInfo = asks (cmdSignAlg . config) >>= \case C.SignAlg a -> newSndQueue_ a qInfo newSndQueue_ :: (C.SignatureAlgorithm a, C.AlgorithmI a, MonadUnliftIO m) => C.SAlgorithm a -> Compatible SMPQueueInfo -> m SndQueue newSndQueue_ a (Compatible (SMPQueueInfo _clientVersion smpServer senderId rcvE2ePubDhKey)) = do -- this function assumes clientVersion is compatible - it was tested before (sndPublicKey, sndPrivateKey) <- liftIO $ C.generateSignatureKeyPair a (e2ePubKey, e2ePrivKey) <- liftIO C.generateKeyPair' pure SndQueue { server = smpServer, sndId = senderId, sndPublicKey = Just sndPublicKey, sndPrivateKey, e2eDhSecret = C.dh' rcvE2ePubDhKey e2ePrivKey, e2ePubKey = Just e2ePubKey, status = New }