Copyright | (c) 2010-2013 Toralf Wittner |
---|---|
License | MIT |
Maintainer | Toralf Wittner <tw@dtex.org> |
Stability | experimental |
Portability | non-portable |
Safe Haskell | None |
Language | Haskell98 |
0MQ haskell binding. The API closely follows the C-API of 0MQ with the main difference being that sockets are typed.
Notes
Many option settings use a Restriction
to further constrain the
range of possible values of their integral types. For example
the maximum message size can be given as -1, which means no limit
or by greater values, which denote the message size in bytes. The
type of setMaxMessageSize
is therefore:
setMaxMessageSize :: Integral i => Restricted (Nneg1, Int64) i -> Socket a -> IO ()
which means any integral value in the range of -1
to
(maxBound :: Int64
) can be given. To create a restricted
value from plain value, use toRestricted
or restrict
.
- data Pair = Pair
- data Pub = Pub
- data Sub = Sub
- data XPub = XPub
- data XSub = XSub
- data Req = Req
- data Rep = Rep
- data Dealer = Dealer
- data Router = Router
- type XReq = Dealer
- type XRep = Router
- data Pull = Pull
- data Push = Push
- data Stream = Stream
- class SocketType a
- class Sender a
- class Receiver a
- class Subscriber a
- class SocketLike s
- class Conflatable a
- class SendProbe a
- type Size = Word
- data Context
- data Socket a
- data Flag
- data Switch
- type Timeout = Int64
- data Event
- data EventType
- data EventMsg
- = Connected !ByteString !Fd
- | ConnectDelayed !ByteString
- | ConnectRetried !ByteString !Int
- | Listening !ByteString !Fd
- | BindFailed !ByteString !Int
- | Accepted !ByteString !Fd
- | AcceptFailed !ByteString !Int
- | Closed !ByteString !Fd
- | CloseFailed !ByteString !Int
- | Disconnected !ByteString !Fd
- | MonitorStopped !ByteString !Int
- data Poll s m where
- data KeyFormat a where
- data SecurityMechanism
- withContext :: (Context -> IO a) -> IO a
- withSocket :: SocketType a => Context -> a -> (Socket a -> IO b) -> IO b
- bind :: Socket a -> String -> IO ()
- unbind :: Socket a -> String -> IO ()
- connect :: Socket a -> String -> IO ()
- disconnect :: Socket a -> String -> IO ()
- send :: Sender a => Socket a -> [Flag] -> ByteString -> IO ()
- send' :: Sender a => Socket a -> [Flag] -> ByteString -> IO ()
- sendMulti :: Sender a => Socket a -> NonEmpty ByteString -> IO ()
- receive :: Receiver a => Socket a -> IO ByteString
- receiveMulti :: Receiver a => Socket a -> IO [ByteString]
- version :: IO (Int, Int, Int)
- monitor :: [EventType] -> Context -> Socket a -> IO (Bool -> IO (Maybe EventMsg))
- socketMonitor :: [EventType] -> String -> Socket a -> IO ()
- poll :: (SocketLike s, MonadIO m) => Timeout -> [Poll s m] -> m [[Event]]
- subscribe :: Subscriber a => Socket a -> ByteString -> IO ()
- unsubscribe :: Subscriber a => Socket a -> ByteString -> IO ()
- ioThreads :: Context -> IO Word
- maxSockets :: Context -> IO Word
- setIoThreads :: Word -> Context -> IO ()
- setMaxSockets :: Word -> Context -> IO ()
- affinity :: Socket a -> IO Word64
- backlog :: Socket a -> IO Int
- conflate :: Conflatable a => Socket a -> IO Bool
- curvePublicKey :: KeyFormat f -> Socket a -> IO ByteString
- curveSecretKey :: KeyFormat f -> Socket a -> IO ByteString
- curveServerKey :: KeyFormat f -> Socket a -> IO ByteString
- delayAttachOnConnect :: Socket a -> IO Bool
- events :: Socket a -> IO [Event]
- fileDescriptor :: Socket a -> IO Fd
- identity :: Socket a -> IO ByteString
- immediate :: Socket a -> IO Bool
- ipv4Only :: Socket a -> IO Bool
- ipv6 :: Socket a -> IO Bool
- lastEndpoint :: Socket a -> IO String
- linger :: Socket a -> IO Int
- maxMessageSize :: Socket a -> IO Int64
- mcastHops :: Socket a -> IO Int
- mechanism :: Socket a -> IO SecurityMechanism
- moreToReceive :: Socket a -> IO Bool
- plainServer :: Socket a -> IO Bool
- plainPassword :: Socket a -> IO ByteString
- plainUserName :: Socket a -> IO ByteString
- rate :: Socket a -> IO Int
- receiveBuffer :: Socket a -> IO Int
- receiveHighWM :: Socket a -> IO Int
- receiveTimeout :: Socket a -> IO Int
- reconnectInterval :: Socket a -> IO Int
- reconnectIntervalMax :: Socket a -> IO Int
- recoveryInterval :: Socket a -> IO Int
- sendBuffer :: Socket a -> IO Int
- sendHighWM :: Socket a -> IO Int
- sendTimeout :: Socket a -> IO Int
- tcpKeepAlive :: Socket a -> IO Switch
- tcpKeepAliveCount :: Socket a -> IO Int
- tcpKeepAliveIdle :: Socket a -> IO Int
- tcpKeepAliveInterval :: Socket a -> IO Int
- zapDomain :: Socket a -> IO ByteString
- setAffinity :: Word64 -> Socket a -> IO ()
- setBacklog :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO ()
- setConflate :: Conflatable a => Bool -> Socket a -> IO ()
- setCurveServer :: Bool -> Socket a -> IO ()
- setCurvePublicKey :: KeyFormat f -> Restricted f ByteString -> Socket a -> IO ()
- setCurveSecretKey :: KeyFormat f -> Restricted f ByteString -> Socket a -> IO ()
- setCurveServerKey :: KeyFormat f -> Restricted f ByteString -> Socket a -> IO ()
- setDelayAttachOnConnect :: Bool -> Socket a -> IO ()
- setIdentity :: Restricted (N1, N254) ByteString -> Socket a -> IO ()
- setImmediate :: Bool -> Socket a -> IO ()
- setIpv4Only :: Bool -> Socket a -> IO ()
- setIpv6 :: Bool -> Socket a -> IO ()
- setLinger :: Integral i => Restricted (Nneg1, Int32) i -> Socket a -> IO ()
- setMaxMessageSize :: Integral i => Restricted (Nneg1, Int64) i -> Socket a -> IO ()
- setMcastHops :: Integral i => Restricted (N1, Int32) i -> Socket a -> IO ()
- setPlainServer :: Bool -> Socket a -> IO ()
- setPlainPassword :: Restricted (N1, N254) ByteString -> Socket a -> IO ()
- setPlainUserName :: Restricted (N1, N254) ByteString -> Socket a -> IO ()
- setProbeRouter :: SendProbe a => Bool -> Socket a -> IO ()
- setRate :: Integral i => Restricted (N1, Int32) i -> Socket a -> IO ()
- setReceiveBuffer :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO ()
- setReceiveHighWM :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO ()
- setReceiveTimeout :: Integral i => Restricted (Nneg1, Int32) i -> Socket a -> IO ()
- setReconnectInterval :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO ()
- setReconnectIntervalMax :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO ()
- setRecoveryInterval :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO ()
- setReqCorrelate :: Bool -> Socket Req -> IO ()
- setReqRelaxed :: Bool -> Socket Req -> IO ()
- setRouterMandatory :: Bool -> Socket Router -> IO ()
- setSendBuffer :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO ()
- setSendHighWM :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO ()
- setSendTimeout :: Integral i => Restricted (Nneg1, Int32) i -> Socket a -> IO ()
- setTcpAcceptFilter :: Maybe ByteString -> Socket a -> IO ()
- setTcpKeepAlive :: Switch -> Socket a -> IO ()
- setTcpKeepAliveCount :: Integral i => Restricted (Nneg1, Int32) i -> Socket a -> IO ()
- setTcpKeepAliveIdle :: Integral i => Restricted (Nneg1, Int32) i -> Socket a -> IO ()
- setTcpKeepAliveInterval :: Integral i => Restricted (Nneg1, Int32) i -> Socket a -> IO ()
- setXPubVerbose :: Bool -> Socket XPub -> IO ()
- restrict :: Restriction r v => v -> Restricted r v
- toRestricted :: Restriction r v => v -> Maybe (Restricted r v)
- data ZMQError
- errno :: ZMQError -> Int
- source :: ZMQError -> String
- message :: ZMQError -> String
- init :: Size -> IO Context
- term :: Context -> IO ()
- shutdown :: Context -> IO ()
- context :: IO Context
- socket :: SocketType a => Context -> a -> IO (Socket a)
- close :: Socket a -> IO ()
- waitRead :: Socket a -> IO ()
- waitWrite :: Socket a -> IO ()
- z85Encode :: MonadIO m => Restricted Div4 ByteString -> m ByteString
- z85Decode :: MonadIO m => Restricted Div5 ByteString -> m ByteString
- proxy :: Socket a -> Socket b -> Maybe (Socket c) -> IO ()
- curveKeyPair :: MonadIO m => m (Restricted Div5 ByteString, Restricted Div5 ByteString)
Type Definitions
Socket Types
Socket type-classes
class SocketType a Source #
Socket types.
Sockets which can send
.
Sockets which can receive
.
class Conflatable a Source #
Sockets which can be conflate
d.
Sockets which can send probes (cf. setProbeRouter
).
Various type definitions
Flags to apply on send operations (cf. man zmq_send)
Configuration switch
Socket events.
Event types to monitor.
Event Message to receive when monitoring socket events.
data SecurityMechanism Source #
General Operations
withContext :: (Context -> IO a) -> IO a Source #
Run an action with a 0MQ context. The Context
supplied to your
action will not be valid after the action either returns or
throws an exception.
withSocket :: SocketType a => Context -> a -> (Socket a -> IO b) -> IO b Source #
Run an action with a 0MQ socket. The socket will be closed after running the supplied action even if an error occurs. The socket supplied to your action will not be valid after the action terminates.
unbind :: Socket a -> String -> IO () Source #
Unbind the socket from the given address (cf. zmq_unbind).
connect :: Socket a -> String -> IO () Source #
Connect the socket to the given address (cf. zmq_connect).
disconnect :: Socket a -> String -> IO () Source #
Disconnect the socket from the given endpoint (cf. zmq_disconnect).
send :: Sender a => Socket a -> [Flag] -> ByteString -> IO () Source #
Send the given ByteString
over the socket
(cf. zmq_sendmsg).
Note: This function always calls zmq_sendmsg
in a non-blocking way,
i.e. there is no need to provide the ZMQ_DONTWAIT
flag as this is used
by default. Still send
is blocking the thread as long as the message
can not be queued on the socket using GHC's threadWaitWrite
.
send' :: Sender a => Socket a -> [Flag] -> ByteString -> IO () Source #
Send the given ByteString
over the socket
(cf. zmq_sendmsg).
This is operationally identical to send socket (Strict.concat
(Lazy.toChunks lbs)) flags
but may be more efficient.
Note: This function always calls zmq_sendmsg
in a non-blocking way,
i.e. there is no need to provide the ZMQ_DONTWAIT
flag as this is used
by default. Still send'
is blocking the thread as long as the message
can not be queued on the socket using GHC's threadWaitWrite
.
sendMulti :: Sender a => Socket a -> NonEmpty ByteString -> IO () Source #
Send a multi-part message.
This function applies the SendMore
Flag
between all message parts.
0MQ guarantees atomic delivery of a multi-part message
(cf. zmq_sendmsg).
receive :: Receiver a => Socket a -> IO ByteString Source #
Receive a ByteString
from socket
(cf. zmq_recvmsg).
Note: This function always calls zmq_recvmsg
in a non-blocking way,
i.e. there is no need to provide the ZMQ_DONTWAIT
flag as this is used
by default. Still receive
is blocking the thread as long as no data
is available using GHC's threadWaitRead
.
receiveMulti :: Receiver a => Socket a -> IO [ByteString] Source #
Receive a multi-part message.
This function collects all message parts send via sendMulti
.
version :: IO (Int, Int, Int) Source #
Return the runtime version of the underlying 0MQ library as a (major, minor, patch) triple.
monitor :: [EventType] -> Context -> Socket a -> IO (Bool -> IO (Maybe EventMsg)) Source #
Monitor socket events (cf. zmq_socket_monitor).
This function returns a function which can be invoked to retrieve
the next socket event, potentially blocking until the next one becomes
available. When applied to False
, monitoring will terminate, i.e.
internal monitoring resources will be disposed. Consequently after
monitor
has been invoked, the returned function must be applied
once to False
.
subscribe :: Subscriber a => Socket a -> ByteString -> IO () Source #
Subscribe Socket to given subscription.
unsubscribe :: Subscriber a => Socket a -> ByteString -> IO () Source #
Unsubscribe Socket from given subscription.
Context Options (Read)
Context Options (Write)
Socket Options (Read)
conflate :: Conflatable a => Socket a -> IO Bool Source #
Restricts the outgoing and incoming socket buffers to a single message.
curvePublicKey :: KeyFormat f -> Socket a -> IO ByteString Source #
curveSecretKey :: KeyFormat f -> Socket a -> IO ByteString Source #
curveServerKey :: KeyFormat f -> Socket a -> IO ByteString Source #
Socket Options (Write)
setBacklog :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO () Source #
setConflate :: Conflatable a => Bool -> Socket a -> IO () Source #
Restrict the outgoing and incoming socket buffers to a single message.
setCurvePublicKey :: KeyFormat f -> Restricted f ByteString -> Socket a -> IO () Source #
setCurveSecretKey :: KeyFormat f -> Restricted f ByteString -> Socket a -> IO () Source #
setCurveServerKey :: KeyFormat f -> Restricted f ByteString -> Socket a -> IO () Source #
setIdentity :: Restricted (N1, N254) ByteString -> Socket a -> IO () Source #
setMaxMessageSize :: Integral i => Restricted (Nneg1, Int64) i -> Socket a -> IO () Source #
setMcastHops :: Integral i => Restricted (N1, Int32) i -> Socket a -> IO () Source #
setPlainPassword :: Restricted (N1, N254) ByteString -> Socket a -> IO () Source #
setPlainUserName :: Restricted (N1, N254) ByteString -> Socket a -> IO () Source #
setReceiveBuffer :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO () Source #
setReceiveHighWM :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO () Source #
setReceiveTimeout :: Integral i => Restricted (Nneg1, Int32) i -> Socket a -> IO () Source #
setReconnectInterval :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO () Source #
setReconnectIntervalMax :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO () Source #
setRecoveryInterval :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO () Source #
setSendBuffer :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO () Source #
setSendHighWM :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO () Source #
setSendTimeout :: Integral i => Restricted (Nneg1, Int32) i -> Socket a -> IO () Source #
setTcpAcceptFilter :: Maybe ByteString -> Socket a -> IO () Source #
setTcpKeepAliveCount :: Integral i => Restricted (Nneg1, Int32) i -> Socket a -> IO () Source #
setTcpKeepAliveIdle :: Integral i => Restricted (Nneg1, Int32) i -> Socket a -> IO () Source #
setTcpKeepAliveInterval :: Integral i => Restricted (Nneg1, Int32) i -> Socket a -> IO () Source #
Restrictions
restrict :: Restriction r v => v -> Restricted r v Source #
Create a restricted value. If the given value does not satisfy the restrictions, a modified variant is used instead, e.g. if an integer is larger than the upper bound, the upper bound value is used.
toRestricted :: Restriction r v => v -> Maybe (Restricted r v) Source #
Create a restricted value. Returns Nothing
if
the given value does not satisfy all restrictions.
Error Handling
ZMQError encapsulates information about errors, which occur when using the native 0MQ API, such as error number and message.
Low-level Functions
socket :: SocketType a => Context -> a -> IO (Socket a) Source #
Create a new 0MQ socket within the given context. withSocket
provides
automatic socket closing and may be safer to use.
close :: Socket a -> IO () Source #
Close a 0MQ socket. withSocket
provides automatic socket closing and may
be safer to use.
waitRead :: Socket a -> IO () Source #
Wait until data is available for reading from the given Socket.
After this function returns, a call to receive
will essentially be
non-blocking.
waitWrite :: Socket a -> IO () Source #
Wait until data can be written to the given Socket.
After this function returns, a call to send
will essentially be
non-blocking.
z85Encode :: MonadIO m => Restricted Div4 ByteString -> m ByteString Source #
z85Decode :: MonadIO m => Restricted Div5 ByteString -> m ByteString Source #
Utils
proxy :: Socket a -> Socket b -> Maybe (Socket c) -> IO () Source #
Starts built-in 0MQ proxy (cf. zmq_proxy)
Proxy connects front to back socket
Before calling proxy all sockets should be bound
If the capture socket is not Nothing, the proxy shall send all messages, received on both frontend and backend, to the capture socket.
curveKeyPair :: MonadIO m => m (Restricted Div5 ByteString, Restricted Div5 ByteString) Source #
Generate a new curve key pair. (cf. zmq_curve_keypair)