{-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE DeriveGeneric #-} ----------------------------------------------------------------------------- -- | -- Module : Control.Distributed.Process.Extras.Monitoring -- Copyright : (c) Tim Watson 2013 - 2014 -- License : BSD3 (see the file LICENSE) -- -- Maintainer : Tim Watson <watson.timothy@gmail.com> -- Stability : experimental -- Portability : non-portable (requires concurrency) -- -- This module provides a primitive node monitoring capability, implemented as -- a /distributed-process Management Agent/. Once the 'nodeMonitor' agent is -- started, calling 'monitorNodes' will ensure that whenever the local node -- detects a new network-transport connection (from another cloud haskell node), -- the caller will receive a 'NodeUp' message in its mailbox. If a node -- disconnects, a corollary 'NodeDown' message will be delivered as well. -- ----------------------------------------------------------------------------- module Control.Distributed.Process.Extras.Monitoring ( NodeUp(..) , NodeDown(..) , nodeMonitorAgentId , nodeMonitor , monitorNodes , unmonitorNodes ) where import Control.DeepSeq (NFData) import Control.Distributed.Process -- NB: requires NodeId(..) to be exported! import Control.Distributed.Process.Management ( MxEvent(MxConnected, MxDisconnected) , MxAgentId(..) , mxAgent , mxSink , mxReady , liftMX , mxGetLocal , mxSetLocal , mxNotify ) import Control.Distributed.Process.Extras (deliver) import Data.Binary import qualified Data.Foldable as Foldable import Data.HashSet (HashSet) import qualified Data.HashSet as Set import Data.Typeable (Typeable) import GHC.Generics data Register = Register !ProcessId deriving (Typeable, Generic) instance Binary Register where instance NFData Register where data UnRegister = UnRegister !ProcessId deriving (Typeable, Generic) instance Binary UnRegister where instance NFData UnRegister where -- | Sent to subscribing processes when a connection -- (from a remote node) is detected. -- data NodeUp = NodeUp !NodeId deriving (Typeable, Generic, Show) instance Binary NodeUp where instance NFData NodeUp where -- | Sent to subscribing processes when a dis-connection -- (from a remote node) is detected. -- data NodeDown = NodeDown !NodeId deriving (Typeable, Generic, Show) instance Binary NodeDown where instance NFData NodeDown where -- | The @MxAgentId@ for the node monitoring agent. nodeMonitorAgentId :: MxAgentId nodeMonitorAgentId = MxAgentId "service.monitoring.nodes" -- | Start monitoring node connection/disconnection events. When a -- connection event occurs, the calling process will receive a message -- @NodeUp NodeId@ in its mailbox. When a disconnect occurs, the -- corollary @NodeDown NodeId@ message will be delivered instead. -- -- No guaranatee is made about the timeliness of the delivery, nor can -- the receiver expect that the node (for which it is being notified) -- is still up/connected or down/disconnected at the point when it receives -- a message from the node monitoring agent. -- monitorNodes :: Process () monitorNodes = do us <- getSelfPid mxNotify $ Register us -- | Stop monitoring node connection/disconnection events. This does not -- flush the caller's mailbox, nor does it guarantee that any/all node -- up/down notifications will have been delivered before it is evaluated. -- unmonitorNodes :: Process () unmonitorNodes = do us <- getSelfPid mxNotify $ UnRegister us -- | Starts the node monitoring agent. No call to @monitorNodes@ and -- @unmonitorNodes@ will have any effect unless the agent is already -- running. Note that we make /no guarantees what-so-ever/ about the -- timeliness or ordering semantics of node monitoring notifications. -- nodeMonitor :: Process ProcessId nodeMonitor = do mxAgent nodeMonitorAgentId initState [ (mxSink $ \(Register pid) -> do mxSetLocal . Set.insert pid =<< mxGetLocal mxReady) , (mxSink $ \(UnRegister pid) -> do mxSetLocal . Set.delete pid =<< mxGetLocal mxReady) , (mxSink $ \ev -> do let act = case ev of (MxConnected _ ep) -> notify $ nodeUp ep (MxDisconnected _ ep) -> notify $ nodeDown ep _ -> return () act >> mxReady) ] where initState :: HashSet ProcessId initState = Set.empty notify msg = Foldable.mapM_ (liftMX . deliver msg) =<< mxGetLocal nodeUp = NodeUp . NodeId nodeDown = NodeDown . NodeId