{-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE PatternGuards #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE UndecidableInstances #-} ----------------------------------------------------------------------------- -- | -- Module : Control.Distributed.Process.Management -- Copyright : (c) Well-Typed / Tim Watson -- License : BSD3 (see the file LICENSE) -- -- Maintainer : Tim Watson <watson.timothy@gmail.com> -- Stability : experimental -- Portability : non-portable (requires concurrency) -- -- [Management Extensions API] -- -- This module presents an API for creating /Management Agents/: -- special processes that are capable of receiving and responding to -- a node's internal system events. These /system events/ are delivered by -- the management event bus: An internal subsystem maintained for each -- running node, to which all agents are automatically subscribed. -- -- /Agents/ are defined in terms of /event sinks/, taking a particular -- @Serializable@ type and evaluating to an action in the 'MxAgent' monad in -- response. Each 'MxSink' evaluates to an 'MxAction' that specifies whether -- the agent should continue processing it's inputs or stop. If the type of a -- message cannot be matched to any of the agent's sinks, it will be discarded. -- A sink can also deliberately skip processing a message, deferring to the -- remaining handlers. This is the /only/ way that more than one event sink -- can handle the same data type, since otherwise the first /type match/ will -- /win/ every time a message arrives. See 'mxSkip' for details. -- -- Various events are published to the management event bus automatically, -- the full list of which can be found in the definition of the 'MxEvent' data -- type. Additionally, clients of the /Management API/ can publish arbitrary -- @Serializable@ data to the event bus using 'mxNotify'. All running agents -- receive all events (from the primary event bus to which they're subscribed). -- -- Agent processes are automatically registered on the local node, and can -- receive messages via their mailbox just like ordinary processes. Unlike -- ordinary @Process@ code however, it is unnecessary (though possible) for -- agents to use the base @expect@ and @receiveX@ primitives to do this, since -- the management infrastructure will continuously read from both the primary -- event bus /and/ the process' own mailbox. Messages are transparently passed -- to the agent's event sinks from both sources, so an agent need only concern -- itself with how to respond to its inputs. -- -- Some agents may wish to prioritise messages from their mailbox over traffic -- on the management event bus, or vice versa. The 'mxReceive' and -- 'mxReceiveChan' API calls do this for the mailbox and event bus, -- respectively. The /prioritisation/ these APIs offer is simply that the chosen -- data stream will be checked first. No blocking will occur if the chosen -- (prioritised) source is devoid of input messages, instead the agent handling -- code will revert to switching between the alternatives in /round-robin/ as -- usual. If messages exist in one or more channels, they will be consumed as -- soon as they're available, priority is effectively a hint about which -- channel to consume from, should messages be available in both. -- -- Prioritisation then, is a /hint/ about the preference of data source from -- which the next input should be chosen. No guarantee can be made that the -- chosen source will in fact be selected at runtime. -- -- [Management API Semantics] -- -- The management API provides /no guarantees whatsoever/, viz: -- -- * The ordering of messages delivered to the event bus. -- -- * The order in which agents will be executed. -- -- * Whether messages will be taken from the mailbox first, or the event bus. -- -- [Management Data API] -- -- Both management agents and clients of the API have access to a variety of -- data storage capabilities, to facilitate publishing and consuming useful -- system information. Agents maintain their own internal state privately (via a -- state transformer - see 'mxGetLocal' et al), however it is possible for -- agents to share additional data with each other (and the outside world) -- using /data tables/. -- -- Each agent is assigned its own data table, which acts as a shared map, where -- the keys are @String@s and the values are @Serializable@ datum of whatever -- type the agent or its clients stores. -- -- Because an agent's /data table/ stores its values in raw 'Message' format, -- it works effectively as an /un-typed dictionary/, into which data of varying -- types can be fed and later retrieved. The upside of this is that different -- keys can be mapped to various types without any additional work on the part -- of the developer. The downside is that the code reading these values must -- know in advance what type(s) to expect, and the API provides no additional -- support for handling that. -- -- Publishing is accomplished using the 'mxPublish' and 'mxSet' APIs, whilst -- querying and deletion are handled by 'mxGet', 'mxClear', 'mxPurgeTable' and -- 'mxDropTable' respectively. -- -- When a management agent terminates, their tables are left in memory despite -- termination, such that an agent may resume its role (by restarting) or have -- its 'MxAgentId' taken over by another subsequent agent, leaving the data -- originally captured in place. -- -- [Defining Agents] -- -- New agents are defined with 'mxAgent' and require a unique 'MxAgentId', an -- initial state - 'MxAgent' runs in a state transformer - and a list of the -- agent's event sinks. Each 'MxSink' is defined in terms of a specific -- @Serializable@ type, via the 'mxSink' function, binding the event handler -- expression to inputs of only that type. -- -- Apart from modifying its own local state, an agent can execute arbitrary -- @Process a@ code via lifting (see 'liftMX') and even publish its own messages -- back to the primary event bus (see 'mxBroadcast'). -- -- Since messages are delivered to agents from both the management event bus and -- the agent processes mailbox, agents (i.e., event sinks) will generally have -- no idea as to their origin. An agent can, however, choose to prioritise the -- choice of input (source) each time one of its event sinks runs. The /standard/ -- way for an event sink to indicate that the agent is ready for its next input -- is to evaluate 'mxReady'. When this happens, the management infrastructure -- will obtain data from the event bus and process' mailbox in a round robbin -- fashion, i.e., one after the other, changing each time. -- -- [Example Code] -- -- What follows is a grossly over-simplified example of a management agent that -- provides a basic name monitoring facility. Whenever a process name is -- registered or unregistered, clients are informed of the fact. -- -- > -- simple notification data type -- > -- > data Registration = Reg { added :: Bool -- > , procId :: ProcessId -- > , name :: String -- > } -- > -- > -- start a /name monitoring agent/ -- > nameMonitorAgent = do -- > mxAgent (MxAgentId "name-monitor") Set.empty [ -- > (mxSink $ \(pid :: ProcessId) -> do -- > mxUpdateState $ Set.insert pid -- > mxReady) -- > , (mxSink $ -- > let act = -- > case ev of -- > (MxRegistered p n) -> notify True n p -- > (MxUnRegistered p n) -> notify False n p -- > _ -> return () -- > act >> mxReady) -- > ] -- > where -- > notify a n p = do -- > Foldable.mapM_ (liftMX . deliver (Reg a n p)) =<< mxGetLocal -- > -- -- The client interface (for sending their pid) can take one of two forms: -- -- > monitorNames = getSelfPid >>= nsend "name-monitor" -- > monitorNames2 = getSelfPid >>= mxNotify -- -- For some real-world examples, see the distributed-process-platform package. -- -- [Performance, Stablity and Scalability] -- -- /Management Agents/ offer numerous advantages over regular processes: -- broadcast communication with them can have a lower latency, they offer -- simplified messgage (i.e., input type) handling and they have access to -- internal system information that would be otherwise unobtainable. -- -- Do not be tempted to implement everything (e.g., the kitchen sink) using the -- management API though. There are overheads associated with management agents -- which is why they're presented as tools for consuming low level system -- information, instead of as /application level/ development tools. -- -- Agents that rely heavily on a busy mailbox can cause the management event -- bus to backlog un-GC'ed data, leading to increased heap space. Producers that -- do not take care to avoid passing unevaluated thunks to the API can crash -- /all/ the agents in the system. Agents are not monitored or managed in any -- way, and those that crash will not be restarted. -- -- The management event bus can receive a great deal of traffic. Every time -- a message is sent and/or received, an event is passed to the agent controller -- and broadcast to all agents (plus the trace controller, if tracing is enabled -- for the node). This is already a significant overhead - though profiling and -- benchmarks have demonstrated that it does not adversely affect performance -- if few agents are installed. Agents will typically use more cycles than plain -- processes, since they perform additional work: selecting input data from both -- the event bus /and/ their own mailboxes, plus searching through the set of -- event sinks (for each agent) to determine the right handler for the event. -- -- Each management agent requires not only its own @Process@ (in which the agent -- code is run), but also a peer process that provides its /data table/. These -- data tables also have to be coordinated and manaaged on each agent's behalf. -- -- [Architecture Overview] -- -- The architecture of the management event bus is internal and subject to -- change without prior notice. The description that follows is provided for -- informational purposes only. -- -- When a node initially starts, two special, internal system processes are -- started to support the management infrastructure. The first, known as the -- /trace controller/, is responsible for consuming 'MxEvent's and forwarding -- them to the configured tracer - see "Control.Distributed.Process.Debug" for -- further details. The second is the /management agent controller/, and is the -- primary worker process underpinning the management infrastructure. All -- published management events are routed to this process, which places them -- onto a system wide /event bus/ and additionally passes them directly to the -- /trace controller/. -- -- There are several reasons for segregating the tracing and management control -- planes in this fashion. Tracing can be enabled or disabled by clients, whilst -- the management event bus cannot, since in addition to providing -- runtime instrumentation, its intended use-cases include node monitoring, peer -- discovery (via topology providing backends) and other essential system -- services that require knowledge of otherwise hidden system internals. Tracing -- is also subject to /trace flags/ that limit the specific 'MxEvent's delivered -- to trace clients - an overhead/complexity not shared by management agents. -- Finally, tracing and management agents are implemented using completely -- different signalling techniques - more on this later - which would introduce -- considerable complexity if the shared the same /event loop/. -- -- The management control plane is driven by a shared broadcast channel, which -- is written to by the agent controller and subscribed to by all agent -- processes. Agents are spawned as regular processes, whose primary -- implementation (i.e., /server loop/) is responsible for consuming -- messages from both the broadcast channel and their own mailbox. Once -- consumed, messages are applied to the agent's /event sinks/ until one -- matches the input, at which point it is applied and the loop continues. -- The implementation chooses from the event bus and the mailbox in a -- round-robin fashion, until a message is received. This polling activity would -- lead to management agents consuming considerable system resources if left -- unchecked, therefore the implementation will poll for a limitted number of -- retries, after which it will perform a blocking read on the event bus. -- ----------------------------------------------------------------------------- module Control.Distributed.Process.Management ( MxEvent(..) -- * Firing Arbitrary /Mx Events/ , mxNotify -- * Constructing Mx Agents , MxAction() , MxAgentId(..) , MxAgent() , mxAgent , mxAgentWithFinalize , MxSink() , mxSink , mxGetId , mxDeactivate , mxReady , mxSkip , mxReceive , mxReceiveChan , mxBroadcast , mxSetLocal , mxGetLocal , mxUpdateLocal , liftMX -- * Mx Data API , mxPublish , mxSet , mxGet , mxClear , mxPurgeTable , mxDropTable ) where import Control.Applicative ((<$>)) import Control.Concurrent.STM (atomically) import Control.Concurrent.STM.TChan ( tryReadTChan , readTChan , writeTChan , TChan ) import Control.Distributed.Process.Internal.Primitives ( newChan , nsend , receiveWait , receiveTimeout , matchChan , matchAny , matchSTM , unwrapMessage , onException , register , whereis , die ) import Control.Distributed.Process.Internal.Types ( Process , ProcessId , Message , LocalProcess(..) , LocalNode(..) , MxEventBus(..) , unsafeCreateUnencodedMessage ) import Control.Distributed.Process.Management.Internal.Bus (publishEvent) import qualified Control.Distributed.Process.Management.Internal.Table as Table import Control.Distributed.Process.Management.Internal.Types ( MxAgentId(..) , MxAgent(..) , MxAction(..) , ChannelSelector(..) , MxAgentState(..) , MxAgentStart(..) , MxSink , MxEvent(..) ) import Control.Distributed.Process.Serializable (Serializable) import Control.Monad.IO.Class (liftIO) import Control.Monad.Reader (ask) import qualified Control.Monad.State as ST ( MonadState , StateT , get , modify , lift , runStateT ) -- | Publishes an arbitrary @Serializable@ message to the management event bus. -- Note that /no attempt is made to force the argument/, therefore it is very -- important that you do not pass unevaluated thunks that might crash the -- receiving process via this API, since /all/ registered agents will gain -- access to the data structure once it is broadcast by the agent controller. mxNotify :: (Serializable a) => a -> Process () mxNotify msg = do bus <- localEventBus . processNode <$> ask liftIO $ publishEvent bus $ unsafeCreateUnencodedMessage msg -- | Publish an arbitrary @Message@ as a property in the management database. -- -- For publishing @Serializable@ data, use 'mxSet' instead. -- mxPublish :: MxAgentId -> String -> Message -> Process () mxPublish a k v = Table.set k v (Table.MxForAgent a) -- | Sets an arbitrary @Serializable@ datum against a key in the management -- database. Note that /no attempt is made to force the argument/, therefore -- it is very important that you do not pass unevaluated thunks that might -- crash some other, arbitrary process (or management agent!) that obtains -- and attempts to force the value later on. -- mxSet :: Serializable a => MxAgentId -> String -> a -> Process () mxSet mxId key msg = do Table.set key (unsafeCreateUnencodedMessage msg) (Table.MxForAgent mxId) -- | Fetches a property from the management database for the given key. -- If the property is not set, or does not match the expected type when -- typechecked (at runtime), returns @Nothing@. mxGet :: Serializable a => MxAgentId -> String -> Process (Maybe a) mxGet = Table.fetch . Table.MxForAgent -- | Clears a property from the management database using the given key. -- If the key does not exist in the database, this is a noop. mxClear :: MxAgentId -> String -> Process () mxClear mxId key = Table.clear key (Table.MxForAgent mxId) -- | Purges a table in the management database of all its stored properties. mxPurgeTable :: MxAgentId -> Process () mxPurgeTable = Table.purge . Table.MxForAgent -- | Deletes a table from the management database. mxDropTable :: MxAgentId -> Process () mxDropTable = Table.delete . Table.MxForAgent -------------------------------------------------------------------------------- -- API for writing user defined management extensions (i.e., agents) -- -------------------------------------------------------------------------------- -- | Return the 'MxAgentId' for the currently executing agent. -- mxGetId :: MxAgent s MxAgentId mxGetId = ST.get >>= return . mxAgentId -- | The 'MxAgent' version of 'mxNotify'. -- mxBroadcast :: (Serializable m) => m -> MxAgent s () mxBroadcast msg = do state <- ST.get liftMX $ liftIO $ atomically $ do writeTChan (mxBus state) (unsafeCreateUnencodedMessage msg) -- | Gracefully terminate an agent. -- mxDeactivate :: forall s. String -> MxAgent s MxAction mxDeactivate = return . MxAgentDeactivate -- | Continue executing (i.e., receiving and processing messages). -- mxReady :: forall s. MxAgent s MxAction mxReady = return MxAgentReady -- | Causes the currently executing /event sink/ to be skipped. -- The remaining declared event sinks will be evaluated to find -- a matching handler. Can be used to allow multiple event sinks -- to process data of the same type. -- mxSkip :: forall s. MxAgent s MxAction mxSkip = return MxAgentSkip -- | Continue exeucting, prioritising inputs from the process' own -- /mailbox/ ahead of data from the management event bus. -- mxReceive :: forall s. MxAgent s MxAction mxReceive = return $ MxAgentPrioritise Mailbox -- | Continue exeucting, prioritising inputs from the management event bus -- over the process' own /mailbox/. -- mxReceiveChan :: forall s. MxAgent s MxAction mxReceiveChan = return $ MxAgentPrioritise InputChan -- | Lift a @Process@ action. -- liftMX :: Process a -> MxAgent s a liftMX p = MxAgent $ ST.lift p -- | Set the agent's local state. -- mxSetLocal :: s -> MxAgent s () mxSetLocal s = ST.modify $ \st -> st { mxLocalState = s } -- | Update the agent's local state. -- mxUpdateLocal :: (s -> s) -> MxAgent s () mxUpdateLocal f = ST.modify $ \st -> st { mxLocalState = (f $ mxLocalState st) } -- | Fetch the agent's local state. -- mxGetLocal :: MxAgent s s mxGetLocal = ST.get >>= return . mxLocalState -- | Create an 'MxSink' from an expression taking a @Serializable@ type @m@, -- that yields an 'MxAction' in the 'MxAgent' monad. -- mxSink :: forall s m . (Serializable m) => (m -> MxAgent s MxAction) -> MxSink s mxSink act msg = do msg' <- liftMX $ (unwrapMessage msg :: Process (Maybe m)) case msg' of Nothing -> return Nothing Just m -> do r <- act m case r of MxAgentSkip -> return Nothing _ -> return $ Just r -- private ADT: a linked list of event sinks data MxPipeline s = MxPipeline { current :: !(MxSink s) , next :: !(MxPipeline s) } | MxStop -- | Activates a new agent. -- mxAgent :: MxAgentId -> s -> [MxSink s] -> Process ProcessId mxAgent mxId st hs = mxAgentWithFinalize mxId st hs $ return () -- | Activates a new agent. This variant takes a /finalizer/ expression, -- that is run once the agent shuts down (even in case of failure/exceptions). -- The /finalizer/ expression runs in the mx monad - @MxAgent s ()@ - such -- that the agent's internal state remains accessible to the shutdown/cleanup -- code. -- mxAgentWithFinalize :: MxAgentId -> s -> [MxSink s] -> MxAgent s () -> Process ProcessId mxAgentWithFinalize mxId initState handlers dtor = do let name = agentId mxId existing <- whereis name case existing of Just _ -> die "DuplicateAgentId" -- TODO: better error handling policy Nothing -> do node <- processNode <$> ask pid <- liftIO $ mxNew (localEventBus node) $ start register name pid return pid where start (sendTChan, recvTChan) = do (sp, rp) <- newChan nsend Table.mxTableCoordinator (MxAgentStart sp mxId) tablePid <- receiveWait [ matchChan rp (\(p :: ProcessId) -> return p) ] let nState = MxAgentState mxId sendTChan tablePid initState runAgent dtor handlers InputChan recvTChan nState runAgent :: MxAgent s () -> [MxSink s] -> ChannelSelector -> TChan Message -> MxAgentState s -> Process () runAgent eh hs cs c s = runAgentWithFinalizer eh hs cs c s `onException` runAgentFinalizer eh s runAgentWithFinalizer :: MxAgent s () -> [MxSink s] -> ChannelSelector -> TChan Message -> MxAgentState s -> Process () runAgentWithFinalizer eh' hs' cs' c' s' = do msg <- getNextInput cs' c' (action, state) <- runPipeline msg s' $ pipeline hs' case action of MxAgentReady -> runAgent eh' hs' InputChan c' state MxAgentPrioritise priority -> runAgent eh' hs' priority c' state MxAgentDeactivate _ -> runAgentFinalizer eh' state MxAgentSkip -> error "IllegalState" -- MxAgentBecome h' -> runAgent h' c state getNextInput sel chan = let stmRead = atomically . readTChan matches = case sel of Mailbox -> [ matchAny return , matchSTM (readTChan chan) return] InputChan -> [ matchSTM (readTChan chan) return , matchAny return] in receiveWait matches runAgentFinalizer :: MxAgent s () -> MxAgentState s -> Process () runAgentFinalizer f s = ST.runStateT (unAgent f) s >>= return . fst pipeline :: forall s . [MxSink s] -> MxPipeline s pipeline [] = MxStop pipeline (sink:sinks) = MxPipeline sink (pipeline sinks) runPipeline :: forall s . Message -> MxAgentState s -> MxPipeline s -> Process (MxAction, MxAgentState s) runPipeline _ state MxStop = return (MxAgentReady, state) runPipeline msg state MxPipeline{..} = do let act = current msg (pass, state') <- ST.runStateT (unAgent act) state case pass of Nothing -> runPipeline msg state next Just result -> return (result, state')