module System.Network.ZMQ.MDP.Worker where
import Data.ByteString.Char8
import Data.Int
import Prelude hiding (putStr, putStrLn)
import qualified Prelude
import qualified System.ZMQ as Z
import System.ZMQ hiding(receive)
import Control.Applicative
import Control.Exception
import Control.Monad
import Data.Time.Clock
import Data.Time.Format
import System.Locale
import Control.Concurrent
import System.Timeout
import System.Network.ZMQ.MDP.Util
data Protocol = WORKER_PROTOCOL
instance Show Protocol where
show WORKER_PROTOCOL = "MDPW01"
type Address = ByteString
data Response = Response { envelope :: ! [Address],
body :: ! [ByteString] }
data ResponseCode = REPLY | READY | WORKER_HEARTBEAT
instance Show ResponseCode where
show REPLY = "\003"
show READY = "\001"
show WORKER_HEARTBEAT = "\004"
data CommandCode = REQUEST | HEARTBEAT | DISCONNECT
instance Show CommandCode where
show REQUEST = "\002"
show HEARTBEAT = "\004"
show DISCONNECT = "\005"
parseCommand :: ByteString -> Maybe CommandCode
parseCommand "\002" = Just REQUEST
parseCommand "\004" = Just HEARTBEAT
parseCommand "\005" = Just DISCONNECT
parseCommand _ = Nothing
type MDError = ByteString
sendToBroker :: Socket a -> ResponseCode -> [ByteString] ->
[ByteString] -> IO ()
sendToBroker sock cmd option message =
sendAll sock $ ["",
pack $ show WORKER_PROTOCOL,
pack $ show cmd] ++ option ++ message
sendResponse :: Socket a -> Response -> IO ()
sendResponse sock resp = sendToBroker sock REPLY (envelope resp) (body resp)
whileJust :: Monad m => (b -> m (Maybe b)) -> b -> m b
whileJust action seed = action seed >>= maybe (return seed) (whileJust action)
start :: WorkerState a -> IO ()
start worker = forever (withBroker readTillDrop worker)
readTillDrop :: Socket a -> WorkerState a1 -> IO (WorkerState a1)
readTillDrop sock worker = whileJust (receive sock) worker
data WorkerState a = WorkerState { heartbeat_at :: ! UTCTime,
liveness :: ! Int,
heartbeat :: ! Int64,
reconnect :: ! Int,
broker :: String,
context :: System.ZMQ.Context,
svc :: ByteString,
handler :: [ByteString] -> IO [ByteString]
}
epoch :: UTCTime
epoch = buildTime defaultTimeLocale []
lIVENESS :: Int
lIVENESS = 3
withWorker :: String -> ByteString -> ([ByteString] -> IO [ByteString]) -> IO ()
withWorker broker_ service_ io =
withContext 1 $ \c ->
start WorkerState { broker = broker_,
context = c,
svc = service_,
handler = io,
liveness = 1,
heartbeat_at = epoch,
heartbeat = 2,
reconnect = 2
}
withBroker :: (Socket XReq -> WorkerState a -> IO b) -> WorkerState t -> IO b
withBroker go worker =
withSocket (context worker) XReq $ \sock -> do
loggedPut ( "connecting to broker " ++ broker worker)
connect sock (broker worker)
sendToBroker sock READY [svc worker] []
now <- getCurrentTime
let time = addUTCTime (fromIntegral $ heartbeat worker) now
loggedPut ("beat at:" ++ show time)
go sock worker { liveness = lIVENESS,
heartbeat_at = time
}
loggedPut :: String -> IO ()
loggedPut _res = return ()
receive :: Socket a -> WorkerState a1 -> IO (Maybe (WorkerState a2))
receive sock worker = do loggedPut "polling"
next <- getMessage
case next of
Nothing -> loggedPut "no message" >> return Nothing
Just w -> loggedPut "message!" >> postCheck w
where
getMessage = do
[S _ polled] <- poll [S sock In] $ 1000000 * heartbeat worker
case polled of
None -> noMessage
In -> Z.receive sock [] >>= handleEvent
noMessage :: IO (Maybe (WorkerState b))
noMessage = do
let live = liveness worker 1
if liveness worker == 0
then loggedPut "reconnecting" >> threadDelay (1000000 * reconnect worker) >> return Nothing
else return $ Just worker { liveness = live }
postCheck :: WorkerState a -> IO (Maybe (WorkerState a))
postCheck worker = do
time <- getCurrentTime
if time > heartbeat_at worker
then do
sendToBroker sock WORKER_HEARTBEAT [] []
return $ Just $ updateWorkerTime worker time
else return $ Just worker
updateWorkerTime w time =
w { heartbeat_at = addUTCTime (fromIntegral $! heartbeat w) time}
handleEvent header = do
let zrecv = Z.receive sock []
assert (header == "") (return ())
prot <- zrecv
assert (prot == "MDPW01") (return ())
command <- parseCommand <$> zrecv
let new_worker = worker { liveness = lIVENESS }
case command of
Just REQUEST -> do
addresses <- receiveUntilEmpty sock
msgs <- receiveUntilEnd sock
replyString <- handler worker msgs
sendResponse sock Response { envelope = addresses,
body = replyString }
return $ Just new_worker
Just HEARTBEAT -> do
return $ Just new_worker
Just DISCONNECT -> do
return Nothing
Nothing -> error "borked"