{-# LANGUAGE DeriveDataTypeable         #-}
{-# LANGUAGE DeriveGeneric              #-}
{-# LANGUAGE StandaloneDeriving         #-}
{-# LANGUAGE ExistentialQuantification  #-}
{-# LANGUAGE ScopedTypeVariables        #-}
{-# LANGUAGE RecordWildCards            #-}
{-# LANGUAGE PatternGuards              #-}
{-# LANGUAGE FlexibleInstances          #-}
{-# LANGUAGE UndecidableInstances       #-}

-----------------------------------------------------------------------------
-- |
-- Module      :  Control.Distributed.Process.Management
-- Copyright   :  (c) Well-Typed / Tim Watson
-- License     :  BSD3 (see the file LICENSE)
--
-- Maintainer  :  Tim Watson <watson.timothy@gmail.com>
-- Stability   :  experimental
-- Portability :  non-portable (requires concurrency)
--
-- [Management Extensions API]
--
-- This module presents an API for creating /Management Agents/:
-- special processes that are capable of receiving and responding to
-- a node's internal system events. These /system events/ are delivered by
-- the management event bus: An internal subsystem maintained for each
-- running node, to which all agents are automatically subscribed.
--
-- /Agents/ are defined in terms of /event sinks/, taking a particular
-- @Serializable@ type and evaluating to an action in the 'MxAgent' monad in
-- response. Each 'MxSink' evaluates to an 'MxAction' that specifies whether
-- the agent should continue processing it's inputs or stop. If the type of a
-- message cannot be matched to any of the agent's sinks, it will be discarded.
-- A sink can also deliberately skip processing a message, deferring to the
-- remaining handlers. This is the /only/ way that more than one event sink
-- can handle the same data type, since otherwise the first /type match/ will
-- /win/ every time a message arrives. See 'mxSkip' for details.
--
-- Various events are published to the management event bus automatically,
-- the full list of which can be found in the definition of the 'MxEvent' data
-- type. Additionally, clients of the /Management API/ can publish arbitrary
-- @Serializable@ data to the event bus using 'mxNotify'. All running agents
-- receive all events (from the primary event bus to which they're subscribed).
--
-- Agent processes are automatically registered on the local node, and can
-- receive messages via their mailbox just like ordinary processes. Unlike
-- ordinary @Process@ code however, it is unnecessary (though possible) for
-- agents to use the base @expect@ and @receiveX@ primitives to do this, since
-- the management infrastructure will continuously read from both the primary
-- event bus /and/ the process' own mailbox. Messages are transparently passed
-- to the agent's event sinks from both sources, so an agent need only concern
-- itself with how to respond to its inputs.
--
-- Some agents may wish to prioritise messages from their mailbox over traffic
-- on the management event bus, or vice versa. The 'mxReceive' and
-- 'mxReceiveChan' API calls do this for the mailbox and event bus,
-- respectively. The /prioritisation/ these APIs offer is simply that the chosen
-- data stream will be checked first. No blocking will occur if the chosen
-- (prioritised) source is devoid of input messages, instead the agent handling
-- code will revert to switching between the alternatives in /round-robin/ as
-- usual. If messages exist in one or more channels, they will be consumed as
-- soon as they're available, priority is effectively a hint about which
-- channel to consume from, should messages be available in both.
--
-- Prioritisation then, is a /hint/ about the preference of data source from
-- which the next input should be chosen. No guarantee can be made that the
-- chosen source will in fact be selected at runtime.
--
-- [Management API Semantics]
--
-- The management API provides /no guarantees whatsoever/, viz:
--
--  * The ordering of messages delivered to the event bus.
--
--  * The order in which agents will be executed.
--
--  * Whether messages will be taken from the mailbox first, or the event bus.
--
-- Since the event bus uses STM broadcast channels to communicate with agents,
-- no message written to the bus successfully can be lost.
--
-- Agents can also receive messages via their mailboxes - these are subject to
-- the same guarantees as all inter-process message sending.
--
-- Messages dispatched on an STM broadcast channel (i.e., management event bus)
-- are guaranteed to be delivered with the same FIFO ordering guarantees that
-- exist between two communicating processes, such that communication from the
-- node controller's threads (i.e., MxEvent's) will never be re-ordered, but
-- messages dispatched to the event bus by other processes (including, but not
-- limited to agents) are only guaranteed to be ordered between one sender and
-- one receiver.
--
-- No guarantee exists for the ordering in which messages sent to an agent's
-- mailbox will be delivered, vs messages dispatched via the event bus.
--
-- Because of the above, there are no ordering guarantees for messages sent
-- between agents, or for processes to agents, except for those that apply to
-- messages sent between regular processes, since agents are
-- implemented as such.
--
-- The event bus is serial and single threaded. Anything that is published by
-- the node controller will be seen in FIFO order. There are no ordering
-- guarantees pertaining to entries published to the event bus by other
-- processes or agents.
--
-- It should not be possible to see, for example, an @MxReceived@ before the
-- corresponding @MxSent@ event, since the places where we issue the @MxSent@
-- write directly to the event bus (using STM) in the calling (green) thread,
-- before dispatching instructions to the node controller to perform the
-- necessary routing to deliver the message to a process (or registered name,
-- or typed channel) locally or remotely.
--
-- [Management Data API]
--
-- Both management agents and clients of the API have access to a variety of
-- data storage capabilities, to facilitate publishing and consuming useful
-- system information. Agents maintain their own internal state privately (via a
-- state transformer - see 'mxGetLocal' et al), however it is possible for
-- agents to share additional data with each other (and the outside world)
-- using whatever mechanism the user wishes, e.g., acidstate, or shared memory
-- primitives.
--
-- [Defining Agents]
--
-- New agents are defined with 'mxAgent' and require a unique 'MxAgentId', an
-- initial state - 'MxAgent' runs in a state transformer - and a list of the
-- agent's event sinks. Each 'MxSink' is defined in terms of a specific
-- @Serializable@ type, via the 'mxSink' function, binding the event handler
-- expression to inputs of only that type.
--
-- Apart from modifying its own local state, an agent can execute arbitrary
-- @Process a@ code via lifting (see 'liftMX') and even publish its own messages
-- back to the primary event bus (see 'mxBroadcast').
--
-- Since messages are delivered to agents from both the management event bus and
-- the agent processes mailbox, agents (i.e., event sinks) will generally have
-- no idea as to their origin. An agent can, however, choose to prioritise the
-- choice of input (source) each time one of its event sinks runs. The /standard/
-- way for an event sink to indicate that the agent is ready for its next input
-- is to evaluate 'mxReady'. When this happens, the management infrastructure
-- will obtain data from the event bus and process' mailbox in a round robbin
-- fashion, i.e., one after the other, changing each time.
--
-- [Example Code]
--
-- What follows is a grossly over-simplified example of a management agent that
-- provides a basic name monitoring facility. Whenever a process name is
-- registered or unregistered, clients are informed of the fact.
--
-- > -- simple notification data type
-- >
-- > data Registration = Reg { added  :: Bool
-- >                         , procId :: ProcessId
-- >                         , name   :: String
-- >                         }
-- >
-- > -- start a /name monitoring agent/
-- > nameMonitorAgent = do
-- >   mxAgent (MxAgentId "name-monitor") Set.empty [
-- >         (mxSink $ \(pid :: ProcessId) -> do
-- >            mxUpdateState $ Set.insert pid
-- >            mxReady)
-- >       , (mxSink $
-- >             let act =
-- >                   case ev of
-- >                     (MxRegistered   p n) -> notify True  n p
-- >                     (MxUnRegistered p n) -> notify False n p
-- >                     _                    -> return ()
-- >             act >> mxReady)
-- >     ]
-- >   where
-- >     notify a n p = do
-- >       Foldable.mapM_ (liftMX . deliver (Reg a n p)) =<< mxGetLocal
-- >
--
-- The client interface (for sending their pid) can take one of two forms:
--
-- > monitorNames = getSelfPid >>= nsend "name-monitor"
-- > monitorNames2 = getSelfPid >>= mxNotify
--
-- [Performance, Stablity and Scalability]
--
-- /Management Agents/ offer numerous advantages over regular processes:
-- broadcast communication with them can have a lower latency, they offer
-- simplified messgage (i.e., input type) handling and they have access to
-- internal system information that would be otherwise unobtainable.
--
-- Do not be tempted to implement everything (e.g., the kitchen sink) using the
-- management API though. There are overheads associated with management agents
-- which is why they're presented as tools for consuming low level system
-- information, instead of as /application level/ development tools.
--
-- Agents that rely heavily on a busy mailbox can cause the management event
-- bus to backlog un-GC'ed data, leading to increased heap space. Producers that
-- do not take care to avoid passing unevaluated thunks to the API can crash
-- /all/ the agents in the system. Agents are not monitored or managed in any
-- way, and those that crash will not be restarted.
--
-- The management event bus can receive a great deal of traffic. Every time
-- a message is sent and/or received, an event is passed to the agent controller
-- and broadcast to all agents (plus the trace controller, if tracing is enabled
-- for the node). This is already a significant overhead - though profiling and
-- benchmarks have demonstrated that it does not adversely affect performance
-- if few agents are installed. Agents will typically use more cycles than plain
-- processes, since they perform additional work: selecting input data from both
-- the event bus /and/ their own mailboxes, plus searching through the set of
-- event sinks (for each agent) to determine the right handler for the event.
--
-- [Architecture Overview]
--
-- The architecture of the management event bus is internal and subject to
-- change without prior notice. The description that follows is provided for
-- informational purposes only.
--
-- When a node initially starts, two special, internal system processes are
-- started to support the management infrastructure. The first, known as the
-- /trace controller/, is responsible for consuming 'MxEvent's and forwarding
-- them to the configured tracer - see "Control.Distributed.Process.Debug" for
-- further details. The second is the /management agent controller/, and is the
-- primary worker process underpinning the management infrastructure. All
-- published management events are routed to this process, which places them
-- onto a system wide /event bus/ and additionally passes them directly to the
-- /trace controller/.
--
-- There are several reasons for segregating the tracing and management control
-- planes in this fashion. Tracing can be enabled or disabled by clients, whilst
-- the management event bus cannot, since in addition to providing
-- runtime instrumentation, its intended use-cases include node monitoring, peer
-- discovery (via topology providing backends) and other essential system
-- services that require knowledge of otherwise hidden system internals. Tracing
-- is also subject to /trace flags/ that limit the specific 'MxEvent's delivered
-- to trace clients - an overhead/complexity not shared by management agents.
-- Finally, tracing and management agents are implemented using completely
-- different signalling techniques - more on this later - which would introduce
-- considerable complexity if the shared the same /event loop/.
--
-- The management control plane is driven by a shared broadcast channel, which
-- is written to by the agent controller and subscribed to by all agent
-- processes. Agents are spawned as regular processes, whose primary
-- implementation (i.e., /server loop/) is responsible for consuming
-- messages from both the broadcast channel and their own mailbox. Once
-- consumed, messages are applied to the agent's /event sinks/ until one
-- matches the input, at which point it is applied and the loop continues.
-- The implementation chooses from the event bus and the mailbox in a
-- round-robin fashion, until a message is received. This polling activity would
-- lead to management agents consuming considerable system resources if left
-- unchecked, therefore the implementation will poll for a limitted number of
-- retries, after which it will perform a blocking read on the event bus.
--
-----------------------------------------------------------------------------
module Control.Distributed.Process.Management
  (
    MxEvent(..)
    -- * Firing Arbitrary /Mx Events/
  , mxNotify
    -- * Constructing Mx Agents
  , MxAction()
  , MxAgentId(..)
  , MxAgent()
  , mxAgent
  , mxAgentWithFinalize
  , MxSink()
  , mxSink
  , mxGetId
  , mxDeactivate
  , mxReady
  , mxSkip
  , mxReceive
  , mxReceiveChan
  , mxBroadcast
  , mxSetLocal
  , mxGetLocal
  , mxUpdateLocal
  , liftMX
  ) where

import Control.Applicative
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TChan
  ( readTChan
  , writeTChan
  , TChan
  )
import Control.Distributed.Process.Internal.Primitives
  ( receiveWait
  , matchAny
  , matchSTM
  , unwrapMessage
  , register
  , whereis
  , die
  )
import Control.Distributed.Process.Internal.Types
  ( Process
  , ProcessId
  , Message
  , LocalProcess(..)
  , LocalNode(..)
  , MxEventBus(..)
  , unsafeCreateUnencodedMessage
  )
import Control.Distributed.Process.Management.Internal.Bus (publishEvent)
import Control.Distributed.Process.Management.Internal.Types
  ( MxAgentId(..)
  , MxAgent(..)
  , MxAction(..)
  , ChannelSelector(..)
  , MxAgentState(..)
  , MxSink
  , MxEvent(..)
  )
import Control.Distributed.Process.Serializable (Serializable)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Reader (ask)
import Control.Monad.Catch (onException)
import qualified Control.Monad.State as ST
  ( get
  , modify
  , lift
  , runStateT
  )
import Prelude

-- | Publishes an arbitrary @Serializable@ message to the management event bus.
-- Note that /no attempt is made to force the argument/, therefore it is very
-- important that you do not pass unevaluated thunks that might crash the
-- receiving process via this API, since /all/ registered agents will gain
-- access to the data structure once it is broadcast by the agent controller.
mxNotify :: (Serializable a) => a -> Process ()
mxNotify :: forall a. Serializable a => a -> Process ()
mxNotify a
msg = do
  MxEventBus
bus <- LocalNode -> MxEventBus
localEventBus (LocalNode -> MxEventBus)
-> (LocalProcess -> LocalNode) -> LocalProcess -> MxEventBus
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LocalProcess -> LocalNode
processNode (LocalProcess -> MxEventBus)
-> Process LocalProcess -> Process MxEventBus
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
  IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> Message -> IO ()
publishEvent MxEventBus
bus (Message -> IO ()) -> Message -> IO ()
forall a b. (a -> b) -> a -> b
$ a -> Message
forall a. Serializable a => a -> Message
unsafeCreateUnencodedMessage a
msg

--------------------------------------------------------------------------------
-- API for writing user defined management extensions (i.e., agents)          --
--------------------------------------------------------------------------------

-- | Return the 'MxAgentId' for the currently executing agent.
--
mxGetId :: MxAgent s MxAgentId
mxGetId :: forall s. MxAgent s MxAgentId
mxGetId = MxAgent s (MxAgentState s)
forall s (m :: * -> *). MonadState s m => m s
ST.get MxAgent s (MxAgentState s)
-> (MxAgentState s -> MxAgent s MxAgentId) -> MxAgent s MxAgentId
forall a b. MxAgent s a -> (a -> MxAgent s b) -> MxAgent s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= MxAgentId -> MxAgent s MxAgentId
forall a. a -> MxAgent s a
forall (m :: * -> *) a. Monad m => a -> m a
return (MxAgentId -> MxAgent s MxAgentId)
-> (MxAgentState s -> MxAgentId)
-> MxAgentState s
-> MxAgent s MxAgentId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MxAgentState s -> MxAgentId
forall s. MxAgentState s -> MxAgentId
mxAgentId

-- | The 'MxAgent' version of 'mxNotify'.
--
mxBroadcast :: (Serializable m) => m -> MxAgent s ()
mxBroadcast :: forall m s. Serializable m => m -> MxAgent s ()
mxBroadcast m
msg = do
  MxAgentState s
state <- MxAgent s (MxAgentState s)
forall s (m :: * -> *). MonadState s m => m s
ST.get
  Process () -> MxAgent s ()
forall a s. Process a -> MxAgent s a
liftMX (Process () -> MxAgent s ()) -> Process () -> MxAgent s ()
forall a b. (a -> b) -> a -> b
$ IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    TChan Message -> Message -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan (MxAgentState s -> TChan Message
forall s. MxAgentState s -> TChan Message
mxBus MxAgentState s
state) (m -> Message
forall a. Serializable a => a -> Message
unsafeCreateUnencodedMessage m
msg)

-- | Gracefully terminate an agent.
--
mxDeactivate :: forall s. String -> MxAgent s MxAction
mxDeactivate :: forall s. String -> MxAgent s MxAction
mxDeactivate = MxAction -> MxAgent s MxAction
forall a. a -> MxAgent s a
forall (m :: * -> *) a. Monad m => a -> m a
return (MxAction -> MxAgent s MxAction)
-> (String -> MxAction) -> String -> MxAgent s MxAction
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> MxAction
MxAgentDeactivate

-- | Continue executing (i.e., receiving and processing messages).
--
mxReady :: forall s. MxAgent s MxAction
mxReady :: forall s. MxAgent s MxAction
mxReady = MxAction -> MxAgent s MxAction
forall a. a -> MxAgent s a
forall (m :: * -> *) a. Monad m => a -> m a
return MxAction
MxAgentReady

-- | Causes the currently executing /event sink/ to be skipped.
-- The remaining declared event sinks will be evaluated to find
-- a matching handler. Can be used to allow multiple event sinks
-- to process data of the same type.
--
mxSkip :: forall s. MxAgent s MxAction
mxSkip :: forall s. MxAgent s MxAction
mxSkip = MxAction -> MxAgent s MxAction
forall a. a -> MxAgent s a
forall (m :: * -> *) a. Monad m => a -> m a
return MxAction
MxAgentSkip

-- | Continue exeucting, prioritising inputs from the process' own
-- /mailbox/ ahead of data from the management event bus.
--
mxReceive :: forall s. MxAgent s MxAction
mxReceive :: forall s. MxAgent s MxAction
mxReceive = MxAction -> MxAgent s MxAction
forall a. a -> MxAgent s a
forall (m :: * -> *) a. Monad m => a -> m a
return (MxAction -> MxAgent s MxAction) -> MxAction -> MxAgent s MxAction
forall a b. (a -> b) -> a -> b
$ ChannelSelector -> MxAction
MxAgentPrioritise ChannelSelector
Mailbox

-- | Continue exeucting, prioritising inputs from the management event bus
-- over the process' own /mailbox/.
--
mxReceiveChan :: forall s. MxAgent s MxAction
mxReceiveChan :: forall s. MxAgent s MxAction
mxReceiveChan = MxAction -> MxAgent s MxAction
forall a. a -> MxAgent s a
forall (m :: * -> *) a. Monad m => a -> m a
return (MxAction -> MxAgent s MxAction) -> MxAction -> MxAgent s MxAction
forall a b. (a -> b) -> a -> b
$ ChannelSelector -> MxAction
MxAgentPrioritise ChannelSelector
InputChan

-- | Lift a @Process@ action.
--
liftMX :: Process a -> MxAgent s a
liftMX :: forall a s. Process a -> MxAgent s a
liftMX Process a
p = StateT (MxAgentState s) Process a -> MxAgent s a
forall s a. StateT (MxAgentState s) Process a -> MxAgent s a
MxAgent (StateT (MxAgentState s) Process a -> MxAgent s a)
-> StateT (MxAgentState s) Process a -> MxAgent s a
forall a b. (a -> b) -> a -> b
$ Process a -> StateT (MxAgentState s) Process a
forall (m :: * -> *) a.
Monad m =>
m a -> StateT (MxAgentState s) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
ST.lift Process a
p

-- | Set the agent's local state.
--
mxSetLocal :: s -> MxAgent s ()
mxSetLocal :: forall s. s -> MxAgent s ()
mxSetLocal s
s = (MxAgentState s -> MxAgentState s) -> MxAgent s ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
ST.modify ((MxAgentState s -> MxAgentState s) -> MxAgent s ())
-> (MxAgentState s -> MxAgentState s) -> MxAgent s ()
forall a b. (a -> b) -> a -> b
$ \MxAgentState s
st -> MxAgentState s
st { mxLocalState = s }

-- | Update the agent's local state.
--
mxUpdateLocal :: (s -> s) -> MxAgent s ()
mxUpdateLocal :: forall s. (s -> s) -> MxAgent s ()
mxUpdateLocal s -> s
f = (MxAgentState s -> MxAgentState s) -> MxAgent s ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
ST.modify ((MxAgentState s -> MxAgentState s) -> MxAgent s ())
-> (MxAgentState s -> MxAgentState s) -> MxAgent s ()
forall a b. (a -> b) -> a -> b
$ \MxAgentState s
st -> MxAgentState s
st { mxLocalState = (f $ mxLocalState st) }

-- | Fetch the agent's local state.
--
mxGetLocal :: MxAgent s s
mxGetLocal :: forall s. MxAgent s s
mxGetLocal = MxAgent s (MxAgentState s)
forall s (m :: * -> *). MonadState s m => m s
ST.get MxAgent s (MxAgentState s)
-> (MxAgentState s -> MxAgent s s) -> MxAgent s s
forall a b. MxAgent s a -> (a -> MxAgent s b) -> MxAgent s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= s -> MxAgent s s
forall a. a -> MxAgent s a
forall (m :: * -> *) a. Monad m => a -> m a
return (s -> MxAgent s s)
-> (MxAgentState s -> s) -> MxAgentState s -> MxAgent s s
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MxAgentState s -> s
forall s. MxAgentState s -> s
mxLocalState

-- | Create an 'MxSink' from an expression taking a @Serializable@ type @m@,
-- that yields an 'MxAction' in the 'MxAgent' monad.
--
mxSink :: forall s m . (Serializable m)
       => (m -> MxAgent s MxAction)
       -> MxSink s
mxSink :: forall s m. Serializable m => (m -> MxAgent s MxAction) -> MxSink s
mxSink m -> MxAgent s MxAction
act Message
msg = do
  Maybe m
msg' <- Process (Maybe m) -> MxAgent s (Maybe m)
forall a s. Process a -> MxAgent s a
liftMX (Process (Maybe m) -> MxAgent s (Maybe m))
-> Process (Maybe m) -> MxAgent s (Maybe m)
forall a b. (a -> b) -> a -> b
$ (Message -> Process (Maybe m)
forall (m :: * -> *) a.
(Monad m, Serializable a) =>
Message -> m (Maybe a)
unwrapMessage Message
msg :: Process (Maybe m))
  case Maybe m
msg' of
    Maybe m
Nothing -> Maybe MxAction -> MxAgent s (Maybe MxAction)
forall a. a -> MxAgent s a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe MxAction
forall a. Maybe a
Nothing
    Just m
m  -> do
      MxAction
r <- m -> MxAgent s MxAction
act m
m
      case MxAction
r of
        MxAction
MxAgentSkip -> Maybe MxAction -> MxAgent s (Maybe MxAction)
forall a. a -> MxAgent s a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe MxAction
forall a. Maybe a
Nothing
        MxAction
_           -> Maybe MxAction -> MxAgent s (Maybe MxAction)
forall a. a -> MxAgent s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe MxAction -> MxAgent s (Maybe MxAction))
-> Maybe MxAction -> MxAgent s (Maybe MxAction)
forall a b. (a -> b) -> a -> b
$ MxAction -> Maybe MxAction
forall a. a -> Maybe a
Just MxAction
r

-- private ADT: a linked list of event sinks
data MxPipeline s =
  MxPipeline
  {
    forall s. MxPipeline s -> MxSink s
current  :: !(MxSink s)
  , forall s. MxPipeline s -> MxPipeline s
next     :: !(MxPipeline s)
  } | MxStop

-- | Activates a new agent.
--
mxAgent :: MxAgentId -> s -> [MxSink s] -> Process ProcessId
mxAgent :: forall s. MxAgentId -> s -> [MxSink s] -> Process ProcessId
mxAgent MxAgentId
mxId s
st [MxSink s]
hs = MxAgentId -> s -> [MxSink s] -> MxAgent s () -> Process ProcessId
forall s.
MxAgentId -> s -> [MxSink s] -> MxAgent s () -> Process ProcessId
mxAgentWithFinalize MxAgentId
mxId s
st [MxSink s]
hs (MxAgent s () -> Process ProcessId)
-> MxAgent s () -> Process ProcessId
forall a b. (a -> b) -> a -> b
$ () -> MxAgent s ()
forall a. a -> MxAgent s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Activates a new agent. This variant takes a /finalizer/ expression,
-- that is run once the agent shuts down (even in case of failure/exceptions).
-- The /finalizer/ expression runs in the mx monad -  @MxAgent s ()@ - such
-- that the agent's internal state remains accessible to the shutdown/cleanup
-- code.
--
mxAgentWithFinalize :: MxAgentId
        -> s
        -> [MxSink s]
        -> MxAgent s ()
        -> Process ProcessId
mxAgentWithFinalize :: forall s.
MxAgentId -> s -> [MxSink s] -> MxAgent s () -> Process ProcessId
mxAgentWithFinalize MxAgentId
mxId s
initState [MxSink s]
handlers MxAgent s ()
dtor = do
    let name :: String
name = MxAgentId -> String
agentId MxAgentId
mxId
    Maybe ProcessId
existing <- String -> Process (Maybe ProcessId)
whereis String
name
    case Maybe ProcessId
existing of
      Just ProcessId
_  -> String -> Process ProcessId
forall a b. Serializable a => a -> Process b
die String
"DuplicateAgentId"  -- TODO: better error handling policy
      Maybe ProcessId
Nothing -> do
        LocalNode
node <- LocalProcess -> LocalNode
processNode (LocalProcess -> LocalNode)
-> Process LocalProcess -> Process LocalNode
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
        ProcessId
pid <- IO ProcessId -> Process ProcessId
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ProcessId -> Process ProcessId)
-> IO ProcessId -> Process ProcessId
forall a b. (a -> b) -> a -> b
$ MxEventBus
-> ((TChan Message, TChan Message) -> Process ()) -> IO ProcessId
mxNew (LocalNode -> MxEventBus
localEventBus LocalNode
node) (((TChan Message, TChan Message) -> Process ()) -> IO ProcessId)
-> ((TChan Message, TChan Message) -> Process ()) -> IO ProcessId
forall a b. (a -> b) -> a -> b
$ (TChan Message, TChan Message) -> Process ()
start
        String -> ProcessId -> Process ()
register String
name ProcessId
pid
        ProcessId -> Process ProcessId
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ProcessId
pid
  where
    start :: (TChan Message, TChan Message) -> Process ()
start (TChan Message
sendTChan, TChan Message
recvTChan) = do
      let nState :: MxAgentState s
nState = MxAgentId -> TChan Message -> s -> MxAgentState s
forall s. MxAgentId -> TChan Message -> s -> MxAgentState s
MxAgentState MxAgentId
mxId TChan Message
sendTChan s
initState
      MxAgent s ()
-> [MxSink s]
-> ChannelSelector
-> TChan Message
-> MxAgentState s
-> Process ()
forall s.
MxAgent s ()
-> [MxSink s]
-> ChannelSelector
-> TChan Message
-> MxAgentState s
-> Process ()
runAgent MxAgent s ()
dtor [MxSink s]
handlers ChannelSelector
InputChan TChan Message
recvTChan MxAgentState s
nState

    runAgent :: MxAgent s ()
             -> [MxSink s]
             -> ChannelSelector
             -> TChan Message
             -> MxAgentState s
             -> Process ()
    runAgent :: forall s.
MxAgent s ()
-> [MxSink s]
-> ChannelSelector
-> TChan Message
-> MxAgentState s
-> Process ()
runAgent MxAgent s ()
eh [MxSink s]
hs ChannelSelector
cs TChan Message
c MxAgentState s
s =
      MxAgent s ()
-> [MxSink s]
-> ChannelSelector
-> TChan Message
-> MxAgentState s
-> Process ()
forall s.
MxAgent s ()
-> [MxSink s]
-> ChannelSelector
-> TChan Message
-> MxAgentState s
-> Process ()
runAgentWithFinalizer MxAgent s ()
eh [MxSink s]
hs ChannelSelector
cs TChan Message
c MxAgentState s
s
        Process () -> Process () -> Process ()
forall (m :: * -> *) a b.
(HasCallStack, MonadCatch m) =>
m a -> m b -> m a
`onException` MxAgent s () -> MxAgentState s -> Process ()
forall s. MxAgent s () -> MxAgentState s -> Process ()
runAgentFinalizer MxAgent s ()
eh MxAgentState s
s

    runAgentWithFinalizer :: MxAgent s ()
                          -> [MxSink s]
                          -> ChannelSelector
                          -> TChan Message
                          -> MxAgentState s
                          -> Process ()
    runAgentWithFinalizer :: forall s.
MxAgent s ()
-> [MxSink s]
-> ChannelSelector
-> TChan Message
-> MxAgentState s
-> Process ()
runAgentWithFinalizer MxAgent s ()
eh' [MxSink s]
hs' ChannelSelector
cs' TChan Message
c' MxAgentState s
s' = do
      Message
msg <- ChannelSelector -> TChan Message -> Process Message
getNextInput ChannelSelector
cs' TChan Message
c'
      (MxAction
action, MxAgentState s
state) <- Message
-> MxAgentState s
-> MxPipeline s
-> Process (MxAction, MxAgentState s)
forall s.
Message
-> MxAgentState s
-> MxPipeline s
-> Process (MxAction, MxAgentState s)
runPipeline Message
msg MxAgentState s
s' (MxPipeline s -> Process (MxAction, MxAgentState s))
-> MxPipeline s -> Process (MxAction, MxAgentState s)
forall a b. (a -> b) -> a -> b
$ [MxSink s] -> MxPipeline s
forall s. [MxSink s] -> MxPipeline s
pipeline [MxSink s]
hs'
      case MxAction
action of
        MxAction
MxAgentReady               -> MxAgent s ()
-> [MxSink s]
-> ChannelSelector
-> TChan Message
-> MxAgentState s
-> Process ()
forall s.
MxAgent s ()
-> [MxSink s]
-> ChannelSelector
-> TChan Message
-> MxAgentState s
-> Process ()
runAgent MxAgent s ()
eh' [MxSink s]
hs' ChannelSelector
InputChan TChan Message
c' MxAgentState s
state
        MxAgentPrioritise ChannelSelector
priority -> MxAgent s ()
-> [MxSink s]
-> ChannelSelector
-> TChan Message
-> MxAgentState s
-> Process ()
forall s.
MxAgent s ()
-> [MxSink s]
-> ChannelSelector
-> TChan Message
-> MxAgentState s
-> Process ()
runAgent MxAgent s ()
eh' [MxSink s]
hs' ChannelSelector
priority  TChan Message
c' MxAgentState s
state
        MxAgentDeactivate String
_        -> MxAgent s () -> MxAgentState s -> Process ()
forall s. MxAgent s () -> MxAgentState s -> Process ()
runAgentFinalizer MxAgent s ()
eh' MxAgentState s
state
        MxAction
MxAgentSkip                -> String -> Process ()
forall a. HasCallStack => String -> a
error String
"IllegalState"
--      MxAgentBecome h'           -> runAgent h' c state

    getNextInput :: ChannelSelector -> TChan Message -> Process Message
getNextInput ChannelSelector
sel TChan Message
chan =
      let matches :: [Match Message]
matches =
            case ChannelSelector
sel of
              ChannelSelector
Mailbox   -> [ (Message -> Process Message) -> Match Message
forall b. (Message -> Process b) -> Match b
matchAny Message -> Process Message
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return
                           , STM Message -> (Message -> Process Message) -> Match Message
forall a b. STM a -> (a -> Process b) -> Match b
matchSTM (TChan Message -> STM Message
forall a. TChan a -> STM a
readTChan TChan Message
chan) Message -> Process Message
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return]
              ChannelSelector
InputChan -> [ STM Message -> (Message -> Process Message) -> Match Message
forall a b. STM a -> (a -> Process b) -> Match b
matchSTM (TChan Message -> STM Message
forall a. TChan a -> STM a
readTChan TChan Message
chan) Message -> Process Message
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return
                           , (Message -> Process Message) -> Match Message
forall b. (Message -> Process b) -> Match b
matchAny Message -> Process Message
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return]
      in [Match Message] -> Process Message
forall b. [Match b] -> Process b
receiveWait [Match Message]
matches

    runAgentFinalizer :: MxAgent s () -> MxAgentState s -> Process ()
    runAgentFinalizer :: forall s. MxAgent s () -> MxAgentState s -> Process ()
runAgentFinalizer MxAgent s ()
f MxAgentState s
s = StateT (MxAgentState s) Process ()
-> MxAgentState s -> Process ((), MxAgentState s)
forall s (m :: * -> *) a. StateT s m a -> s -> m (a, s)
ST.runStateT (MxAgent s () -> StateT (MxAgentState s) Process ()
forall s a. MxAgent s a -> StateT (MxAgentState s) Process a
unAgent MxAgent s ()
f) MxAgentState s
s Process ((), MxAgentState s)
-> (((), MxAgentState s) -> Process ()) -> Process ()
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> Process ())
-> (((), MxAgentState s) -> ())
-> ((), MxAgentState s)
-> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((), MxAgentState s) -> ()
forall a b. (a, b) -> a
fst

    pipeline :: forall s . [MxSink s] -> MxPipeline s
    pipeline :: forall s. [MxSink s] -> MxPipeline s
pipeline []           = MxPipeline s
forall s. MxPipeline s
MxStop
    pipeline (MxSink s
sink:[MxSink s]
sinks) = MxSink s -> MxPipeline s -> MxPipeline s
forall s. MxSink s -> MxPipeline s -> MxPipeline s
MxPipeline MxSink s
sink ([MxSink s] -> MxPipeline s
forall s. [MxSink s] -> MxPipeline s
pipeline [MxSink s]
sinks)

    runPipeline :: forall s .
                   Message
                -> MxAgentState s
                -> MxPipeline s
                -> Process (MxAction, MxAgentState s)
    runPipeline :: forall s.
Message
-> MxAgentState s
-> MxPipeline s
-> Process (MxAction, MxAgentState s)
runPipeline Message
_   MxAgentState s
state MxPipeline s
MxStop         = (MxAction, MxAgentState s) -> Process (MxAction, MxAgentState s)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (MxAction
MxAgentReady, MxAgentState s
state)
    runPipeline Message
msg MxAgentState s
state MxPipeline{MxPipeline s
MxSink s
current :: forall s. MxPipeline s -> MxSink s
next :: forall s. MxPipeline s -> MxPipeline s
current :: MxSink s
next :: MxPipeline s
..} = do
      let act :: MxAgent s (Maybe MxAction)
act = MxSink s
current Message
msg
      (Maybe MxAction
pass, MxAgentState s
state') <- StateT (MxAgentState s) Process (Maybe MxAction)
-> MxAgentState s -> Process (Maybe MxAction, MxAgentState s)
forall s (m :: * -> *) a. StateT s m a -> s -> m (a, s)
ST.runStateT (MxAgent s (Maybe MxAction)
-> StateT (MxAgentState s) Process (Maybe MxAction)
forall s a. MxAgent s a -> StateT (MxAgentState s) Process a
unAgent MxAgent s (Maybe MxAction)
act) MxAgentState s
state
      case Maybe MxAction
pass of
        Maybe MxAction
Nothing     -> Message
-> MxAgentState s
-> MxPipeline s
-> Process (MxAction, MxAgentState s)
forall s.
Message
-> MxAgentState s
-> MxPipeline s
-> Process (MxAction, MxAgentState s)
runPipeline Message
msg MxAgentState s
state MxPipeline s
next
        Just MxAction
result -> (MxAction, MxAgentState s) -> Process (MxAction, MxAgentState s)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (MxAction
result, MxAgentState s
state')