{-# LANGUAGE StandaloneDeriving, ScopedTypeVariables #-} -- | Provides a generic TCP server, parameterized over the -- server's listening port and the types of messages received from -- and sent to clients. module Nettle.Servers.TCPServer ( tcpServer, ServerPortNumber, TCPMessageDriver (..), Peer , Process (..), readAll, writeAll, SockAddr ) where import Prelude hiding (catch) import Control.Monad import Control.Concurrent import Control.Exception import Network.Socket import System.IO import Data.Word type ServerPortNumber = Word16 -- | A Peer is a TCP peer. It consists of a @SockAddr@ value giving the -- the socket address of the peer, as well as a @Process@ value which provides methods to access -- messages received from the peer and send messages to a peer. type Peer a b = (SockAddr, Process a b IOException) deriving instance Ord SockAddr {- data SockAddr = SockAddrInet PortNumber HostAddress | SockAddrInet6 PortNumber FlowInfo HostAddress6 ScopeID | SockAddrUnix String newtype PortNumber = PortNum Word16 deriving ( Eq, Ord ) type HostAddress = Word32 -} -- | A @Process a b c@ represents a process with an API that -- allows another IO computation to observe its outputs (of type @a@), to -- supply it with inputs (of type @b@), and to observe when it terminates. data Process a b c = Process { readP :: IO a, -- ^ interact with the process by receiving one of its outputs; should block until the process emits a value. tellP :: b -> IO (), -- ^ interact with the process by sending it an input; should be non-blocking. whenDeadP :: IO c -- ^ should block until the process terminates, carrying an output value of type @c@. } -- | Read all of the output from the process as a lazy stream. readAll :: Process a b c -> IO [a] readAll p = do ch <- newChan forkIO $ forever (readP p >>= writeChan ch) as <- getChanContents ch return as -- | Write a list to the process; does not return until every element of the list has been sent to the process. writeAll :: Process a b c -> [b] -> IO () writeAll p bs = sequence_ [tellP p b | b <- bs] -- | A @TCPMessageDriver a b@ is used by the @tcpServer@ to read messages of type @a@ from the underlying TCP sockets with -- peers as well as write messages of type @b@ to the TCP sockets. data TCPMessageDriver a b = TCPMessageDriver { getMessage :: Handle -> IO (Maybe a) -- ^ Method to read a value from the handle. Returns @Nothing@ if the read failed. , putMessage :: b -> Handle -> IO () -- ^ Method to write a value to a handle. } -- | tcpServer starts a TCP server at the given port number, waiting for new connections. -- Whenever a new connection is established, new threads are created for reading and writing -- to the thread, using the given tcp message driver. The socket address of the other side of -- the connection, along with a pair of channels for the incoming messages and outgoing messages -- is placed on the result channel. This method returns immediately. tcpServer :: (Show a) => ServerPortNumber -> TCPMessageDriver a b -> IO ([Peer a b]) tcpServer pstring driver = do addrinfos <- getAddrInfo (Just (defaultHints {addrFlags = [AI_PASSIVE]})) Nothing (Just $ show pstring) let serveraddr = head addrinfos sock <- socket (addrFamily serveraddr) Stream defaultProtocol setSocketOption sock ReuseAddr 1 bindSocket sock (addrAddress serveraddr) listen sock queueLength resultChan <- newChan forkIO $ finally (procRequests sock resultChan) (sClose sock) getChanContents resultChan where queueLength = maxListenQueue procRequests masterSock resultChan = forever acceptConnection where acceptConnection = do (connsock, clientaddr) <- accept masterSock connhdl <- socketToHandle connsock ReadWriteMode hSetBuffering connhdl (BlockBuffering Nothing) deadVar <- newEmptyMVar let readMessage = do msg <- getMessage driver connhdl case msg of Nothing -> fail "read returned nothing" Just msg' -> return msg' writeMessage outMsg = do putMessage driver outMsg connhdl hFlush connhdl peerProcess = Process { readP = readMessage , tellP = writeMessage, whenDeadP = readMVar deadVar } writeChan resultChan (clientaddr, peerProcess)