Portability | non-portable |
---|---|
Stability | experimental |
Safe Haskell | None |
Stream processing services
- withStreams :: Context -> Service -> Timeout -> [PollEntry] -> StreamAction -> OnError_ -> StreamSink -> Control a -> IO a
- runReceiver :: Socket a -> Timeout -> SinkR (Maybe o) -> IO (Maybe o)
- runSender :: Socket a -> Source -> IO ()
- data PollEntry = Poll {
- pollId :: Identifier
- pollAdd :: String
- pollType :: AccessType
- pollLink :: LinkType
- pollSub :: [Service]
- pollOs :: [SocketOption]
- data AccessType
- parseAccess :: String -> Maybe AccessType
- data LinkType
- link :: LinkType -> Socket a -> String -> [SocketOption] -> IO ()
- parseLink :: String -> Maybe LinkType
- data Streamer
- type StreamConduit = Streamer -> Conduit ByteString ()
- type StreamSink = Streamer -> Sink
- type StreamAction = Streamer -> IO ()
- filterStreams :: Streamer -> (Identifier -> Bool) -> [Identifier]
- getSource :: Streamer -> Identifier
- stream :: Streamer -> [Identifier] -> [ByteString] -> Sink
- part :: Streamer -> [Identifier] -> [ByteString] -> Sink
- passAll :: Streamer -> [Identifier] -> Sink
- pass1 :: Streamer -> [Identifier] -> Sink
- passN :: Streamer -> [Identifier] -> Int -> Sink
- passWhile :: Streamer -> [Identifier] -> (ByteString -> Bool) -> Sink
- ignoreStream :: Sink
- data Controller
- type Control a = Controller -> IO a
- internal :: Identifier
- stop :: Controller -> IO ()
- pause :: Controller -> IO ()
- resume :: Controller -> IO ()
- send :: Controller -> [Identifier] -> Source -> IO ()
- receive :: Controller -> Timeout -> SinkR (Maybe a) -> IO (Maybe a)
Processing Streams
This module provides functions to automate stream processing,
in particular the function withStreams
that starts
a background action polling on a set of streams.
The function uses uses application-defined callbacks
to manipulate streams.
The functions runReceiver
and runSender
are intended mainly
for testing. They send or receive respectively streams,
which are handled or created by a conduit Sink
and Source
.
withStreams :: Context -> Service -> Timeout -> [PollEntry] -> StreamAction -> OnError_ -> StreamSink -> Control a -> IO aSource
Starts polling on a set of streams. The actual polling will be run in another thread. The current thread continues with the action passed in. When this action terminates, the streamer stops polling.
Parameters:
-
Context
- The ZMQ context -
Service
- The service name indicated for instance in error messages. -
Timeout
- The polling timeout: < 0 - listens eternally, 0 - returns immediately, > 0 - timeout in microseconds; when the timeout expires, theStreamAction
is invoked. -
PollEntry
- List ofPollEntry
; the streamer will poll over all list members. When input is available, it is directed to theStreamSink
. -
StreamAction
- Invoked when timeout expires. -
OnError_
- Error handler -
StreamSink
- The sink, to which the stream is sent. Note that the sink must terminate the outgoing stream (using one of the terminating sinks described below). Not terminating the stream properly will result in a zeromq socket error. -
Control
a - The action to invoke, when the streamer has been started; TheControl
is used to control the device.
runReceiver :: Socket a -> Timeout -> SinkR (Maybe o) -> IO (Maybe o)Source
Receiver Sink: Internally a zeromq socket is waiting for input; when input is available, it is send to the sink.
- 'Z.Socket a' - The source socket
-
Timeout
- receiver timeout < 0 - listens eternally, 0 - returns immediately, > 0 - timeout in microseconds; when the timeout expires, the stream terminates and the return value is Nothing.
A poll entry describes how to access and identify a socket
Poll | |
|
data AccessType Source
Defines the type of a PollEntry
;
the names of the constructors are similar
to the corresponding ZMQ socket types.
ServerT | Represents a server and expects connections from clients;
corresponds to ZMQ Socket Type |
ClientT | Represents a client and connects to a server;
corresponds to ZMQ Socket Type |
RouterT | Represents a load balancer,
expecting connections from clients;
corresponds to ZMQ Socket Type |
DealerT | Represents a router
expecting connections from servers;
corresponds to ZMQ Socket Type |
PubT | Represents a publisher;
corresponds to ZMQ Socket Type |
SubT | Represents a subscriber;
corresponds to ZMQ Socket Type |
PipeT | Represents a Pipe;
corresponds to ZMQ Socket Type |
PullT | Represents a Puller;
corresponds to ZMQ Socket Type |
PeerT | Represents a Peer;
corresponds to ZMQ Socket Type |
parseAccess :: String -> Maybe AccessTypeSource
Safely read AccessType
;
ignores the case of the input string
(e.g. "servert" -> ServerT
)
A zeromq AccessPoint
can be bound or connected to its address.
Only one peer can bind the address,
all other parties have to connect.
link :: LinkType -> Socket a -> String -> [SocketOption] -> IO ()Source
Binds or connects a socket to an address
parseLink :: String -> Maybe LinkTypeSource
Safely read LinkType
;
ignores the case of the input string
and, besides "bind" and "connect",
also accepts "bin", "con" and "conn";
intended for use with command line parameters
Streamer
A streamer represents the current state of the streaming device
started by means of withStreams
. It is passed in to
application-defined callbacks, namely the timeout action
(StreamAction
) and the Sink
(StreamSink
).
There is a bunch of useful sinks that receive a streamer as input (see below).
Holds information on streams and the current state of the streamer, i.e. the current source. Streamers are passed to processing conduits.
type StreamConduit = Streamer -> Conduit ByteString ()Source
Conduit with Streamer
type StreamSink = Streamer -> SinkSource
Sink with Streamer
type StreamAction = Streamer -> IO ()Source
IO Action with Streamer (e.g. Timeout action)
filterStreams :: Streamer -> (Identifier -> Bool) -> [Identifier]Source
Filter subset of streams; usually you want to filter a subset of streams to which to relay an incoming stream. Note that the result is just a list of stream identifiers, which of course could be used directly in the first place. A meaningful use of filterstreams would be, for instance:
let targets = filterStreams s (/= getSource s)
Where all streams but the source are selected.
getSource :: Streamer -> IdentifierSource
Get current source
StreamSinks
To manipulate and relay incoming streams,
the application passes a StreamSink
to withStreams
.
The following sinks are building blocks for more
application-focused manipulations.
The peculiarities of the zeromq library,
in particular the fact that messages are sent
entirely, i.e. with all segments belonging to the same message,
or not at all,
require some care in designing zeromq sinks.
The sink must ensure to mark the last segment sent
(see SndMore
).
Also, the incoming stream should be exhausted
to avoid message segements lingering around in the pipe.
Applications can construct new sinks by either calling a building block in the their own sink code, e.g.:
example :: [B.ByteString] -> StreamSink example headers s is = do mbX <- C.await case mbX of Nothing -> return () Just x -> do stream s is headers passAll s is
or by combining a sink with a conduit forming a more complex sink, e.g.:
example :: StreamSink example s is = sourceList headers =$ passAll s is
stream :: Streamer -> [Identifier] -> [ByteString] -> SinkSource
Send the ByteString
segments to the outgoing streams
identified by [Identifier
].
The stream is terminated.
part :: Streamer -> [Identifier] -> [ByteString] -> SinkSource
Send the ByteString
segments to the outgoing streams
identified by [Identifier
]
without terminating the stream,
i.e. more segments must be sent.
passAll :: Streamer -> [Identifier] -> SinkSource
Pass all segments of an incoming stream to a list of outgoing streams. The stream is terminated.
pass1 :: Streamer -> [Identifier] -> SinkSource
Pass one segment and ignore the remainder of the stream. The stream is terminated.
passN :: Streamer -> [Identifier] -> Int -> SinkSource
Pass n segments and ignore the remainder of the stream. The stream is terminated.
passWhile :: Streamer -> [Identifier] -> (ByteString -> Bool) -> SinkSource
Pass while condition is true and ignore the remainder of the stream. The stream is terminated.
Ignore an incoming stream
Controller
The controller is passed in to the control action of withStreams
.
It allows the application to control the polling device.
Through the controller, the device can be stopped, restarted,
paused and resumed and it is possible to send and receive
streams through the controler.
To relay streams to the controller
(i.e. directly to application code) the internal
stream,
which is identified by the string "_internal"
can be used.
data Controller Source
Controller
type Control a = Controller -> IO aSource
Control Action
The internal stream that represents the Controller
.
StreamSinks can write to this stream, e.g.:
passAll s [internal]
And the streamer may also receive from this stream, e.g.:
if getSource s == internal
stop :: Controller -> IO ()Source
Stop streams
pause :: Controller -> IO ()Source
Pause streams
resume :: Controller -> IO ()Source
Resume streams
send :: Controller -> [Identifier] -> Source -> IO ()Source
Send a stream through the controller
receive :: Controller -> Timeout -> SinkR (Maybe a) -> IO (Maybe a)Source
Receive a stream through the controller
that was sink'd to the target internal
.
Complete Example
The following code implements a ping pong communication using two streamers. The code is somewhat simplistic; it does not use timeout, ignores errors and does not provide means for clean shutdown. It focuses instead on demonstrating the core of the streamer functionality.
For more examples on how to use streams, you may want to refer to the MDP Broker code in Network.Mom.Patterns.Broker.Broker.
import Control.Monad.Trans import Control.Monad (forever) import Control.Concurrent import qualified Data.Conduit as C import qualified Data.ByteString.Char8 as B import Network.Mom.Patterns.Streams import qualified System.ZMQ as Z
main :: IO () main = Z.withContext 1 $ \ctx -> do ready <- newEmptyMVar _ <- forkIO (ping ctx ready) _ <- forkIO (pong ctx ready) forever $ threadDelay 100000
ping :: Z.Context -> MVar () -> IO () ping ctx ready = withStreams ctx "pong" (-1) [Poll "ping" "inproc://ping" PeerT Bind [] []] (\_ -> return ()) -- no timeout (\_ _ _ -> return ()) -- ignore errors pinger $ \c -> do putMVar ready () -- ping is ready putStrLn "starting game!" send c ["ping"] startPing -- send through controller -- to initialise ping pong putStrLn "game started!" forever $ threadDelay 100000 where startPing = C.yield $ B.pack "ping"
pong :: Z.Context -> MVar () -> IO () pong ctx ready = do _ <- takeMVar ready -- wait for ping getting ready withStreams ctx "ping" (-1) [Poll "pong" "inproc://ping" PeerT Connect [] []] (\_ -> return ()) (\_ _ _ -> return ()) pinger $ \_ -> forever $ threadDelay 100000
pinger :: StreamSink pinger s = C.awaitForever $ \i -> let x = B.unpack i in do liftIO $ putStrLn x liftIO $ threadDelay 500000 case x of "ping" -> stream s ["pong"] [B.pack "pong"] "pong" -> stream s ["ping"] [B.pack "ping"] _ -> return ()