module System.ZMQ (
Size,
Context,
Socket,
Flag(..),
SocketOption(..),
Poll(..),
PollEvent(..),
P2P(..),
Pub(..),
Sub(..),
Req(..),
Rep(..),
XReq(..),
XRep(..),
Up(..),
Down(..),
init,
term,
socket,
close,
setOption,
System.ZMQ.subscribe,
System.ZMQ.unsubscribe,
bind,
connect,
send,
flush,
receive,
poll
) where
import Prelude hiding (init)
import Control.Applicative
import Control.Exception
import Data.Int
import Data.Maybe
import System.ZMQ.Base
import qualified System.ZMQ.Base as B
import Foreign
import Foreign.C.Error
import Foreign.C.String
import Foreign.C.Types (CInt, CShort)
import qualified Data.ByteString as SB
import qualified Data.ByteString.Unsafe as UB
import System.Posix.Types (Fd(..))
newtype Context = Context { ctx :: ZMQCtx }
newtype Socket a = Socket { sock :: ZMQSocket }
newtype Message = Message { msgPtr :: ZMQMsgPtr }
type Timeout = Int64
type Size = Word
class SType a where
zmqSocketType :: a -> ZMQSocketType
data P2P = P2P
instance SType P2P where
zmqSocketType = const p2p
data Pub = Pub
instance SType Pub where
zmqSocketType = const pub
data Sub = Sub
instance SType Sub where
zmqSocketType = const sub
data Req = Req
instance SType Req where
zmqSocketType = const request
data Rep = Rep
instance SType Rep where
zmqSocketType = const response
data XReq = Xreq
instance SType XReq where
zmqSocketType = const xrequest
data XRep = Xrep
instance SType XRep where
zmqSocketType = const xresponse
data Up = Up
instance SType Up where
zmqSocketType = const upstream
data Down = Down
instance SType Down where
zmqSocketType = const downstream
class SubsType a
instance SubsType Sub
data SocketOption =
HighWM Int64
| LowWM Int64
| Swap Int64
| Affinity Int64
| Identity String
| Rate Word64
| RecoveryIVL Word64
| McastLoop Word64
| SendBuf Word64
| ReceiveBuf Word64
deriving (Eq, Ord, Show)
data Flag =
NoBlock
| NoFlush
deriving (Eq, Ord, Show)
data PollEvent =
In
| Out
| InOut
deriving (Eq, Ord, Show)
data Poll =
forall a. S (Socket a) PollEvent
| F Fd PollEvent
init :: Size -> Size -> Bool -> IO Context
init appThreads ioThreads doPoll = do
c <- throwErrnoIfNull "init" $
c_zmq_init (fromIntegral appThreads)
(fromIntegral ioThreads)
(if doPoll then usePoll else 0)
return (Context c)
term :: Context -> IO ()
term = throwErrnoIfMinus1_ "term" . c_zmq_term . ctx
socket :: SType a => Context -> a -> IO (Socket a)
socket (Context c) t =
let zt = typeVal . zmqSocketType $ t
in Socket <$> throwErrnoIfNull "socket" (c_zmq_socket c zt)
close :: Socket a -> IO ()
close = throwErrnoIfMinus1_ "close" . c_zmq_close . sock
setOption :: Socket a -> SocketOption -> IO ()
setOption s (HighWM o) = setIntOpt s highWM o
setOption s (LowWM o) = setIntOpt s lowWM o
setOption s (Swap o) = setIntOpt s swap o
setOption s (Affinity o) = setIntOpt s affinity o
setOption s (Identity o) = setStrOpt s identity o
setOption s (Rate o) = setIntOpt s rate o
setOption s (RecoveryIVL o) = setIntOpt s recoveryIVL o
setOption s (McastLoop o) = setIntOpt s mcastLoop o
setOption s (SendBuf o) = setIntOpt s sendBuf o
setOption s (ReceiveBuf o) = setIntOpt s receiveBuf o
subscribe :: SubsType a => Socket a -> String -> IO ()
subscribe s = setStrOpt s B.subscribe
unsubscribe :: SubsType a => Socket a -> String -> IO ()
unsubscribe s = setStrOpt s B.unsubscribe
bind :: Socket a -> String -> IO ()
bind (Socket s) str = throwErrnoIfMinus1_ "bind" $
withCString str (c_zmq_bind s)
connect :: Socket a -> String -> IO ()
connect (Socket s) str = throwErrnoIfMinus1_ "connect" $
withCString str (c_zmq_connect s)
send :: Socket a -> SB.ByteString -> [Flag] -> IO ()
send (Socket s) val fls = bracket (messageOf val) messageClose $ \m ->
throwErrnoIfMinus1_ "send" $ c_zmq_send s (msgPtr m) (combine fls)
flush :: Socket a -> IO ()
flush = throwErrnoIfMinus1_ "flush" . c_zmq_flush . sock
receive :: Socket a -> [Flag] -> IO (SB.ByteString)
receive (Socket s) fls = bracket messageInit messageClose $ \m -> do
throwErrnoIfMinus1_ "receive" $ c_zmq_recv s (msgPtr m) (combine fls)
data_ptr <- c_zmq_msg_data (msgPtr m)
size <- c_zmq_msg_size (msgPtr m)
SB.packCStringLen (data_ptr, fromIntegral size)
poll :: [Poll] -> Timeout -> IO [Poll]
poll fds to = do
let len = length fds
ps = map createZMQPoll fds
withArray ps $ \ptr -> do
throwErrnoIfMinus1_ "poll" $
c_zmq_poll ptr (fromIntegral len) (fromIntegral to)
ps' <- peekArray len ptr
createPoll ps' []
where
createZMQPoll :: Poll -> ZMQPoll
createZMQPoll (S (Socket s) e) =
ZMQPoll s 0 (fromEvent e) 0
createZMQPoll (F (Fd s) e) =
ZMQPoll nullPtr (fromIntegral s) (fromEvent e) 0
createPoll :: [ZMQPoll] -> [Poll] -> IO [Poll]
createPoll [] fd = return fd
createPoll (p:pp) fd = do
let s = pSocket p;
f = pFd p;
r = toEvent $ pRevents p
if isJust r
then createPoll pp (newPoll s f r:fd)
else createPoll pp fd
newPoll :: ZMQSocket -> CInt -> Maybe PollEvent -> Poll
newPoll s 0 r = S (Socket s) (fromJust r)
newPoll _ f r = F (Fd f) (fromJust r)
fromEvent :: PollEvent -> CShort
fromEvent In = fromIntegral . pollVal $ pollIn
fromEvent Out = fromIntegral . pollVal $ pollOut
fromEvent InOut = fromIntegral . pollVal $ pollInOut
toEvent :: CShort -> Maybe PollEvent
toEvent e | e == (fromIntegral . pollVal $ pollIn) = Just In
| e == (fromIntegral . pollVal $ pollOut) = Just Out
| e == (fromIntegral . pollVal $ pollInOut) = Just InOut
| otherwise = Nothing
messageOf :: SB.ByteString -> IO Message
messageOf b = UB.unsafeUseAsCStringLen b $ \(cstr, len) -> do
msg <- messageInitSize (fromIntegral len)
data_ptr <- c_zmq_msg_data (msgPtr msg)
copyBytes data_ptr cstr len
return msg
messageClose :: Message -> IO ()
messageClose (Message ptr) = do
throwErrnoIfMinus1_ "messageClose" $ c_zmq_msg_close ptr
free ptr
messageInit :: IO Message
messageInit = do
ptr <- new (ZMQMsg nullPtr)
throwErrnoIfMinus1_ "messageInit" $ c_zmq_msg_init ptr
return (Message ptr)
messageInitSize :: Size -> IO Message
messageInitSize s = do
ptr <- new (ZMQMsg nullPtr)
throwErrnoIfMinus1_ "messageInitSize" $
c_zmq_msg_init_size ptr (fromIntegral s)
return (Message ptr)
setIntOpt :: (Storable b, Integral b) => Socket a -> ZMQOption -> b -> IO ()
setIntOpt (Socket s) (ZMQOption o) i = throwErrnoIfMinus1_ "setIntOpt" $
bracket (newStablePtr i) freeStablePtr $ \ptr ->
c_zmq_setsockopt s (fromIntegral o)
(castStablePtrToPtr ptr)
(fromIntegral . sizeOf $ i)
setStrOpt :: Socket a -> ZMQOption -> String -> IO ()
setStrOpt (Socket s) (ZMQOption o) str = throwErrnoIfMinus1_ "setStrOpt" $
withCStringLen str $ \(cstr, len) ->
c_zmq_setsockopt s (fromIntegral o) (castPtr cstr) (fromIntegral len)
toZMQFlag :: Flag -> ZMQFlag
toZMQFlag NoBlock = noBlock
toZMQFlag NoFlush = noFlush
combine :: [Flag] -> CInt
combine = fromIntegral . foldr ((.|.) . flagVal . toZMQFlag) 0