distributed-process- Cloud Haskell: Erlang-style concurrency in Haskell

Copyright(c) Well-Typed / Tim Watson
LicenseBSD3 (see the file LICENSE)
MaintainerTim Watson <watson.timothy@gmail.com>
Portabilitynon-portable (requires concurrency)
Safe HaskellNone




Management Extensions API

This module presents an API for creating Management Agents: special processes that are capable of receiving and responding to a node's internal system events. These system events are delivered by the management event bus: An internal subsystem maintained for each running node, to which all agents are automatically subscribed.

Agents are defined in terms of event sinks, taking a particular Serializable type and evaluating to an action in the MxAgent monad in response. Each MxSink evaluates to an MxAction that specifies whether the agent should continue processing it's inputs or stop. If the type of a message cannot be matched to any of the agent's sinks, it will be discarded. A sink can also deliberately skip processing a message, deferring to the remaining handlers. This is the only way that more than one event sink can handle the same data type, since otherwise the first type match will win every time a message arrives. See mxSkip for details.

Various events are published to the management event bus automatically, the full list of which can be found in the definition of the MxEvent data type. Additionally, clients of the Management API can publish arbitrary Serializable data to the event bus using mxNotify. All running agents receive all events (from the primary event bus to which they're subscribed).

Agent processes are automatically registered on the local node, and can receive messages via their mailbox just like ordinary processes. Unlike ordinary Process code however, it is unnecessary (though possible) for agents to use the base expect and receiveX primitives to do this, since the management infrastructure will continuously read from both the primary event bus and the process' own mailbox. Messages are transparently passed to the agent's event sinks from both sources, so an agent need only concern itself with how to respond to its inputs.

Some agents may wish to prioritise messages from their mailbox over traffic on the management event bus, or vice versa. The mxReceive and mxReceiveChan API calls do this for the mailbox and event bus, respectively. The prioritisation these APIs offer is simply that the chosen data stream will be checked first. No blocking will occur if the chosen (prioritised) source is devoid of input messages, instead the agent handling code will revert to switching between the alternatives in round-robin as usual. If messages exist in one or more channels, they will be consumed as soon as they're available, priority is effectively a hint about which channel to consume from, should messages be available in both.

Prioritisation then, is a hint about the preference of data source from which the next input should be chosen. No guarantee can be made that the chosen source will in fact be selected at runtime.

Management API Semantics

The management API provides no guarantees whatsoever, viz:

  • The ordering of messages delivered to the event bus.
  • The order in which agents will be executed.
  • Whether messages will be taken from the mailbox first, or the event bus.
Management Data API

Both management agents and clients of the API have access to a variety of data storage capabilities, to facilitate publishing and consuming useful system information. Agents maintain their own internal state privately (via a state transformer - see mxGetLocal et al), however it is possible for agents to share additional data with each other (and the outside world) using data tables.

Each agent is assigned its own data table, which acts as a shared map, where the keys are Strings and the values are Serializable datum of whatever type the agent or its clients stores.

Because an agent's data table stores its values in raw Message format, it works effectively as an un-typed dictionary, into which data of varying types can be fed and later retrieved. The upside of this is that different keys can be mapped to various types without any additional work on the part of the developer. The downside is that the code reading these values must know in advance what type(s) to expect, and the API provides no additional support for handling that.

Publishing is accomplished using the mxPublish and mxSet APIs, whilst querying and deletion are handled by mxGet, mxClear, mxPurgeTable and mxDropTable respectively.

When a management agent terminates, their tables are left in memory despite termination, such that an agent may resume its role (by restarting) or have its MxAgentId taken over by another subsequent agent, leaving the data originally captured in place.

Defining Agents

New agents are defined with mxAgent and require a unique MxAgentId, an initial state - MxAgent runs in a state transformer - and a list of the agent's event sinks. Each MxSink is defined in terms of a specific Serializable type, via the mxSink function, binding the event handler expression to inputs of only that type.

Apart from modifying its own local state, an agent can execute arbitrary Process a code via lifting (see liftMX) and even publish its own messages back to the primary event bus (see mxBroadcast).

Since messages are delivered to agents from both the management event bus and the agent processes mailbox, agents (i.e., event sinks) will generally have no idea as to their origin. An agent can, however, choose to prioritise the choice of input (source) each time one of its event sinks runs. The standard way for an event sink to indicate that the agent is ready for its next input is to evaluate mxReady. When this happens, the management infrastructure will obtain data from the event bus and process' mailbox in a round robbin fashion, i.e., one after the other, changing each time.

Example Code

What follows is a grossly over-simplified example of a management agent that provides a basic name monitoring facility. Whenever a process name is registered or unregistered, clients are informed of the fact.

-- simple notification data type

data Registration = Reg { added  :: Bool
                        , procId :: ProcessId
                        , name   :: String

-- start a /name monitoring agent/
nameMonitorAgent = do
  mxAgent (MxAgentId "name-monitor") Set.empty [
        (mxSink $ \(pid :: ProcessId) -> do
           mxUpdateState $ Set.insert pid
      , (mxSink $
            let act =
                  case ev of
                    (MxRegistered   p n) -> notify True  n p
                    (MxUnRegistered p n) -> notify False n p
                    _                    -> return ()
            act >> mxReady)
    notify a n p = do
      Foldable.mapM_ (liftMX . deliver (Reg a n p)) =<< mxGetLocal

The client interface (for sending their pid) can take one of two forms:

monitorNames = getSelfPid >>= nsend "name-monitor"
monitorNames2 = getSelfPid >>= mxNotify

For some real-world examples, see the distributed-process-platform package.

Performance, Stablity and Scalability

Management Agents offer numerous advantages over regular processes: broadcast communication with them can have a lower latency, they offer simplified messgage (i.e., input type) handling and they have access to internal system information that would be otherwise unobtainable.

Do not be tempted to implement everything (e.g., the kitchen sink) using the management API though. There are overheads associated with management agents which is why they're presented as tools for consuming low level system information, instead of as application level development tools.

Agents that rely heavily on a busy mailbox can cause the management event bus to backlog un-GC'ed data, leading to increased heap space. Producers that do not take care to avoid passing unevaluated thunks to the API can crash all the agents in the system. Agents are not monitored or managed in any way, and those that crash will not be restarted.

The management event bus can receive a great deal of traffic. Every time a message is sent and/or received, an event is passed to the agent controller and broadcast to all agents (plus the trace controller, if tracing is enabled for the node). This is already a significant overhead - though profiling and benchmarks have demonstrated that it does not adversely affect performance if few agents are installed. Agents will typically use more cycles than plain processes, since they perform additional work: selecting input data from both the event bus and their own mailboxes, plus searching through the set of event sinks (for each agent) to determine the right handler for the event.

Each management agent requires not only its own Process (in which the agent code is run), but also a peer process that provides its data table. These data tables also have to be coordinated and manaaged on each agent's behalf.

Architecture Overview

The architecture of the management event bus is internal and subject to change without prior notice. The description that follows is provided for informational purposes only.

When a node initially starts, two special, internal system processes are started to support the management infrastructure. The first, known as the trace controller, is responsible for consuming MxEvents and forwarding them to the configured tracer - see Control.Distributed.Process.Debug for further details. The second is the management agent controller, and is the primary worker process underpinning the management infrastructure. All published management events are routed to this process, which places them onto a system wide event bus and additionally passes them directly to the trace controller.

There are several reasons for segregating the tracing and management control planes in this fashion. Tracing can be enabled or disabled by clients, whilst the management event bus cannot, since in addition to providing runtime instrumentation, its intended use-cases include node monitoring, peer discovery (via topology providing backends) and other essential system services that require knowledge of otherwise hidden system internals. Tracing is also subject to trace flags that limit the specific MxEvents delivered to trace clients - an overhead/complexity not shared by management agents. Finally, tracing and management agents are implemented using completely different signalling techniques - more on this later - which would introduce considerable complexity if the shared the same event loop.

The management control plane is driven by a shared broadcast channel, which is written to by the agent controller and subscribed to by all agent processes. Agents are spawned as regular processes, whose primary implementation (i.e., server loop) is responsible for consuming messages from both the broadcast channel and their own mailbox. Once consumed, messages are applied to the agent's event sinks until one matches the input, at which point it is applied and the loop continues. The implementation chooses from the event bus and the mailbox in a round-robin fashion, until a message is received. This polling activity would lead to management agents consuming considerable system resources if left unchecked, therefore the implementation will poll for a limitted number of retries, after which it will perform a blocking read on the event bus.



data MxEvent Source

This is the default management event, fired for various internal events around the NT connection and Process lifecycle. All published events that conform to this type, are eligible for tracing - i.e., they will be delivered to the trace controller.


MxSpawned ProcessId

fired whenever a local process is spawned

MxRegistered ProcessId String

fired whenever a process/name is registered (locally)

MxUnRegistered ProcessId String

fired whenever a process/name is unregistered (locally)

MxProcessDied ProcessId DiedReason

fired whenever a process dies

MxNodeDied NodeId DiedReason

fired whenever a node dies (i.e., the connection is broken/disconnected)

MxSent ProcessId ProcessId Message

fired whenever a message is sent from a local process

MxReceived ProcessId Message

fired whenever a message is received by a local process

MxConnected ConnectionId EndPointAddress

fired when a network-transport connection is first established

MxDisconnected ConnectionId EndPointAddress

fired when a network-transport connection is broken/disconnected

MxUser Message

a user defined trace event

MxLog String

a logging event - used for debugging purposes only

MxTraceTakeover ProcessId

notifies a trace listener that all subsequent traces will be sent to pid


notifies a trace listener that it has been disabled/removed

Firing Arbitrary Mx Events

mxNotify :: Serializable a => a -> Process () Source

Publishes an arbitrary Serializable message to the management event bus. Note that no attempt is made to force the argument, therefore it is very important that you do not pass unevaluated thunks that might crash the receiving process via this API, since all registered agents will gain access to the data structure once it is broadcast by the agent controller.

Constructing Mx Agents

data MxAction Source

Represents the actions a management agent can take when evaluating an event sink.

newtype MxAgentId Source

A newtype wrapper for an agent id (which is a string).




agentId :: String

data MxAgent s a Source

Monad for management agents.

mxAgent :: MxAgentId -> s -> [MxSink s] -> Process ProcessId Source

Activates a new agent.

mxAgentWithFinalize :: MxAgentId -> s -> [MxSink s] -> MxAgent s () -> Process ProcessId Source

Activates a new agent. This variant takes a finalizer expression, that is run once the agent shuts down (even in case of failure/exceptions). The finalizer expression runs in the mx monad - MxAgent s () - such that the agent's internal state remains accessible to the shutdown/cleanup code.

type MxSink s = Message -> MxAgent s (Maybe MxAction) Source

Type of a management agent's event sink.

mxSink :: forall s m. Serializable m => (m -> MxAgent s MxAction) -> MxSink s Source

Create an MxSink from an expression taking a Serializable type m, that yields an MxAction in the MxAgent monad.

mxGetId :: MxAgent s MxAgentId Source

Return the MxAgentId for the currently executing agent.

mxDeactivate :: forall s. String -> MxAgent s MxAction Source

Gracefully terminate an agent.

mxReady :: forall s. MxAgent s MxAction Source

Continue executing (i.e., receiving and processing messages).

mxSkip :: forall s. MxAgent s MxAction Source

Causes the currently executing event sink to be skipped. The remaining declared event sinks will be evaluated to find a matching handler. Can be used to allow multiple event sinks to process data of the same type.

mxReceive :: forall s. MxAgent s MxAction Source

Continue exeucting, prioritising inputs from the process' own mailbox ahead of data from the management event bus.

mxReceiveChan :: forall s. MxAgent s MxAction Source

Continue exeucting, prioritising inputs from the management event bus over the process' own mailbox.

mxBroadcast :: Serializable m => m -> MxAgent s () Source

The MxAgent version of mxNotify.

mxSetLocal :: s -> MxAgent s () Source

Set the agent's local state.

mxGetLocal :: MxAgent s s Source

Fetch the agent's local state.

mxUpdateLocal :: (s -> s) -> MxAgent s () Source

Update the agent's local state.

liftMX :: Process a -> MxAgent s a Source

Lift a Process action.

Mx Data API

mxPublish :: MxAgentId -> String -> Message -> Process () Source

Publish an arbitrary Message as a property in the management database.

For publishing Serializable data, use mxSet instead.

mxSet :: Serializable a => MxAgentId -> String -> a -> Process () Source

Sets an arbitrary Serializable datum against a key in the management database. Note that no attempt is made to force the argument, therefore it is very important that you do not pass unevaluated thunks that might crash some other, arbitrary process (or management agent!) that obtains and attempts to force the value later on.

mxGet :: Serializable a => MxAgentId -> String -> Process (Maybe a) Source

Fetches a property from the management database for the given key. If the property is not set, or does not match the expected type when typechecked (at runtime), returns Nothing.

mxClear :: MxAgentId -> String -> Process () Source

Clears a property from the management database using the given key. If the key does not exist in the database, this is a noop.

mxPurgeTable :: MxAgentId -> Process () Source

Purges a table in the management database of all its stored properties.

mxDropTable :: MxAgentId -> Process () Source

Deletes a table from the management database.