-- | This module provides a TCP server that multiplexes incoming and outgoing messages -- from many connected peers onto a single pair of input and output channels. The socket address -- of the peer is used to identify the source and destination of messages. -- -- This interface introduces a new error condition: that a message on the outgoing channel has a -- socket address for which no socket exists. This may occur because of incorrect usage of this library, -- or because a peer disconnected after the client placed a message on the outgoing channel, -- but before that message was sent. Currently, the server does not notify its caller of the occurrence of this error. module Nettle.Servers.MultiplexedTCPServer ( muxedTCPServer, MultiplexedProcess, TCPMessage(..) ) where import Prelude hiding (interact, catch) import Nettle.Servers.TwoWayChannel import Nettle.Servers.TCPServer import Control.Concurrent import Control.Exception import Control.Monad import qualified Data.Map as Map import Data.Binary -- | A multiplexed process has inputs and outputs that are tagged with the @SockAddr@ of the -- sending or receiving peer, and carries connection start and connection end events. type MultiplexedProcess a b = Process (TCPMessage a) (SockAddr, b) IOException -- | The type of externally visible events that may occur for the multiplexed TCP server. data TCPMessage a = ConnectionEstablished SockAddr -- ^ A connection to a peer with the given address is established. | ConnectionTerminated SockAddr IOException -- ^ A connection with the given address is terminated, due to the given exception. | PeerMessage SockAddr a -- ^ A message of type @a@ has been received from the peer with the given address. deriving (Show,Eq) -- | Runs a TCP server returning a process that outputs messages of type @a@ from connected peers, tagged with -- their @SockAddr@, and accepts messages of type @b@ for peers, again tagged with their @SockAddr@. muxedTCPServer :: Show a => ServerPortNumber -> TCPMessageDriver a b -> IO (MultiplexedProcess a b) muxedTCPServer pstring driver = do resultCh <- newChan2 peers <- tcpServer pstring driver addrToChanMapVar <- newMVar Map.empty forkIO $ sequence_ [ forkIO (multiplex addrToChanMapVar resultCh addr process) | (addr,process) <- peers ] forkIO $ demultiplex addrToChanMapVar resultCh let ch' = theOtherEnd2 resultCh d <- newEmptyMVar return (Process { readP = readChan2 ch', tellP = writeChan2 ch', whenDeadP = readMVar d }) where multiplex addrToChanMapVar resultCh addr process = do modifyMVar_ addrToChanMapVar (return . Map.insert addr process) writeChan2 resultCh (ConnectionEstablished addr) catch (forever (readP process >>= writeChan2 resultCh . PeerMessage addr)) (\e -> modifyMVar_ addrToChanMapVar (return . Map.delete addr) >> writeChan2 resultCh (ConnectionTerminated addr e)) demultiplex addrToChanMapVar resultCh = forever (readChan2 resultCh >>= \(addr, msg) -> withMVar addrToChanMapVar (lookupAndSend addr msg)) where lookupAndSend addr msg addrToChanMap = case Map.lookup addr addrToChanMap of Nothing -> return () -- The message could not be sent because the peer has disconnected. Just process -> tellP process msg