{-# LANGUAGE DeriveGeneric, DeriveDataTypeable #-}

-- |
-- Module     : Simulation.Aivika.Distributed.Optimistic.Internal.DIO
-- Copyright  : Copyright (c) 2015-2017, David Sorokin <david.sorokin@gmail.com>
-- License    : BSD3
-- Maintainer : David Sorokin <david.sorokin@gmail.com>
-- Stability  : experimental
-- Tested with: GHC 7.10.3
--
-- This module defines a distributed computation based on 'IO'.
--
module Simulation.Aivika.Distributed.Optimistic.Internal.DIO
       (DIO(..),
        DIOParams(..),
        DIOEnv(..),
        DIOStrategy(..),
        invokeDIO,
        runDIO,
        runDIOWithEnv,
        defaultDIOParams,
        defaultDIOEnv,
        terminateDIO,
        registerDIO,
        unregisterDIO,
        monitorProcessDIO,
        dioParams,
        messageChannel,
        messageInboxId,
        timeServerId,
        sendMessageDIO,
        sendMessagesDIO,
        sendAcknowledgementMessageDIO,
        sendAcknowledgementMessagesDIO,
        sendLocalTimeDIO,
        sendRequestGlobalTimeDIO,
        logDIO,
        liftDistributedUnsafe) where

import Data.Typeable
import Data.Binary
import Data.IORef
import Data.Time.Clock

import GHC.Generics

import Control.Applicative
import Control.Monad
import Control.Monad.Trans
import Control.Exception (throw)
import Control.Monad.Catch as C
import qualified Control.Distributed.Process as DP
import Control.Concurrent
import Control.Concurrent.STM

import System.Timeout

import Simulation.Aivika.Trans.Exception
import Simulation.Aivika.Trans.Internal.Types

import Simulation.Aivika.Distributed.Optimistic.Internal.Channel
import Simulation.Aivika.Distributed.Optimistic.Internal.Message
import Simulation.Aivika.Distributed.Optimistic.Internal.TimeServer
import Simulation.Aivika.Distributed.Optimistic.Internal.Priority
import Simulation.Aivika.Distributed.Optimistic.Internal.ConnectionManager
import Simulation.Aivika.Distributed.Optimistic.State

-- | The parameters for the 'DIO' computation.
data DIOParams =
  DIOParams { DIOParams -> Priority
dioLoggingPriority :: Priority,
              -- ^ The logging priority
              DIOParams -> String
dioName :: String,
              -- ^ The name of the logical process.
              DIOParams -> Maybe Double
dioTimeHorizon :: Maybe Double,
              -- ^ The time horizon in modeling time units.
              DIOParams -> Int
dioUndoableLogSizeThreshold :: Int,
              -- ^ The undoable log size threshold used for detecting an overflow
              DIOParams -> Int
dioOutputMessageQueueSizeThreshold :: Int,
              -- ^ The output message queue size threshold used for detecting an overflow
              DIOParams -> Int
dioTransientMessageQueueSizeThreshold :: Int,
              -- ^ The transient message queue size threshold used for detecting an overflow
              DIOParams -> Int
dioSyncTimeout :: Int,
              -- ^ The timeout in microseconds used for synchronising the operations
              DIOParams -> Bool
dioAllowPrematureIO :: Bool,
              -- ^ Whether to allow performing the premature IO action; otherwise, raise an error
              DIOParams -> Bool
dioAllowSkippingOutdatedMessage :: Bool,
              -- ^ Whether to allow skipping an outdated message with the receive time less than the global time,
              -- which is possible after reconnection
              DIOParams -> Bool
dioProcessMonitoringEnabled :: Bool,
              -- ^ Whether the process monitoring is enabled
              DIOParams -> Int
dioProcessMonitoringDelay :: Int,
              -- ^ The delay in microseconds which must be applied for monitoring every remote process.
              DIOParams -> Bool
dioProcessReconnectingEnabled :: Bool,
              -- ^ Whether the automatic reconnecting to processes is enabled when the monitoring is enabled
              DIOParams -> Int
dioProcessReconnectingDelay :: Int,
              -- ^ The delay in microseconds before reconnecting to the remote process
              DIOParams -> Int
dioKeepAliveInterval :: Int,
              -- ^ The interval in microseconds for sending keep-alive messages
              DIOParams -> Int
dioTimeServerAcknowledgementTimeout :: Int,
              -- ^ The timeout in microseconds for receiving an acknowledgement message from the time server
              DIOParams -> Int
dioSimulationMonitoringInterval :: Int,
              -- ^ The interval in microseconds between sending the simulation monitoring messages
              DIOParams -> Int
dioSimulationMonitoringTimeout :: Int,
              -- ^ The timeout in microseconds when processing the monitoring messages
              DIOParams -> DIOStrategy
dioStrategy :: DIOStrategy,
              -- ^ The logical process strategy
              DIOParams -> Bool
dioProcessDisconnectingEnabled :: Bool
              -- ^ Whether the process disconnecting is enabled when the monitoring is enabled but the reconnecting is disabled
            } deriving (DIOParams -> DIOParams -> Bool
(DIOParams -> DIOParams -> Bool)
-> (DIOParams -> DIOParams -> Bool) -> Eq DIOParams
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: DIOParams -> DIOParams -> Bool
== :: DIOParams -> DIOParams -> Bool
$c/= :: DIOParams -> DIOParams -> Bool
/= :: DIOParams -> DIOParams -> Bool
Eq, Eq DIOParams
Eq DIOParams =>
(DIOParams -> DIOParams -> Ordering)
-> (DIOParams -> DIOParams -> Bool)
-> (DIOParams -> DIOParams -> Bool)
-> (DIOParams -> DIOParams -> Bool)
-> (DIOParams -> DIOParams -> Bool)
-> (DIOParams -> DIOParams -> DIOParams)
-> (DIOParams -> DIOParams -> DIOParams)
-> Ord DIOParams
DIOParams -> DIOParams -> Bool
DIOParams -> DIOParams -> Ordering
DIOParams -> DIOParams -> DIOParams
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: DIOParams -> DIOParams -> Ordering
compare :: DIOParams -> DIOParams -> Ordering
$c< :: DIOParams -> DIOParams -> Bool
< :: DIOParams -> DIOParams -> Bool
$c<= :: DIOParams -> DIOParams -> Bool
<= :: DIOParams -> DIOParams -> Bool
$c> :: DIOParams -> DIOParams -> Bool
> :: DIOParams -> DIOParams -> Bool
$c>= :: DIOParams -> DIOParams -> Bool
>= :: DIOParams -> DIOParams -> Bool
$cmax :: DIOParams -> DIOParams -> DIOParams
max :: DIOParams -> DIOParams -> DIOParams
$cmin :: DIOParams -> DIOParams -> DIOParams
min :: DIOParams -> DIOParams -> DIOParams
Ord, Int -> DIOParams -> ShowS
[DIOParams] -> ShowS
DIOParams -> String
(Int -> DIOParams -> ShowS)
-> (DIOParams -> String)
-> ([DIOParams] -> ShowS)
-> Show DIOParams
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> DIOParams -> ShowS
showsPrec :: Int -> DIOParams -> ShowS
$cshow :: DIOParams -> String
show :: DIOParams -> String
$cshowList :: [DIOParams] -> ShowS
showList :: [DIOParams] -> ShowS
Show, Typeable, (forall x. DIOParams -> Rep DIOParams x)
-> (forall x. Rep DIOParams x -> DIOParams) -> Generic DIOParams
forall x. Rep DIOParams x -> DIOParams
forall x. DIOParams -> Rep DIOParams x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. DIOParams -> Rep DIOParams x
from :: forall x. DIOParams -> Rep DIOParams x
$cto :: forall x. Rep DIOParams x -> DIOParams
to :: forall x. Rep DIOParams x -> DIOParams
Generic)

instance Binary DIOParams

-- | Those 'DIO' environment parameters that cannot be serialized and passed to another process via the net.
data DIOEnv =
  DIOEnv { DIOEnv -> Maybe (LogicalProcessState -> Process ())
dioSimulationMonitoringAction :: Maybe (LogicalProcessState -> DP.Process ())
           -- ^ The simulation monitoring action
         }

-- | The logical process strategy.
data DIOStrategy = WaitIndefinitelyForTimeServer
                   -- ^ Wait for the time server forever
                 | TerminateDueToTimeServerTimeout Int
                   -- ^ Terminate due to the exceeded time server timeout in microseconds, but not less than 'dioSyncTimeout'
                 deriving (DIOStrategy -> DIOStrategy -> Bool
(DIOStrategy -> DIOStrategy -> Bool)
-> (DIOStrategy -> DIOStrategy -> Bool) -> Eq DIOStrategy
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: DIOStrategy -> DIOStrategy -> Bool
== :: DIOStrategy -> DIOStrategy -> Bool
$c/= :: DIOStrategy -> DIOStrategy -> Bool
/= :: DIOStrategy -> DIOStrategy -> Bool
Eq, Eq DIOStrategy
Eq DIOStrategy =>
(DIOStrategy -> DIOStrategy -> Ordering)
-> (DIOStrategy -> DIOStrategy -> Bool)
-> (DIOStrategy -> DIOStrategy -> Bool)
-> (DIOStrategy -> DIOStrategy -> Bool)
-> (DIOStrategy -> DIOStrategy -> Bool)
-> (DIOStrategy -> DIOStrategy -> DIOStrategy)
-> (DIOStrategy -> DIOStrategy -> DIOStrategy)
-> Ord DIOStrategy
DIOStrategy -> DIOStrategy -> Bool
DIOStrategy -> DIOStrategy -> Ordering
DIOStrategy -> DIOStrategy -> DIOStrategy
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: DIOStrategy -> DIOStrategy -> Ordering
compare :: DIOStrategy -> DIOStrategy -> Ordering
$c< :: DIOStrategy -> DIOStrategy -> Bool
< :: DIOStrategy -> DIOStrategy -> Bool
$c<= :: DIOStrategy -> DIOStrategy -> Bool
<= :: DIOStrategy -> DIOStrategy -> Bool
$c> :: DIOStrategy -> DIOStrategy -> Bool
> :: DIOStrategy -> DIOStrategy -> Bool
$c>= :: DIOStrategy -> DIOStrategy -> Bool
>= :: DIOStrategy -> DIOStrategy -> Bool
$cmax :: DIOStrategy -> DIOStrategy -> DIOStrategy
max :: DIOStrategy -> DIOStrategy -> DIOStrategy
$cmin :: DIOStrategy -> DIOStrategy -> DIOStrategy
min :: DIOStrategy -> DIOStrategy -> DIOStrategy
Ord, Int -> DIOStrategy -> ShowS
[DIOStrategy] -> ShowS
DIOStrategy -> String
(Int -> DIOStrategy -> ShowS)
-> (DIOStrategy -> String)
-> ([DIOStrategy] -> ShowS)
-> Show DIOStrategy
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> DIOStrategy -> ShowS
showsPrec :: Int -> DIOStrategy -> ShowS
$cshow :: DIOStrategy -> String
show :: DIOStrategy -> String
$cshowList :: [DIOStrategy] -> ShowS
showList :: [DIOStrategy] -> ShowS
Show, Typeable, (forall x. DIOStrategy -> Rep DIOStrategy x)
-> (forall x. Rep DIOStrategy x -> DIOStrategy)
-> Generic DIOStrategy
forall x. Rep DIOStrategy x -> DIOStrategy
forall x. DIOStrategy -> Rep DIOStrategy x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. DIOStrategy -> Rep DIOStrategy x
from :: forall x. DIOStrategy -> Rep DIOStrategy x
$cto :: forall x. Rep DIOStrategy x -> DIOStrategy
to :: forall x. Rep DIOStrategy x -> DIOStrategy
Generic)

instance Binary DIOStrategy

-- | The distributed computation based on 'IO'.
newtype DIO a = DIO { forall a. DIO a -> DIOContext -> Process a
unDIO :: DIOContext -> DP.Process a
                      -- ^ Unwrap the computation.
                    }

-- | The context of the 'DIO' computation.
data DIOContext =
  DIOContext { DIOContext -> Channel LogicalProcessMessage
dioChannel :: Channel LogicalProcessMessage,
               -- ^ The channel of messages.
               DIOContext -> ProcessId
dioInboxId :: DP.ProcessId,
               -- ^ The inbox process identifier.
               DIOContext -> ProcessId
dioTimeServerId :: DP.ProcessId,
               -- ^ The time server process
               DIOContext -> DIOParams
dioParams0 :: DIOParams,
               -- ^ The parameters of the computation.
               DIOContext -> TVar Bool
dioRegisteredInTimeServer :: TVar Bool,
               -- ^ Whether the compution is registered in the time server.
               DIOContext -> TVar Bool
dioUnregisteredFromTimeServer :: TVar Bool,
               -- ^ Whether the compution is unregistered from the time server.
               DIOContext -> TVar Bool
dioTimeServerTerminating :: TVar Bool
               -- ^ Whether the compution asked to terminate the time server.
             }

instance Monad DIO where

  {-# INLINE (>>=) #-}
  (DIO DIOContext -> Process a
m) >>= :: forall a b. DIO a -> (a -> DIO b) -> DIO b
>>= a -> DIO b
k = (DIOContext -> Process b) -> DIO b
forall a. (DIOContext -> Process a) -> DIO a
DIO ((DIOContext -> Process b) -> DIO b)
-> (DIOContext -> Process b) -> DIO b
forall a b. (a -> b) -> a -> b
$ \DIOContext
ps ->
    DIOContext -> Process a
m DIOContext
ps Process a -> (a -> Process b) -> Process b
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \a
a ->
    let m' :: DIOContext -> Process b
m' = DIO b -> DIOContext -> Process b
forall a. DIO a -> DIOContext -> Process a
unDIO (a -> DIO b
k a
a) in DIOContext -> Process b
m' DIOContext
ps

instance Applicative DIO where

  {-# INLINE pure #-}
  pure :: forall a. a -> DIO a
pure = (DIOContext -> Process a) -> DIO a
forall a. (DIOContext -> Process a) -> DIO a
DIO ((DIOContext -> Process a) -> DIO a)
-> (a -> DIOContext -> Process a) -> a -> DIO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Process a -> DIOContext -> Process a
forall a b. a -> b -> a
const (Process a -> DIOContext -> Process a)
-> (a -> Process a) -> a -> DIOContext -> Process a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Process a
forall a. a -> Process a
forall (f :: * -> *) a. Applicative f => a -> f a
pure

  {-# INLINE (<*>) #-}
  <*> :: forall a b. DIO (a -> b) -> DIO a -> DIO b
(<*>) = DIO (a -> b) -> DIO a -> DIO b
forall (m :: * -> *) a b. Monad m => m (a -> b) -> m a -> m b
ap

instance Functor DIO where

  {-# INLINE fmap #-}
  fmap :: forall a b. (a -> b) -> DIO a -> DIO b
fmap a -> b
f (DIO DIOContext -> Process a
m) = (DIOContext -> Process b) -> DIO b
forall a. (DIOContext -> Process a) -> DIO a
DIO ((DIOContext -> Process b) -> DIO b)
-> (DIOContext -> Process b) -> DIO b
forall a b. (a -> b) -> a -> b
$ (a -> b) -> Process a -> Process b
forall a b. (a -> b) -> Process a -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f (Process a -> Process b)
-> (DIOContext -> Process a) -> DIOContext -> Process b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DIOContext -> Process a
m 

instance MonadException DIO where

  catchComp :: forall e a. Exception e => DIO a -> (e -> DIO a) -> DIO a
catchComp (DIO DIOContext -> Process a
m) e -> DIO a
h = (DIOContext -> Process a) -> DIO a
forall a. (DIOContext -> Process a) -> DIO a
DIO ((DIOContext -> Process a) -> DIO a)
-> (DIOContext -> Process a) -> DIO a
forall a b. (a -> b) -> a -> b
$ \DIOContext
ps ->
    Process a -> (e -> Process a) -> Process a
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
C.catch (DIOContext -> Process a
m DIOContext
ps) (\e
e -> DIO a -> DIOContext -> Process a
forall a. DIO a -> DIOContext -> Process a
unDIO (e -> DIO a
h e
e) DIOContext
ps)

  finallyComp :: forall a b. DIO a -> DIO b -> DIO a
finallyComp (DIO DIOContext -> Process a
m1) (DIO DIOContext -> Process b
m2) = (DIOContext -> Process a) -> DIO a
forall a. (DIOContext -> Process a) -> DIO a
DIO ((DIOContext -> Process a) -> DIO a)
-> (DIOContext -> Process a) -> DIO a
forall a b. (a -> b) -> a -> b
$ \DIOContext
ps ->
    Process a -> Process b -> Process a
forall (m :: * -> *) a b.
(HasCallStack, MonadMask m) =>
m a -> m b -> m a
C.finally (DIOContext -> Process a
m1 DIOContext
ps) (DIOContext -> Process b
m2 DIOContext
ps)
  
  throwComp :: forall e a. Exception e => e -> DIO a
throwComp e
e = (DIOContext -> Process a) -> DIO a
forall a. (DIOContext -> Process a) -> DIO a
DIO ((DIOContext -> Process a) -> DIO a)
-> (DIOContext -> Process a) -> DIO a
forall a b. (a -> b) -> a -> b
$ \DIOContext
ps ->
    e -> Process a
forall a e. Exception e => e -> a
throw e
e

-- | Invoke the 'DIO' computation.
invokeDIO :: DIOContext -> DIO a -> DP.Process a
{-# INLINE invokeDIO #-}
invokeDIO :: forall a. DIOContext -> DIO a -> Process a
invokeDIO DIOContext
ps (DIO DIOContext -> Process a
m) = DIOContext -> Process a
m DIOContext
ps

-- | Lift the distributed 'Process' computation.
liftDistributedUnsafe :: DP.Process a -> DIO a
liftDistributedUnsafe :: forall a. Process a -> DIO a
liftDistributedUnsafe = (DIOContext -> Process a) -> DIO a
forall a. (DIOContext -> Process a) -> DIO a
DIO ((DIOContext -> Process a) -> DIO a)
-> (Process a -> DIOContext -> Process a) -> Process a -> DIO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Process a -> DIOContext -> Process a
forall a b. a -> b -> a
const

-- | The default parameters for the 'DIO' computation
defaultDIOParams :: DIOParams
defaultDIOParams :: DIOParams
defaultDIOParams =
  DIOParams { dioLoggingPriority :: Priority
dioLoggingPriority = Priority
WARNING,
              dioName :: String
dioName = String
"LP",
              dioTimeHorizon :: Maybe Double
dioTimeHorizon = Maybe Double
forall a. Maybe a
Nothing,
              dioUndoableLogSizeThreshold :: Int
dioUndoableLogSizeThreshold = Int
10000000,
              dioOutputMessageQueueSizeThreshold :: Int
dioOutputMessageQueueSizeThreshold = Int
10000,
              dioTransientMessageQueueSizeThreshold :: Int
dioTransientMessageQueueSizeThreshold = Int
5000,
              dioSyncTimeout :: Int
dioSyncTimeout = Int
60000000,
              dioAllowPrematureIO :: Bool
dioAllowPrematureIO = Bool
False,
              dioAllowSkippingOutdatedMessage :: Bool
dioAllowSkippingOutdatedMessage = Bool
True,
              dioProcessMonitoringEnabled :: Bool
dioProcessMonitoringEnabled = Bool
False,
              dioProcessMonitoringDelay :: Int
dioProcessMonitoringDelay = Int
5000000,
              dioProcessReconnectingEnabled :: Bool
dioProcessReconnectingEnabled = Bool
False,
              dioProcessReconnectingDelay :: Int
dioProcessReconnectingDelay = Int
5000000,
              dioKeepAliveInterval :: Int
dioKeepAliveInterval = Int
5000000,
              dioTimeServerAcknowledgementTimeout :: Int
dioTimeServerAcknowledgementTimeout = Int
5000000,
              dioSimulationMonitoringInterval :: Int
dioSimulationMonitoringInterval = Int
30000000,
              dioSimulationMonitoringTimeout :: Int
dioSimulationMonitoringTimeout = Int
100000,
              dioStrategy :: DIOStrategy
dioStrategy = Int -> DIOStrategy
TerminateDueToTimeServerTimeout Int
300000000,
              dioProcessDisconnectingEnabled :: Bool
dioProcessDisconnectingEnabled = Bool
False
            }

-- | The default environment parameters for the 'DIO' computation
defaultDIOEnv :: DIOEnv
defaultDIOEnv :: DIOEnv
defaultDIOEnv =
  DIOEnv { dioSimulationMonitoringAction :: Maybe (LogicalProcessState -> Process ())
dioSimulationMonitoringAction = Maybe (LogicalProcessState -> Process ())
forall a. Maybe a
Nothing }

-- | Return the computation context.
dioContext :: DIO DIOContext
dioContext :: DIO DIOContext
dioContext = (DIOContext -> Process DIOContext) -> DIO DIOContext
forall a. (DIOContext -> Process a) -> DIO a
DIO DIOContext -> Process DIOContext
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return

-- | Return the parameters of the current computation.
dioParams :: DIO DIOParams
dioParams :: DIO DIOParams
dioParams = (DIOContext -> Process DIOParams) -> DIO DIOParams
forall a. (DIOContext -> Process a) -> DIO a
DIO ((DIOContext -> Process DIOParams) -> DIO DIOParams)
-> (DIOContext -> Process DIOParams) -> DIO DIOParams
forall a b. (a -> b) -> a -> b
$ DIOParams -> Process DIOParams
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (DIOParams -> Process DIOParams)
-> (DIOContext -> DIOParams) -> DIOContext -> Process DIOParams
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DIOContext -> DIOParams
dioParams0

-- | Return the chanel of messages.
messageChannel :: DIO (Channel LogicalProcessMessage)
messageChannel :: DIO (Channel LogicalProcessMessage)
messageChannel = (DIOContext -> Process (Channel LogicalProcessMessage))
-> DIO (Channel LogicalProcessMessage)
forall a. (DIOContext -> Process a) -> DIO a
DIO ((DIOContext -> Process (Channel LogicalProcessMessage))
 -> DIO (Channel LogicalProcessMessage))
-> (DIOContext -> Process (Channel LogicalProcessMessage))
-> DIO (Channel LogicalProcessMessage)
forall a b. (a -> b) -> a -> b
$ Channel LogicalProcessMessage
-> Process (Channel LogicalProcessMessage)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Channel LogicalProcessMessage
 -> Process (Channel LogicalProcessMessage))
-> (DIOContext -> Channel LogicalProcessMessage)
-> DIOContext
-> Process (Channel LogicalProcessMessage)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DIOContext -> Channel LogicalProcessMessage
dioChannel

-- | Return the process identifier of the inbox that receives messages.
messageInboxId :: DIO DP.ProcessId
messageInboxId :: DIO ProcessId
messageInboxId = (DIOContext -> Process ProcessId) -> DIO ProcessId
forall a. (DIOContext -> Process a) -> DIO a
DIO ((DIOContext -> Process ProcessId) -> DIO ProcessId)
-> (DIOContext -> Process ProcessId) -> DIO ProcessId
forall a b. (a -> b) -> a -> b
$ ProcessId -> Process ProcessId
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ProcessId -> Process ProcessId)
-> (DIOContext -> ProcessId) -> DIOContext -> Process ProcessId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DIOContext -> ProcessId
dioInboxId

-- | Return the time server process identifier.
timeServerId :: DIO DP.ProcessId
timeServerId :: DIO ProcessId
timeServerId = (DIOContext -> Process ProcessId) -> DIO ProcessId
forall a. (DIOContext -> Process a) -> DIO a
DIO ((DIOContext -> Process ProcessId) -> DIO ProcessId)
-> (DIOContext -> Process ProcessId) -> DIO ProcessId
forall a b. (a -> b) -> a -> b
$ ProcessId -> Process ProcessId
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ProcessId -> Process ProcessId)
-> (DIOContext -> ProcessId) -> DIOContext -> Process ProcessId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DIOContext -> ProcessId
dioTimeServerId

-- | Terminate the simulation including the processes in
-- all nodes connected to the time server.
terminateDIO :: DIO ()
terminateDIO :: DIO ()
terminateDIO =
  (DIOContext -> Process ()) -> DIO ()
forall a. (DIOContext -> Process a) -> DIO a
DIO ((DIOContext -> Process ()) -> DIO ())
-> (DIOContext -> Process ()) -> DIO ()
forall a b. (a -> b) -> a -> b
$ \DIOContext
ctx ->
  do let ps :: DIOParams
ps = DIOContext -> DIOParams
dioParams0 DIOContext
ctx
     DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
INFO String
"Terminating the simulation..."
     ProcessId
sender   <- DIOContext -> DIO ProcessId -> Process ProcessId
forall a. DIOContext -> DIO a -> Process a
invokeDIO DIOContext
ctx DIO ProcessId
messageInboxId
     ProcessId
receiver <- DIOContext -> DIO ProcessId -> Process ProcessId
forall a. DIOContext -> DIO a -> Process a
invokeDIO DIOContext
ctx DIO ProcessId
timeServerId
     let inbox :: ProcessId
inbox = ProcessId
sender
     if DIOParams -> Bool
dioProcessMonitoringEnabled DIOParams
ps
       then ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
inbox (ProcessId -> ProcessId -> InboxProcessMessage
SendTerminateTimeServerMessage ProcessId
receiver ProcessId
sender)
       else ProcessId -> TimeServerMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
receiver (ProcessId -> TimeServerMessage
TerminateTimeServerMessage ProcessId
sender)
     IO (Maybe ()) -> Process (Maybe ())
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe ()) -> Process (Maybe ()))
-> IO (Maybe ()) -> Process (Maybe ())
forall a b. (a -> b) -> a -> b
$
       Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout (DIOParams -> Int
dioTimeServerAcknowledgementTimeout DIOParams
ps) (IO () -> IO (Maybe ())) -> IO () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$
       STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
       do Bool
f <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar (DIOContext -> TVar Bool
dioTimeServerTerminating DIOContext
ctx)
          Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
f STM ()
forall a. STM a
retry
     ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
inbox InboxProcessMessage
TerminateInboxProcessMessage
     () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Register the simulation process in the time server, which
-- requires some initial quorum to start synchronizing the global time.
registerDIO :: DIO ()
registerDIO :: DIO ()
registerDIO =
  (DIOContext -> Process ()) -> DIO ()
forall a. (DIOContext -> Process a) -> DIO a
DIO ((DIOContext -> Process ()) -> DIO ())
-> (DIOContext -> Process ()) -> DIO ()
forall a b. (a -> b) -> a -> b
$ \DIOContext
ctx ->
  do let ps :: DIOParams
ps = DIOContext -> DIOParams
dioParams0 DIOContext
ctx
     DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
INFO String
"Registering the simulation process..."
     ProcessId
sender   <- DIOContext -> DIO ProcessId -> Process ProcessId
forall a. DIOContext -> DIO a -> Process a
invokeDIO DIOContext
ctx DIO ProcessId
messageInboxId
     ProcessId
receiver <- DIOContext -> DIO ProcessId -> Process ProcessId
forall a. DIOContext -> DIO a -> Process a
invokeDIO DIOContext
ctx DIO ProcessId
timeServerId
     let inbox :: ProcessId
inbox = ProcessId
sender
     if DIOParams -> Bool
dioProcessMonitoringEnabled DIOParams
ps
       then ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
inbox (ProcessId -> ProcessId -> InboxProcessMessage
SendRegisterLogicalProcessMessage ProcessId
receiver ProcessId
sender)
       else ProcessId -> TimeServerMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
receiver (ProcessId -> TimeServerMessage
RegisterLogicalProcessMessage ProcessId
sender)
     IO (Maybe ()) -> Process (Maybe ())
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe ()) -> Process (Maybe ()))
-> IO (Maybe ()) -> Process (Maybe ())
forall a b. (a -> b) -> a -> b
$
       Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout (DIOParams -> Int
dioTimeServerAcknowledgementTimeout DIOParams
ps) (IO () -> IO (Maybe ())) -> IO () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$
       STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
       do Bool
f <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar (DIOContext -> TVar Bool
dioRegisteredInTimeServer DIOContext
ctx)
          Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
f STM ()
forall a. STM a
retry
     () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Unregister the simulation process from the time server
-- without affecting the processes in other nodes connected to
-- the corresponding time server.
unregisterDIO :: DIO ()
unregisterDIO :: DIO ()
unregisterDIO =
  (DIOContext -> Process ()) -> DIO ()
forall a. (DIOContext -> Process a) -> DIO a
DIO ((DIOContext -> Process ()) -> DIO ())
-> (DIOContext -> Process ()) -> DIO ()
forall a b. (a -> b) -> a -> b
$ \DIOContext
ctx ->
  do let ps :: DIOParams
ps = DIOContext -> DIOParams
dioParams0 DIOContext
ctx
     DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
INFO String
"Unregistering the simulation process..."
     ProcessId
sender   <- DIOContext -> DIO ProcessId -> Process ProcessId
forall a. DIOContext -> DIO a -> Process a
invokeDIO DIOContext
ctx DIO ProcessId
messageInboxId
     ProcessId
receiver <- DIOContext -> DIO ProcessId -> Process ProcessId
forall a. DIOContext -> DIO a -> Process a
invokeDIO DIOContext
ctx DIO ProcessId
timeServerId
     let inbox :: ProcessId
inbox = ProcessId
sender
     if DIOParams -> Bool
dioProcessMonitoringEnabled DIOParams
ps
       then ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
inbox (ProcessId -> ProcessId -> InboxProcessMessage
SendUnregisterLogicalProcessMessage ProcessId
receiver ProcessId
sender)
       else ProcessId -> TimeServerMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
receiver (ProcessId -> TimeServerMessage
UnregisterLogicalProcessMessage ProcessId
sender)
     IO (Maybe ()) -> Process (Maybe ())
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe ()) -> Process (Maybe ()))
-> IO (Maybe ()) -> Process (Maybe ())
forall a b. (a -> b) -> a -> b
$
       Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout (DIOParams -> Int
dioTimeServerAcknowledgementTimeout DIOParams
ps) (IO () -> IO (Maybe ())) -> IO () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$
       STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
       do Bool
f <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar (DIOContext -> TVar Bool
dioUnregisteredFromTimeServer DIOContext
ctx)
          Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
f STM ()
forall a. STM a
retry
     ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
inbox InboxProcessMessage
TerminateInboxProcessMessage
     () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | The internal logical process message.
data InternalLogicalProcessMessage = InternalLogicalProcessMessage LogicalProcessMessage
                                     -- ^ the logical process message
                                   | InternalProcessMonitorNotification DP.ProcessMonitorNotification
                                     -- ^ the process monitor notification
                                   | InternalInboxProcessMessage InboxProcessMessage
                                     -- ^ the inbox process message
                                   | InternalGeneralMessage GeneralMessage
                                     -- ^ the general message

-- | Handle the specified exception.
handleException :: DIOParams -> SomeException -> DP.Process ()
handleException :: DIOParams -> SomeException -> Process ()
handleException DIOParams
ps SomeException
e =
  do ---
     DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
ERROR (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Exception occurred: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ SomeException -> String
forall a. Show a => a -> String
show SomeException
e
     ---
     SomeException -> Process ()
forall e a. (HasCallStack, Exception e) => e -> Process a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
C.throwM SomeException
e

-- | Run the computation using the specified parameters along with time server process
-- identifier and return the inbox process identifier and a new simulation process.
runDIO :: DIO a -> DIOParams -> DP.ProcessId -> DP.Process (DP.ProcessId, DP.Process a)
runDIO :: forall a.
DIO a -> DIOParams -> ProcessId -> Process (ProcessId, Process a)
runDIO DIO a
m DIOParams
ps ProcessId
serverId = DIO a
-> DIOParams
-> DIOEnv
-> ProcessId
-> Process (ProcessId, Process a)
forall a.
DIO a
-> DIOParams
-> DIOEnv
-> ProcessId
-> Process (ProcessId, Process a)
runDIOWithEnv DIO a
m DIOParams
ps DIOEnv
defaultDIOEnv ProcessId
serverId

-- | A full version of 'runDIO' that also allows specifying the environment parameters.
runDIOWithEnv :: DIO a -> DIOParams -> DIOEnv -> DP.ProcessId -> DP.Process (DP.ProcessId, DP.Process a)
runDIOWithEnv :: forall a.
DIO a
-> DIOParams
-> DIOEnv
-> ProcessId
-> Process (ProcessId, Process a)
runDIOWithEnv DIO a
m DIOParams
ps DIOEnv
env ProcessId
serverId =
  do Channel LogicalProcessMessage
ch <- IO (Channel LogicalProcessMessage)
-> Process (Channel LogicalProcessMessage)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Channel LogicalProcessMessage)
forall a. IO (Channel a)
newChannel
     let connParams :: ConnectionParams
connParams =
           ConnectionParams { connLoggingPriority :: Priority
connLoggingPriority = DIOParams -> Priority
dioLoggingPriority DIOParams
ps,
                              connKeepAliveInterval :: Int
connKeepAliveInterval = DIOParams -> Int
dioKeepAliveInterval DIOParams
ps,
                              connReconnectingDelay :: Int
connReconnectingDelay = DIOParams -> Int
dioProcessReconnectingDelay DIOParams
ps,
                              connMonitoringDelay :: Int
connMonitoringDelay = DIOParams -> Int
dioProcessMonitoringDelay DIOParams
ps }
     ConnectionManager
connManager <- IO ConnectionManager -> Process ConnectionManager
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ConnectionManager -> Process ConnectionManager)
-> IO ConnectionManager -> Process ConnectionManager
forall a b. (a -> b) -> a -> b
$ ConnectionParams -> IO ConnectionManager
newConnectionManager ConnectionParams
connParams
     IORef Bool
terminated <- IO (IORef Bool) -> Process (IORef Bool)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef Bool) -> Process (IORef Bool))
-> IO (IORef Bool) -> Process (IORef Bool)
forall a b. (a -> b) -> a -> b
$ Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
     TVar Bool
registeredInTimeServer <- IO (TVar Bool) -> Process (TVar Bool)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TVar Bool) -> Process (TVar Bool))
-> IO (TVar Bool) -> Process (TVar Bool)
forall a b. (a -> b) -> a -> b
$ Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
False
     TVar Bool
unregisteredFromTimeServer <- IO (TVar Bool) -> Process (TVar Bool)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TVar Bool) -> Process (TVar Bool))
-> IO (TVar Bool) -> Process (TVar Bool)
forall a b. (a -> b) -> a -> b
$ Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
False
     TVar Bool
timeServerTerminating <- IO (TVar Bool) -> Process (TVar Bool)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TVar Bool) -> Process (TVar Bool))
-> IO (TVar Bool) -> Process (TVar Bool)
forall a b. (a -> b) -> a -> b
$ Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
False
     IORef UTCTime
timeServerTimestamp <- IO (IORef UTCTime) -> Process (IORef UTCTime)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef UTCTime) -> Process (IORef UTCTime))
-> IO (IORef UTCTime) -> Process (IORef UTCTime)
forall a b. (a -> b) -> a -> b
$ IO UTCTime
getCurrentTime IO UTCTime -> (UTCTime -> IO (IORef UTCTime)) -> IO (IORef UTCTime)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= UTCTime -> IO (IORef UTCTime)
forall a. a -> IO (IORef a)
newIORef
     let loop0 :: Process b
loop0 =
           Process () -> Process b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (Process () -> Process b) -> Process () -> Process b
forall a b. (a -> b) -> a -> b
$
           do let f1 :: LogicalProcessMessage -> DP.Process InternalLogicalProcessMessage
                  f1 :: LogicalProcessMessage -> Process InternalLogicalProcessMessage
f1 LogicalProcessMessage
x = InternalLogicalProcessMessage
-> Process InternalLogicalProcessMessage
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (LogicalProcessMessage -> InternalLogicalProcessMessage
InternalLogicalProcessMessage LogicalProcessMessage
x)
                  f2 :: DP.ProcessMonitorNotification -> DP.Process InternalLogicalProcessMessage
                  f2 :: ProcessMonitorNotification -> Process InternalLogicalProcessMessage
f2 ProcessMonitorNotification
x = InternalLogicalProcessMessage
-> Process InternalLogicalProcessMessage
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ProcessMonitorNotification -> InternalLogicalProcessMessage
InternalProcessMonitorNotification ProcessMonitorNotification
x)
                  f3 :: InboxProcessMessage -> DP.Process InternalLogicalProcessMessage
                  f3 :: InboxProcessMessage -> Process InternalLogicalProcessMessage
f3 InboxProcessMessage
x = InternalLogicalProcessMessage
-> Process InternalLogicalProcessMessage
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (InboxProcessMessage -> InternalLogicalProcessMessage
InternalInboxProcessMessage InboxProcessMessage
x)
                  f4 :: GeneralMessage -> DP.Process InternalLogicalProcessMessage
                  f4 :: GeneralMessage -> Process InternalLogicalProcessMessage
f4 GeneralMessage
x = InternalLogicalProcessMessage
-> Process InternalLogicalProcessMessage
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (GeneralMessage -> InternalLogicalProcessMessage
InternalGeneralMessage GeneralMessage
x)
              Maybe InternalLogicalProcessMessage
x <- (InternalLogicalProcessMessage
 -> Maybe InternalLogicalProcessMessage)
-> Process InternalLogicalProcessMessage
-> Process (Maybe InternalLogicalProcessMessage)
forall a b. (a -> b) -> Process a -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap InternalLogicalProcessMessage
-> Maybe InternalLogicalProcessMessage
forall a. a -> Maybe a
Just (Process InternalLogicalProcessMessage
 -> Process (Maybe InternalLogicalProcessMessage))
-> Process InternalLogicalProcessMessage
-> Process (Maybe InternalLogicalProcessMessage)
forall a b. (a -> b) -> a -> b
$ [Match InternalLogicalProcessMessage]
-> Process InternalLogicalProcessMessage
forall b. [Match b] -> Process b
DP.receiveWait [(LogicalProcessMessage -> Process InternalLogicalProcessMessage)
-> Match InternalLogicalProcessMessage
forall a b. Serializable a => (a -> Process b) -> Match b
DP.match LogicalProcessMessage -> Process InternalLogicalProcessMessage
f1, (ProcessMonitorNotification
 -> Process InternalLogicalProcessMessage)
-> Match InternalLogicalProcessMessage
forall a b. Serializable a => (a -> Process b) -> Match b
DP.match ProcessMonitorNotification -> Process InternalLogicalProcessMessage
f2, (InboxProcessMessage -> Process InternalLogicalProcessMessage)
-> Match InternalLogicalProcessMessage
forall a b. Serializable a => (a -> Process b) -> Match b
DP.match InboxProcessMessage -> Process InternalLogicalProcessMessage
f3, (GeneralMessage -> Process InternalLogicalProcessMessage)
-> Match InternalLogicalProcessMessage
forall a b. Serializable a => (a -> Process b) -> Match b
DP.match GeneralMessage -> Process InternalLogicalProcessMessage
f4]
              case Maybe InternalLogicalProcessMessage
x of
                Maybe InternalLogicalProcessMessage
Nothing -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                Just (InternalLogicalProcessMessage LogicalProcessMessage
m) ->
                  do LogicalProcessMessage
-> DIOParams -> ProcessId -> IORef UTCTime -> Process ()
processTimeServerMessage LogicalProcessMessage
m DIOParams
ps ProcessId
serverId IORef UTCTime
timeServerTimestamp
                     IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
                       Channel LogicalProcessMessage -> LogicalProcessMessage -> IO ()
forall a. Channel a -> a -> IO ()
writeChannel Channel LogicalProcessMessage
ch LogicalProcessMessage
m
                Just (InternalProcessMonitorNotification m :: ProcessMonitorNotification
m@(DP.ProcessMonitorNotification MonitorRef
_ ProcessId
_ DiedReason
_)) ->
                  ProcessMonitorNotification
-> DIOParams
-> Channel LogicalProcessMessage
-> ConnectionManager
-> ProcessId
-> Process ()
handleProcessMonitorNotification ProcessMonitorNotification
m DIOParams
ps Channel LogicalProcessMessage
ch ConnectionManager
connManager ProcessId
serverId
                Just (InternalInboxProcessMessage InboxProcessMessage
m) ->
                  case InboxProcessMessage
m of
                    SendQueueMessage ProcessId
pid Message
m ->
                      ProcessId -> LogicalProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.usend ProcessId
pid (Message -> LogicalProcessMessage
QueueMessage Message
m)
                    SendQueueMessageBulk ProcessId
pid [Message]
ms ->
                      [Message] -> (Message -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Message]
ms ((Message -> Process ()) -> Process ())
-> (Message -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ \Message
m ->
                      ProcessId -> LogicalProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.usend ProcessId
pid (Message -> LogicalProcessMessage
QueueMessage Message
m)
                    SendAcknowledgementQueueMessage ProcessId
pid AcknowledgementMessage
m ->
                      ProcessId -> LogicalProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.usend ProcessId
pid (AcknowledgementMessage -> LogicalProcessMessage
AcknowledgementQueueMessage AcknowledgementMessage
m)
                    SendAcknowledgementQueueMessageBulk ProcessId
pid [AcknowledgementMessage]
ms ->
                      [AcknowledgementMessage]
-> (AcknowledgementMessage -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [AcknowledgementMessage]
ms ((AcknowledgementMessage -> Process ()) -> Process ())
-> (AcknowledgementMessage -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ \AcknowledgementMessage
m ->
                      ProcessId -> LogicalProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.usend ProcessId
pid (AcknowledgementMessage -> LogicalProcessMessage
AcknowledgementQueueMessage AcknowledgementMessage
m)
                    SendLocalTimeMessage ProcessId
receiver ProcessId
sender Double
t ->
                      ProcessId -> TimeServerMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.usend ProcessId
receiver (ProcessId -> Double -> TimeServerMessage
LocalTimeMessage ProcessId
sender Double
t)
                    SendRequestGlobalTimeMessage ProcessId
receiver ProcessId
sender ->
                      ProcessId -> TimeServerMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.usend ProcessId
receiver (ProcessId -> TimeServerMessage
RequestGlobalTimeMessage ProcessId
sender)
                    SendRegisterLogicalProcessMessage ProcessId
receiver ProcessId
sender ->
                      ProcessId -> TimeServerMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.usend ProcessId
receiver (ProcessId -> TimeServerMessage
RegisterLogicalProcessMessage ProcessId
sender)
                    SendUnregisterLogicalProcessMessage ProcessId
receiver ProcessId
sender ->
                      ProcessId -> TimeServerMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.usend ProcessId
receiver (ProcessId -> TimeServerMessage
UnregisterLogicalProcessMessage ProcessId
sender)
                    SendTerminateTimeServerMessage ProcessId
receiver ProcessId
sender ->
                      ProcessId -> TimeServerMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.usend ProcessId
receiver (ProcessId -> TimeServerMessage
TerminateTimeServerMessage ProcessId
sender)
                    MonitorProcessMessage ProcessId
pid ->
                      ConnectionManager -> ProcessId -> Process Bool
tryAddMessageReceiver ConnectionManager
connManager ProcessId
pid Process Bool -> Process () -> Process ()
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                    InboxProcessMessage
TrySendProcessKeepAliveMessage ->
                      ConnectionManager -> Process ()
trySendKeepAlive ConnectionManager
connManager
                    RegisterLogicalProcessAcknowledgementMessage ProcessId
pid ->
                      do ---
                         DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
INFO String
"Registered the logical process in the time server"
                         ---
                         IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
                           STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
                           TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
registeredInTimeServer Bool
True
                    UnregisterLogicalProcessAcknowledgementMessage ProcessId
pid ->
                      do ---
                         DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
INFO String
"Unregistered the logical process from the time server"
                         ---
                         IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
                           STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
                           TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
unregisteredFromTimeServer Bool
True
                    TerminateTimeServerAcknowledgementMessage ProcessId
pid ->
                      do ---
                         DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
INFO String
"Started terminating the time server"
                         ---
                         IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
                           STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
                           TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
timeServerTerminating Bool
True
                    InboxProcessMessage
TerminateInboxProcessMessage ->
                      do ---
                         DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
INFO String
"Terminating the inbox and keep-alive processes..."
                         ---
                         IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
                           do IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
atomicWriteIORef IORef Bool
terminated Bool
True
                              Channel LogicalProcessMessage -> LogicalProcessMessage -> IO ()
forall a. Channel a -> a -> IO ()
writeChannel Channel LogicalProcessMessage
ch LogicalProcessMessage
AbortSimulationMessage
                         Process ()
forall a. Process a
DP.terminate
                Just (InternalGeneralMessage GeneralMessage
m) ->
                  GeneralMessage
-> DIOParams
-> Channel LogicalProcessMessage
-> ConnectionManager
-> Process ()
handleGeneralMessage GeneralMessage
m DIOParams
ps Channel LogicalProcessMessage
ch ConnectionManager
connManager
         loop :: Process a
loop =
           Process a -> Process () -> Process a
forall (m :: * -> *) a b.
(HasCallStack, MonadMask m) =>
m a -> m b -> m a
C.finally Process a
forall a. Process a
loop0
           (do IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
                 do IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
atomicWriteIORef IORef Bool
terminated Bool
True
                    Channel LogicalProcessMessage -> LogicalProcessMessage -> IO ()
forall a. Channel a -> a -> IO ()
writeChannel Channel LogicalProcessMessage
ch LogicalProcessMessage
AbortSimulationMessage
               ConnectionManager -> Process ()
clearMessageReceivers ConnectionManager
connManager)
     ProcessId
inboxId <-
       Process () -> Process ProcessId
DP.spawnLocal (Process () -> Process ProcessId)
-> Process () -> Process ProcessId
forall a b. (a -> b) -> a -> b
$
       Process () -> (SomeException -> Process ()) -> Process ()
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
C.catch Process ()
forall a. Process a
loop (DIOParams -> SomeException -> Process ()
handleException DIOParams
ps)
     Process () -> Process ProcessId
DP.spawnLocal (Process () -> Process ProcessId)
-> Process () -> Process ProcessId
forall a b. (a -> b) -> a -> b
$
       let loop :: Process ()
loop =
             do Bool
f <- IO Bool -> Process Bool
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> Process Bool) -> IO Bool -> Process Bool
forall a b. (a -> b) -> a -> b
$ IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
terminated
                Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
f (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
                  do IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
                       Int -> IO ()
threadDelay (DIOParams -> Int
dioKeepAliveInterval DIOParams
ps)
                     ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
inboxId InboxProcessMessage
TrySendProcessKeepAliveMessage
                     Process ()
loop
       in Process () -> (SomeException -> Process ()) -> Process ()
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
C.catch Process ()
loop (DIOParams -> SomeException -> Process ()
handleException DIOParams
ps)
     Process () -> Process ProcessId
DP.spawnLocal (Process () -> Process ProcessId)
-> Process () -> Process ProcessId
forall a b. (a -> b) -> a -> b
$
       let stop :: Process Bool
stop =
             do Bool
f <- IO Bool -> Process Bool
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> Process Bool) -> IO Bool -> Process Bool
forall a b. (a -> b) -> a -> b
$ IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
terminated
                Bool -> Process Bool
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
f Bool -> Bool -> Bool
|| DIOParams -> DIOStrategy
dioStrategy DIOParams
ps DIOStrategy -> DIOStrategy -> Bool
forall a. Eq a => a -> a -> Bool
== DIOStrategy
WaitIndefinitelyForTimeServer)
           loop :: Process ()
loop =
             do Bool
f <- Process Bool
stop
                Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
f (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
                  do IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
                       Int -> IO ()
threadDelay (DIOParams -> Int
dioSyncTimeout DIOParams
ps)
                     Bool
f <- Process Bool
stop
                     Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
f (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
                       do DIOParams -> ProcessId -> IORef UTCTime -> Process ()
validateTimeServer DIOParams
ps ProcessId
inboxId IORef UTCTime
timeServerTimestamp
                          Process ()
loop
       in Process () -> (SomeException -> Process ()) -> Process ()
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
C.catch Process ()
loop (DIOParams -> SomeException -> Process ()
handleException DIOParams
ps)
     case DIOEnv -> Maybe (LogicalProcessState -> Process ())
dioSimulationMonitoringAction DIOEnv
env of
       Maybe (LogicalProcessState -> Process ())
Nothing  -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
       Just LogicalProcessState -> Process ()
act ->
         do ProcessId
monitorId <-
              Process () -> Process ProcessId
DP.spawnLocal (Process () -> Process ProcessId)
-> Process () -> Process ProcessId
forall a b. (a -> b) -> a -> b
$
              let loop :: Process ()
loop =
                    do Bool
f <- IO Bool -> Process Bool
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> Process Bool) -> IO Bool -> Process Bool
forall a b. (a -> b) -> a -> b
$ IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
terminated
                       Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
f (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
                         do Maybe LogicalProcessState
x <- Int -> Process (Maybe LogicalProcessState)
forall a. Serializable a => Int -> Process (Maybe a)
DP.expectTimeout (DIOParams -> Int
dioSimulationMonitoringTimeout DIOParams
ps)
                            case Maybe LogicalProcessState
x of
                              Maybe LogicalProcessState
Nothing -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                              Just LogicalProcessState
st -> LogicalProcessState -> Process ()
act LogicalProcessState
st
                            Process ()
loop
              in Process () -> (SomeException -> Process ()) -> Process ()
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
C.catch Process ()
loop (DIOParams -> SomeException -> Process ()
handleException DIOParams
ps)
            Process () -> Process ProcessId
DP.spawnLocal (Process () -> Process ProcessId)
-> Process () -> Process ProcessId
forall a b. (a -> b) -> a -> b
$
              let loop :: Process ()
loop =
                    do Bool
f <- IO Bool -> Process Bool
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> Process Bool) -> IO Bool -> Process Bool
forall a b. (a -> b) -> a -> b
$ IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
terminated
                       Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
f (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
                         do IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
                              do Int -> IO ()
threadDelay (DIOParams -> Int
dioSimulationMonitoringInterval DIOParams
ps)
                                 Channel LogicalProcessMessage -> LogicalProcessMessage -> IO ()
forall a. Channel a -> a -> IO ()
writeChannel Channel LogicalProcessMessage
ch (ProcessId -> LogicalProcessMessage
ProvideLogicalProcessStateMessage ProcessId
monitorId)
                            Process ()
loop
              in Process () -> (SomeException -> Process ()) -> Process ()
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
C.catch Process ()
loop (DIOParams -> SomeException -> Process ()
handleException DIOParams
ps)
            () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
     let simulation :: Process a
simulation =
           DIO a -> DIOContext -> Process a
forall a. DIO a -> DIOContext -> Process a
unDIO DIO a
m DIOContext { dioChannel :: Channel LogicalProcessMessage
dioChannel = Channel LogicalProcessMessage
ch,
                                dioInboxId :: ProcessId
dioInboxId = ProcessId
inboxId,
                                dioTimeServerId :: ProcessId
dioTimeServerId = ProcessId
serverId,
                                dioParams0 :: DIOParams
dioParams0 = DIOParams
ps,
                                dioRegisteredInTimeServer :: TVar Bool
dioRegisteredInTimeServer = TVar Bool
registeredInTimeServer,
                                dioUnregisteredFromTimeServer :: TVar Bool
dioUnregisteredFromTimeServer = TVar Bool
unregisteredFromTimeServer,
                                dioTimeServerTerminating :: TVar Bool
dioTimeServerTerminating = TVar Bool
timeServerTerminating }
     (ProcessId, Process a) -> Process (ProcessId, Process a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ProcessId
inboxId, Process a
simulation)

-- | Handle the process monitor notification.
handleProcessMonitorNotification :: DP.ProcessMonitorNotification
                                    -> DIOParams
                                    -> Channel LogicalProcessMessage
                                    -> ConnectionManager
                                    -> DP.ProcessId
                                    -> DP.Process ()
handleProcessMonitorNotification :: ProcessMonitorNotification
-> DIOParams
-> Channel LogicalProcessMessage
-> ConnectionManager
-> ProcessId
-> Process ()
handleProcessMonitorNotification m :: ProcessMonitorNotification
m@(DP.ProcessMonitorNotification MonitorRef
_ ProcessId
pid0 DiedReason
reason) DIOParams
ps Channel LogicalProcessMessage
ch ConnectionManager
connManager ProcessId
serverId =
  do let recv :: ProcessMonitorNotification -> Process ProcessMonitorNotification
recv m :: ProcessMonitorNotification
m@(DP.ProcessMonitorNotification MonitorRef
_ ProcessId
_ DiedReason
_) = 
           do ---
              DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
WARNING (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Received a process monitor notification " String -> ShowS
forall a. [a] -> [a] -> [a]
++ ProcessMonitorNotification -> String
forall a. Show a => a -> String
show ProcessMonitorNotification
m
              ---
              IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
                Channel LogicalProcessMessage -> LogicalProcessMessage -> IO ()
forall a. Channel a -> a -> IO ()
writeChannel Channel LogicalProcessMessage
ch (ProcessMonitorNotification -> LogicalProcessMessage
ProcessMonitorNotificationMessage ProcessMonitorNotification
m)
              ProcessMonitorNotification -> Process ProcessMonitorNotification
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ProcessMonitorNotification
m
     ProcessMonitorNotification -> Process ProcessMonitorNotification
recv ProcessMonitorNotification
m
     Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ProcessId
pid0 ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
serverId) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
       case DiedReason
reason of
         DiedReason
DP.DiedNormal      -> DIOParams -> ProcessId -> Process ()
processTimeServerTerminated DIOParams
ps ProcessId
serverId 
         DP.DiedException String
_ -> DIOParams -> ProcessId -> Process ()
processTimeServerTerminated DIOParams
ps ProcessId
serverId
         DiedReason
DP.DiedNodeDown    -> DIOParams -> ProcessId -> Process ()
processTimeServerTerminated DIOParams
ps ProcessId
serverId
         DiedReason
_                  -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
     Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (DIOParams -> Bool
dioProcessReconnectingEnabled DIOParams
ps Bool -> Bool -> Bool
&& (DiedReason
reason DiedReason -> DiedReason -> Bool
forall a. Eq a => a -> a -> Bool
== DiedReason
DP.DiedDisconnect)) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
       do IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
            Int -> IO ()
threadDelay (DIOParams -> Int
dioProcessReconnectingDelay DIOParams
ps)
          let pred :: ProcessMonitorNotification -> Bool
pred m :: ProcessMonitorNotification
m@(DP.ProcessMonitorNotification MonitorRef
_ ProcessId
_ DiedReason
reason) = DiedReason
reason DiedReason -> DiedReason -> Bool
forall a. Eq a => a -> a -> Bool
== DiedReason
DP.DiedDisconnect
              loop :: [DP.ProcessMonitorNotification] -> DP.Process [DP.ProcessMonitorNotification]
              loop :: [ProcessMonitorNotification]
-> Process [ProcessMonitorNotification]
loop [ProcessMonitorNotification]
acc =
                do Maybe ProcessMonitorNotification
y <- Int
-> [Match ProcessMonitorNotification]
-> Process (Maybe ProcessMonitorNotification)
forall b. Int -> [Match b] -> Process (Maybe b)
DP.receiveTimeout Int
0 [(ProcessMonitorNotification -> Bool)
-> (ProcessMonitorNotification
    -> Process ProcessMonitorNotification)
-> Match ProcessMonitorNotification
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
DP.matchIf ProcessMonitorNotification -> Bool
pred ProcessMonitorNotification -> Process ProcessMonitorNotification
recv]
                   case Maybe ProcessMonitorNotification
y of
                     Maybe ProcessMonitorNotification
Nothing -> [ProcessMonitorNotification]
-> Process [ProcessMonitorNotification]
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ([ProcessMonitorNotification]
 -> Process [ProcessMonitorNotification])
-> [ProcessMonitorNotification]
-> Process [ProcessMonitorNotification]
forall a b. (a -> b) -> a -> b
$ [ProcessMonitorNotification] -> [ProcessMonitorNotification]
forall a. [a] -> [a]
reverse [ProcessMonitorNotification]
acc
                     Just m :: ProcessMonitorNotification
m@(DP.ProcessMonitorNotification MonitorRef
_ ProcessId
_ DiedReason
_) -> [ProcessMonitorNotification]
-> Process [ProcessMonitorNotification]
loop (ProcessMonitorNotification
m ProcessMonitorNotification
-> [ProcessMonitorNotification] -> [ProcessMonitorNotification]
forall a. a -> [a] -> [a]
: [ProcessMonitorNotification]
acc)
          [ProcessMonitorNotification]
ms <- [ProcessMonitorNotification]
-> Process [ProcessMonitorNotification]
loop [ProcessMonitorNotification
m]
          [ProcessId]
pids <- ConnectionManager
-> [ProcessMonitorNotification] -> Process [ProcessId]
filterMessageReceivers ConnectionManager
connManager [ProcessMonitorNotification]
ms
          ConnectionManager -> [ProcessId] -> Process ()
reconnectMessageReceivers ConnectionManager
connManager [ProcessId]
pids
          [ProcessId] -> (ProcessId -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [ProcessId]
pids ((ProcessId -> Process ()) -> Process ())
-> (ProcessId -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ \ProcessId
pid ->
            do ---
               DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
NOTICE (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$
                 String
"Writing to the channel about reconnecting to " String -> ShowS
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid
               ---
               IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
                 Channel LogicalProcessMessage -> LogicalProcessMessage -> IO ()
forall a. Channel a -> a -> IO ()
writeChannel Channel LogicalProcessMessage
ch (ProcessId -> LogicalProcessMessage
ReconnectProcessMessage ProcessId
pid)
     Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (DIOParams -> Bool
dioProcessDisconnectingEnabled DIOParams
ps Bool -> Bool -> Bool
&& Bool -> Bool
not (DIOParams -> Bool
dioProcessReconnectingEnabled DIOParams
ps)) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
       IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
       Channel LogicalProcessMessage -> LogicalProcessMessage -> IO ()
forall a. Channel a -> a -> IO ()
writeChannel Channel LogicalProcessMessage
ch (ProcessId -> LogicalProcessMessage
DisconnectProcessMessage ProcessId
pid0)

-- | Handle the general message.
handleGeneralMessage :: GeneralMessage
                        -> DIOParams
                        -> Channel LogicalProcessMessage
                        -> ConnectionManager
                        -> DP.Process ()
handleGeneralMessage :: GeneralMessage
-> DIOParams
-> Channel LogicalProcessMessage
-> ConnectionManager
-> Process ()
handleGeneralMessage m :: GeneralMessage
m@GeneralMessage
KeepAliveMessage DIOParams
ps Channel LogicalProcessMessage
ch ConnectionManager
connManager =
  do ---
     DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
DEBUG (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$
       String
"Received " String -> ShowS
forall a. [a] -> [a] -> [a]
++ GeneralMessage -> String
forall a. Show a => a -> String
show GeneralMessage
m
     ---
     () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Monitor the specified process.
monitorProcessDIO :: DP.ProcessId -> DIO ()
monitorProcessDIO :: ProcessId -> DIO ()
monitorProcessDIO ProcessId
pid =
  do DIOParams
ps <- DIO DIOParams
dioParams
     if DIOParams -> Bool
dioProcessMonitoringEnabled DIOParams
ps
       then do ProcessId
inbox <- DIO ProcessId
messageInboxId
               Process () -> DIO ()
forall a. Process a -> DIO a
liftDistributedUnsafe (Process () -> DIO ()) -> Process () -> DIO ()
forall a b. (a -> b) -> a -> b
$
                 ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
inbox (InboxProcessMessage -> Process ())
-> InboxProcessMessage -> Process ()
forall a b. (a -> b) -> a -> b
$
                 ProcessId -> InboxProcessMessage
MonitorProcessMessage ProcessId
pid
       else Process () -> DIO ()
forall a. Process a -> DIO a
liftDistributedUnsafe (Process () -> DIO ()) -> Process () -> DIO ()
forall a b. (a -> b) -> a -> b
$
            DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
WARNING String
"Ignored the process monitoring as it was disabled in the DIO computation parameters"

-- | Process the time server message in a stream of messages destined for the logical process.
processTimeServerMessage :: LogicalProcessMessage -> DIOParams -> DP.ProcessId -> IORef UTCTime -> DP.Process ()
processTimeServerMessage :: LogicalProcessMessage
-> DIOParams -> ProcessId -> IORef UTCTime -> Process ()
processTimeServerMessage LogicalProcessMessage
ComputeLocalTimeMessage DIOParams
ps ProcessId
serverId IORef UTCTime
r =
  do IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
       IO UTCTime
getCurrentTime IO UTCTime -> (UTCTime -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IORef UTCTime -> UTCTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef UTCTime
r
     ProcessId
inboxId <- Process ProcessId
DP.getSelfPid
     if DIOParams -> Bool
dioProcessMonitoringEnabled DIOParams
ps
       then ProcessId -> TimeServerMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.usend ProcessId
serverId (ProcessId -> TimeServerMessage
ComputeLocalTimeAcknowledgementMessage ProcessId
inboxId)
       else ProcessId -> TimeServerMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
serverId (ProcessId -> TimeServerMessage
ComputeLocalTimeAcknowledgementMessage ProcessId
inboxId)
processTimeServerMessage (GlobalTimeMessage Double
_) DIOParams
ps ProcessId
serverId IORef UTCTime
r =
  IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
  IO UTCTime
getCurrentTime IO UTCTime -> (UTCTime -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IORef UTCTime -> UTCTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef UTCTime
r
processTimeServerMessage LogicalProcessMessage
_ DIOParams
ps ProcessId
serverId IORef UTCTime
r =
  () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Validate the time server by the specified inbox and recent timestamp.
validateTimeServer :: DIOParams -> DP.ProcessId -> IORef UTCTime -> DP.Process ()
validateTimeServer :: DIOParams -> ProcessId -> IORef UTCTime -> Process ()
validateTimeServer DIOParams
ps ProcessId
inboxId IORef UTCTime
r =
  do ---
     DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
NOTICE String
"Validating the time server"
     ---
     case DIOParams -> DIOStrategy
dioStrategy DIOParams
ps of
       DIOStrategy
WaitIndefinitelyForTimeServer ->
         () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
       TerminateDueToTimeServerTimeout Int
timeout ->
         do UTCTime
utc0 <- IO UTCTime -> Process UTCTime
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO UTCTime -> Process UTCTime) -> IO UTCTime -> Process UTCTime
forall a b. (a -> b) -> a -> b
$ IORef UTCTime -> IO UTCTime
forall a. IORef a -> IO a
readIORef IORef UTCTime
r
            UTCTime
utc  <- IO UTCTime -> Process UTCTime
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
            let dt :: Double
dt = Rational -> Double
forall a. Fractional a => Rational -> a
fromRational (Rational -> Double) -> Rational -> Double
forall a b. (a -> b) -> a -> b
$ NominalDiffTime -> Rational
forall a. Real a => a -> Rational
toRational (UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
utc UTCTime
utc0)
            Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Double -> Int
secondsToMicroseconds Double
dt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
timeout) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
              do ---
                 DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
WARNING String
"Terminating due to the exceeded time server timeout"
                 ---
                 ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
inboxId InboxProcessMessage
TerminateInboxProcessMessage 

-- | Process the situation when the time server has suddenly been terminated.
processTimeServerTerminated :: DIOParams -> DP.ProcessId -> DP.Process ()
processTimeServerTerminated :: DIOParams -> ProcessId -> Process ()
processTimeServerTerminated DIOParams
ps ProcessId
inboxId =
  do ---
     DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
NOTICE String
"Terminating due to sudden termination of the time server"
     ---
     ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
inboxId InboxProcessMessage
TerminateInboxProcessMessage 

-- | Convert seconds to microseconds.
secondsToMicroseconds :: Double -> Int
secondsToMicroseconds :: Double -> Int
secondsToMicroseconds Double
x = Integer -> Int
forall a. Num a => Integer -> a
fromInteger (Integer -> Int) -> Integer -> Int
forall a b. (a -> b) -> a -> b
$ Integer -> Integer
forall a. Integral a => a -> Integer
toInteger (Integer -> Integer) -> Integer -> Integer
forall a b. (a -> b) -> a -> b
$ Double -> Integer
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
1000000 Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
x)

-- | Send the message.
sendMessageDIO :: DP.ProcessId -> Message -> DIO ()
{-# INLINABLE sendMessageDIO #-}
sendMessageDIO :: ProcessId -> Message -> DIO ()
sendMessageDIO ProcessId
pid Message
m =
  do DIOParams
ps <- DIO DIOParams
dioParams
     if DIOParams -> Bool
dioProcessMonitoringEnabled DIOParams
ps
       then do ProcessId
inbox <- DIO ProcessId
messageInboxId
               Process () -> DIO ()
forall a. Process a -> DIO a
liftDistributedUnsafe (Process () -> DIO ()) -> Process () -> DIO ()
forall a b. (a -> b) -> a -> b
$
                 ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
inbox (ProcessId -> Message -> InboxProcessMessage
SendQueueMessage ProcessId
pid Message
m)
       else Process () -> DIO ()
forall a. Process a -> DIO a
liftDistributedUnsafe (Process () -> DIO ()) -> Process () -> DIO ()
forall a b. (a -> b) -> a -> b
$
            ProcessId -> LogicalProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
pid (Message -> LogicalProcessMessage
QueueMessage Message
m)

-- | Send the bulk of messages.
sendMessagesDIO :: DP.ProcessId -> [Message] -> DIO ()
{-# INLINABLE sendMessagesDIO #-}
sendMessagesDIO :: ProcessId -> [Message] -> DIO ()
sendMessagesDIO ProcessId
pid [Message]
ms =
  do DIOParams
ps <- DIO DIOParams
dioParams
     if DIOParams -> Bool
dioProcessMonitoringEnabled DIOParams
ps
       then do ProcessId
inbox <- DIO ProcessId
messageInboxId
               Process () -> DIO ()
forall a. Process a -> DIO a
liftDistributedUnsafe (Process () -> DIO ()) -> Process () -> DIO ()
forall a b. (a -> b) -> a -> b
$
                 ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
inbox (ProcessId -> [Message] -> InboxProcessMessage
SendQueueMessageBulk ProcessId
pid [Message]
ms)
       else do [Message] -> (Message -> DIO ()) -> DIO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Message]
ms ((Message -> DIO ()) -> DIO ()) -> (Message -> DIO ()) -> DIO ()
forall a b. (a -> b) -> a -> b
$ \Message
m ->
                 Process () -> DIO ()
forall a. Process a -> DIO a
liftDistributedUnsafe (Process () -> DIO ()) -> Process () -> DIO ()
forall a b. (a -> b) -> a -> b
$
                 ProcessId -> LogicalProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
pid (Message -> LogicalProcessMessage
QueueMessage Message
m)

-- | Send the acknowledgement message.
sendAcknowledgementMessageDIO :: DP.ProcessId -> AcknowledgementMessage -> DIO ()
{-# INLINABLE sendAcknowledgementMessageDIO #-}
sendAcknowledgementMessageDIO :: ProcessId -> AcknowledgementMessage -> DIO ()
sendAcknowledgementMessageDIO ProcessId
pid AcknowledgementMessage
m =
  do DIOParams
ps <- DIO DIOParams
dioParams
     if DIOParams -> Bool
dioProcessMonitoringEnabled DIOParams
ps
       then do ProcessId
inbox <- DIO ProcessId
messageInboxId
               Process () -> DIO ()
forall a. Process a -> DIO a
liftDistributedUnsafe (Process () -> DIO ()) -> Process () -> DIO ()
forall a b. (a -> b) -> a -> b
$
                 ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
inbox (ProcessId -> AcknowledgementMessage -> InboxProcessMessage
SendAcknowledgementQueueMessage ProcessId
pid AcknowledgementMessage
m)
       else Process () -> DIO ()
forall a. Process a -> DIO a
liftDistributedUnsafe (Process () -> DIO ()) -> Process () -> DIO ()
forall a b. (a -> b) -> a -> b
$
            ProcessId -> LogicalProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
pid (AcknowledgementMessage -> LogicalProcessMessage
AcknowledgementQueueMessage AcknowledgementMessage
m)

-- | Send the bulk of acknowledgement messages.
sendAcknowledgementMessagesDIO :: DP.ProcessId -> [AcknowledgementMessage] -> DIO ()
{-# INLINABLE sendAcknowledgementMessagesDIO #-}
sendAcknowledgementMessagesDIO :: ProcessId -> [AcknowledgementMessage] -> DIO ()
sendAcknowledgementMessagesDIO ProcessId
pid [AcknowledgementMessage]
ms =
  do DIOParams
ps <- DIO DIOParams
dioParams
     if DIOParams -> Bool
dioProcessMonitoringEnabled DIOParams
ps
       then do ProcessId
inbox <- DIO ProcessId
messageInboxId
               Process () -> DIO ()
forall a. Process a -> DIO a
liftDistributedUnsafe (Process () -> DIO ()) -> Process () -> DIO ()
forall a b. (a -> b) -> a -> b
$
                 ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
inbox (ProcessId -> [AcknowledgementMessage] -> InboxProcessMessage
SendAcknowledgementQueueMessageBulk ProcessId
pid [AcknowledgementMessage]
ms)
       else do [AcknowledgementMessage]
-> (AcknowledgementMessage -> DIO ()) -> DIO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [AcknowledgementMessage]
ms ((AcknowledgementMessage -> DIO ()) -> DIO ())
-> (AcknowledgementMessage -> DIO ()) -> DIO ()
forall a b. (a -> b) -> a -> b
$ \AcknowledgementMessage
m ->
                 Process () -> DIO ()
forall a. Process a -> DIO a
liftDistributedUnsafe (Process () -> DIO ()) -> Process () -> DIO ()
forall a b. (a -> b) -> a -> b
$
                 ProcessId -> LogicalProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
pid (AcknowledgementMessage -> LogicalProcessMessage
AcknowledgementQueueMessage AcknowledgementMessage
m)

-- | Send the local time to the time server.
sendLocalTimeDIO :: DP.ProcessId -> DP.ProcessId -> Double -> DIO ()
{-# INLINABLE sendLocalTimeDIO #-}
sendLocalTimeDIO :: ProcessId -> ProcessId -> Double -> DIO ()
sendLocalTimeDIO ProcessId
receiver ProcessId
sender Double
t =
  do DIOParams
ps <- DIO DIOParams
dioParams
     if DIOParams -> Bool
dioProcessMonitoringEnabled DIOParams
ps
       then do ProcessId
inbox <- DIO ProcessId
messageInboxId
               Process () -> DIO ()
forall a. Process a -> DIO a
liftDistributedUnsafe (Process () -> DIO ()) -> Process () -> DIO ()
forall a b. (a -> b) -> a -> b
$
                 ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
inbox (ProcessId -> ProcessId -> Double -> InboxProcessMessage
SendLocalTimeMessage ProcessId
receiver ProcessId
sender Double
t)
       else Process () -> DIO ()
forall a. Process a -> DIO a
liftDistributedUnsafe (Process () -> DIO ()) -> Process () -> DIO ()
forall a b. (a -> b) -> a -> b
$
            ProcessId -> TimeServerMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
receiver (ProcessId -> Double -> TimeServerMessage
LocalTimeMessage ProcessId
sender Double
t)

-- | Send the request for the global virtual time to the time server.
sendRequestGlobalTimeDIO :: DP.ProcessId -> DP.ProcessId -> DIO ()
{-# INLINABLE sendRequestGlobalTimeDIO #-}
sendRequestGlobalTimeDIO :: ProcessId -> ProcessId -> DIO ()
sendRequestGlobalTimeDIO ProcessId
receiver ProcessId
sender =
  do DIOParams
ps <- DIO DIOParams
dioParams
     if DIOParams -> Bool
dioProcessMonitoringEnabled DIOParams
ps
       then do ProcessId
inbox <- DIO ProcessId
messageInboxId
               Process () -> DIO ()
forall a. Process a -> DIO a
liftDistributedUnsafe (Process () -> DIO ()) -> Process () -> DIO ()
forall a b. (a -> b) -> a -> b
$
                 ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
inbox (ProcessId -> ProcessId -> InboxProcessMessage
SendRequestGlobalTimeMessage ProcessId
receiver ProcessId
sender)
       else Process () -> DIO ()
forall a. Process a -> DIO a
liftDistributedUnsafe (Process () -> DIO ()) -> Process () -> DIO ()
forall a b. (a -> b) -> a -> b
$
            ProcessId -> TimeServerMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
receiver (ProcessId -> TimeServerMessage
RequestGlobalTimeMessage ProcessId
sender)

-- | Log the message with the specified priority.
logDIO :: Priority -> String -> DIO ()
{-# INLINE logDIO #-}
logDIO :: Priority -> String -> DIO ()
logDIO Priority
p String
message =
  do DIOParams
ps <- DIO DIOParams
dioParams
     Bool -> DIO () -> DIO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (DIOParams -> Priority
dioLoggingPriority DIOParams
ps Priority -> Priority -> Bool
forall a. Ord a => a -> a -> Bool
<= Priority
p) (DIO () -> DIO ()) -> DIO () -> DIO ()
forall a b. (a -> b) -> a -> b
$
       Process () -> DIO ()
forall a. Process a -> DIO a
liftDistributedUnsafe (Process () -> DIO ()) -> Process () -> DIO ()
forall a b. (a -> b) -> a -> b
$
       String -> Process ()
DP.say (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$
       Priority -> String
embracePriority Priority
p String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" " String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
message

-- | Log the message with the specified priority.
logProcess :: DIOParams -> Priority -> String -> DP.Process ()
{-# INLINE logProcess #-}
logProcess :: DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
p String
message =
  Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (DIOParams -> Priority
dioLoggingPriority DIOParams
ps Priority -> Priority -> Bool
forall a. Ord a => a -> a -> Bool
<= Priority
p) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
  String -> Process ()
DP.say (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$
  Priority -> String
embracePriority Priority
p String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" " String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
message