distributed-process- Cloud Haskell: Erlang-style concurrency in Haskell

Safe HaskellNone




type AgentConfig = (Tracer, Weak (CQueue Message), ((TChan Message, TChan Message) -> Process ()) -> IO ProcessId) Source

A triple containing a configured tracer, weak pointer to the agent controller's mailbox (CQueue) and an expression used to instantiate new agents on the current node.

mxAgentController :: Fork -> MVar AgentConfig -> Process () Source

Starts a management agent for the current node. The agent process must not crash or be killed, so we generally avoid publishing its ProcessId where possible.

Our process is also responsible for forwarding messages to the trace controller, since having two special processes handled via the LocalNode would be inelegant. We forward messages directly to the trace controller's message queue, just as the MxEventBus that's set up on the LocalNode forwards messages directly to us. This optimises the code path for tracing and avoids overloading the node node controller's internal control plane with additional routing, at the cost of a little more complexity and two cases where we break encapsulation.

mxStartAgent :: Fork -> TChan Message -> ((TChan Message, TChan Message) -> Process ()) -> IO ProcessId Source

Forks a new process in which an mxAgent is run.

startTracing :: Fork -> IO Tracer Source

Start the tracer controller.

startDeadLetterQueue :: TChan Message -> IO () Source

Start a dead letter (agent) queue.

If no agents are registered on the system, the management event bus will fill up and its data won't be GC'ed until someone comes along and reads from the broadcast channel (via dupTChan of course). This is effectively a leak, so to mitigate it, we start a dead letter queue that drains the event bus continuously, thus ensuring if there are no other consumers that we won't use up heap space unnecessarily.