module Network.Server.ScalableServer (
runServer,
RequestPipeline(..), RequestCreator,
RequestProcessor) where
import Network.Socket
import qualified Network.Socket.ByteString as BinSock
import Network.BSD
import Control.Exception (finally)
import Control.Monad (forever, liftM, replicateM, void)
import Control.Monad.Trans (liftIO)
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.STM (newTChanIO, writeTChan, readTChan, atomically, TChan)
import Data.Enumerator (($$), run_)
import qualified Data.Enumerator as E
import Data.Enumerator.Binary (enumHandle)
import Data.Attoparsec.Enumerator (iterParser)
import System.IO (hSetBuffering, hClose, BufferMode(..),
IOMode(..), Handle)
import Blaze.ByteString.Builder (toByteStringIO, Builder)
import Data.Enumerator (yield, continue, Iteratee, Stream(..))
import qualified Data.Attoparsec as Atto
import qualified Data.Attoparsec.Char8 as AttoC
import qualified Data.ByteString.Char8 as S
import qualified Data.ByteString as WS
import qualified Data.ByteString.Lazy.Char8 as B
data RequestPipeline a = RequestPipeline (RequestCreator a) (RequestProcessor a)
type RequestCreator a = Atto.Parser a
type RequestProcessor a = a -> IO Builder
type RequestChannel a = TChan (Maybe a)
runServer :: RequestPipeline a -> PortNumber -> IO ()
runServer pipe port = do
proto <- getProtocolNumber "tcp"
s <- socket AF_INET Stream proto
setSocketOption s ReuseAddr 1
bindSocket s (SockAddrInet port iNADDR_ANY)
serverListenLoop pipe s
serverListenLoop :: RequestPipeline a -> Socket -> IO ()
serverListenLoop pipe s = do
listen s 100
forever $ do
(c, _) <- accept s
h <- socketToHandle c ReadMode
hSetBuffering h NoBuffering
forkIO $ connHandler pipe c h
connHandler :: RequestPipeline a -> Socket -> Handle -> IO ()
connHandler (RequestPipeline reqParse reqProc) s h = do
chan <- newTChanIO
(do
let enum = enumHandle 32768 h
let parser = iterParser reqParse
void $ forkIO $ processRequests chan reqProc s
void $ run_ (enum $$ E.sequence parser $$ requestHandler chan s)
) `finally` ( (atomically $ writeTChan chan Nothing) >> hClose h )
requestHandler :: RequestChannel a -> Socket -> Iteratee a IO ()
requestHandler chan s = do
continue requestConsume
where
requestConsume (Chunks mrs) = do
liftIO $ mapM_ (\m -> atomically $ writeTChan chan $ Just m) mrs
continue requestConsume
requestConsume EOF = do
yield () EOF
processRequests :: RequestChannel a -> RequestProcessor a -> Socket -> IO ()
processRequests chan proc s = do
next <- atomically $ readTChan chan
case next of
Just a -> do
resp <- proc a
toByteStringIO (BinSock.sendAll s) $ resp
processRequests chan proc s
Nothing -> return ()