module Control.Distributed.Process.Execution.EventManager
( EventManager
, start
, startSupervised
, startSupervisedRef
, notify
, addHandler
, addMessageHandler
) where
import Control.Distributed.Process hiding (Message, link)
import qualified Control.Distributed.Process as P (Message)
import Control.Distributed.Process.Execution.Exchange
( Exchange
, Message(..)
, post
, broadcastExchange
, broadcastExchangeT
, broadcastClient
)
import qualified Control.Distributed.Process.Execution.Exchange as Exchange
( startSupervised
)
import Control.Distributed.Process.Extras.Internal.Primitives
import Control.Distributed.Process.Extras.Internal.Unsafe
( InputStream
, matchInputStream
)
import Control.Distributed.Process.Supervisor (SupervisorPid)
import Control.Distributed.Process.Serializable hiding (SerializableDict)
import Data.Binary
import Data.Typeable (Typeable)
import GHC.Generics
newtype EventManager = EventManager { ex :: Exchange }
deriving (Typeable, Generic)
instance Binary EventManager where
instance Resolvable EventManager where
resolve = resolve . ex
start :: Process EventManager
start = broadcastExchange >>= return . EventManager
startSupervised :: SupervisorPid -> Process EventManager
startSupervised sPid = do
ex <- broadcastExchangeT >>= \t -> Exchange.startSupervised t sPid
return $ EventManager ex
startSupervisedRef :: SupervisorPid -> Process (ProcessId, P.Message)
startSupervisedRef sPid = do
ex <- startSupervised sPid
Just pid <- resolve ex
return (pid, unsafeWrapMessage ex)
notify :: Serializable a => EventManager -> a -> Process ()
notify em msg = post (ex em) msg
addHandler :: forall s a. Serializable a
=> EventManager
-> (s -> a -> Process s)
-> Process s
-> Process ProcessId
addHandler m h s =
spawnLocal $ newHandler (ex m) (\s' m' -> handleMessage m' (h s')) s
addMessageHandler :: forall s.
EventManager
-> (s -> P.Message -> Process (Maybe s))
-> Process s
-> Process ProcessId
addMessageHandler m h s = spawnLocal $ newHandler (ex m) h s
newHandler :: forall s .
Exchange
-> (s -> P.Message -> Process (Maybe s))
-> Process s
-> Process ()
newHandler ex handler initState = do
linkTo ex
is <- broadcastClient ex
listen is handler =<< initState
listen :: forall s . InputStream Message
-> (s -> P.Message -> Process (Maybe s))
-> s
-> Process ()
listen inStream handler state = do
receiveWait [ matchInputStream inStream ] >>= handleEvent inStream handler state
where
handleEvent is h s p = do
r <- h s (payload p)
let s2 = case r of
Nothing -> s
Just s' -> s'
listen is h s2