Copyright | (c) Tim Watson 2012 - 2014 |
---|---|
License | BSD3 (see the file LICENSE) |
Maintainer | Tim Watson <watson.timothy@gmail.com> |
Stability | experimental |
Portability | non-portable (requires concurrency) |
Safe Haskell | None |
Language | Haskell98 |
- Message Exchanges
The concept of a message exchange is borrowed from the world of messaging and enterprise integration. The exchange acts like a kind of mailbox, accepting inputs from producers and forwarding these messages to one or more consumers, depending on the implementation's semantics.
This module provides some basic types of message exchange and exposes an API for defining your own custom exchange types.
- Broadcast Exchanges
The broadcast exchange type, started via broadcastExchange
, forward their
inputs to all registered consumers (as the name suggests). This exchange type
is highly optimised for local (intra-node) traffic and provides two different
kinds of client binding, one which causes messages to be delivered directly
to the client's mailbox (viz bindToBroadcaster
), the other providing a
separate stream of messages that can be obtained using the expect
and
receiveX
family of messaging primitives (and thus composed with other forms
of input selection, such as typed channels and selective reads on the process
mailbox).
Important: When a ProcessId
is registered via bindToBroadcaster
, only
the payload of the Message
(i.e., the underlying Serializable
datum) is
forwarded to the consumer, not the whole Message
itself.
- Router Exchanges
The router API provides a means to selectively route messages to one or
more clients, depending on the content of the Message
. Two modes of binding
(and client selection) are provided out of the box, one of which matches the
message key
, the second of which matches on a name and value from the
headers
. Alternative mechanisms for content based routing can be derived
by modifying the BindingSelector
expression passed to router
See messageKeyRouter
and headerContentRouter
for the built-in routing
exchanges, and router
for the extensible routing API.
- Custom Exchange Types
Both the broadcast and router exchanges are implemented as custom
exchange types. The mechanism for defining custom exchange behaviours
such as these is very simple. Raw exchanges are started by evaluating
startExchange
with a specific ExchangeType
record. This type is
parameterised by the internal state it holds, and defines two API callbacks
in its configureEx
and routeEx
fields. The former is evaluated whenever a
client process evaluates configureExchange
, the latter whenever a client
evaluates post
or postMessage
. The configureEx
callback takes a raw
Message
(from Control.Distributed.Process) and is responsible for
decoding the message and updating its own state (if required). It is via
this callback that custom exchange types can receive information about
clients and handle it in their own way. The routeEx
callback is evaluated
with the exchange type's own internal state and the Message
originally
sent to the exchange process (via post
) and is responsible for delivering
the message to its clients in whatever way makes sense for that exchange
type.
- data Exchange
- data Message = Message {}
- startExchange :: forall s. ExchangeType s -> Process Exchange
- startSupervised :: forall s. ExchangeType s -> SupervisorPid -> Process Exchange
- startSupervisedRef :: forall s. ExchangeType s -> SupervisorPid -> Process (ProcessId, Message)
- runExchange :: forall s. ExchangeType s -> MVar (ControlPort ControlMessage) -> Process ()
- post :: Serializable a => Exchange -> a -> Process ()
- postMessage :: Exchange -> Message -> Process ()
- configureExchange :: Serializable m => Exchange -> m -> Process ()
- createMessage :: Serializable m => String -> [(String, String)] -> m -> Message
- broadcastExchange :: Process Exchange
- broadcastExchangeT :: Process BroadcastExchange
- broadcastClient :: Exchange -> Process (InputStream Message)
- bindToBroadcaster :: Exchange -> Process ()
- type BroadcastExchange = ExchangeType BroadcastEx
- type HeaderName = String
- data Binding
- = BindKey {
- bindingKey :: !String
- | BindHeader {
- bindingKey :: !String
- headerName :: !HeaderName
- | BindNone
- = BindKey {
- class (Hashable k, Eq k, Serializable k) => Bindable k
- type BindingSelector k = Message -> Process k
- data RelayType
- router :: Bindable k => RelayType -> BindingSelector k -> Process Exchange
- supervisedRouter :: Bindable k => RelayType -> BindingSelector k -> SupervisorPid -> Process Exchange
- route :: Serializable m => Exchange -> m -> Process ()
- routeMessage :: Exchange -> Message -> Process ()
- messageKeyRouter :: RelayType -> Process Exchange
- bindKey :: String -> Exchange -> Process ()
- headerContentRouter :: RelayType -> HeaderName -> Process Exchange
- bindHeader :: HeaderName -> String -> Exchange -> Process ()
- data ExchangeType s = ExchangeType {}
- applyHandlers :: a -> Message -> [Message -> Process (Maybe a)] -> Process a
Fundamental API
Opaque handle to an exchange.
Messages sent to an exchange can optionally provide a routing key and a list of (key, value) headers in addition to the underlying payload.
Starting/Running an Exchange
startExchange :: forall s. ExchangeType s -> Process Exchange Source
Starts an exchange process with the given ExchangeType
.
startSupervised :: forall s. ExchangeType s -> SupervisorPid -> Process Exchange Source
Starts an exchange as part of a supervision tree.
Example: > childSpec = toChildStart $ startSupervised exType
startSupervisedRef :: forall s. ExchangeType s -> SupervisorPid -> Process (ProcessId, Message) Source
Starts an exchange as part of a supervision tree.
Example: > childSpec = toChildStart $ startSupervisedRef exType
runExchange :: forall s. ExchangeType s -> MVar (ControlPort ControlMessage) -> Process () Source
Client Facing API
post :: Serializable a => Exchange -> a -> Process () Source
Posts an arbitrary Serializable
datum to an exchange. The raw datum is
wrapped in the Message
data type, with its key
set to ""
and its
headers
to []
.
configureExchange :: Serializable m => Exchange -> m -> Process () Source
Sends an arbitrary Serializable
datum to an exchange, for use as a
configuration change - see configureEx
for details.
createMessage :: Serializable m => String -> [(String, String)] -> m -> Message Source
Broadcast Exchange
broadcastExchange :: Process Exchange Source
Start a new broadcast exchange and return a handle to the exchange.
broadcastExchangeT :: Process BroadcastExchange Source
The ExchangeType
of a broadcast exchange. Can be combined with the
startSupervisedRef
and startSupervised
APIs.
broadcastClient :: Exchange -> Process (InputStream Message) Source
Create a binding to the given broadcast exchange for the calling process
and return an InputStream
that can be used in the expect
and
receiveWait
family of messaging primitives. This form of client interaction
helps avoid cluttering the caller's mailbox with Message
data, since the
InputChannel
provides a separate input stream (in a similar fashion to
a typed channel).
Example:
is <- broadcastClient ex msg <- receiveWait [ matchInputStream is ] handleMessage (payload msg)
bindToBroadcaster :: Exchange -> Process () Source
type BroadcastExchange = ExchangeType BroadcastEx Source
Routing (Content Based)
type HeaderName = String Source
The binding key used by the built-in key and header based routers.
BindKey | |
| |
BindHeader | |
| |
BindNone |
class (Hashable k, Eq k, Serializable k) => Bindable k Source
Things that can be used as binding keys in a router.
(Hashable k, Eq k, Serializable k) => Bindable k Source |
type BindingSelector k = Message -> Process k Source
Starting a Router
router :: Bindable k => RelayType -> BindingSelector k -> Process Exchange Source
Defines a router exchange. The BindingSelector
is used to construct
a binding (i.e., an instance of the Bindable
type k
) for each incoming
Message
. Such bindings are matched against bindings stored in the exchange.
Clients of a router exchange are identified by a binding, mapped to
one or more ProcessId
s.
The format of the bindings, nature of their storage and mechanism for
submitting new bindings is implementation dependent (i.e., will vary by
exchange type). For example, the messageKeyRouter
and headerContentRouter
implementations both use the Binding
data type, which can represent a
Message
key or a HeaderName
and content. As with all custom exchange
types, bindings should be submitted by evaluating configureExchange
with
a suitable data type.
supervisedRouter :: Bindable k => RelayType -> BindingSelector k -> SupervisorPid -> Process Exchange Source
Defines a router that can be used in a supervision tree.
Routing (Publishing) API
route :: Serializable m => Exchange -> m -> Process () Source
Send a Serializable
message to the supplied Exchange
. The given datum
will be converted to a Message
, with the key
set to ""
and the
headers
to []
.
The routing behaviour will be dependent on the choice of BindingSelector
given when initialising the router.
routeMessage :: Exchange -> Message -> Process () Source
Send a Message
to the supplied Exchange
.
The routing behaviour will be dependent on the choice of BindingSelector
given when initialising the router.
Routing via message/binding keys
bindKey :: String -> Exchange -> Process () Source
Add a binding (for the calling process) to a messageKeyRouter
exchange.
Routing via message headers
headerContentRouter :: RelayType -> HeaderName -> Process Exchange Source
A router that matches on a specific (named) header. To bind a client
Process
to such an exchange, use the bindHeader
function.
bindHeader :: HeaderName -> String -> Exchange -> Process () Source
Add a binding (for the calling process) to a headerContentRouter
exchange.
Defining Custom Exchange Types
data ExchangeType s Source
Different exchange types are defined using record syntax.
The configureEx
and routeEx
API functions are called during the exchange
lifecycle when incoming traffic arrives. Configuration messages are
completely arbitrary types and the exchange type author is entirely
responsible for decoding them. Messages posted to the exchange (see the
Message
data type) are passed to the routeEx
API function along with the
exchange type's own internal state. Both API functions return a new
(potentially updated) state and run in the Process
monad.