{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE RecordWildCards #-}
module Control.Distributed.Process.Management.Internal.Agent where
import Control.Applicative
import Control.Concurrent (forkIO)
import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, takeMVar)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TChan
( TChan
, newBroadcastTChanIO
, readTChan
, writeTChan
, dupTChan
)
import Control.Distributed.Process.Internal.Primitives
( receiveWait
, matchAny
, die
, catches
, Handler(..)
)
import Control.Distributed.Process.Internal.CQueue
( enqueueSTM
, CQueue
)
import Control.Distributed.Process.Management.Internal.Types
( Fork
)
import Control.Distributed.Process.Management.Internal.Trace.Tracer
( traceController
)
import Control.Distributed.Process.Internal.Types
( Process
, Message
, Tracer(..)
, LocalProcess(..)
, ProcessId
, forever'
)
import Control.Exception (AsyncException(ThreadKilled), SomeException)
import Control.Monad (void)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Reader (ask)
import GHC.Weak (Weak, deRefWeak)
import Prelude
type AgentConfig =
(Tracer, Weak (CQueue Message),
(((TChan Message, TChan Message) -> Process ()) -> IO ProcessId))
mxAgentController :: Fork
-> MVar AgentConfig
-> Process ()
mxAgentController :: Fork -> MVar AgentConfig -> Process ()
mxAgentController Fork
forkProcess MVar AgentConfig
mv = do
Tracer
trc <- IO Tracer -> Process Tracer
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Tracer -> Process Tracer) -> IO Tracer -> Process Tracer
forall a b. (a -> b) -> a -> b
$ Fork -> IO Tracer
startTracing Fork
forkProcess
TChan Message
sigbus <- IO (TChan Message) -> Process (TChan Message)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TChan Message) -> Process (TChan Message))
-> IO (TChan Message) -> Process (TChan Message)
forall a b. (a -> b) -> a -> b
$ IO (TChan Message)
forall a. IO (TChan a)
newBroadcastTChanIO
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ TChan Message -> IO ()
startDeadLetterQueue TChan Message
sigbus
Weak (CQueue Message)
weakQueue <- LocalProcess -> Weak (CQueue Message)
processWeakQ (LocalProcess -> Weak (CQueue Message))
-> Process LocalProcess -> Process (Weak (CQueue Message))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MVar AgentConfig -> AgentConfig -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar AgentConfig
mv (Tracer
trc, Weak (CQueue Message)
weakQueue, Fork
-> TChan Message
-> ((TChan Message, TChan Message) -> Process ())
-> IO ProcessId
mxStartAgent Fork
forkProcess TChan Message
sigbus)
TChan Message -> Tracer -> Process ()
forall {b}. TChan Message -> Tracer -> Process b
go TChan Message
sigbus Tracer
trc
where
go :: TChan Message -> Tracer -> Process b
go TChan Message
bus Tracer
tracer = Process () -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b
forever' (Process () -> Process b) -> Process () -> Process b
forall a b. (a -> b) -> a -> b
$ do
Process () -> Process ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$ [Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait [
(Message -> Process ()) -> Match ()
forall b. (Message -> Process b) -> Match b
matchAny (IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ())
-> (Message -> IO ()) -> Message -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TChan Message -> Tracer -> Message -> IO ()
broadcast TChan Message
bus Tracer
tracer)
] Process () -> [Handler ()] -> Process ()
forall a. Process a -> [Handler a] -> Process a
`catches` [(AsyncException -> Process ()) -> Handler ()
forall a e. Exception e => (e -> Process a) -> Handler a
Handler (\AsyncException
ThreadKilled -> String -> Process ()
forall a b. Serializable a => a -> Process b
die String
"Killed"),
(SomeException -> Process ()) -> Handler ()
forall a e. Exception e => (e -> Process a) -> Handler a
Handler (\(SomeException
_ :: SomeException) -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())]
broadcast :: TChan Message -> Tracer -> Message -> IO ()
broadcast :: TChan Message -> Tracer -> Message -> IO ()
broadcast TChan Message
ch Tracer
tr Message
msg = do
Maybe (CQueue Message)
tmQueue <- Tracer -> IO (Maybe (CQueue Message))
tracerQueue Tracer
tr
TChan Message -> Maybe (CQueue Message) -> Message -> IO ()
atomicBroadcast TChan Message
ch Maybe (CQueue Message)
tmQueue Message
msg
tracerQueue :: Tracer -> IO (Maybe (CQueue Message))
tracerQueue :: Tracer -> IO (Maybe (CQueue Message))
tracerQueue (Tracer ProcessId
_ Weak (CQueue Message)
wQ) = Weak (CQueue Message) -> IO (Maybe (CQueue Message))
forall v. Weak v -> IO (Maybe v)
deRefWeak Weak (CQueue Message)
wQ
atomicBroadcast :: TChan Message
-> Maybe (CQueue Message)
-> Message -> IO ()
atomicBroadcast :: TChan Message -> Maybe (CQueue Message) -> Message -> IO ()
atomicBroadcast TChan Message
ch Maybe (CQueue Message)
Nothing Message
msg = IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TChan Message -> Message -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan Message
ch Message
msg
atomicBroadcast TChan Message
ch (Just CQueue Message
q) Message
msg = do
IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ CQueue Message -> Message -> STM ()
forall a. CQueue a -> a -> STM ()
enqueueSTM CQueue Message
q Message
msg STM () -> STM () -> STM ()
forall a b. STM a -> STM b -> STM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> TChan Message -> Message -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan Message
ch Message
msg
mxStartAgent :: Fork
-> TChan Message
-> ((TChan Message, TChan Message) -> Process ())
-> IO ProcessId
mxStartAgent :: Fork
-> TChan Message
-> ((TChan Message, TChan Message) -> Process ())
-> IO ProcessId
mxStartAgent Fork
fork TChan Message
chan (TChan Message, TChan Message) -> Process ()
handler = do
TChan Message
chan' <- STM (TChan Message) -> IO (TChan Message)
forall a. STM a -> IO a
atomically (TChan Message -> STM (TChan Message)
forall a. TChan a -> STM (TChan a)
dupTChan TChan Message
chan)
let proc :: Process ()
proc = (TChan Message, TChan Message) -> Process ()
handler (TChan Message
chan, TChan Message
chan')
Fork
fork Process ()
proc
startTracing :: Fork -> IO Tracer
startTracing :: Fork -> IO Tracer
startTracing Fork
forkProcess = do
MVar (Weak (CQueue Message))
mv <- IO (MVar (Weak (CQueue Message)))
forall a. IO (MVar a)
newEmptyMVar
ProcessId
pid <- Fork
forkProcess Fork -> Fork
forall a b. (a -> b) -> a -> b
$ MVar (Weak (CQueue Message)) -> Process ()
traceController MVar (Weak (CQueue Message))
mv
Weak (CQueue Message)
wQ <- IO (Weak (CQueue Message)) -> IO (Weak (CQueue Message))
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Weak (CQueue Message)) -> IO (Weak (CQueue Message)))
-> IO (Weak (CQueue Message)) -> IO (Weak (CQueue Message))
forall a b. (a -> b) -> a -> b
$ MVar (Weak (CQueue Message)) -> IO (Weak (CQueue Message))
forall a. MVar a -> IO a
takeMVar MVar (Weak (CQueue Message))
mv
Tracer -> IO Tracer
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Tracer -> IO Tracer) -> Tracer -> IO Tracer
forall a b. (a -> b) -> a -> b
$ ProcessId -> Weak (CQueue Message) -> Tracer
Tracer ProcessId
pid Weak (CQueue Message)
wQ
startDeadLetterQueue :: TChan Message
-> IO ()
startDeadLetterQueue :: TChan Message -> IO ()
startDeadLetterQueue TChan Message
sigbus = do
TChan Message
chan' <- STM (TChan Message) -> IO (TChan Message)
forall a. STM a -> IO a
atomically (TChan Message -> STM (TChan Message)
forall a. TChan a -> STM (TChan a)
dupTChan TChan Message
sigbus)
IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b
forever' (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IO Message -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Message -> IO ()) -> IO Message -> IO ()
forall a b. (a -> b) -> a -> b
$ STM Message -> IO Message
forall a. STM a -> IO a
atomically (STM Message -> IO Message) -> STM Message -> IO Message
forall a b. (a -> b) -> a -> b
$ TChan Message -> STM Message
forall a. TChan a -> STM a
readTChan TChan Message
chan'