distributed-process-execution-0.1.2.1: Execution Framework for The Cloud Haskell Application Platform

Copyright(c) Tim Watson 2012 - 2014
LicenseBSD3 (see the file LICENSE)
MaintainerTim Watson <watson.timothy@gmail.com>
Stabilityexperimental
Portabilitynon-portable (requires concurrency)
Safe HaskellNone
LanguageHaskell98

Control.Distributed.Process.Execution.Exchange

Contents

Description

Message Exchanges

The concept of a message exchange is borrowed from the world of messaging and enterprise integration. The exchange acts like a kind of mailbox, accepting inputs from producers and forwarding these messages to one or more consumers, depending on the implementation's semantics.

This module provides some basic types of message exchange and exposes an API for defining your own custom exchange types.

Broadcast Exchanges

The broadcast exchange type, started via broadcastExchange, forward their inputs to all registered consumers (as the name suggests). This exchange type is highly optimised for local (intra-node) traffic and provides two different kinds of client binding, one which causes messages to be delivered directly to the client's mailbox (viz bindToBroadcaster), the other providing a separate stream of messages that can be obtained using the expect and receiveX family of messaging primitives (and thus composed with other forms of input selection, such as typed channels and selective reads on the process mailbox).

Important: When a ProcessId is registered via bindToBroadcaster, only the payload of the Message (i.e., the underlying Serializable datum) is forwarded to the consumer, not the whole Message itself.

Router Exchanges

The router API provides a means to selectively route messages to one or more clients, depending on the content of the Message. Two modes of binding (and client selection) are provided out of the box, one of which matches the message key, the second of which matches on a name and value from the headers. Alternative mechanisms for content based routing can be derived by modifying the BindingSelector expression passed to router

See messageKeyRouter and headerContentRouter for the built-in routing exchanges, and router for the extensible routing API.

Custom Exchange Types

Both the broadcast and router exchanges are implemented as custom exchange types. The mechanism for defining custom exchange behaviours such as these is very simple. Raw exchanges are started by evaluating startExchange with a specific ExchangeType record. This type is parameterised by the internal state it holds, and defines two API callbacks in its configureEx and routeEx fields. The former is evaluated whenever a client process evaluates configureExchange, the latter whenever a client evaluates post or postMessage. The configureEx callback takes a raw Message (from Control.Distributed.Process) and is responsible for decoding the message and updating its own state (if required). It is via this callback that custom exchange types can receive information about clients and handle it in their own way. The routeEx callback is evaluated with the exchange type's own internal state and the Message originally sent to the exchange process (via post) and is responsible for delivering the message to its clients in whatever way makes sense for that exchange type.

Synopsis

Fundamental API

data Message Source

Messages sent to an exchange can optionally provide a routing key and a list of (key, value) headers in addition to the underlying payload.

Constructors

Message 

Fields

key :: !String

a routing key for the payload

headers :: ![(String, String)]

arbitrary key-value headers

payload :: !Message

the underlying Message payload

Starting/Running an Exchange

startExchange :: forall s. ExchangeType s -> Process Exchange Source

Starts an exchange process with the given ExchangeType.

startSupervised :: forall s. ExchangeType s -> SupervisorPid -> Process Exchange Source

Starts an exchange as part of a supervision tree.

Example: > childSpec = toChildStart $ startSupervised exType

startSupervisedRef :: forall s. ExchangeType s -> SupervisorPid -> Process (ProcessId, Message) Source

Starts an exchange as part of a supervision tree.

Example: > childSpec = toChildStart $ startSupervisedRef exType

runExchange :: forall s. ExchangeType s -> MVar (ControlPort ControlMessage) -> Process () Source

Client Facing API

post :: Serializable a => Exchange -> a -> Process () Source

Posts an arbitrary Serializable datum to an exchange. The raw datum is wrapped in the Message data type, with its key set to "" and its headers to [].

postMessage :: Exchange -> Message -> Process () Source

Posts a Message to an exchange.

configureExchange :: Serializable m => Exchange -> m -> Process () Source

Sends an arbitrary Serializable datum to an exchange, for use as a configuration change - see configureEx for details.

createMessage :: Serializable m => String -> [(String, String)] -> m -> Message Source

Utility for creating a Message datum from its key, headers and payload.

Broadcast Exchange

broadcastExchange :: Process Exchange Source

Start a new broadcast exchange and return a handle to the exchange.

broadcastExchangeT :: Process BroadcastExchange Source

The ExchangeType of a broadcast exchange. Can be combined with the startSupervisedRef and startSupervised APIs.

broadcastClient :: Exchange -> Process (InputStream Message) Source

Create a binding to the given broadcast exchange for the calling process and return an InputStream that can be used in the expect and receiveWait family of messaging primitives. This form of client interaction helps avoid cluttering the caller's mailbox with Message data, since the InputChannel provides a separate input stream (in a similar fashion to a typed channel). Example:

is <- broadcastClient ex
msg <- receiveWait [ matchInputStream is ]
handleMessage (payload msg)

bindToBroadcaster :: Exchange -> Process () Source

Bind the calling process to the given broadcast exchange. For each Message the exchange receives, only the payload will be sent to the calling process' mailbox.

Example:

(producer) > post ex Hello

(consumer) > bindToBroadcaster ex > expect >>= liftIO . putStrLn

Routing (Content Based)

data Binding Source

The binding key used by the built-in key and header based routers.

class (Hashable k, Eq k, Serializable k) => Bindable k Source

Things that can be used as binding keys in a router.

Instances

type BindingSelector k = Message -> Process k Source

Used to convert a Message into a Bindable routing key.

data RelayType Source

Given to a router to indicate whether clients should receive Message payloads only, or the whole Message object itself.

Constructors

PayloadOnly 
WholeMessage 

Starting a Router

router :: Bindable k => RelayType -> BindingSelector k -> Process Exchange Source

Defines a router exchange. The BindingSelector is used to construct a binding (i.e., an instance of the Bindable type k) for each incoming Message. Such bindings are matched against bindings stored in the exchange. Clients of a router exchange are identified by a binding, mapped to one or more ProcessIds.

The format of the bindings, nature of their storage and mechanism for submitting new bindings is implementation dependent (i.e., will vary by exchange type). For example, the messageKeyRouter and headerContentRouter implementations both use the Binding data type, which can represent a Message key or a HeaderName and content. As with all custom exchange types, bindings should be submitted by evaluating configureExchange with a suitable data type.

supervisedRouter :: Bindable k => RelayType -> BindingSelector k -> SupervisorPid -> Process Exchange Source

Defines a router that can be used in a supervision tree.

Routing (Publishing) API

route :: Serializable m => Exchange -> m -> Process () Source

Send a Serializable message to the supplied Exchange. The given datum will be converted to a Message, with the key set to "" and the headers to [].

The routing behaviour will be dependent on the choice of BindingSelector given when initialising the router.

routeMessage :: Exchange -> Message -> Process () Source

Send a Message to the supplied Exchange. The routing behaviour will be dependent on the choice of BindingSelector given when initialising the router.

Routing via message/binding keys

messageKeyRouter :: RelayType -> Process Exchange Source

A router that matches on a Message key. To bind a client Process to such an exchange, use the bindKey function.

bindKey :: String -> Exchange -> Process () Source

Add a binding (for the calling process) to a messageKeyRouter exchange.

Routing via message headers

headerContentRouter :: RelayType -> HeaderName -> Process Exchange Source

A router that matches on a specific (named) header. To bind a client Process to such an exchange, use the bindHeader function.

bindHeader :: HeaderName -> String -> Exchange -> Process () Source

Add a binding (for the calling process) to a headerContentRouter exchange.

Defining Custom Exchange Types

data ExchangeType s Source

Different exchange types are defined using record syntax. The configureEx and routeEx API functions are called during the exchange lifecycle when incoming traffic arrives. Configuration messages are completely arbitrary types and the exchange type author is entirely responsible for decoding them. Messages posted to the exchange (see the Message data type) are passed to the routeEx API function along with the exchange type's own internal state. Both API functions return a new (potentially updated) state and run in the Process monad.

Constructors

ExchangeType 

Fields

name :: String
 
state :: s
 
configureEx :: s -> Message -> Process s
 
routeEx :: s -> Message -> Process s
 

applyHandlers :: a -> Message -> [Message -> Process (Maybe a)] -> Process a Source

Utility for custom exchange type authors - evaluates a set of primitive message handlers from left to right, returning the first which evaluates to Just a, or the initial e value if all the handlers yield Nothing.