{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE PatternGuards #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE UndecidableInstances #-}
module Control.Distributed.Process.Management
(
MxEvent(..)
, mxNotify
, MxAction()
, MxAgentId(..)
, MxAgent()
, mxAgent
, mxAgentWithFinalize
, MxSink()
, mxSink
, mxGetId
, mxDeactivate
, mxReady
, mxSkip
, mxReceive
, mxReceiveChan
, mxBroadcast
, mxSetLocal
, mxGetLocal
, mxUpdateLocal
, liftMX
) where
import Control.Applicative
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TChan
( readTChan
, writeTChan
, TChan
)
import Control.Distributed.Process.Internal.Primitives
( receiveWait
, matchAny
, matchSTM
, unwrapMessage
, register
, whereis
, die
)
import Control.Distributed.Process.Internal.Types
( Process
, ProcessId
, Message
, LocalProcess(..)
, LocalNode(..)
, MxEventBus(..)
, unsafeCreateUnencodedMessage
)
import Control.Distributed.Process.Management.Internal.Bus (publishEvent)
import Control.Distributed.Process.Management.Internal.Types
( MxAgentId(..)
, MxAgent(..)
, MxAction(..)
, ChannelSelector(..)
, MxAgentState(..)
, MxSink
, MxEvent(..)
)
import Control.Distributed.Process.Serializable (Serializable)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Reader (ask)
import Control.Monad.Catch (onException)
import qualified Control.Monad.State as ST
( get
, modify
, lift
, runStateT
)
import Prelude
mxNotify :: (Serializable a) => a -> Process ()
mxNotify :: forall a. Serializable a => a -> Process ()
mxNotify a
msg = do
MxEventBus
bus <- LocalNode -> MxEventBus
localEventBus (LocalNode -> MxEventBus)
-> (LocalProcess -> LocalNode) -> LocalProcess -> MxEventBus
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LocalProcess -> LocalNode
processNode (LocalProcess -> MxEventBus)
-> Process LocalProcess -> Process MxEventBus
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> Message -> IO ()
publishEvent MxEventBus
bus (Message -> IO ()) -> Message -> IO ()
forall a b. (a -> b) -> a -> b
$ a -> Message
forall a. Serializable a => a -> Message
unsafeCreateUnencodedMessage a
msg
mxGetId :: MxAgent s MxAgentId
mxGetId :: forall s. MxAgent s MxAgentId
mxGetId = MxAgent s (MxAgentState s)
forall s (m :: * -> *). MonadState s m => m s
ST.get MxAgent s (MxAgentState s)
-> (MxAgentState s -> MxAgent s MxAgentId) -> MxAgent s MxAgentId
forall a b. MxAgent s a -> (a -> MxAgent s b) -> MxAgent s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= MxAgentId -> MxAgent s MxAgentId
forall a. a -> MxAgent s a
forall (m :: * -> *) a. Monad m => a -> m a
return (MxAgentId -> MxAgent s MxAgentId)
-> (MxAgentState s -> MxAgentId)
-> MxAgentState s
-> MxAgent s MxAgentId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MxAgentState s -> MxAgentId
forall s. MxAgentState s -> MxAgentId
mxAgentId
mxBroadcast :: (Serializable m) => m -> MxAgent s ()
mxBroadcast :: forall m s. Serializable m => m -> MxAgent s ()
mxBroadcast m
msg = do
MxAgentState s
state <- MxAgent s (MxAgentState s)
forall s (m :: * -> *). MonadState s m => m s
ST.get
Process () -> MxAgent s ()
forall a s. Process a -> MxAgent s a
liftMX (Process () -> MxAgent s ()) -> Process () -> MxAgent s ()
forall a b. (a -> b) -> a -> b
$ IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TChan Message -> Message -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan (MxAgentState s -> TChan Message
forall s. MxAgentState s -> TChan Message
mxBus MxAgentState s
state) (m -> Message
forall a. Serializable a => a -> Message
unsafeCreateUnencodedMessage m
msg)
mxDeactivate :: forall s. String -> MxAgent s MxAction
mxDeactivate :: forall s. String -> MxAgent s MxAction
mxDeactivate = MxAction -> MxAgent s MxAction
forall a. a -> MxAgent s a
forall (m :: * -> *) a. Monad m => a -> m a
return (MxAction -> MxAgent s MxAction)
-> (String -> MxAction) -> String -> MxAgent s MxAction
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> MxAction
MxAgentDeactivate
mxReady :: forall s. MxAgent s MxAction
mxReady :: forall s. MxAgent s MxAction
mxReady = MxAction -> MxAgent s MxAction
forall a. a -> MxAgent s a
forall (m :: * -> *) a. Monad m => a -> m a
return MxAction
MxAgentReady
mxSkip :: forall s. MxAgent s MxAction
mxSkip :: forall s. MxAgent s MxAction
mxSkip = MxAction -> MxAgent s MxAction
forall a. a -> MxAgent s a
forall (m :: * -> *) a. Monad m => a -> m a
return MxAction
MxAgentSkip
mxReceive :: forall s. MxAgent s MxAction
mxReceive :: forall s. MxAgent s MxAction
mxReceive = MxAction -> MxAgent s MxAction
forall a. a -> MxAgent s a
forall (m :: * -> *) a. Monad m => a -> m a
return (MxAction -> MxAgent s MxAction) -> MxAction -> MxAgent s MxAction
forall a b. (a -> b) -> a -> b
$ ChannelSelector -> MxAction
MxAgentPrioritise ChannelSelector
Mailbox
mxReceiveChan :: forall s. MxAgent s MxAction
mxReceiveChan :: forall s. MxAgent s MxAction
mxReceiveChan = MxAction -> MxAgent s MxAction
forall a. a -> MxAgent s a
forall (m :: * -> *) a. Monad m => a -> m a
return (MxAction -> MxAgent s MxAction) -> MxAction -> MxAgent s MxAction
forall a b. (a -> b) -> a -> b
$ ChannelSelector -> MxAction
MxAgentPrioritise ChannelSelector
InputChan
liftMX :: Process a -> MxAgent s a
liftMX :: forall a s. Process a -> MxAgent s a
liftMX Process a
p = StateT (MxAgentState s) Process a -> MxAgent s a
forall s a. StateT (MxAgentState s) Process a -> MxAgent s a
MxAgent (StateT (MxAgentState s) Process a -> MxAgent s a)
-> StateT (MxAgentState s) Process a -> MxAgent s a
forall a b. (a -> b) -> a -> b
$ Process a -> StateT (MxAgentState s) Process a
forall (m :: * -> *) a.
Monad m =>
m a -> StateT (MxAgentState s) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
ST.lift Process a
p
mxSetLocal :: s -> MxAgent s ()
mxSetLocal :: forall s. s -> MxAgent s ()
mxSetLocal s
s = (MxAgentState s -> MxAgentState s) -> MxAgent s ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
ST.modify ((MxAgentState s -> MxAgentState s) -> MxAgent s ())
-> (MxAgentState s -> MxAgentState s) -> MxAgent s ()
forall a b. (a -> b) -> a -> b
$ \MxAgentState s
st -> MxAgentState s
st { mxLocalState = s }
mxUpdateLocal :: (s -> s) -> MxAgent s ()
mxUpdateLocal :: forall s. (s -> s) -> MxAgent s ()
mxUpdateLocal s -> s
f = (MxAgentState s -> MxAgentState s) -> MxAgent s ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
ST.modify ((MxAgentState s -> MxAgentState s) -> MxAgent s ())
-> (MxAgentState s -> MxAgentState s) -> MxAgent s ()
forall a b. (a -> b) -> a -> b
$ \MxAgentState s
st -> MxAgentState s
st { mxLocalState = (f $ mxLocalState st) }
mxGetLocal :: MxAgent s s
mxGetLocal :: forall s. MxAgent s s
mxGetLocal = MxAgent s (MxAgentState s)
forall s (m :: * -> *). MonadState s m => m s
ST.get MxAgent s (MxAgentState s)
-> (MxAgentState s -> MxAgent s s) -> MxAgent s s
forall a b. MxAgent s a -> (a -> MxAgent s b) -> MxAgent s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= s -> MxAgent s s
forall a. a -> MxAgent s a
forall (m :: * -> *) a. Monad m => a -> m a
return (s -> MxAgent s s)
-> (MxAgentState s -> s) -> MxAgentState s -> MxAgent s s
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MxAgentState s -> s
forall s. MxAgentState s -> s
mxLocalState
mxSink :: forall s m . (Serializable m)
=> (m -> MxAgent s MxAction)
-> MxSink s
mxSink :: forall s m. Serializable m => (m -> MxAgent s MxAction) -> MxSink s
mxSink m -> MxAgent s MxAction
act Message
msg = do
Maybe m
msg' <- Process (Maybe m) -> MxAgent s (Maybe m)
forall a s. Process a -> MxAgent s a
liftMX (Process (Maybe m) -> MxAgent s (Maybe m))
-> Process (Maybe m) -> MxAgent s (Maybe m)
forall a b. (a -> b) -> a -> b
$ (Message -> Process (Maybe m)
forall (m :: * -> *) a.
(Monad m, Serializable a) =>
Message -> m (Maybe a)
unwrapMessage Message
msg :: Process (Maybe m))
case Maybe m
msg' of
Maybe m
Nothing -> Maybe MxAction -> MxAgent s (Maybe MxAction)
forall a. a -> MxAgent s a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe MxAction
forall a. Maybe a
Nothing
Just m
m -> do
MxAction
r <- m -> MxAgent s MxAction
act m
m
case MxAction
r of
MxAction
MxAgentSkip -> Maybe MxAction -> MxAgent s (Maybe MxAction)
forall a. a -> MxAgent s a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe MxAction
forall a. Maybe a
Nothing
MxAction
_ -> Maybe MxAction -> MxAgent s (Maybe MxAction)
forall a. a -> MxAgent s a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe MxAction -> MxAgent s (Maybe MxAction))
-> Maybe MxAction -> MxAgent s (Maybe MxAction)
forall a b. (a -> b) -> a -> b
$ MxAction -> Maybe MxAction
forall a. a -> Maybe a
Just MxAction
r
data MxPipeline s =
MxPipeline
{
forall s. MxPipeline s -> MxSink s
current :: !(MxSink s)
, forall s. MxPipeline s -> MxPipeline s
next :: !(MxPipeline s)
} | MxStop
mxAgent :: MxAgentId -> s -> [MxSink s] -> Process ProcessId
mxAgent :: forall s. MxAgentId -> s -> [MxSink s] -> Process ProcessId
mxAgent MxAgentId
mxId s
st [MxSink s]
hs = MxAgentId -> s -> [MxSink s] -> MxAgent s () -> Process ProcessId
forall s.
MxAgentId -> s -> [MxSink s] -> MxAgent s () -> Process ProcessId
mxAgentWithFinalize MxAgentId
mxId s
st [MxSink s]
hs (MxAgent s () -> Process ProcessId)
-> MxAgent s () -> Process ProcessId
forall a b. (a -> b) -> a -> b
$ () -> MxAgent s ()
forall a. a -> MxAgent s a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
mxAgentWithFinalize :: MxAgentId
-> s
-> [MxSink s]
-> MxAgent s ()
-> Process ProcessId
mxAgentWithFinalize :: forall s.
MxAgentId -> s -> [MxSink s] -> MxAgent s () -> Process ProcessId
mxAgentWithFinalize MxAgentId
mxId s
initState [MxSink s]
handlers MxAgent s ()
dtor = do
let name :: String
name = MxAgentId -> String
agentId MxAgentId
mxId
Maybe ProcessId
existing <- String -> Process (Maybe ProcessId)
whereis String
name
case Maybe ProcessId
existing of
Just ProcessId
_ -> String -> Process ProcessId
forall a b. Serializable a => a -> Process b
die String
"DuplicateAgentId"
Maybe ProcessId
Nothing -> do
LocalNode
node <- LocalProcess -> LocalNode
processNode (LocalProcess -> LocalNode)
-> Process LocalProcess -> Process LocalNode
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
ProcessId
pid <- IO ProcessId -> Process ProcessId
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ProcessId -> Process ProcessId)
-> IO ProcessId -> Process ProcessId
forall a b. (a -> b) -> a -> b
$ MxEventBus
-> ((TChan Message, TChan Message) -> Process ()) -> IO ProcessId
mxNew (LocalNode -> MxEventBus
localEventBus LocalNode
node) (((TChan Message, TChan Message) -> Process ()) -> IO ProcessId)
-> ((TChan Message, TChan Message) -> Process ()) -> IO ProcessId
forall a b. (a -> b) -> a -> b
$ (TChan Message, TChan Message) -> Process ()
start
String -> ProcessId -> Process ()
register String
name ProcessId
pid
ProcessId -> Process ProcessId
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ProcessId
pid
where
start :: (TChan Message, TChan Message) -> Process ()
start (TChan Message
sendTChan, TChan Message
recvTChan) = do
let nState :: MxAgentState s
nState = MxAgentId -> TChan Message -> s -> MxAgentState s
forall s. MxAgentId -> TChan Message -> s -> MxAgentState s
MxAgentState MxAgentId
mxId TChan Message
sendTChan s
initState
MxAgent s ()
-> [MxSink s]
-> ChannelSelector
-> TChan Message
-> MxAgentState s
-> Process ()
forall s.
MxAgent s ()
-> [MxSink s]
-> ChannelSelector
-> TChan Message
-> MxAgentState s
-> Process ()
runAgent MxAgent s ()
dtor [MxSink s]
handlers ChannelSelector
InputChan TChan Message
recvTChan MxAgentState s
nState
runAgent :: MxAgent s ()
-> [MxSink s]
-> ChannelSelector
-> TChan Message
-> MxAgentState s
-> Process ()
runAgent :: forall s.
MxAgent s ()
-> [MxSink s]
-> ChannelSelector
-> TChan Message
-> MxAgentState s
-> Process ()
runAgent MxAgent s ()
eh [MxSink s]
hs ChannelSelector
cs TChan Message
c MxAgentState s
s =
MxAgent s ()
-> [MxSink s]
-> ChannelSelector
-> TChan Message
-> MxAgentState s
-> Process ()
forall s.
MxAgent s ()
-> [MxSink s]
-> ChannelSelector
-> TChan Message
-> MxAgentState s
-> Process ()
runAgentWithFinalizer MxAgent s ()
eh [MxSink s]
hs ChannelSelector
cs TChan Message
c MxAgentState s
s
Process () -> Process () -> Process ()
forall (m :: * -> *) a b.
(HasCallStack, MonadCatch m) =>
m a -> m b -> m a
`onException` MxAgent s () -> MxAgentState s -> Process ()
forall s. MxAgent s () -> MxAgentState s -> Process ()
runAgentFinalizer MxAgent s ()
eh MxAgentState s
s
runAgentWithFinalizer :: MxAgent s ()
-> [MxSink s]
-> ChannelSelector
-> TChan Message
-> MxAgentState s
-> Process ()
runAgentWithFinalizer :: forall s.
MxAgent s ()
-> [MxSink s]
-> ChannelSelector
-> TChan Message
-> MxAgentState s
-> Process ()
runAgentWithFinalizer MxAgent s ()
eh' [MxSink s]
hs' ChannelSelector
cs' TChan Message
c' MxAgentState s
s' = do
Message
msg <- ChannelSelector -> TChan Message -> Process Message
getNextInput ChannelSelector
cs' TChan Message
c'
(MxAction
action, MxAgentState s
state) <- Message
-> MxAgentState s
-> MxPipeline s
-> Process (MxAction, MxAgentState s)
forall s.
Message
-> MxAgentState s
-> MxPipeline s
-> Process (MxAction, MxAgentState s)
runPipeline Message
msg MxAgentState s
s' (MxPipeline s -> Process (MxAction, MxAgentState s))
-> MxPipeline s -> Process (MxAction, MxAgentState s)
forall a b. (a -> b) -> a -> b
$ [MxSink s] -> MxPipeline s
forall s. [MxSink s] -> MxPipeline s
pipeline [MxSink s]
hs'
case MxAction
action of
MxAction
MxAgentReady -> MxAgent s ()
-> [MxSink s]
-> ChannelSelector
-> TChan Message
-> MxAgentState s
-> Process ()
forall s.
MxAgent s ()
-> [MxSink s]
-> ChannelSelector
-> TChan Message
-> MxAgentState s
-> Process ()
runAgent MxAgent s ()
eh' [MxSink s]
hs' ChannelSelector
InputChan TChan Message
c' MxAgentState s
state
MxAgentPrioritise ChannelSelector
priority -> MxAgent s ()
-> [MxSink s]
-> ChannelSelector
-> TChan Message
-> MxAgentState s
-> Process ()
forall s.
MxAgent s ()
-> [MxSink s]
-> ChannelSelector
-> TChan Message
-> MxAgentState s
-> Process ()
runAgent MxAgent s ()
eh' [MxSink s]
hs' ChannelSelector
priority TChan Message
c' MxAgentState s
state
MxAgentDeactivate String
_ -> MxAgent s () -> MxAgentState s -> Process ()
forall s. MxAgent s () -> MxAgentState s -> Process ()
runAgentFinalizer MxAgent s ()
eh' MxAgentState s
state
MxAction
MxAgentSkip -> String -> Process ()
forall a. HasCallStack => String -> a
error String
"IllegalState"
getNextInput :: ChannelSelector -> TChan Message -> Process Message
getNextInput ChannelSelector
sel TChan Message
chan =
let matches :: [Match Message]
matches =
case ChannelSelector
sel of
ChannelSelector
Mailbox -> [ (Message -> Process Message) -> Match Message
forall b. (Message -> Process b) -> Match b
matchAny Message -> Process Message
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return
, STM Message -> (Message -> Process Message) -> Match Message
forall a b. STM a -> (a -> Process b) -> Match b
matchSTM (TChan Message -> STM Message
forall a. TChan a -> STM a
readTChan TChan Message
chan) Message -> Process Message
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return]
ChannelSelector
InputChan -> [ STM Message -> (Message -> Process Message) -> Match Message
forall a b. STM a -> (a -> Process b) -> Match b
matchSTM (TChan Message -> STM Message
forall a. TChan a -> STM a
readTChan TChan Message
chan) Message -> Process Message
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return
, (Message -> Process Message) -> Match Message
forall b. (Message -> Process b) -> Match b
matchAny Message -> Process Message
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return]
in [Match Message] -> Process Message
forall b. [Match b] -> Process b
receiveWait [Match Message]
matches
runAgentFinalizer :: MxAgent s () -> MxAgentState s -> Process ()
runAgentFinalizer :: forall s. MxAgent s () -> MxAgentState s -> Process ()
runAgentFinalizer MxAgent s ()
f MxAgentState s
s = StateT (MxAgentState s) Process ()
-> MxAgentState s -> Process ((), MxAgentState s)
forall s (m :: * -> *) a. StateT s m a -> s -> m (a, s)
ST.runStateT (MxAgent s () -> StateT (MxAgentState s) Process ()
forall s a. MxAgent s a -> StateT (MxAgentState s) Process a
unAgent MxAgent s ()
f) MxAgentState s
s Process ((), MxAgentState s)
-> (((), MxAgentState s) -> Process ()) -> Process ()
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> Process ())
-> (((), MxAgentState s) -> ())
-> ((), MxAgentState s)
-> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((), MxAgentState s) -> ()
forall a b. (a, b) -> a
fst
pipeline :: forall s . [MxSink s] -> MxPipeline s
pipeline :: forall s. [MxSink s] -> MxPipeline s
pipeline [] = MxPipeline s
forall s. MxPipeline s
MxStop
pipeline (MxSink s
sink:[MxSink s]
sinks) = MxSink s -> MxPipeline s -> MxPipeline s
forall s. MxSink s -> MxPipeline s -> MxPipeline s
MxPipeline MxSink s
sink ([MxSink s] -> MxPipeline s
forall s. [MxSink s] -> MxPipeline s
pipeline [MxSink s]
sinks)
runPipeline :: forall s .
Message
-> MxAgentState s
-> MxPipeline s
-> Process (MxAction, MxAgentState s)
runPipeline :: forall s.
Message
-> MxAgentState s
-> MxPipeline s
-> Process (MxAction, MxAgentState s)
runPipeline Message
_ MxAgentState s
state MxPipeline s
MxStop = (MxAction, MxAgentState s) -> Process (MxAction, MxAgentState s)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (MxAction
MxAgentReady, MxAgentState s
state)
runPipeline Message
msg MxAgentState s
state MxPipeline{MxPipeline s
MxSink s
current :: forall s. MxPipeline s -> MxSink s
next :: forall s. MxPipeline s -> MxPipeline s
current :: MxSink s
next :: MxPipeline s
..} = do
let act :: MxAgent s (Maybe MxAction)
act = MxSink s
current Message
msg
(Maybe MxAction
pass, MxAgentState s
state') <- StateT (MxAgentState s) Process (Maybe MxAction)
-> MxAgentState s -> Process (Maybe MxAction, MxAgentState s)
forall s (m :: * -> *) a. StateT s m a -> s -> m (a, s)
ST.runStateT (MxAgent s (Maybe MxAction)
-> StateT (MxAgentState s) Process (Maybe MxAction)
forall s a. MxAgent s a -> StateT (MxAgentState s) Process a
unAgent MxAgent s (Maybe MxAction)
act) MxAgentState s
state
case Maybe MxAction
pass of
Maybe MxAction
Nothing -> Message
-> MxAgentState s
-> MxPipeline s
-> Process (MxAction, MxAgentState s)
forall s.
Message
-> MxAgentState s
-> MxPipeline s
-> Process (MxAction, MxAgentState s)
runPipeline Message
msg MxAgentState s
state MxPipeline s
next
Just MxAction
result -> (MxAction, MxAgentState s) -> Process (MxAction, MxAgentState s)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (MxAction
result, MxAgentState s
state')