{-# LANGUAGE ScopedTypeVariables  #-}
{-# LANGUAGE RecordWildCards #-}

module Control.Distributed.Process.Management.Internal.Agent where

import Control.Applicative
import Control.Concurrent (forkIO)
import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, takeMVar)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TChan
  ( TChan
  , newBroadcastTChanIO
  , readTChan
  , writeTChan
  , dupTChan
  )
import Control.Distributed.Process.Internal.Primitives
  ( receiveWait
  , matchAny
  , die
  , catches
  , Handler(..)
  )
import Control.Distributed.Process.Internal.CQueue
  ( enqueueSTM
  , CQueue
  )
import Control.Distributed.Process.Management.Internal.Types
  ( Fork
  )
import Control.Distributed.Process.Management.Internal.Trace.Tracer
  ( traceController
  )
import Control.Distributed.Process.Internal.Types
  ( Process
  , Message
  , Tracer(..)
  , LocalProcess(..)
  , ProcessId
  , forever'
  )
import Control.Exception (AsyncException(ThreadKilled), SomeException)
import Control.Monad (void)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Reader (ask)
import GHC.Weak (Weak, deRefWeak)
import Prelude

--------------------------------------------------------------------------------
-- Agent Controller Implementation                                            --
--------------------------------------------------------------------------------

-- | A triple containing a configured tracer, weak pointer to the
-- agent controller's mailbox (CQueue) and an expression used to
-- instantiate new agents on the current node.
type AgentConfig =
  (Tracer, Weak (CQueue Message),
   (((TChan Message, TChan Message) -> Process ()) -> IO ProcessId))

-- | Starts a management agent for the current node. The agent process
-- must not crash or be killed, so we generally avoid publishing its
-- @ProcessId@ where possible.
--
-- Our process is also responsible for forwarding messages to the trace
-- controller, since having two /special processes/ handled via the
-- @LocalNode@ would be inelegant. We forward messages directly to the
-- trace controller's message queue, just as the @MxEventBus@ that's
-- set up on the @LocalNode@ forwards messages directly to us. This
-- optimises the code path for tracing and avoids overloading the node
-- node controller's internal control plane with additional routing, at the
-- cost of a little more complexity and two cases where we break
-- encapsulation.
--
mxAgentController :: Fork
                  -> MVar AgentConfig
                  -> Process ()
mxAgentController :: Fork -> MVar AgentConfig -> Process ()
mxAgentController Fork
forkProcess MVar AgentConfig
mv = do
    Tracer
trc <- IO Tracer -> Process Tracer
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Tracer -> Process Tracer) -> IO Tracer -> Process Tracer
forall a b. (a -> b) -> a -> b
$ Fork -> IO Tracer
startTracing Fork
forkProcess
    TChan Message
sigbus <- IO (TChan Message) -> Process (TChan Message)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TChan Message) -> Process (TChan Message))
-> IO (TChan Message) -> Process (TChan Message)
forall a b. (a -> b) -> a -> b
$ IO (TChan Message)
forall a. IO (TChan a)
newBroadcastTChanIO
    IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ TChan Message -> IO ()
startDeadLetterQueue TChan Message
sigbus
    Weak (CQueue Message)
weakQueue <- LocalProcess -> Weak (CQueue Message)
processWeakQ (LocalProcess -> Weak (CQueue Message))
-> Process LocalProcess -> Process (Weak (CQueue Message))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
    IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MVar AgentConfig -> AgentConfig -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar AgentConfig
mv (Tracer
trc, Weak (CQueue Message)
weakQueue, Fork
-> TChan Message
-> ((TChan Message, TChan Message) -> Process ())
-> IO ProcessId
mxStartAgent Fork
forkProcess TChan Message
sigbus)
    TChan Message -> Tracer -> Process ()
forall {b}. TChan Message -> Tracer -> Process b
go TChan Message
sigbus Tracer
trc
  where
    go :: TChan Message -> Tracer -> Process b
go TChan Message
bus Tracer
tracer = Process () -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b
forever' (Process () -> Process b) -> Process () -> Process b
forall a b. (a -> b) -> a -> b
$ do
      Process () -> Process ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$ [Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait [
          -- This is exactly what it appears to be: a "catch all" handler.
          -- Since mxNotify can potentially pass an unevaluated thunk to
          -- our mailbox, the dequeue (i.e., matchMessage) can fail and
          -- crash this process, which we DO NOT want. Alternatively,
          -- we handle IO exceptions here explicitly, since we don't want
          -- this process to ever crash, and the assumption we therefore
          -- make is thus:
          --
          -- 1. only ThreadKilled can tell this process to terminate
          -- 2. all other exceptions are invalid and should be ignored
          --
          -- The outcome of course, is that /bad/ calls to mxNotify
          -- (e.g., passing unevaluated thunks that will crash when
          -- they're eventually forced) are thus silently ignored.
          --
          (Message -> Process ()) -> Match ()
forall b. (Message -> Process b) -> Match b
matchAny (IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ())
-> (Message -> IO ()) -> Message -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TChan Message -> Tracer -> Message -> IO ()
broadcast TChan Message
bus Tracer
tracer)
        ] Process () -> [Handler ()] -> Process ()
forall a. Process a -> [Handler a] -> Process a
`catches` [(AsyncException -> Process ()) -> Handler ()
forall a e. Exception e => (e -> Process a) -> Handler a
Handler (\AsyncException
ThreadKilled -> String -> Process ()
forall a b. Serializable a => a -> Process b
die String
"Killed"),
                     (SomeException -> Process ()) -> Handler ()
forall a e. Exception e => (e -> Process a) -> Handler a
Handler (\(SomeException
_ :: SomeException) -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())]

    broadcast :: TChan Message -> Tracer -> Message -> IO ()
    broadcast :: TChan Message -> Tracer -> Message -> IO ()
broadcast TChan Message
ch Tracer
tr Message
msg = do
      Maybe (CQueue Message)
tmQueue <- Tracer -> IO (Maybe (CQueue Message))
tracerQueue Tracer
tr
      TChan Message -> Maybe (CQueue Message) -> Message -> IO ()
atomicBroadcast TChan Message
ch Maybe (CQueue Message)
tmQueue Message
msg

    tracerQueue :: Tracer -> IO (Maybe (CQueue Message))
    tracerQueue :: Tracer -> IO (Maybe (CQueue Message))
tracerQueue (Tracer ProcessId
_ Weak (CQueue Message)
wQ) = Weak (CQueue Message) -> IO (Maybe (CQueue Message))
forall v. Weak v -> IO (Maybe v)
deRefWeak Weak (CQueue Message)
wQ

    atomicBroadcast :: TChan Message
                    -> Maybe (CQueue Message)
                    -> Message -> IO ()
    atomicBroadcast :: TChan Message -> Maybe (CQueue Message) -> Message -> IO ()
atomicBroadcast TChan Message
ch Maybe (CQueue Message)
Nothing  Message
msg = IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TChan Message -> Message -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan Message
ch Message
msg
    atomicBroadcast TChan Message
ch (Just CQueue Message
q) Message
msg = do
      -- liftIO $ putStrLn $ "broadcasting " ++ (show msg)
      IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ CQueue Message -> Message -> STM ()
forall a. CQueue a -> a -> STM ()
enqueueSTM CQueue Message
q Message
msg STM () -> STM () -> STM ()
forall a b. STM a -> STM b -> STM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> TChan Message -> Message -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan Message
ch Message
msg

-- | Forks a new process in which an mxAgent is run.
--
mxStartAgent :: Fork
             -> TChan Message
             -> ((TChan Message, TChan Message) -> Process ())
             -> IO ProcessId
mxStartAgent :: Fork
-> TChan Message
-> ((TChan Message, TChan Message) -> Process ())
-> IO ProcessId
mxStartAgent Fork
fork TChan Message
chan (TChan Message, TChan Message) -> Process ()
handler = do
  TChan Message
chan' <- STM (TChan Message) -> IO (TChan Message)
forall a. STM a -> IO a
atomically (TChan Message -> STM (TChan Message)
forall a. TChan a -> STM (TChan a)
dupTChan TChan Message
chan)
  let proc :: Process ()
proc = (TChan Message, TChan Message) -> Process ()
handler (TChan Message
chan, TChan Message
chan')
  Fork
fork Process ()
proc

-- | Start the tracer controller.
--
startTracing :: Fork -> IO Tracer
startTracing :: Fork -> IO Tracer
startTracing Fork
forkProcess = do
  MVar (Weak (CQueue Message))
mv  <- IO (MVar (Weak (CQueue Message)))
forall a. IO (MVar a)
newEmptyMVar
  ProcessId
pid <- Fork
forkProcess Fork -> Fork
forall a b. (a -> b) -> a -> b
$ MVar (Weak (CQueue Message)) -> Process ()
traceController MVar (Weak (CQueue Message))
mv
  Weak (CQueue Message)
wQ  <- IO (Weak (CQueue Message)) -> IO (Weak (CQueue Message))
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Weak (CQueue Message)) -> IO (Weak (CQueue Message)))
-> IO (Weak (CQueue Message)) -> IO (Weak (CQueue Message))
forall a b. (a -> b) -> a -> b
$ MVar (Weak (CQueue Message)) -> IO (Weak (CQueue Message))
forall a. MVar a -> IO a
takeMVar MVar (Weak (CQueue Message))
mv
  Tracer -> IO Tracer
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Tracer -> IO Tracer) -> Tracer -> IO Tracer
forall a b. (a -> b) -> a -> b
$ ProcessId -> Weak (CQueue Message) -> Tracer
Tracer ProcessId
pid Weak (CQueue Message)
wQ

-- | Start a dead letter (agent) queue.
--
-- If no agents are registered on the system, the management
-- event bus will fill up and its data won't be GC'ed until someone
-- comes along and reads from the broadcast channel (via dupTChan
-- of course). This is effectively a leak, so to mitigate it, we
-- start a /dead letter queue/ that drains the event bus continuously,
-- thus ensuring if there are no other consumers that we won't use
-- up heap space unnecessarily.
--
startDeadLetterQueue :: TChan Message
                     -> IO ()
startDeadLetterQueue :: TChan Message -> IO ()
startDeadLetterQueue TChan Message
sigbus = do
  TChan Message
chan' <- STM (TChan Message) -> IO (TChan Message)
forall a. STM a -> IO a
atomically (TChan Message -> STM (TChan Message)
forall a. TChan a -> STM (TChan a)
dupTChan TChan Message
sigbus)
  IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b
forever' (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    IO Message -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Message -> IO ()) -> IO Message -> IO ()
forall a b. (a -> b) -> a -> b
$ STM Message -> IO Message
forall a. STM a -> IO a
atomically (STM Message -> IO Message) -> STM Message -> IO Message
forall a b. (a -> b) -> a -> b
$ TChan Message -> STM Message
forall a. TChan a -> STM a
readTChan TChan Message
chan'