module Erebos.Network.Protocol ( TransportPacket(..), transportToObject, TransportHeader(..), TransportHeaderItem(..), WaitingRef(..), WaitingRefCallback, wrDigest, ChannelState(..), ControlRequest(..), ControlMessage(..), erebosNetworkProtocol, Connection, connAddress, connData, connGetChannel, connSetChannel, module Erebos.Flow, ) where import Control.Applicative import Control.Concurrent import Control.Concurrent.Async import Control.Concurrent.STM import Control.Monad import Control.Monad.Except import Data.Bits import Data.ByteString (ByteString) import Data.ByteString qualified as B import Data.ByteString.Char8 qualified as BC import Data.ByteString.Lazy qualified as BL import Data.List import Data.Maybe import Data.Text (Text) import Data.Text qualified as T import System.Clock import Erebos.Channel import Erebos.Flow import Erebos.Identity import Erebos.Service import Erebos.Storage protocolVersion :: Text protocolVersion = T.pack "0.1" protocolVersions :: [Text] protocolVersions = [protocolVersion] data TransportPacket a = TransportPacket TransportHeader [a] data TransportHeader = TransportHeader [TransportHeaderItem] deriving (Show) data TransportHeaderItem = Acknowledged RefDigest | AcknowledgedSingle Integer | Rejected RefDigest | ProtocolVersion Text | Initiation RefDigest | CookieSet Cookie | CookieEcho Cookie | DataRequest RefDigest | DataResponse RefDigest | AnnounceSelf RefDigest | AnnounceUpdate RefDigest | TrChannelRequest RefDigest | TrChannelAccept RefDigest | ServiceType ServiceID | ServiceRef RefDigest deriving (Eq, Show) newtype Cookie = Cookie ByteString deriving (Eq, Show) isHeaderItemAcknowledged :: TransportHeaderItem -> Bool isHeaderItemAcknowledged = \case Acknowledged {} -> False AcknowledgedSingle {} -> False Rejected {} -> False ProtocolVersion {} -> False Initiation {} -> False CookieSet {} -> False CookieEcho {} -> False _ -> True transportToObject :: PartialStorage -> TransportHeader -> PartialObject transportToObject st (TransportHeader items) = Rec $ map single items where single = \case Acknowledged dgst -> (BC.pack "ACK", RecRef $ partialRefFromDigest st dgst) AcknowledgedSingle num -> (BC.pack "ACK", RecInt num) Rejected dgst -> (BC.pack "REJ", RecRef $ partialRefFromDigest st dgst) ProtocolVersion ver -> (BC.pack "VER", RecText ver) Initiation dgst -> (BC.pack "INI", RecRef $ partialRefFromDigest st dgst) CookieSet (Cookie bytes) -> (BC.pack "CKS", RecBinary bytes) CookieEcho (Cookie bytes) -> (BC.pack "CKE", RecBinary bytes) DataRequest dgst -> (BC.pack "REQ", RecRef $ partialRefFromDigest st dgst) DataResponse dgst -> (BC.pack "RSP", RecRef $ partialRefFromDigest st dgst) AnnounceSelf dgst -> (BC.pack "ANN", RecRef $ partialRefFromDigest st dgst) AnnounceUpdate dgst -> (BC.pack "ANU", RecRef $ partialRefFromDigest st dgst) TrChannelRequest dgst -> (BC.pack "CRQ", RecRef $ partialRefFromDigest st dgst) TrChannelAccept dgst -> (BC.pack "CAC", RecRef $ partialRefFromDigest st dgst) ServiceType stype -> (BC.pack "SVT", RecUUID $ toUUID stype) ServiceRef dgst -> (BC.pack "SVR", RecRef $ partialRefFromDigest st dgst) transportFromObject :: PartialObject -> Maybe TransportHeader transportFromObject (Rec items) = case catMaybes $ map single items of [] -> Nothing titems -> Just $ TransportHeader titems where single (name, content) = if | name == BC.pack "ACK", RecRef ref <- content -> Just $ Acknowledged $ refDigest ref | name == BC.pack "ACK", RecInt num <- content -> Just $ AcknowledgedSingle num | name == BC.pack "REJ", RecRef ref <- content -> Just $ Rejected $ refDigest ref | name == BC.pack "VER", RecText ver <- content -> Just $ ProtocolVersion ver | name == BC.pack "INI", RecRef ref <- content -> Just $ Initiation $ refDigest ref | name == BC.pack "CKS", RecBinary bytes <- content -> Just $ CookieSet (Cookie bytes) | name == BC.pack "CKE", RecBinary bytes <- content -> Just $ CookieEcho (Cookie bytes) | name == BC.pack "REQ", RecRef ref <- content -> Just $ DataRequest $ refDigest ref | name == BC.pack "RSP", RecRef ref <- content -> Just $ DataResponse $ refDigest ref | name == BC.pack "ANN", RecRef ref <- content -> Just $ AnnounceSelf $ refDigest ref | name == BC.pack "ANU", RecRef ref <- content -> Just $ AnnounceUpdate $ refDigest ref | name == BC.pack "CRQ", RecRef ref <- content -> Just $ TrChannelRequest $ refDigest ref | name == BC.pack "CAC", RecRef ref <- content -> Just $ TrChannelAccept $ refDigest ref | name == BC.pack "SVT", RecUUID uuid <- content -> Just $ ServiceType $ fromUUID uuid | name == BC.pack "SVR", RecRef ref <- content -> Just $ ServiceRef $ refDigest ref | otherwise -> Nothing transportFromObject _ = Nothing data GlobalState addr = (Eq addr, Show addr) => GlobalState { gIdentity :: TVar (UnifiedIdentity, [UnifiedIdentity]) , gConnections :: TVar [Connection addr] , gDataFlow :: SymFlow (addr, ByteString) , gControlFlow :: Flow (ControlRequest addr) (ControlMessage addr) , gNextUp :: TMVar (Connection addr, (Bool, TransportPacket PartialObject)) , gLog :: String -> STM () , gStorage :: PartialStorage , gNowVar :: TVar TimeSpec , gNextTimeout :: TVar TimeSpec , gInitConfig :: Ref } data Connection addr = Connection { cAddress :: addr , cDataUp :: Flow (Bool, TransportPacket PartialObject) (Bool, TransportPacket Ref, [TransportHeaderItem]) , cDataInternal :: Flow (Bool, TransportPacket Ref, [TransportHeaderItem]) (Bool, TransportPacket PartialObject) , cChannel :: TVar ChannelState , cSecureOutQueue :: TQueue (Bool, TransportPacket Ref, [TransportHeaderItem]) , cSentPackets :: TVar [SentPacket] , cToAcknowledge :: TVar [Integer] } connAddress :: Connection addr -> addr connAddress = cAddress connData :: Connection addr -> Flow (Bool, TransportPacket PartialObject) (Bool, TransportPacket Ref, [TransportHeaderItem]) connData = cDataUp connGetChannel :: Connection addr -> STM ChannelState connGetChannel Connection {..} = readTVar cChannel connSetChannel :: Connection addr -> ChannelState -> STM () connSetChannel Connection {..} ch = do writeTVar cChannel ch data WaitingRef = WaitingRef { wrefStorage :: Storage , wrefPartial :: PartialRef , wrefAction :: Ref -> WaitingRefCallback , wrefStatus :: TVar (Either [RefDigest] Ref) } type WaitingRefCallback = ExceptT String IO () wrDigest :: WaitingRef -> RefDigest wrDigest = refDigest . wrefPartial data ChannelState = ChannelNone | ChannelCookieWait -- sent initiation, waiting for response | ChannelCookieReceived Cookie -- received cookie, but no cookie echo yet | ChannelCookieConfirmed Cookie -- received cookie echo, no need to send from our side | ChannelOurRequest (Maybe Cookie) (Stored ChannelRequest) | ChannelPeerRequest (Maybe Cookie) WaitingRef | ChannelOurAccept (Maybe Cookie) (Stored ChannelAccept) Channel | ChannelEstablished Channel data SentPacket = SentPacket { spTime :: TimeSpec , spRetryCount :: Int , spAckedBy :: Maybe (TransportHeaderItem -> Bool) , spData :: BC.ByteString } data ControlRequest addr = RequestConnection addr | SendAnnounce addr | UpdateSelfIdentity UnifiedIdentity data ControlMessage addr = NewConnection (Connection addr) (Maybe RefDigest) | ReceivedAnnounce addr RefDigest erebosNetworkProtocol :: (Eq addr, Ord addr, Show addr) => UnifiedIdentity -> (String -> STM ()) -> SymFlow (addr, ByteString) -> Flow (ControlRequest addr) (ControlMessage addr) -> IO () erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do gIdentity <- newTVarIO (initialIdentity, []) gConnections <- newTVarIO [] gNextUp <- newEmptyTMVarIO mStorage <- memoryStorage gStorage <- derivePartialStorage mStorage startTime <- getTime MonotonicRaw gNowVar <- newTVarIO startTime gNextTimeout <- newTVarIO startTime gInitConfig <- store mStorage $ (Rec [] :: Object) let gs = GlobalState {..} let signalTimeouts = forever $ do now <- getTime MonotonicRaw next <- atomically $ do writeTVar gNowVar now readTVar gNextTimeout let waitTill time | time > now = threadDelay $ fromInteger (toNanoSecs (time - now)) `div` 1000 | otherwise = threadDelay maxBound waitForUpdate = atomically $ do next' <- readTVar gNextTimeout when (next' == next) retry race_ (waitTill next) waitForUpdate race_ signalTimeouts $ forever $ join $ atomically $ passUpIncoming gs <|> processIncoming gs <|> processOutgoing gs getConnection :: GlobalState addr -> addr -> STM (Connection addr) getConnection gs addr = do maybe (newConnection gs addr) return =<< findConnection gs addr findConnection :: GlobalState addr -> addr -> STM (Maybe (Connection addr)) findConnection GlobalState {..} addr = do find ((addr==) . cAddress) <$> readTVar gConnections newConnection :: GlobalState addr -> addr -> STM (Connection addr) newConnection GlobalState {..} addr = do conns <- readTVar gConnections let cAddress = addr (cDataUp, cDataInternal) <- newFlow cChannel <- newTVar ChannelNone cSecureOutQueue <- newTQueue cSentPackets <- newTVar [] cToAcknowledge <- newTVar [] let conn = Connection {..} writeTVar gConnections (conn : conns) return conn passUpIncoming :: GlobalState addr -> STM (IO ()) passUpIncoming GlobalState {..} = do (Connection {..}, up) <- takeTMVar gNextUp writeFlow cDataInternal up return $ return () processIncoming :: GlobalState addr -> STM (IO ()) processIncoming gs@GlobalState {..} = do guard =<< isEmptyTMVar gNextUp guard =<< canWriteFlow gControlFlow (addr, msg) <- readFlow gDataFlow mbconn <- findConnection gs addr mbch <- case mbconn of Nothing -> return Nothing Just conn -> readTVar (cChannel conn) >>= return . \case ChannelEstablished ch -> Just ch ChannelOurAccept _ _ ch -> Just ch _ -> Nothing return $ do let deserialize = liftEither . runExcept . deserializeObjects gStorage . BL.fromStrict let parse = case B.uncons msg of Just (b, enc) | b .&. 0xE0 == 0x80 -> do ch <- maybe (throwError "unexpected encrypted packet") return mbch (dec, counter) <- channelDecrypt ch enc case B.uncons dec of Just (0x00, content) -> do objs <- deserialize content return (True, objs, Just counter) Just (_, _) -> do throwError "streams not implemented" Nothing -> do throwError "empty decrypted content" | b .&. 0xE0 == 0x60 -> do objs <- deserialize msg return (False, objs, Nothing) | otherwise -> throwError "invalid packet" Nothing -> throwError "empty packet" runExceptT parse >>= \case Right (secure, objs, mbcounter) | hobj:content <- objs , Just header@(TransportHeader items) <- transportFromObject hobj -> processPacket gs (maybe (Left addr) Right mbconn) secure (TransportPacket header content) >>= \case Just (conn@Connection {..}, mbup) -> atomically $ do case mbcounter of Just counter | any isHeaderItemAcknowledged items -> modifyTVar' cToAcknowledge (fromIntegral counter :) _ -> return () processAcknowledgements gs conn items case mbup of Just up -> putTMVar gNextUp (conn, (secure, up)) Nothing -> return () Nothing -> return () | otherwise -> atomically $ do gLog $ show addr ++ ": invalid objects" gLog $ show objs Left err -> do atomically $ gLog $ show addr <> ": failed to parse packet: " <> err processPacket :: GlobalState addr -> Either addr (Connection addr) -> Bool -> TransportPacket a -> IO (Maybe (Connection addr, Maybe (TransportPacket a))) processPacket gs@GlobalState {..} econn secure packet@(TransportPacket (TransportHeader header) _) = if -- Established secure communication | Right conn <- econn, secure -> return $ Just (conn, Just packet) -- Plaintext communication with cookies to prove origin | cookie:_ <- mapMaybe (\case CookieEcho x -> Just x; _ -> Nothing) header -> verifyCookie gs addr cookie >>= \case True -> do atomically $ do conn@Connection {..} <- getConnection gs addr channel <- readTVar cChannel let received = listToMaybe $ mapMaybe (\case CookieSet x -> Just x; _ -> Nothing) header case received `mplus` channelCurrentCookie channel of Just current -> do cookieEchoReceived gs conn mbpid current return $ Just (conn, Just packet) Nothing -> do gLog $ show addr <> ": missing cookie set, dropping " <> show header return $ Nothing False -> do atomically $ gLog $ show addr <> ": cookie verification failed, dropping " <> show header return Nothing -- Response to initiation packet | cookie:_ <- mapMaybe (\case CookieSet x -> Just x; _ -> Nothing) header , Just _ <- version , Right conn@Connection {..} <- econn -> do atomically $ readTVar cChannel >>= \case ChannelCookieWait -> do writeTVar cChannel $ ChannelCookieReceived cookie writeFlow gControlFlow (NewConnection conn mbpid) return $ Just (conn, Nothing) _ -> return Nothing -- Initiation packet | _:_ <- mapMaybe (\case Initiation x -> Just x; _ -> Nothing) header , Just ver <- version -> do cookie <- createCookie gs addr atomically $ do identity <- fst <$> readTVar gIdentity let reply = BL.toStrict $ serializeObject $ transportToObject gStorage $ TransportHeader [ CookieSet cookie , AnnounceSelf $ refDigest $ storedRef $ idData identity , ProtocolVersion ver ] writeFlow gDataFlow (addr, reply) return Nothing -- Announce packet outside any connection | dgst:_ <- mapMaybe (\case AnnounceSelf x -> Just x; _ -> Nothing) header , Just _ <- version -> do atomically $ do (cur, past) <- readTVar gIdentity when (not $ dgst `elem` map (refDigest . storedRef . idData) (cur : past)) $ do writeFlow gControlFlow $ ReceivedAnnounce addr dgst return Nothing | otherwise -> do atomically $ gLog $ show addr <> ": dropping packet " <> show header return Nothing where addr = either id cAddress econn mbpid = listToMaybe $ mapMaybe (\case AnnounceSelf dgst -> Just dgst; _ -> Nothing) header version = listToMaybe $ filter (\v -> ProtocolVersion v `elem` header) protocolVersions channelCurrentCookie :: ChannelState -> Maybe Cookie channelCurrentCookie = \case ChannelCookieReceived cookie -> Just cookie ChannelCookieConfirmed cookie -> Just cookie ChannelOurRequest mbcookie _ -> mbcookie ChannelPeerRequest mbcookie _ -> mbcookie ChannelOurAccept mbcookie _ _ -> mbcookie _ -> Nothing cookieEchoReceived :: GlobalState addr -> Connection addr -> Maybe RefDigest -> Cookie -> STM () cookieEchoReceived GlobalState {..} conn@Connection {..} mbpid cookieSet = do readTVar cChannel >>= \case ChannelNone -> newConn ChannelCookieWait -> newConn ChannelCookieReceived {} -> update _ -> return () where update = do writeTVar cChannel $ ChannelCookieConfirmed cookieSet newConn = do update writeFlow gControlFlow (NewConnection conn mbpid) generateCookieHeaders :: GlobalState addr -> addr -> ChannelState -> IO [TransportHeaderItem] generateCookieHeaders gs addr ch = catMaybes <$> sequence [ echoHeader, setHeader ] where echoHeader = return $ CookieEcho <$> channelCurrentCookie ch setHeader = case ch of ChannelCookieWait {} -> Just . CookieSet <$> createCookie gs addr ChannelCookieReceived {} -> Just . CookieSet <$> createCookie gs addr _ -> return Nothing createCookie :: GlobalState addr -> addr -> IO Cookie createCookie GlobalState {} addr = return (Cookie $ BC.pack $ show addr) verifyCookie :: GlobalState addr -> addr -> Cookie -> IO Bool verifyCookie GlobalState {} addr (Cookie cookie) = return $ show addr == BC.unpack cookie resendBytes :: GlobalState addr -> Connection addr -> SentPacket -> IO () resendBytes GlobalState {..} Connection {..} sp = do now <- getTime MonotonicRaw atomically $ do when (isJust $ spAckedBy sp) $ do modifyTVar' cSentPackets $ (:) sp { spTime = now , spRetryCount = spRetryCount sp + 1 } writeFlow gDataFlow (cAddress, spData sp) sendBytes :: GlobalState addr -> Connection addr -> ByteString -> Maybe (TransportHeaderItem -> Bool) -> IO () sendBytes gs conn bs ackedBy = resendBytes gs conn SentPacket { spTime = undefined , spRetryCount = -1 , spAckedBy = ackedBy , spData = bs } processOutgoing :: forall addr. GlobalState addr -> STM (IO ()) processOutgoing gs@GlobalState {..} = do let sendNextPacket :: Connection addr -> STM (IO ()) sendNextPacket conn@Connection {..} = do channel <- readTVar cChannel let mbch = case channel of ChannelEstablished ch -> Just ch _ -> Nothing let checkOutstanding | isJust mbch = readTQueue cSecureOutQueue | otherwise = retry checkAcknowledgements | isJust mbch = do acks <- readTVar cToAcknowledge if null acks then retry else return (True, TransportPacket (TransportHeader []) [], []) | otherwise = retry (secure, packet@(TransportPacket (TransportHeader hitems) content), plainAckedBy) <- checkOutstanding <|> readFlow cDataInternal <|> checkAcknowledgements when (isNothing mbch && secure) $ do writeTQueue cSecureOutQueue (secure, packet, plainAckedBy) acknowledge <- case mbch of Nothing -> return [] Just _ -> swapTVar cToAcknowledge [] return $ do cookieHeaders <- generateCookieHeaders gs cAddress channel let header = TransportHeader $ map AcknowledgedSingle acknowledge ++ cookieHeaders ++ hitems let plain = BL.concat $ (serializeObject $ transportToObject gStorage header) : map lazyLoadBytes content mbs <- case mbch of Just ch -> do runExceptT (channelEncrypt ch $ BL.toStrict $ 0x00 `BL.cons` plain) >>= \case Right (ctext, counter) -> do let isAcked = any isHeaderItemAcknowledged hitems return $ Just (0x80 `B.cons` ctext, if isAcked then [ AcknowledgedSingle $ fromIntegral counter ] else []) Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ err return Nothing Nothing | secure -> return Nothing | otherwise -> return $ Just (BL.toStrict plain, plainAckedBy) case mbs of Just (bs, ackedBy) -> sendBytes gs conn bs $ guard (not $ null ackedBy) >> Just (`elem` ackedBy) Nothing -> return () let retransmitPacket :: Connection addr -> STM (IO ()) retransmitPacket conn@Connection {..} = do now <- readTVar gNowVar (sp, rest) <- readTVar cSentPackets >>= \case sps@(_:_) -> return (last sps, init sps) _ -> retry let nextTry = spTime sp + fromNanoSecs 1000000000 if now < nextTry then do nextTimeout <- readTVar gNextTimeout if nextTimeout <= now || nextTry < nextTimeout then do writeTVar gNextTimeout nextTry return $ return () else retry else do writeTVar cSentPackets rest return $ resendBytes gs conn sp let handleControlRequests = readFlow gControlFlow >>= \case RequestConnection addr -> do conn@Connection {..} <- getConnection gs addr identity <- fst <$> readTVar gIdentity readTVar cChannel >>= \case ChannelNone -> do let packet = BL.toStrict $ BL.concat [ serializeObject $ transportToObject gStorage $ TransportHeader $ [ Initiation $ refDigest gInitConfig , AnnounceSelf $ refDigest $ storedRef $ idData identity ] ++ map ProtocolVersion protocolVersions , lazyLoadBytes gInitConfig ] writeTVar cChannel ChannelCookieWait return $ sendBytes gs conn packet $ Just $ \case CookieSet {} -> True; _ -> False _ -> return $ return () SendAnnounce addr -> do identity <- fst <$> readTVar gIdentity let packet = BL.toStrict $ serializeObject $ transportToObject gStorage $ TransportHeader $ [ AnnounceSelf $ refDigest $ storedRef $ idData identity ] ++ map ProtocolVersion protocolVersions writeFlow gDataFlow (addr, packet) return $ return () UpdateSelfIdentity nid -> do (cur, past) <- readTVar gIdentity writeTVar gIdentity (nid, cur : past) return $ return () conns <- readTVar gConnections msum $ concat $ [ map retransmitPacket conns , map sendNextPacket conns , [ handleControlRequests ] ] processAcknowledgements :: GlobalState addr -> Connection addr -> [TransportHeaderItem] -> STM () processAcknowledgements GlobalState {} Connection {..} = mapM_ $ \hitem -> do modifyTVar' cSentPackets $ filter $ \sp -> not (fromJust (spAckedBy sp) hitem)