rio-process-pool-1.0.1: A library for process pools coupled with asynchronous message queues
Safe HaskellNone
LanguageHaskell2010

RIO.ProcessPool

Description

Launch- and Dispatch messages to processes.

A pool has an Input for Multiplexed messages, and dispatches incoming messges to concurrent processes using user defined MessageBoxes.

The pool starts and stops the processes and creates the message boxes.

The user supplied PoolWorkerCallback usually runs a loop that receives messages from the MessageBox created by the pool for that worker.

When a worker process dies, e.g. because the PoolWorkerCallback returns, the pool process will also cancel the process (just to make sure...) and cleanup the internal Broker.

Synopsis

Documentation

A process that receives messages and dispatches them to a callback. Each message must contain a key that identifies a resource. That resource is created and cleaned by user supplied callback functions.

type ResourceCleaner k a m = k -> a -> RIO m () Source #

User supplied callback called _with exceptions masked_ when the MessageHandler returns RemoveResource (Sync-) Exceptions thrown from this function are caught, and do not prevent the removal of the resource, also the broker continues.

  • k is the key for the resource associated to an incoming message
  • a specifies the resource type.
  • m is the monad of the returned action.

type ResourceCreator k w a m = k -> Maybe w -> RIO m a Source #

User supplied callback to create and initialize a resource. (Sync-) Exceptions thrown from this function are caught, and the broker continues.

  • k is the key for the resource associated to an incoming message
  • w is the type of the demultiplexed messages.
  • a specifies the resource type.
  • m is the monad of the returned action.

data Multiplexed k w Source #

The action that the broker has to take for in incoming message.

  • k is the key for the resource associated to an incoming message
  • w is the type of the demultiplexed messages.

Constructors

Initialize k !(Maybe w)

The message is an initialization message, that requires the creation of a new resouce for the given key. When the resource is created, then maybe additionally a message will also be dispatched.

Dispatch k w

Dispatch a message using an existing resource. Silently ignore if no resource for the key exists.

data ResourceUpdate a Source #

This value indicates in what state a worker is in after the MessageHandler action was executed.

Constructors

KeepResource

The resources is still required.

UpdateResource a

The resource is still required but must be updated.

RemoveResource !(Maybe a)

The resource is obsolete and can be removed from the broker. The broker will call ResourceCleaner either on the current, or an updated resource value.

type MessageHandler k w a m = k -> w -> a -> RIO m (ResourceUpdate a) Source #

User supplied callback to use the Multiplexed message and the associated resource. (Sync-) Exceptions thrown from this function are caught and lead to immediate cleanup of the resource but the broker continues.

  • Type k is the key for the resource associated to an incoming message
  • Type w is the type of incoming, demultiplexed, messages.
  • Type a specifies the resource type.
  • Type m is the base monad

type Demultiplexer w' k w = w' -> Multiplexed k w Source #

User supplied callback to extract the key and the Multiplexed from a message. (Sync-) Exceptions thrown from this function are caught and lead to dropping of the incoming message, while the broker continues.

  • k is the key for the resource associated to an incoming message
  • w' is the type of incoming messages.
  • w is the type of the demultiplexed messages.

data BrokerConfig k w' w a m Source #

The broker configuration, used by spawnBroker.

  • k is the key for the resource associated to an incoming message
  • w' is the type of incoming messages.
  • w is the type of the demultiplexed messages.
  • a specifies the resource type.
  • m is the base monad

data BrokerResult Source #

This is just what the Async returned from spawnBroker returns, it's current purpose is to make code easier to read.

Instead of some Async () that could be anything, there is Async BrokerResult.

Constructors

MkBrokerResult 

Instances

Instances details
Eq BrokerResult Source # 
Instance details

Defined in RIO.ProcessPool.Broker

Show BrokerResult Source # 
Instance details

Defined in RIO.ProcessPool.Broker

spawnBroker :: forall brokerBoxArg k w' w a m. (HasLogFunc m, Ord k, Display k, IsMessageBoxArg brokerBoxArg) => brokerBoxArg -> BrokerConfig k w' w a m -> RIO m (Either SomeException (Input (MessageBox brokerBoxArg) w', Async BrokerResult)) Source #

Spawn a broker with a new MessageBox, and return its message Input channel as well as the Async handle of the spawned process, needed to stop the broker process.

  • k is the key for the resource associated to an incoming message
  • w' is the type of incoming messages.
  • w is the type of the demultiplexed messages.
  • a specifies the resource type.
  • m is the base monad

A process that receives messages and dispatches them to other processes. Building directly on RIO.ProcessPool.Broker, it provides a central message box Input, from which messages are are delivered to the corresponding message box Inputs.

data Pool poolBox k w Source #

A record containing the message box Input of the Broker and the Async value required to cancel the pools broker process.

Constructors

MkPool 

Fields

newtype PoolWorkerCallback workerBox w k m Source #

The function that processes a MessageBox of a worker for a specific key.

Constructors

MkPoolWorkerCallback 

Fields

spawnPool :: forall k w poolBox workerBox m. (IsMessageBoxArg poolBox, IsMessageBoxArg workerBox, Ord k, Display k, HasLogFunc m) => poolBox -> workerBox -> PoolWorkerCallback workerBox w k m -> RIO m (Either SomeException (Pool poolBox k w)) Source #

Start a Pool.

Start a process that receives messages sent to the poolInput and dispatches them to the Input of pool member processes. If necessary the pool worker processes are started.

Each pool worker process is started using async and executes the PoolWorkerCallback.

When the callback returns, the process will exit.

Internally the pool uses the async function to wrap the callback.

When a Multiplixed Dispatch message is received with a Nothing then the worker is cancelled and the worker is removed from the map.

Such a message is automatically sent after the PoolWorkerCallback has returned, even when an exception was thrown. See finally.

removePoolWorkerMessage :: k -> Multiplexed k (Maybe w) Source #

This message will cancel the worker with the given key. If the PoolWorkerCallback wants to do cleanup it should use finally or onException.

Re-export.