Copyright | (c) Tim Watson 2012 |
---|---|
License | BSD3 (see the file LICENSE) |
Maintainer | Tim Watson <watson.timothy@gmail.com> |
Stability | experimental |
Portability | non-portable (requires concurrency) |
Safe Haskell | None |
Language | Haskell98 |
This module provides a high(er) level API for building complex Process
implementations by abstracting out the management of the process' mailbox,
reply/response handling, timeouts, process hiberation, error handling
and shutdown/stop procedures. It is modelled along similar lines to OTP's
gen_server API - http://www.erlang.org/doc/man/gen_server.html.
In particular, a managed process will interoperate cleanly with the supervisor API in distributed-process-supervision.
- API Overview
Once started, a managed process will consume messages from its mailbox and
pass them on to user defined handlers based on the types received (mapped
to those accepted by the handlers) and optionally by also evaluating user
supplied predicates to determine which handler(s) should run.
Each handler returns a ProcessAction
which specifies how we should proceed.
If none of the handlers is able to process a message (because their types are
incompatible), then the unhandledMessagePolicy
will be applied.
The ProcessAction
type defines the ways in which our process can respond
to its inputs, whether by continuing to read incoming messages, setting an
optional timeout, sleeping for a while or stopping. The optional timeout
behaves a little differently to the other process actions. If no messages
are received within the specified time span, a user defined timeoutHandler
will be called in order to determine the next action.
The ProcessDefinition
type also defines a shutdownHandler
,
which is called whenever the process exits, whether because a callback has
returned stop
as the next action, or as the result of unhandled exit signal
or similar asynchronous exceptions thrown in (or to) the process itself.
The other handlers are split into two groups: apiHandlers and infoHandlers.
The former contains handlers for the cast
and call
protocols, whilst the
latter contains handlers that deal with input messages which are not sent
via these API calls (i.e., messages sent using bare send
or signals put
into the process mailbox by the node controller, such as
ProcessMonitorNotification
and the like).
- The Cast/Call Protocol
Deliberate interactions with a managed process usually fall into one of
two categories. A cast
interaction involves a client sending a message
asynchronously and the server handling this input. No reply is sent to
the client. On the other hand, a call
is a remote procedure call,
where the client sends a message and waits for a reply from the server.
All expressions given to apiHandlers
have to conform to the cast|call
protocol. The protocol (messaging) implementation is hidden from the user;
API functions for creating user defined apiHandlers
are given instead,
which take expressions (i.e., a function or lambda expression) and create the
appropriate Dispatcher
for handling the cast (or call).
These castcall protocols are for dealing with expected/ inputs. They will usually form the explicit public API for the process, and be exposed by providing module level functions that defer to the cast/call API, giving the author an opportunity to enforce the correct types. For example:
{- Ask the server to add two numbers -} add :: ProcessId -> Double -> Double -> Double add pid x y = call pid (Add x y)
Note here that the return type from the call is inferred and will not be enforced by the type system. If the server sent a different type back in the reply, then the caller might be blocked indefinitely! In fact, the result of mis-matching the expected return type (in the client facing API) with the actual type returned by the server is more severe in practise. The underlying types that implement the call protocol carry information about the expected return type. If there is a mismatch between the input and output types that the client API uses and those which the server declares it can handle, then the message will be considered unroutable - no handler will be executed against it and the unhandled message policy will be applied. You should, therefore, take great care to align these types since the default unhandled message policy is to terminate the server! That might seem pretty extreme, but you can alter the unhandled message policy and/or use the various overloaded versions of the call API in order to detect errors on the server such as this.
The cost of potential type mismatches between the client and server is the main disadvantage of this looser coupling between them. This mechanism does however, allow servers to handle a variety of messages without specifying the entire protocol to be supported in excruciating detail.
- Handling Unexpected/Info Messages
An explicit protocol for communicating with the process can be
configured using cast
and call
, but it is not possible to prevent
other kinds of messages from being sent to the process mailbox. When
any message arrives for which there are no handlers able to process
its content, the UnhandledMessagePolicy
will be applied. Sometimes
it is desireable to process incoming messages which aren't part of the
protocol, rather than let the policy deal with them. This is particularly
true when incoming messages are important to the process, but their point
of origin is outside the author's control. Handling signals such as
ProcessMonitorNotification
is a typical example of this:
handleInfo_ (\(ProcessMonitorNotification _ _ r) -> say $ show r >> continue_)
- Handling Process State
The ProcessDefinition
is parameterised by the type of state it maintains.
A process that has no state will have the type ProcessDefinition ()
and can
be bootstrapped by evaluating statelessProcess
.
All call/cast handlers come in two flavours, those which take the process state as an input and those which do not. Handlers that ignore the process state have to return a function that takes the state and returns the required action. Versions of the various action generating functions ending in an underscore are provided to simplify this:
statelessProcess { apiHandlers = [ handleCall_ (\(n :: Int) -> return (n * 2)) , handleCastIf_ (\(c :: String, _ :: Delay) -> c == "timeout") (\("timeout", (d :: Delay)) -> timeoutAfter_ d) ] , timeoutHandler = \_ _ -> stop $ ExitOther "timeout" }
- Avoiding Side Effects
If you wish to only write side-effect free code in your server definition, then there is an explicit API for doing so. Instead of using the handlers definition functions in this module, import the pure server module instead, which provides a StateT based monad for building referentially transparent callbacks.
See Control.Distributed.Process.ManagedProcess.Server.Restricted for details and API documentation.
- Handling Errors
Error handling appears in several contexts and process definitions can hook into these with relative ease. Only process failures as a result of asynchronous exceptions are supported by the API, which provides several scopes for error handling.
Catching exceptions inside handler functions is no different to ordinary exception handling in monadic code.
handleCall (\x y -> catch (hereBeDragons x y) (\(e :: SmaugTheTerribleException) -> return (Left (show e))))
The caveats mentioned in Control.Distributed.Process.Extras about exit signal handling obviously apply here as well.
- Structured Exit Handling
Because Control.Distributed.Process.ProcessExitException is a ubiquitous
signalling mechanism in Cloud Haskell, it is treated unlike other
asynchronous exceptions. The ProcessDefinition
exitHandlers
field
accepts a list of handlers that, for a specific exit reason, can decide
how the process should respond. If none of these handlers matches the
type of reason
then the process will exit with DiedException why
. In
addition, a private exit handler is installed for exit signals where
reason :: ExitReason
, which is a form of exit signal used explicitly
by the supervision APIs. This behaviour, which cannot be overriden, is to
gracefully shut down the process, calling the shutdownHandler
as usual,
before stopping with reason
given as the final outcome.
Example: handling custom data is ProcessExitException
handleExit (\state from (sigExit :: SomeExitData) -> continue s)
Under some circumstances, handling exit signals is perfectly legitimate. Handling of other forms of asynchronous exception (e.g., exceptions not generated by an exit signal) is not supported by this API. Cloud Haskell's primitives for exception handling will work normally in managed process callbacks however.
If any asynchronous exception goes unhandled, the process will immediately
exit without running the shutdownHandler
. It is very important to note
that in Cloud Haskell, link failures generate asynchronous exceptions in
the target and these will NOT be caught by the API and will therefore
cause the process to exit without running the termination handler
callback. If your termination handler is set up to do important work
(such as resource cleanup) then you should avoid linking you process
and use monitors instead.
- Prioritised Mailboxes
Many processes need to prioritise certain classes of message over others, so two subsets of the API are given to supporting those cases.
A PrioritisedProcessDefintion
combines the usual ProcessDefintion
-
containing the cast/call API, error, termination and info handlers - with a
list of Priority
entries, which are used at runtime to prioritise the
server's inputs. Note that it is only messages which are prioritised; The
server's various handlers are still evaluated in insertion order.
Prioritisation does not guarantee that a prioritised message/type will be processed before other traffic - indeed doing so in a multi-threaded runtime would be very hard - but in the absence of races between multiple processes, if two messages are both present in the process' own mailbox, they will be applied to the ProcessDefinition's handler's in priority order. This is achieved by draining the real mailbox into a priority queue and processing each message in turn.
A prioritised process must be configured with a Priority
list to be of
any use. Creating a prioritised process without any priorities would be a
big waste of computational resources, and it is worth thinking carefully
about whether or not prioritisation is truly necessary in your design before
choosing to use it.
Using a prioritised process is as simple as calling pserve
instead of
serve
, and passing an initialised PrioritisedProcessDefinition
.
- Control Channels
For advanced users and those requiring very low latency, a prioritised
process definition might not be suitable, since it performs considerable
work behind the scenes. There are also designs that need to segregate a
process' control plane from other kinds of traffic it is expected to
receive. For such use cases, a control channel may prove a better choice,
since typed channels are already prioritised during the mailbox scans that
the base receiveWait
and receiveTimeout
primitives from
distribute-process provides.
In order to utilise a control channel in a server, it must be passed to the
corresponding handleControlChan
function (or its stateless variant). The
control channel is created by evaluating newControlChan
, in the same way
that we create regular typed channels.
In order for clients to communicate with a server via its control channel
however, they must pass a handle to a ControlPort
, which can be obtained by
evaluating channelControlPort
on the ControlChannel
. A ControlPort
is
Serializable
, so they can alternatively be sent to other processes.
Control channel traffic will only be prioritised over other traffic if the
handlers using it are present before others (e.g., handleInfo, handleCast
,
etc) in the process definition. It is not possible to combine prioritised
processes with control channels. Attempting to do so will satisfy the
compiler, but crash with a runtime error once you attempt to evaluate the
prioritised server loop (i.e., pserve
).
Since the primary purpose of control channels is to simplify and optimise
client-server communication over a single channel, this module provides an
alternate server loop in the form of chanServe
. Instead of passing an
initialised ProcessDefinition
, this API takes an expression from a
ControlChannel
to ProcessDefinition
, operating in the Process
monad.
Providing the opaque reference in this fashion is useful, since the type of
messages the control channel carries will not correlate directly to the
inter-process traffic we use internally.
Although control channels are intended for use as a single control plane
(via chanServe
), it is possible to use them as a more strictly typed
communications backbone, since they do enforce absolute type safety in client
code, being bound to a particular type on creation. For rpc (i.e., call
)
interaction however, it is not possible to have the server reply to a control
channel, since they're a one way pipe. It is possible to alleviate this
situation by passing a request type than contains a typed channel bound to
the expected reply type, enabling client and server to match on both the input
and output types as specifically as possible. Note that this still does not
guarantee an agreement on types between all parties at runtime however.
An example of how to do this follows:
data Request = Request String (SendPort String) deriving (Typeable, Generic) instance Binary Request where -- note that our initial caller needs an mvar to obtain the control port... echoServer :: MVar (ControlPort Request) -> Process () echoServer mv = do cc <- newControlChan :: Process (ControlChannel Request) liftIO $ putMVar mv $ channelControlPort cc let s = statelessProcess { apiHandlers = [ handleControlChan_ cc (\(Request m sp) -> sendChan sp m >> continue_) ] } serve () (statelessInit Infinity) s echoClient :: String -> ControlPort Request -> Process String echoClient str cp = do (sp, rp) <- newChan sendControlMessage cp $ Request str sp receiveChan rp
- Performance Considerations
The various server loops are fairly optimised, but there is a definite cost associated with scanning the mailbox to match on protocol messages, plus additional costs in space and time due to mapping over all available info handlers for non-protocol (i.e., neither call nor cast) messages. These are exacerbated significantly when using prioritisation, whilst using a single control channel is very fast and carries little overhead.
From the client perspective, it's important to remember that the call
protocol will wait for a reply in most cases, triggering a full O(n) scan of
the caller's mailbox. If the mailbox is extremely full and calls are
regularly made, this may have a significant impact on the caller. The
callChan
family of client API functions can alleviate this, by using (and
matching on) a private typed channel instead, but the server must be written
to accomodate this. Similar gains can be had using a control channel and
providing a typed reply channel in the request data, however the call
mechanism does not support this notion, so not only are we unable
to use the various reply functions, client code should also consider
monitoring the server's pid and handling server failures whilst waiting on
- data InitResult s
- = InitOk s Delay
- | InitStop String
- | InitIgnore
- type InitHandler a s = a -> Process (InitResult s)
- serve :: a -> InitHandler a s -> ProcessDefinition s -> Process ()
- pserve :: a -> InitHandler a s -> PrioritisedProcessDefinition s -> Process ()
- chanServe :: Serializable b => a -> InitHandler a s -> (ControlChannel b -> Process (ProcessDefinition s)) -> Process ()
- runProcess :: (s -> Delay -> Process ExitReason) -> a -> InitHandler a s -> Process ()
- prioritised :: ProcessDefinition s -> [DispatchPriority s] -> PrioritisedProcessDefinition s
- module Control.Distributed.Process.ManagedProcess.Client
- data ProcessDefinition s = ProcessDefinition {
- apiHandlers :: [Dispatcher s]
- infoHandlers :: [DeferredDispatcher s]
- exitHandlers :: [ExitSignalDispatcher s]
- timeoutHandler :: TimeoutHandler s
- shutdownHandler :: ShutdownHandler s
- unhandledMessagePolicy :: UnhandledMessagePolicy
- data PrioritisedProcessDefinition s = PrioritisedProcessDefinition {}
- data RecvTimeoutPolicy
- newtype Priority a = Priority {}
- data DispatchPriority s
- data Dispatcher s
- data DeferredDispatcher s
- type ShutdownHandler s = s -> ExitReason -> Process ()
- type TimeoutHandler s = s -> Delay -> Process (ProcessAction s)
- data ProcessAction s
- data ProcessReply r s
- data Condition s m
- type CallHandler s a b = s -> a -> Process (ProcessReply b s)
- type CastHandler s a = s -> a -> Process (ProcessAction s)
- data UnhandledMessagePolicy
- = Terminate
- | DeadLetter ProcessId
- | Log
- | Drop
- data CallRef a
- data ControlChannel m
- data ControlPort m
- defaultProcess :: ProcessDefinition s
- defaultProcessWithPriorities :: [DispatchPriority s] -> PrioritisedProcessDefinition s
- statelessProcess :: ProcessDefinition ()
- statelessInit :: Delay -> InitHandler () ()
- handleCall :: (Serializable a, Serializable b) => (s -> a -> Process (ProcessReply b s)) -> Dispatcher s
- handleCallIf :: forall s a b. (Serializable a, Serializable b) => Condition s a -> (s -> a -> Process (ProcessReply b s)) -> Dispatcher s
- handleCallFrom :: forall s a b. (Serializable a, Serializable b) => (s -> CallRef b -> a -> Process (ProcessReply b s)) -> Dispatcher s
- handleCallFromIf :: forall s a b. (Serializable a, Serializable b) => Condition s a -> (s -> CallRef b -> a -> Process (ProcessReply b s)) -> Dispatcher s
- handleCast :: Serializable a => (s -> a -> Process (ProcessAction s)) -> Dispatcher s
- handleCastIf :: forall s a. Serializable a => Condition s a -> (s -> a -> Process (ProcessAction s)) -> Dispatcher s
- handleInfo :: forall s a. Serializable a => (s -> a -> Process (ProcessAction s)) -> DeferredDispatcher s
- handleRaw :: forall s. (s -> Message -> Process (ProcessAction s)) -> DeferredDispatcher s
- handleRpcChan :: forall s a b. (Serializable a, Serializable b) => (s -> SendPort b -> a -> Process (ProcessAction s)) -> Dispatcher s
- handleRpcChanIf :: forall s a b. (Serializable a, Serializable b) => Condition s a -> (s -> SendPort b -> a -> Process (ProcessAction s)) -> Dispatcher s
- action :: forall s a. Serializable a => (a -> s -> Process (ProcessAction s)) -> Dispatcher s
- handleDispatch :: forall s a. Serializable a => (s -> a -> Process (ProcessAction s)) -> Dispatcher s
- handleExit :: forall s a. Serializable a => (s -> ProcessId -> a -> Process (ProcessAction s)) -> ExitSignalDispatcher s
- handleCall_ :: (Serializable a, Serializable b) => (a -> Process b) -> Dispatcher s
- handleCallFrom_ :: forall s a b. (Serializable a, Serializable b) => (CallRef b -> a -> Process (ProcessReply b s)) -> Dispatcher s
- handleCallIf_ :: forall s a b. (Serializable a, Serializable b) => Condition s a -> (a -> Process b) -> Dispatcher s
- handleCallFromIf_ :: forall s a b. (Serializable a, Serializable b) => Condition s a -> (CallRef b -> a -> Process (ProcessReply b s)) -> Dispatcher s
- handleCast_ :: Serializable a => (a -> s -> Process (ProcessAction s)) -> Dispatcher s
- handleCastIf_ :: forall s a. Serializable a => Condition s a -> (a -> s -> Process (ProcessAction s)) -> Dispatcher s
- handleRpcChan_ :: forall a b. (Serializable a, Serializable b) => (SendPort b -> a -> Process (ProcessAction ())) -> Dispatcher ()
- handleRpcChanIf_ :: forall a b. (Serializable a, Serializable b) => Condition () a -> (SendPort b -> a -> Process (ProcessAction ())) -> Dispatcher ()
- newControlChan :: Serializable m => Process (ControlChannel m)
- channelControlPort :: Serializable m => ControlChannel m -> ControlPort m
- handleControlChan :: forall s a. Serializable a => ControlChannel a -> (s -> a -> Process (ProcessAction s)) -> Dispatcher s
- handleControlChan_ :: forall s a. Serializable a => ControlChannel a -> (a -> s -> Process (ProcessAction s)) -> Dispatcher s
- module Control.Distributed.Process.ManagedProcess.Server.Priority
- condition :: forall a b. (Serializable a, Serializable b) => (a -> b -> Bool) -> Condition a b
- state :: forall s m. Serializable m => (s -> Bool) -> Condition s m
- input :: forall s m. Serializable m => (m -> Bool) -> Condition s m
- reply :: Serializable r => r -> s -> Process (ProcessReply r s)
- replyWith :: Serializable r => r -> ProcessAction s -> Process (ProcessReply r s)
- noReply :: Serializable r => ProcessAction s -> Process (ProcessReply r s)
- noReply_ :: forall s r. Serializable r => s -> Process (ProcessReply r s)
- haltNoReply_ :: Serializable r => ExitReason -> Process (ProcessReply r s)
- continue :: s -> Process (ProcessAction s)
- continue_ :: s -> Process (ProcessAction s)
- timeoutAfter :: Delay -> s -> Process (ProcessAction s)
- timeoutAfter_ :: Delay -> s -> Process (ProcessAction s)
- hibernate :: TimeInterval -> s -> Process (ProcessAction s)
- hibernate_ :: TimeInterval -> s -> Process (ProcessAction s)
- stop :: ExitReason -> Process (ProcessAction s)
- stopWith :: s -> ExitReason -> Process (ProcessAction s)
- stop_ :: ExitReason -> s -> Process (ProcessAction s)
- replyTo :: Serializable m => CallRef m -> m -> Process ()
- replyChan :: Serializable m => SendPort m -> m -> Process ()
Starting/Running server processes
data InitResult s Source
Return type for and InitHandler
expression.
type InitHandler a s = a -> Process (InitResult s) Source
An expression used to initialise a process with its state.
serve :: a -> InitHandler a s -> ProcessDefinition s -> Process () Source
Starts the message handling loop for a managed process configured with the supplied process definition, after calling the init handler with its initial arguments. Note that this function does not return until the server exits.
pserve :: a -> InitHandler a s -> PrioritisedProcessDefinition s -> Process () Source
Starts the message handling loop for a prioritised managed process, configured with the supplied process definition, after calling the init handler with its initial arguments. Note that this function does not return until the server exits.
chanServe :: Serializable b => a -> InitHandler a s -> (ControlChannel b -> Process (ProcessDefinition s)) -> Process () Source
Starts the message handling loop for a managed process, configured with
a typed control channel. The caller supplied expression is evaluated with
an opaque reference to the channel, which must be passed when calling
handleControlChan
. The meaning and behaviour of the init handler and
initial arguments are the same as those given to serve
. Note that this
function does not return until the server exits.
runProcess :: (s -> Delay -> Process ExitReason) -> a -> InitHandler a s -> Process () Source
Wraps any process loop and ensures that it adheres to the
managed process start/stop semantics, i.e., evaluating the
InitHandler
with an initial state and delay will either
die
due to InitStop
, exit silently (due to InitIgnore
)
or evaluate the process' loop
. The supplied loop
must evaluate
to ExitNormal
, otherwise the calling processing will die
with
whatever ExitReason
is given.
prioritised :: ProcessDefinition s -> [DispatchPriority s] -> PrioritisedProcessDefinition s Source
Turns a standard ProcessDefinition
into a PrioritisedProcessDefinition
,
by virtue of the supplied list of DispatchPriority
expressions.
Client interactions
Defining server processes
data ProcessDefinition s Source
Stores the functions that determine runtime behaviour in response to incoming messages and a policy for responding to unhandled messages.
ProcessDefinition | |
|
data PrioritisedProcessDefinition s Source
A ProcessDefinition
decorated with DispatchPriority
for certain
input domains.
data RecvTimeoutPolicy Source
For a PrioritisedProcessDefinition
, this policy determines for how long
the receive loop should continue draining the process' mailbox before
processing its received mail (in priority order).
If a prioritised managed process is receiving a lot of messages (into its real mailbox), the server might never get around to actually processing its inputs. This (mandatory) policy provides a guarantee that eventually (i.e., after a specified number of received messages or time interval), the server will stop removing messages from its mailbox and process those it has already received.
data DispatchPriority s Source
data Dispatcher s Source
Provides dispatch from cast and call messages to a typed handler.
data DeferredDispatcher s Source
Provides dispatch for any input, returns Nothing
for unhandled messages.
type ShutdownHandler s = s -> ExitReason -> Process () Source
An expression used to handle process termination.
type TimeoutHandler s = s -> Delay -> Process (ProcessAction s) Source
An expression used to handle process timeouts.
data ProcessAction s Source
The action taken by a process after a handler has run and its updated state.
See continue
timeoutAfter
hibernate
stop
stopWith
ProcessContinue s | continue with (possibly new) state |
ProcessTimeout Delay s | timeout if no messages are received |
ProcessHibernate TimeInterval s | hibernate for delay |
ProcessStop ExitReason | stop the process, giving |
ProcessStopping s ExitReason | stop the process with |
data ProcessReply r s Source
Returned from handlers for the synchronous call
protocol, encapsulates
the reply data and the action to take after sending the reply. A handler
can return NoReply
if they wish to ignore the call.
Wraps a predicate that is used to determine whether or not a handler is valid based on some combination of the current process state, the type and/or value of the input message or both.
type CallHandler s a b = s -> a -> Process (ProcessReply b s) Source
An expression used to handle a call message.
type CastHandler s a = s -> a -> Process (ProcessAction s) Source
An expression used to handle a cast message.
data UnhandledMessagePolicy Source
Policy for handling unexpected messages, i.e., messages which are not
sent using the call
or cast
APIs, and which are not handled by any of the
handleInfo
handlers.
Terminate | stop immediately, giving |
DeadLetter ProcessId | forward the message to the given recipient |
Log | log messages, then behave identically to |
Drop | dequeue and then drop/ignore the message |
data ControlChannel m Source
Provides a means for servers to listen on a separate, typed control channel, thereby segregating the channel from their regular (and potentially busy) mailbox.
data ControlPort m Source
The writable end of a ControlChannel
.
Eq (ControlPort m) Source | |
Show (ControlPort m) Source | |
Serializable m => Binary (ControlPort m) Source |
defaultProcess :: ProcessDefinition s Source
A default ProcessDefinition
, with no api, info or exit handler.
The default timeoutHandler
simply continues, the shutdownHandler
is a no-op and the unhandledMessagePolicy
is Terminate
.
defaultProcessWithPriorities :: [DispatchPriority s] -> PrioritisedProcessDefinition s Source
Creates a default PrioritisedProcessDefinition
from a list of
DispatchPriority
. See defaultProcess
for the underlying definition.
statelessProcess :: ProcessDefinition () Source
A basic, stateless ProcessDefinition
. See defaultProcess
for the
default field values.
statelessInit :: Delay -> InitHandler () () Source
A default, state unaware InitHandler
that can be used with
statelessProcess
. This simply returns InitOk
with the empty
state (i.e., unit) and the given Delay
.
Server side callbacks
handleCall :: (Serializable a, Serializable b) => (s -> a -> Process (ProcessReply b s)) -> Dispatcher s Source
Constructs a call
handler from a function in the Process
monad.
> handleCall = handleCallIf (const True)
:: (Serializable a, Serializable b) | |
=> Condition s a | predicate that must be satisfied for the handler to run |
-> (s -> a -> Process (ProcessReply b s)) | a reply yielding function over the process state and input message |
-> Dispatcher s |
Constructs a call
handler from an ordinary function in the Process
monad. Given a function f :: (s -> a -> Process (ProcessReply b s))
,
the expression handleCall f
will yield a Dispatcher
for inclusion
in a Behaviour
specification for the GenProcess. Messages are only
dispatched to the handler if the supplied condition evaluates to True
.
handleCallFrom :: forall s a b. (Serializable a, Serializable b) => (s -> CallRef b -> a -> Process (ProcessReply b s)) -> Dispatcher s Source
As handleCall
but passes the CallRef
to the handler function.
This can be useful if you wish to reply later to the caller by, e.g.,
spawning a process to do some work and have it replyTo caller response
out of band. In this case the callback can pass the CallRef
to the
worker (or stash it away itself) and return noReply
.
:: (Serializable a, Serializable b) | |
=> Condition s a | predicate that must be satisfied for the handler to run |
-> (s -> CallRef b -> a -> Process (ProcessReply b s)) | a reply yielding function over the process state, sender and input message |
-> Dispatcher s |
As handleCallFrom
but only runs the handler if the supplied Condition
evaluates to True
.
handleCast :: Serializable a => (s -> a -> Process (ProcessAction s)) -> Dispatcher s Source
Constructs a cast
handler from an ordinary function in the Process
monad.
> handleCast = handleCastIf (const True)
:: Serializable a | |
=> Condition s a | predicate that must be satisfied for the handler to run |
-> (s -> a -> Process (ProcessAction s)) | an action yielding function over the process state and input message |
-> Dispatcher s |
Constructs a cast
handler from an ordinary function in the Process
monad. Given a function f :: (s -> a -> Process (ProcessAction s))
,
the expression handleCall f
will yield a Dispatcher
for inclusion
in a Behaviour
specification for the GenProcess.
handleInfo :: forall s a. Serializable a => (s -> a -> Process (ProcessAction s)) -> DeferredDispatcher s Source
Creates a generic input handler (i.e., for received messages that are not
sent using the cast
or call
APIs) from an ordinary function in the
Process
monad.
handleRaw :: forall s. (s -> Message -> Process (ProcessAction s)) -> DeferredDispatcher s Source
Handle completely raw input messages.
handleRpcChan :: forall s a b. (Serializable a, Serializable b) => (s -> SendPort b -> a -> Process (ProcessAction s)) -> Dispatcher s Source
Creates a handler for a typed channel RPC style interaction. The
handler takes a SendPort b
to reply to, the initial input and evaluates
to a ProcessAction
. It is the handler code's responsibility to send the
reply to the SendPort
.
handleRpcChanIf :: forall s a b. (Serializable a, Serializable b) => Condition s a -> (s -> SendPort b -> a -> Process (ProcessAction s)) -> Dispatcher s Source
As handleRpcChan
, but only evaluates the handler if the supplied
condition is met.
:: Serializable a | |
=> (a -> s -> Process (ProcessAction s)) | a function from the input message to a stateless action, cf |
-> Dispatcher s |
Constructs an action handler. Like handleDispatch
this can handle both
cast
and call
messages, but you won't know which you're dealing with.
This can be useful where certain inputs require a definite action, such as
stopping the server, without concern for the state (e.g., when stopping we
need only decide to stop, as the terminate handler can deal with state
cleanup etc). For example:
action (MyCriticalSignal -> stop_ ExitNormal)
handleDispatch :: forall s a. Serializable a => (s -> a -> Process (ProcessAction s)) -> Dispatcher s Source
Constructs a handler for both call and cast messages.
handleDispatch = handleDispatchIf (const True)
handleExit :: forall s a. Serializable a => (s -> ProcessId -> a -> Process (ProcessAction s)) -> ExitSignalDispatcher s Source
Creates an exit handler scoped to the execution of any and all the registered call, cast and info handlers for the process.
Stateless callbacks
handleCall_ :: (Serializable a, Serializable b) => (a -> Process b) -> Dispatcher s Source
handleCallFrom_ :: forall s a b. (Serializable a, Serializable b) => (CallRef b -> a -> Process (ProcessReply b s)) -> Dispatcher s Source
A variant of handleCallFrom_
that ignores the state argument.
:: (Serializable a, Serializable b) | |
=> Condition s a | predicate that must be satisfied for the handler to run |
-> (a -> Process b) | a function from an input message to a reply |
-> Dispatcher s |
Constructs a call
handler from an ordinary function in the Process
monad. This variant ignores the state argument present in handleCall
and
handleCallIf
and is therefore useful in a stateless server. Messges are
only dispatched to the handler if the supplied condition evaluates to True
See handleCall
handleCallFromIf_ :: forall s a b. (Serializable a, Serializable b) => Condition s a -> (CallRef b -> a -> Process (ProcessReply b s)) -> Dispatcher s Source
A variant of handleCallFromIf
that ignores the state argument.
handleCast_ :: Serializable a => (a -> s -> Process (ProcessAction s)) -> Dispatcher s Source
Version of handleCast
that ignores the server state.
:: Serializable a | |
=> Condition s a | predicate that must be satisfied for the handler to run |
-> (a -> s -> Process (ProcessAction s)) | a function from the input message to a stateless action, cf |
-> Dispatcher s |
Version of handleCastIf
that ignores the server state.
handleRpcChan_ :: forall a b. (Serializable a, Serializable b) => (SendPort b -> a -> Process (ProcessAction ())) -> Dispatcher () Source
A variant of handleRpcChan
that ignores the state argument.
handleRpcChanIf_ :: forall a b. (Serializable a, Serializable b) => Condition () a -> (SendPort b -> a -> Process (ProcessAction ())) -> Dispatcher () Source
A variant of handleRpcChanIf
that ignores the state argument.
Control channels
newControlChan :: Serializable m => Process (ControlChannel m) Source
Creates a new ControlChannel
.
channelControlPort :: Serializable m => ControlChannel m -> ControlPort m Source
Obtain an opaque expression for communicating with a ControlChannel
.
:: Serializable a | |
=> ControlChannel a | the receiving end of the control channel |
-> (s -> a -> Process (ProcessAction s)) | an action yielding function over the process state and input message |
-> Dispatcher s |
Constructs a control channel handler from a function in the
Process
monad. The handler expression returns no reply, and the
control message is treated in the same fashion as a cast
.
handleControlChan = handleControlChanIf $ input (const True)
handleControlChan_ :: forall s a. Serializable a => ControlChannel a -> (a -> s -> Process (ProcessAction s)) -> Dispatcher s Source
Version of handleControlChan
that ignores the server state.
Prioritised mailboxes
Constructing handler results
condition :: forall a b. (Serializable a, Serializable b) => (a -> b -> Bool) -> Condition a b Source
state :: forall s m. Serializable m => (s -> Bool) -> Condition s m Source
input :: forall s m. Serializable m => (m -> Bool) -> Condition s m Source
reply :: Serializable r => r -> s -> Process (ProcessReply r s) Source
Instructs the process to send a reply and continue running.
replyWith :: Serializable r => r -> ProcessAction s -> Process (ProcessReply r s) Source
Instructs the process to send a reply and evaluate the ProcessAction
.
noReply :: Serializable r => ProcessAction s -> Process (ProcessReply r s) Source
Instructs the process to skip sending a reply and evaluate a ProcessAction
noReply_ :: forall s r. Serializable r => s -> Process (ProcessReply r s) Source
Continue without giving a reply to the caller - equivalent to continue
,
but usable in a callback passed to the handleCall
family of functions.
haltNoReply_ :: Serializable r => ExitReason -> Process (ProcessReply r s) Source
Halt process execution during a call handler, without paying any attention to the expected return type.
continue :: s -> Process (ProcessAction s) Source
Instructs the process to continue running and receiving messages.
continue_ :: s -> Process (ProcessAction s) Source
Version of continue
that can be used in handlers that ignore process state.
timeoutAfter :: Delay -> s -> Process (ProcessAction s) Source
Instructs the process loop to wait for incoming messages until Delay
is exceeded. If no messages are handled during this period, the timeout
handler will be called. Note that this alters the process timeout permanently
such that the given Delay
will remain in use until changed.
timeoutAfter_ :: Delay -> s -> Process (ProcessAction s) Source
Version of timeoutAfter
that can be used in handlers that ignore process state.
action (\(TimeoutPlease duration) -> timeoutAfter_ duration)
hibernate :: TimeInterval -> s -> Process (ProcessAction s) Source
Instructs the process to hibernate for the given TimeInterval
. Note
that no messages will be removed from the mailbox until after hibernation has
ceased. This is equivalent to calling threadDelay
.
hibernate_ :: TimeInterval -> s -> Process (ProcessAction s) Source
Version of hibernate
that can be used in handlers that ignore process state.
action (\(HibernatePlease delay) -> hibernate_ delay)
stop :: ExitReason -> Process (ProcessAction s) Source
Instructs the process to terminate, giving the supplied reason. If a valid
shutdownHandler
is installed, it will be called with the ExitReason
returned from this call, along with the process state.
stopWith :: s -> ExitReason -> Process (ProcessAction s) Source
As stop
, but provides an updated state for the shutdown handler.
stop_ :: ExitReason -> s -> Process (ProcessAction s) Source
Version of stop
that can be used in handlers that ignore process state.
action (\ClientError -> stop_ ExitNormal)
replyTo :: Serializable m => CallRef m -> m -> Process () Source
Sends a reply explicitly to a caller.
replyTo = sendTo
replyChan :: Serializable m => SendPort m -> m -> Process () Source
Sends a reply to a SendPort
(for use in handleRpcChan
et al).
replyChan = sendChan