{-# LANGUAGE DeriveGeneric, DeriveDataTypeable #-}
module Simulation.Aivika.Distributed.Optimistic.Internal.DIO
(DIO(..),
DIOParams(..),
DIOEnv(..),
DIOStrategy(..),
invokeDIO,
runDIO,
runDIOWithEnv,
defaultDIOParams,
defaultDIOEnv,
terminateDIO,
registerDIO,
unregisterDIO,
monitorProcessDIO,
dioParams,
messageChannel,
messageInboxId,
timeServerId,
sendMessageDIO,
sendMessagesDIO,
sendAcknowledgementMessageDIO,
sendAcknowledgementMessagesDIO,
sendLocalTimeDIO,
sendRequestGlobalTimeDIO,
logDIO,
liftDistributedUnsafe) where
import Data.Typeable
import Data.Binary
import Data.IORef
import Data.Time.Clock
import GHC.Generics
import Control.Applicative
import Control.Monad
import Control.Monad.Trans
import Control.Exception (throw)
import Control.Monad.Catch as C
import qualified Control.Distributed.Process as DP
import Control.Concurrent
import Control.Concurrent.STM
import System.Timeout
import Simulation.Aivika.Trans.Exception
import Simulation.Aivika.Trans.Internal.Types
import Simulation.Aivika.Distributed.Optimistic.Internal.Channel
import Simulation.Aivika.Distributed.Optimistic.Internal.Message
import Simulation.Aivika.Distributed.Optimistic.Internal.TimeServer
import Simulation.Aivika.Distributed.Optimistic.Internal.Priority
import Simulation.Aivika.Distributed.Optimistic.Internal.ConnectionManager
import Simulation.Aivika.Distributed.Optimistic.State
data DIOParams =
DIOParams { DIOParams -> Priority
dioLoggingPriority :: Priority,
DIOParams -> String
dioName :: String,
DIOParams -> Maybe Double
dioTimeHorizon :: Maybe Double,
DIOParams -> Int
dioUndoableLogSizeThreshold :: Int,
DIOParams -> Int
dioOutputMessageQueueSizeThreshold :: Int,
DIOParams -> Int
dioTransientMessageQueueSizeThreshold :: Int,
DIOParams -> Int
dioSyncTimeout :: Int,
DIOParams -> Bool
dioAllowPrematureIO :: Bool,
DIOParams -> Bool
dioAllowSkippingOutdatedMessage :: Bool,
DIOParams -> Bool
dioProcessMonitoringEnabled :: Bool,
DIOParams -> Int
dioProcessMonitoringDelay :: Int,
DIOParams -> Bool
dioProcessReconnectingEnabled :: Bool,
DIOParams -> Int
dioProcessReconnectingDelay :: Int,
DIOParams -> Int
dioKeepAliveInterval :: Int,
DIOParams -> Int
dioTimeServerAcknowledgementTimeout :: Int,
DIOParams -> Int
dioSimulationMonitoringInterval :: Int,
DIOParams -> Int
dioSimulationMonitoringTimeout :: Int,
DIOParams -> DIOStrategy
dioStrategy :: DIOStrategy,
DIOParams -> Bool
dioProcessDisconnectingEnabled :: Bool
} deriving (DIOParams -> DIOParams -> Bool
(DIOParams -> DIOParams -> Bool)
-> (DIOParams -> DIOParams -> Bool) -> Eq DIOParams
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: DIOParams -> DIOParams -> Bool
== :: DIOParams -> DIOParams -> Bool
$c/= :: DIOParams -> DIOParams -> Bool
/= :: DIOParams -> DIOParams -> Bool
Eq, Eq DIOParams
Eq DIOParams =>
(DIOParams -> DIOParams -> Ordering)
-> (DIOParams -> DIOParams -> Bool)
-> (DIOParams -> DIOParams -> Bool)
-> (DIOParams -> DIOParams -> Bool)
-> (DIOParams -> DIOParams -> Bool)
-> (DIOParams -> DIOParams -> DIOParams)
-> (DIOParams -> DIOParams -> DIOParams)
-> Ord DIOParams
DIOParams -> DIOParams -> Bool
DIOParams -> DIOParams -> Ordering
DIOParams -> DIOParams -> DIOParams
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: DIOParams -> DIOParams -> Ordering
compare :: DIOParams -> DIOParams -> Ordering
$c< :: DIOParams -> DIOParams -> Bool
< :: DIOParams -> DIOParams -> Bool
$c<= :: DIOParams -> DIOParams -> Bool
<= :: DIOParams -> DIOParams -> Bool
$c> :: DIOParams -> DIOParams -> Bool
> :: DIOParams -> DIOParams -> Bool
$c>= :: DIOParams -> DIOParams -> Bool
>= :: DIOParams -> DIOParams -> Bool
$cmax :: DIOParams -> DIOParams -> DIOParams
max :: DIOParams -> DIOParams -> DIOParams
$cmin :: DIOParams -> DIOParams -> DIOParams
min :: DIOParams -> DIOParams -> DIOParams
Ord, Int -> DIOParams -> ShowS
[DIOParams] -> ShowS
DIOParams -> String
(Int -> DIOParams -> ShowS)
-> (DIOParams -> String)
-> ([DIOParams] -> ShowS)
-> Show DIOParams
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> DIOParams -> ShowS
showsPrec :: Int -> DIOParams -> ShowS
$cshow :: DIOParams -> String
show :: DIOParams -> String
$cshowList :: [DIOParams] -> ShowS
showList :: [DIOParams] -> ShowS
Show, Typeable, (forall x. DIOParams -> Rep DIOParams x)
-> (forall x. Rep DIOParams x -> DIOParams) -> Generic DIOParams
forall x. Rep DIOParams x -> DIOParams
forall x. DIOParams -> Rep DIOParams x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. DIOParams -> Rep DIOParams x
from :: forall x. DIOParams -> Rep DIOParams x
$cto :: forall x. Rep DIOParams x -> DIOParams
to :: forall x. Rep DIOParams x -> DIOParams
Generic)
instance Binary DIOParams
data DIOEnv =
DIOEnv { DIOEnv -> Maybe (LogicalProcessState -> Process ())
dioSimulationMonitoringAction :: Maybe (LogicalProcessState -> DP.Process ())
}
data DIOStrategy = WaitIndefinitelyForTimeServer
| TerminateDueToTimeServerTimeout Int
deriving (DIOStrategy -> DIOStrategy -> Bool
(DIOStrategy -> DIOStrategy -> Bool)
-> (DIOStrategy -> DIOStrategy -> Bool) -> Eq DIOStrategy
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: DIOStrategy -> DIOStrategy -> Bool
== :: DIOStrategy -> DIOStrategy -> Bool
$c/= :: DIOStrategy -> DIOStrategy -> Bool
/= :: DIOStrategy -> DIOStrategy -> Bool
Eq, Eq DIOStrategy
Eq DIOStrategy =>
(DIOStrategy -> DIOStrategy -> Ordering)
-> (DIOStrategy -> DIOStrategy -> Bool)
-> (DIOStrategy -> DIOStrategy -> Bool)
-> (DIOStrategy -> DIOStrategy -> Bool)
-> (DIOStrategy -> DIOStrategy -> Bool)
-> (DIOStrategy -> DIOStrategy -> DIOStrategy)
-> (DIOStrategy -> DIOStrategy -> DIOStrategy)
-> Ord DIOStrategy
DIOStrategy -> DIOStrategy -> Bool
DIOStrategy -> DIOStrategy -> Ordering
DIOStrategy -> DIOStrategy -> DIOStrategy
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: DIOStrategy -> DIOStrategy -> Ordering
compare :: DIOStrategy -> DIOStrategy -> Ordering
$c< :: DIOStrategy -> DIOStrategy -> Bool
< :: DIOStrategy -> DIOStrategy -> Bool
$c<= :: DIOStrategy -> DIOStrategy -> Bool
<= :: DIOStrategy -> DIOStrategy -> Bool
$c> :: DIOStrategy -> DIOStrategy -> Bool
> :: DIOStrategy -> DIOStrategy -> Bool
$c>= :: DIOStrategy -> DIOStrategy -> Bool
>= :: DIOStrategy -> DIOStrategy -> Bool
$cmax :: DIOStrategy -> DIOStrategy -> DIOStrategy
max :: DIOStrategy -> DIOStrategy -> DIOStrategy
$cmin :: DIOStrategy -> DIOStrategy -> DIOStrategy
min :: DIOStrategy -> DIOStrategy -> DIOStrategy
Ord, Int -> DIOStrategy -> ShowS
[DIOStrategy] -> ShowS
DIOStrategy -> String
(Int -> DIOStrategy -> ShowS)
-> (DIOStrategy -> String)
-> ([DIOStrategy] -> ShowS)
-> Show DIOStrategy
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> DIOStrategy -> ShowS
showsPrec :: Int -> DIOStrategy -> ShowS
$cshow :: DIOStrategy -> String
show :: DIOStrategy -> String
$cshowList :: [DIOStrategy] -> ShowS
showList :: [DIOStrategy] -> ShowS
Show, Typeable, (forall x. DIOStrategy -> Rep DIOStrategy x)
-> (forall x. Rep DIOStrategy x -> DIOStrategy)
-> Generic DIOStrategy
forall x. Rep DIOStrategy x -> DIOStrategy
forall x. DIOStrategy -> Rep DIOStrategy x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. DIOStrategy -> Rep DIOStrategy x
from :: forall x. DIOStrategy -> Rep DIOStrategy x
$cto :: forall x. Rep DIOStrategy x -> DIOStrategy
to :: forall x. Rep DIOStrategy x -> DIOStrategy
Generic)
instance Binary DIOStrategy
newtype DIO a = DIO { forall a. DIO a -> DIOContext -> Process a
unDIO :: DIOContext -> DP.Process a
}
data DIOContext =
DIOContext { DIOContext -> Channel LogicalProcessMessage
dioChannel :: Channel LogicalProcessMessage,
DIOContext -> ProcessId
dioInboxId :: DP.ProcessId,
DIOContext -> ProcessId
dioTimeServerId :: DP.ProcessId,
DIOContext -> DIOParams
dioParams0 :: DIOParams,
DIOContext -> TVar Bool
dioRegisteredInTimeServer :: TVar Bool,
DIOContext -> TVar Bool
dioUnregisteredFromTimeServer :: TVar Bool,
DIOContext -> TVar Bool
dioTimeServerTerminating :: TVar Bool
}
instance Monad DIO where
{-# INLINE (>>=) #-}
(DIO DIOContext -> Process a
m) >>= :: forall a b. DIO a -> (a -> DIO b) -> DIO b
>>= a -> DIO b
k = (DIOContext -> Process b) -> DIO b
forall a. (DIOContext -> Process a) -> DIO a
DIO ((DIOContext -> Process b) -> DIO b)
-> (DIOContext -> Process b) -> DIO b
forall a b. (a -> b) -> a -> b
$ \DIOContext
ps ->
DIOContext -> Process a
m DIOContext
ps Process a -> (a -> Process b) -> Process b
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \a
a ->
let m' :: DIOContext -> Process b
m' = DIO b -> DIOContext -> Process b
forall a. DIO a -> DIOContext -> Process a
unDIO (a -> DIO b
k a
a) in DIOContext -> Process b
m' DIOContext
ps
instance Applicative DIO where
{-# INLINE pure #-}
pure :: forall a. a -> DIO a
pure = (DIOContext -> Process a) -> DIO a
forall a. (DIOContext -> Process a) -> DIO a
DIO ((DIOContext -> Process a) -> DIO a)
-> (a -> DIOContext -> Process a) -> a -> DIO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Process a -> DIOContext -> Process a
forall a b. a -> b -> a
const (Process a -> DIOContext -> Process a)
-> (a -> Process a) -> a -> DIOContext -> Process a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Process a
forall a. a -> Process a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
{-# INLINE (<*>) #-}
<*> :: forall a b. DIO (a -> b) -> DIO a -> DIO b
(<*>) = DIO (a -> b) -> DIO a -> DIO b
forall (m :: * -> *) a b. Monad m => m (a -> b) -> m a -> m b
ap
instance Functor DIO where
{-# INLINE fmap #-}
fmap :: forall a b. (a -> b) -> DIO a -> DIO b
fmap a -> b
f (DIO DIOContext -> Process a
m) = (DIOContext -> Process b) -> DIO b
forall a. (DIOContext -> Process a) -> DIO a
DIO ((DIOContext -> Process b) -> DIO b)
-> (DIOContext -> Process b) -> DIO b
forall a b. (a -> b) -> a -> b
$ (a -> b) -> Process a -> Process b
forall a b. (a -> b) -> Process a -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f (Process a -> Process b)
-> (DIOContext -> Process a) -> DIOContext -> Process b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DIOContext -> Process a
m
instance MonadException DIO where
catchComp :: forall e a. Exception e => DIO a -> (e -> DIO a) -> DIO a
catchComp (DIO DIOContext -> Process a
m) e -> DIO a
h = (DIOContext -> Process a) -> DIO a
forall a. (DIOContext -> Process a) -> DIO a
DIO ((DIOContext -> Process a) -> DIO a)
-> (DIOContext -> Process a) -> DIO a
forall a b. (a -> b) -> a -> b
$ \DIOContext
ps ->
Process a -> (e -> Process a) -> Process a
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
C.catch (DIOContext -> Process a
m DIOContext
ps) (\e
e -> DIO a -> DIOContext -> Process a
forall a. DIO a -> DIOContext -> Process a
unDIO (e -> DIO a
h e
e) DIOContext
ps)
finallyComp :: forall a b. DIO a -> DIO b -> DIO a
finallyComp (DIO DIOContext -> Process a
m1) (DIO DIOContext -> Process b
m2) = (DIOContext -> Process a) -> DIO a
forall a. (DIOContext -> Process a) -> DIO a
DIO ((DIOContext -> Process a) -> DIO a)
-> (DIOContext -> Process a) -> DIO a
forall a b. (a -> b) -> a -> b
$ \DIOContext
ps ->
Process a -> Process b -> Process a
forall (m :: * -> *) a b.
(HasCallStack, MonadMask m) =>
m a -> m b -> m a
C.finally (DIOContext -> Process a
m1 DIOContext
ps) (DIOContext -> Process b
m2 DIOContext
ps)
throwComp :: forall e a. Exception e => e -> DIO a
throwComp e
e = (DIOContext -> Process a) -> DIO a
forall a. (DIOContext -> Process a) -> DIO a
DIO ((DIOContext -> Process a) -> DIO a)
-> (DIOContext -> Process a) -> DIO a
forall a b. (a -> b) -> a -> b
$ \DIOContext
ps ->
e -> Process a
forall a e. Exception e => e -> a
throw e
e
invokeDIO :: DIOContext -> DIO a -> DP.Process a
{-# INLINE invokeDIO #-}
invokeDIO :: forall a. DIOContext -> DIO a -> Process a
invokeDIO DIOContext
ps (DIO DIOContext -> Process a
m) = DIOContext -> Process a
m DIOContext
ps
liftDistributedUnsafe :: DP.Process a -> DIO a
liftDistributedUnsafe :: forall a. Process a -> DIO a
liftDistributedUnsafe = (DIOContext -> Process a) -> DIO a
forall a. (DIOContext -> Process a) -> DIO a
DIO ((DIOContext -> Process a) -> DIO a)
-> (Process a -> DIOContext -> Process a) -> Process a -> DIO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Process a -> DIOContext -> Process a
forall a b. a -> b -> a
const
defaultDIOParams :: DIOParams
defaultDIOParams :: DIOParams
defaultDIOParams =
DIOParams { dioLoggingPriority :: Priority
dioLoggingPriority = Priority
WARNING,
dioName :: String
dioName = String
"LP",
dioTimeHorizon :: Maybe Double
dioTimeHorizon = Maybe Double
forall a. Maybe a
Nothing,
dioUndoableLogSizeThreshold :: Int
dioUndoableLogSizeThreshold = Int
10000000,
dioOutputMessageQueueSizeThreshold :: Int
dioOutputMessageQueueSizeThreshold = Int
10000,
dioTransientMessageQueueSizeThreshold :: Int
dioTransientMessageQueueSizeThreshold = Int
5000,
dioSyncTimeout :: Int
dioSyncTimeout = Int
60000000,
dioAllowPrematureIO :: Bool
dioAllowPrematureIO = Bool
False,
dioAllowSkippingOutdatedMessage :: Bool
dioAllowSkippingOutdatedMessage = Bool
True,
dioProcessMonitoringEnabled :: Bool
dioProcessMonitoringEnabled = Bool
False,
dioProcessMonitoringDelay :: Int
dioProcessMonitoringDelay = Int
5000000,
dioProcessReconnectingEnabled :: Bool
dioProcessReconnectingEnabled = Bool
False,
dioProcessReconnectingDelay :: Int
dioProcessReconnectingDelay = Int
5000000,
dioKeepAliveInterval :: Int
dioKeepAliveInterval = Int
5000000,
dioTimeServerAcknowledgementTimeout :: Int
dioTimeServerAcknowledgementTimeout = Int
5000000,
dioSimulationMonitoringInterval :: Int
dioSimulationMonitoringInterval = Int
30000000,
dioSimulationMonitoringTimeout :: Int
dioSimulationMonitoringTimeout = Int
100000,
dioStrategy :: DIOStrategy
dioStrategy = Int -> DIOStrategy
TerminateDueToTimeServerTimeout Int
300000000,
dioProcessDisconnectingEnabled :: Bool
dioProcessDisconnectingEnabled = Bool
False
}
defaultDIOEnv :: DIOEnv
defaultDIOEnv :: DIOEnv
defaultDIOEnv =
DIOEnv { dioSimulationMonitoringAction :: Maybe (LogicalProcessState -> Process ())
dioSimulationMonitoringAction = Maybe (LogicalProcessState -> Process ())
forall a. Maybe a
Nothing }
dioContext :: DIO DIOContext
dioContext :: DIO DIOContext
dioContext = (DIOContext -> Process DIOContext) -> DIO DIOContext
forall a. (DIOContext -> Process a) -> DIO a
DIO DIOContext -> Process DIOContext
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return
dioParams :: DIO DIOParams
dioParams :: DIO DIOParams
dioParams = (DIOContext -> Process DIOParams) -> DIO DIOParams
forall a. (DIOContext -> Process a) -> DIO a
DIO ((DIOContext -> Process DIOParams) -> DIO DIOParams)
-> (DIOContext -> Process DIOParams) -> DIO DIOParams
forall a b. (a -> b) -> a -> b
$ DIOParams -> Process DIOParams
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (DIOParams -> Process DIOParams)
-> (DIOContext -> DIOParams) -> DIOContext -> Process DIOParams
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DIOContext -> DIOParams
dioParams0
messageChannel :: DIO (Channel LogicalProcessMessage)
messageChannel :: DIO (Channel LogicalProcessMessage)
messageChannel = (DIOContext -> Process (Channel LogicalProcessMessage))
-> DIO (Channel LogicalProcessMessage)
forall a. (DIOContext -> Process a) -> DIO a
DIO ((DIOContext -> Process (Channel LogicalProcessMessage))
-> DIO (Channel LogicalProcessMessage))
-> (DIOContext -> Process (Channel LogicalProcessMessage))
-> DIO (Channel LogicalProcessMessage)
forall a b. (a -> b) -> a -> b
$ Channel LogicalProcessMessage
-> Process (Channel LogicalProcessMessage)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Channel LogicalProcessMessage
-> Process (Channel LogicalProcessMessage))
-> (DIOContext -> Channel LogicalProcessMessage)
-> DIOContext
-> Process (Channel LogicalProcessMessage)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DIOContext -> Channel LogicalProcessMessage
dioChannel
messageInboxId :: DIO DP.ProcessId
messageInboxId :: DIO ProcessId
messageInboxId = (DIOContext -> Process ProcessId) -> DIO ProcessId
forall a. (DIOContext -> Process a) -> DIO a
DIO ((DIOContext -> Process ProcessId) -> DIO ProcessId)
-> (DIOContext -> Process ProcessId) -> DIO ProcessId
forall a b. (a -> b) -> a -> b
$ ProcessId -> Process ProcessId
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ProcessId -> Process ProcessId)
-> (DIOContext -> ProcessId) -> DIOContext -> Process ProcessId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DIOContext -> ProcessId
dioInboxId
timeServerId :: DIO DP.ProcessId
timeServerId :: DIO ProcessId
timeServerId = (DIOContext -> Process ProcessId) -> DIO ProcessId
forall a. (DIOContext -> Process a) -> DIO a
DIO ((DIOContext -> Process ProcessId) -> DIO ProcessId)
-> (DIOContext -> Process ProcessId) -> DIO ProcessId
forall a b. (a -> b) -> a -> b
$ ProcessId -> Process ProcessId
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ProcessId -> Process ProcessId)
-> (DIOContext -> ProcessId) -> DIOContext -> Process ProcessId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DIOContext -> ProcessId
dioTimeServerId
terminateDIO :: DIO ()
terminateDIO :: DIO ()
terminateDIO =
(DIOContext -> Process ()) -> DIO ()
forall a. (DIOContext -> Process a) -> DIO a
DIO ((DIOContext -> Process ()) -> DIO ())
-> (DIOContext -> Process ()) -> DIO ()
forall a b. (a -> b) -> a -> b
$ \DIOContext
ctx ->
do let ps :: DIOParams
ps = DIOContext -> DIOParams
dioParams0 DIOContext
ctx
DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
INFO String
"Terminating the simulation..."
ProcessId
sender <- DIOContext -> DIO ProcessId -> Process ProcessId
forall a. DIOContext -> DIO a -> Process a
invokeDIO DIOContext
ctx DIO ProcessId
messageInboxId
ProcessId
receiver <- DIOContext -> DIO ProcessId -> Process ProcessId
forall a. DIOContext -> DIO a -> Process a
invokeDIO DIOContext
ctx DIO ProcessId
timeServerId
let inbox :: ProcessId
inbox = ProcessId
sender
if DIOParams -> Bool
dioProcessMonitoringEnabled DIOParams
ps
then ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
inbox (ProcessId -> ProcessId -> InboxProcessMessage
SendTerminateTimeServerMessage ProcessId
receiver ProcessId
sender)
else ProcessId -> TimeServerMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
receiver (ProcessId -> TimeServerMessage
TerminateTimeServerMessage ProcessId
sender)
IO (Maybe ()) -> Process (Maybe ())
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe ()) -> Process (Maybe ()))
-> IO (Maybe ()) -> Process (Maybe ())
forall a b. (a -> b) -> a -> b
$
Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout (DIOParams -> Int
dioTimeServerAcknowledgementTimeout DIOParams
ps) (IO () -> IO (Maybe ())) -> IO () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
do Bool
f <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar (DIOContext -> TVar Bool
dioTimeServerTerminating DIOContext
ctx)
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
f STM ()
forall a. STM a
retry
ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
inbox InboxProcessMessage
TerminateInboxProcessMessage
() -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
registerDIO :: DIO ()
registerDIO :: DIO ()
registerDIO =
(DIOContext -> Process ()) -> DIO ()
forall a. (DIOContext -> Process a) -> DIO a
DIO ((DIOContext -> Process ()) -> DIO ())
-> (DIOContext -> Process ()) -> DIO ()
forall a b. (a -> b) -> a -> b
$ \DIOContext
ctx ->
do let ps :: DIOParams
ps = DIOContext -> DIOParams
dioParams0 DIOContext
ctx
DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
INFO String
"Registering the simulation process..."
ProcessId
sender <- DIOContext -> DIO ProcessId -> Process ProcessId
forall a. DIOContext -> DIO a -> Process a
invokeDIO DIOContext
ctx DIO ProcessId
messageInboxId
ProcessId
receiver <- DIOContext -> DIO ProcessId -> Process ProcessId
forall a. DIOContext -> DIO a -> Process a
invokeDIO DIOContext
ctx DIO ProcessId
timeServerId
let inbox :: ProcessId
inbox = ProcessId
sender
if DIOParams -> Bool
dioProcessMonitoringEnabled DIOParams
ps
then ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
inbox (ProcessId -> ProcessId -> InboxProcessMessage
SendRegisterLogicalProcessMessage ProcessId
receiver ProcessId
sender)
else ProcessId -> TimeServerMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
receiver (ProcessId -> TimeServerMessage
RegisterLogicalProcessMessage ProcessId
sender)
IO (Maybe ()) -> Process (Maybe ())
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe ()) -> Process (Maybe ()))
-> IO (Maybe ()) -> Process (Maybe ())
forall a b. (a -> b) -> a -> b
$
Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout (DIOParams -> Int
dioTimeServerAcknowledgementTimeout DIOParams
ps) (IO () -> IO (Maybe ())) -> IO () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
do Bool
f <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar (DIOContext -> TVar Bool
dioRegisteredInTimeServer DIOContext
ctx)
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
f STM ()
forall a. STM a
retry
() -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
unregisterDIO :: DIO ()
unregisterDIO :: DIO ()
unregisterDIO =
(DIOContext -> Process ()) -> DIO ()
forall a. (DIOContext -> Process a) -> DIO a
DIO ((DIOContext -> Process ()) -> DIO ())
-> (DIOContext -> Process ()) -> DIO ()
forall a b. (a -> b) -> a -> b
$ \DIOContext
ctx ->
do let ps :: DIOParams
ps = DIOContext -> DIOParams
dioParams0 DIOContext
ctx
DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
INFO String
"Unregistering the simulation process..."
ProcessId
sender <- DIOContext -> DIO ProcessId -> Process ProcessId
forall a. DIOContext -> DIO a -> Process a
invokeDIO DIOContext
ctx DIO ProcessId
messageInboxId
ProcessId
receiver <- DIOContext -> DIO ProcessId -> Process ProcessId
forall a. DIOContext -> DIO a -> Process a
invokeDIO DIOContext
ctx DIO ProcessId
timeServerId
let inbox :: ProcessId
inbox = ProcessId
sender
if DIOParams -> Bool
dioProcessMonitoringEnabled DIOParams
ps
then ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
inbox (ProcessId -> ProcessId -> InboxProcessMessage
SendUnregisterLogicalProcessMessage ProcessId
receiver ProcessId
sender)
else ProcessId -> TimeServerMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
receiver (ProcessId -> TimeServerMessage
UnregisterLogicalProcessMessage ProcessId
sender)
IO (Maybe ()) -> Process (Maybe ())
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe ()) -> Process (Maybe ()))
-> IO (Maybe ()) -> Process (Maybe ())
forall a b. (a -> b) -> a -> b
$
Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout (DIOParams -> Int
dioTimeServerAcknowledgementTimeout DIOParams
ps) (IO () -> IO (Maybe ())) -> IO () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
do Bool
f <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar (DIOContext -> TVar Bool
dioUnregisteredFromTimeServer DIOContext
ctx)
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
f STM ()
forall a. STM a
retry
ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
inbox InboxProcessMessage
TerminateInboxProcessMessage
() -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
data InternalLogicalProcessMessage = InternalLogicalProcessMessage LogicalProcessMessage
| InternalProcessMonitorNotification DP.ProcessMonitorNotification
| InternalInboxProcessMessage InboxProcessMessage
| InternalGeneralMessage GeneralMessage
handleException :: DIOParams -> SomeException -> DP.Process ()
handleException :: DIOParams -> SomeException -> Process ()
handleException DIOParams
ps SomeException
e =
do
DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
ERROR (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Exception occurred: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ SomeException -> String
forall a. Show a => a -> String
show SomeException
e
SomeException -> Process ()
forall e a. (HasCallStack, Exception e) => e -> Process a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
C.throwM SomeException
e
runDIO :: DIO a -> DIOParams -> DP.ProcessId -> DP.Process (DP.ProcessId, DP.Process a)
runDIO :: forall a.
DIO a -> DIOParams -> ProcessId -> Process (ProcessId, Process a)
runDIO DIO a
m DIOParams
ps ProcessId
serverId = DIO a
-> DIOParams
-> DIOEnv
-> ProcessId
-> Process (ProcessId, Process a)
forall a.
DIO a
-> DIOParams
-> DIOEnv
-> ProcessId
-> Process (ProcessId, Process a)
runDIOWithEnv DIO a
m DIOParams
ps DIOEnv
defaultDIOEnv ProcessId
serverId
runDIOWithEnv :: DIO a -> DIOParams -> DIOEnv -> DP.ProcessId -> DP.Process (DP.ProcessId, DP.Process a)
runDIOWithEnv :: forall a.
DIO a
-> DIOParams
-> DIOEnv
-> ProcessId
-> Process (ProcessId, Process a)
runDIOWithEnv DIO a
m DIOParams
ps DIOEnv
env ProcessId
serverId =
do Channel LogicalProcessMessage
ch <- IO (Channel LogicalProcessMessage)
-> Process (Channel LogicalProcessMessage)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Channel LogicalProcessMessage)
forall a. IO (Channel a)
newChannel
let connParams :: ConnectionParams
connParams =
ConnectionParams { connLoggingPriority :: Priority
connLoggingPriority = DIOParams -> Priority
dioLoggingPriority DIOParams
ps,
connKeepAliveInterval :: Int
connKeepAliveInterval = DIOParams -> Int
dioKeepAliveInterval DIOParams
ps,
connReconnectingDelay :: Int
connReconnectingDelay = DIOParams -> Int
dioProcessReconnectingDelay DIOParams
ps,
connMonitoringDelay :: Int
connMonitoringDelay = DIOParams -> Int
dioProcessMonitoringDelay DIOParams
ps }
ConnectionManager
connManager <- IO ConnectionManager -> Process ConnectionManager
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ConnectionManager -> Process ConnectionManager)
-> IO ConnectionManager -> Process ConnectionManager
forall a b. (a -> b) -> a -> b
$ ConnectionParams -> IO ConnectionManager
newConnectionManager ConnectionParams
connParams
IORef Bool
terminated <- IO (IORef Bool) -> Process (IORef Bool)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef Bool) -> Process (IORef Bool))
-> IO (IORef Bool) -> Process (IORef Bool)
forall a b. (a -> b) -> a -> b
$ Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
TVar Bool
registeredInTimeServer <- IO (TVar Bool) -> Process (TVar Bool)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TVar Bool) -> Process (TVar Bool))
-> IO (TVar Bool) -> Process (TVar Bool)
forall a b. (a -> b) -> a -> b
$ Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
False
TVar Bool
unregisteredFromTimeServer <- IO (TVar Bool) -> Process (TVar Bool)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TVar Bool) -> Process (TVar Bool))
-> IO (TVar Bool) -> Process (TVar Bool)
forall a b. (a -> b) -> a -> b
$ Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
False
TVar Bool
timeServerTerminating <- IO (TVar Bool) -> Process (TVar Bool)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TVar Bool) -> Process (TVar Bool))
-> IO (TVar Bool) -> Process (TVar Bool)
forall a b. (a -> b) -> a -> b
$ Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
False
IORef UTCTime
timeServerTimestamp <- IO (IORef UTCTime) -> Process (IORef UTCTime)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef UTCTime) -> Process (IORef UTCTime))
-> IO (IORef UTCTime) -> Process (IORef UTCTime)
forall a b. (a -> b) -> a -> b
$ IO UTCTime
getCurrentTime IO UTCTime -> (UTCTime -> IO (IORef UTCTime)) -> IO (IORef UTCTime)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= UTCTime -> IO (IORef UTCTime)
forall a. a -> IO (IORef a)
newIORef
let loop0 :: Process b
loop0 =
Process () -> Process b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (Process () -> Process b) -> Process () -> Process b
forall a b. (a -> b) -> a -> b
$
do let f1 :: LogicalProcessMessage -> DP.Process InternalLogicalProcessMessage
f1 :: LogicalProcessMessage -> Process InternalLogicalProcessMessage
f1 LogicalProcessMessage
x = InternalLogicalProcessMessage
-> Process InternalLogicalProcessMessage
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (LogicalProcessMessage -> InternalLogicalProcessMessage
InternalLogicalProcessMessage LogicalProcessMessage
x)
f2 :: DP.ProcessMonitorNotification -> DP.Process InternalLogicalProcessMessage
f2 :: ProcessMonitorNotification -> Process InternalLogicalProcessMessage
f2 ProcessMonitorNotification
x = InternalLogicalProcessMessage
-> Process InternalLogicalProcessMessage
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ProcessMonitorNotification -> InternalLogicalProcessMessage
InternalProcessMonitorNotification ProcessMonitorNotification
x)
f3 :: InboxProcessMessage -> DP.Process InternalLogicalProcessMessage
f3 :: InboxProcessMessage -> Process InternalLogicalProcessMessage
f3 InboxProcessMessage
x = InternalLogicalProcessMessage
-> Process InternalLogicalProcessMessage
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (InboxProcessMessage -> InternalLogicalProcessMessage
InternalInboxProcessMessage InboxProcessMessage
x)
f4 :: GeneralMessage -> DP.Process InternalLogicalProcessMessage
f4 :: GeneralMessage -> Process InternalLogicalProcessMessage
f4 GeneralMessage
x = InternalLogicalProcessMessage
-> Process InternalLogicalProcessMessage
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (GeneralMessage -> InternalLogicalProcessMessage
InternalGeneralMessage GeneralMessage
x)
Maybe InternalLogicalProcessMessage
x <- (InternalLogicalProcessMessage
-> Maybe InternalLogicalProcessMessage)
-> Process InternalLogicalProcessMessage
-> Process (Maybe InternalLogicalProcessMessage)
forall a b. (a -> b) -> Process a -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap InternalLogicalProcessMessage
-> Maybe InternalLogicalProcessMessage
forall a. a -> Maybe a
Just (Process InternalLogicalProcessMessage
-> Process (Maybe InternalLogicalProcessMessage))
-> Process InternalLogicalProcessMessage
-> Process (Maybe InternalLogicalProcessMessage)
forall a b. (a -> b) -> a -> b
$ [Match InternalLogicalProcessMessage]
-> Process InternalLogicalProcessMessage
forall b. [Match b] -> Process b
DP.receiveWait [(LogicalProcessMessage -> Process InternalLogicalProcessMessage)
-> Match InternalLogicalProcessMessage
forall a b. Serializable a => (a -> Process b) -> Match b
DP.match LogicalProcessMessage -> Process InternalLogicalProcessMessage
f1, (ProcessMonitorNotification
-> Process InternalLogicalProcessMessage)
-> Match InternalLogicalProcessMessage
forall a b. Serializable a => (a -> Process b) -> Match b
DP.match ProcessMonitorNotification -> Process InternalLogicalProcessMessage
f2, (InboxProcessMessage -> Process InternalLogicalProcessMessage)
-> Match InternalLogicalProcessMessage
forall a b. Serializable a => (a -> Process b) -> Match b
DP.match InboxProcessMessage -> Process InternalLogicalProcessMessage
f3, (GeneralMessage -> Process InternalLogicalProcessMessage)
-> Match InternalLogicalProcessMessage
forall a b. Serializable a => (a -> Process b) -> Match b
DP.match GeneralMessage -> Process InternalLogicalProcessMessage
f4]
case Maybe InternalLogicalProcessMessage
x of
Maybe InternalLogicalProcessMessage
Nothing -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just (InternalLogicalProcessMessage LogicalProcessMessage
m) ->
do LogicalProcessMessage
-> DIOParams -> ProcessId -> IORef UTCTime -> Process ()
processTimeServerMessage LogicalProcessMessage
m DIOParams
ps ProcessId
serverId IORef UTCTime
timeServerTimestamp
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
Channel LogicalProcessMessage -> LogicalProcessMessage -> IO ()
forall a. Channel a -> a -> IO ()
writeChannel Channel LogicalProcessMessage
ch LogicalProcessMessage
m
Just (InternalProcessMonitorNotification m :: ProcessMonitorNotification
m@(DP.ProcessMonitorNotification MonitorRef
_ ProcessId
_ DiedReason
_)) ->
ProcessMonitorNotification
-> DIOParams
-> Channel LogicalProcessMessage
-> ConnectionManager
-> ProcessId
-> Process ()
handleProcessMonitorNotification ProcessMonitorNotification
m DIOParams
ps Channel LogicalProcessMessage
ch ConnectionManager
connManager ProcessId
serverId
Just (InternalInboxProcessMessage InboxProcessMessage
m) ->
case InboxProcessMessage
m of
SendQueueMessage ProcessId
pid Message
m ->
ProcessId -> LogicalProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.usend ProcessId
pid (Message -> LogicalProcessMessage
QueueMessage Message
m)
SendQueueMessageBulk ProcessId
pid [Message]
ms ->
[Message] -> (Message -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Message]
ms ((Message -> Process ()) -> Process ())
-> (Message -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ \Message
m ->
ProcessId -> LogicalProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.usend ProcessId
pid (Message -> LogicalProcessMessage
QueueMessage Message
m)
SendAcknowledgementQueueMessage ProcessId
pid AcknowledgementMessage
m ->
ProcessId -> LogicalProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.usend ProcessId
pid (AcknowledgementMessage -> LogicalProcessMessage
AcknowledgementQueueMessage AcknowledgementMessage
m)
SendAcknowledgementQueueMessageBulk ProcessId
pid [AcknowledgementMessage]
ms ->
[AcknowledgementMessage]
-> (AcknowledgementMessage -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [AcknowledgementMessage]
ms ((AcknowledgementMessage -> Process ()) -> Process ())
-> (AcknowledgementMessage -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ \AcknowledgementMessage
m ->
ProcessId -> LogicalProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.usend ProcessId
pid (AcknowledgementMessage -> LogicalProcessMessage
AcknowledgementQueueMessage AcknowledgementMessage
m)
SendLocalTimeMessage ProcessId
receiver ProcessId
sender Double
t ->
ProcessId -> TimeServerMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.usend ProcessId
receiver (ProcessId -> Double -> TimeServerMessage
LocalTimeMessage ProcessId
sender Double
t)
SendRequestGlobalTimeMessage ProcessId
receiver ProcessId
sender ->
ProcessId -> TimeServerMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.usend ProcessId
receiver (ProcessId -> TimeServerMessage
RequestGlobalTimeMessage ProcessId
sender)
SendRegisterLogicalProcessMessage ProcessId
receiver ProcessId
sender ->
ProcessId -> TimeServerMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.usend ProcessId
receiver (ProcessId -> TimeServerMessage
RegisterLogicalProcessMessage ProcessId
sender)
SendUnregisterLogicalProcessMessage ProcessId
receiver ProcessId
sender ->
ProcessId -> TimeServerMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.usend ProcessId
receiver (ProcessId -> TimeServerMessage
UnregisterLogicalProcessMessage ProcessId
sender)
SendTerminateTimeServerMessage ProcessId
receiver ProcessId
sender ->
ProcessId -> TimeServerMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.usend ProcessId
receiver (ProcessId -> TimeServerMessage
TerminateTimeServerMessage ProcessId
sender)
MonitorProcessMessage ProcessId
pid ->
ConnectionManager -> ProcessId -> Process Bool
tryAddMessageReceiver ConnectionManager
connManager ProcessId
pid Process Bool -> Process () -> Process ()
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
InboxProcessMessage
TrySendProcessKeepAliveMessage ->
ConnectionManager -> Process ()
trySendKeepAlive ConnectionManager
connManager
RegisterLogicalProcessAcknowledgementMessage ProcessId
pid ->
do
DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
INFO String
"Registered the logical process in the time server"
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
registeredInTimeServer Bool
True
UnregisterLogicalProcessAcknowledgementMessage ProcessId
pid ->
do
DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
INFO String
"Unregistered the logical process from the time server"
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
unregisteredFromTimeServer Bool
True
TerminateTimeServerAcknowledgementMessage ProcessId
pid ->
do
DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
INFO String
"Started terminating the time server"
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
timeServerTerminating Bool
True
InboxProcessMessage
TerminateInboxProcessMessage ->
do
DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
INFO String
"Terminating the inbox and keep-alive processes..."
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
do IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
atomicWriteIORef IORef Bool
terminated Bool
True
Channel LogicalProcessMessage -> LogicalProcessMessage -> IO ()
forall a. Channel a -> a -> IO ()
writeChannel Channel LogicalProcessMessage
ch LogicalProcessMessage
AbortSimulationMessage
Process ()
forall a. Process a
DP.terminate
Just (InternalGeneralMessage GeneralMessage
m) ->
GeneralMessage
-> DIOParams
-> Channel LogicalProcessMessage
-> ConnectionManager
-> Process ()
handleGeneralMessage GeneralMessage
m DIOParams
ps Channel LogicalProcessMessage
ch ConnectionManager
connManager
loop :: Process a
loop =
Process a -> Process () -> Process a
forall (m :: * -> *) a b.
(HasCallStack, MonadMask m) =>
m a -> m b -> m a
C.finally Process a
forall a. Process a
loop0
(do IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
do IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
atomicWriteIORef IORef Bool
terminated Bool
True
Channel LogicalProcessMessage -> LogicalProcessMessage -> IO ()
forall a. Channel a -> a -> IO ()
writeChannel Channel LogicalProcessMessage
ch LogicalProcessMessage
AbortSimulationMessage
ConnectionManager -> Process ()
clearMessageReceivers ConnectionManager
connManager)
ProcessId
inboxId <-
Process () -> Process ProcessId
DP.spawnLocal (Process () -> Process ProcessId)
-> Process () -> Process ProcessId
forall a b. (a -> b) -> a -> b
$
Process () -> (SomeException -> Process ()) -> Process ()
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
C.catch Process ()
forall a. Process a
loop (DIOParams -> SomeException -> Process ()
handleException DIOParams
ps)
Process () -> Process ProcessId
DP.spawnLocal (Process () -> Process ProcessId)
-> Process () -> Process ProcessId
forall a b. (a -> b) -> a -> b
$
let loop :: Process ()
loop =
do Bool
f <- IO Bool -> Process Bool
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> Process Bool) -> IO Bool -> Process Bool
forall a b. (a -> b) -> a -> b
$ IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
terminated
Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
f (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
do IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
Int -> IO ()
threadDelay (DIOParams -> Int
dioKeepAliveInterval DIOParams
ps)
ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
inboxId InboxProcessMessage
TrySendProcessKeepAliveMessage
Process ()
loop
in Process () -> (SomeException -> Process ()) -> Process ()
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
C.catch Process ()
loop (DIOParams -> SomeException -> Process ()
handleException DIOParams
ps)
Process () -> Process ProcessId
DP.spawnLocal (Process () -> Process ProcessId)
-> Process () -> Process ProcessId
forall a b. (a -> b) -> a -> b
$
let stop :: Process Bool
stop =
do Bool
f <- IO Bool -> Process Bool
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> Process Bool) -> IO Bool -> Process Bool
forall a b. (a -> b) -> a -> b
$ IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
terminated
Bool -> Process Bool
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
f Bool -> Bool -> Bool
|| DIOParams -> DIOStrategy
dioStrategy DIOParams
ps DIOStrategy -> DIOStrategy -> Bool
forall a. Eq a => a -> a -> Bool
== DIOStrategy
WaitIndefinitelyForTimeServer)
loop :: Process ()
loop =
do Bool
f <- Process Bool
stop
Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
f (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
do IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
Int -> IO ()
threadDelay (DIOParams -> Int
dioSyncTimeout DIOParams
ps)
Bool
f <- Process Bool
stop
Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
f (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
do DIOParams -> ProcessId -> IORef UTCTime -> Process ()
validateTimeServer DIOParams
ps ProcessId
inboxId IORef UTCTime
timeServerTimestamp
Process ()
loop
in Process () -> (SomeException -> Process ()) -> Process ()
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
C.catch Process ()
loop (DIOParams -> SomeException -> Process ()
handleException DIOParams
ps)
case DIOEnv -> Maybe (LogicalProcessState -> Process ())
dioSimulationMonitoringAction DIOEnv
env of
Maybe (LogicalProcessState -> Process ())
Nothing -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just LogicalProcessState -> Process ()
act ->
do ProcessId
monitorId <-
Process () -> Process ProcessId
DP.spawnLocal (Process () -> Process ProcessId)
-> Process () -> Process ProcessId
forall a b. (a -> b) -> a -> b
$
let loop :: Process ()
loop =
do Bool
f <- IO Bool -> Process Bool
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> Process Bool) -> IO Bool -> Process Bool
forall a b. (a -> b) -> a -> b
$ IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
terminated
Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
f (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
do Maybe LogicalProcessState
x <- Int -> Process (Maybe LogicalProcessState)
forall a. Serializable a => Int -> Process (Maybe a)
DP.expectTimeout (DIOParams -> Int
dioSimulationMonitoringTimeout DIOParams
ps)
case Maybe LogicalProcessState
x of
Maybe LogicalProcessState
Nothing -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just LogicalProcessState
st -> LogicalProcessState -> Process ()
act LogicalProcessState
st
Process ()
loop
in Process () -> (SomeException -> Process ()) -> Process ()
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
C.catch Process ()
loop (DIOParams -> SomeException -> Process ()
handleException DIOParams
ps)
Process () -> Process ProcessId
DP.spawnLocal (Process () -> Process ProcessId)
-> Process () -> Process ProcessId
forall a b. (a -> b) -> a -> b
$
let loop :: Process ()
loop =
do Bool
f <- IO Bool -> Process Bool
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> Process Bool) -> IO Bool -> Process Bool
forall a b. (a -> b) -> a -> b
$ IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
terminated
Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
f (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
do IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
do Int -> IO ()
threadDelay (DIOParams -> Int
dioSimulationMonitoringInterval DIOParams
ps)
Channel LogicalProcessMessage -> LogicalProcessMessage -> IO ()
forall a. Channel a -> a -> IO ()
writeChannel Channel LogicalProcessMessage
ch (ProcessId -> LogicalProcessMessage
ProvideLogicalProcessStateMessage ProcessId
monitorId)
Process ()
loop
in Process () -> (SomeException -> Process ()) -> Process ()
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
C.catch Process ()
loop (DIOParams -> SomeException -> Process ()
handleException DIOParams
ps)
() -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
let simulation :: Process a
simulation =
DIO a -> DIOContext -> Process a
forall a. DIO a -> DIOContext -> Process a
unDIO DIO a
m DIOContext { dioChannel :: Channel LogicalProcessMessage
dioChannel = Channel LogicalProcessMessage
ch,
dioInboxId :: ProcessId
dioInboxId = ProcessId
inboxId,
dioTimeServerId :: ProcessId
dioTimeServerId = ProcessId
serverId,
dioParams0 :: DIOParams
dioParams0 = DIOParams
ps,
dioRegisteredInTimeServer :: TVar Bool
dioRegisteredInTimeServer = TVar Bool
registeredInTimeServer,
dioUnregisteredFromTimeServer :: TVar Bool
dioUnregisteredFromTimeServer = TVar Bool
unregisteredFromTimeServer,
dioTimeServerTerminating :: TVar Bool
dioTimeServerTerminating = TVar Bool
timeServerTerminating }
(ProcessId, Process a) -> Process (ProcessId, Process a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ProcessId
inboxId, Process a
simulation)
handleProcessMonitorNotification :: DP.ProcessMonitorNotification
-> DIOParams
-> Channel LogicalProcessMessage
-> ConnectionManager
-> DP.ProcessId
-> DP.Process ()
handleProcessMonitorNotification :: ProcessMonitorNotification
-> DIOParams
-> Channel LogicalProcessMessage
-> ConnectionManager
-> ProcessId
-> Process ()
handleProcessMonitorNotification m :: ProcessMonitorNotification
m@(DP.ProcessMonitorNotification MonitorRef
_ ProcessId
pid0 DiedReason
reason) DIOParams
ps Channel LogicalProcessMessage
ch ConnectionManager
connManager ProcessId
serverId =
do let recv :: ProcessMonitorNotification -> Process ProcessMonitorNotification
recv m :: ProcessMonitorNotification
m@(DP.ProcessMonitorNotification MonitorRef
_ ProcessId
_ DiedReason
_) =
do
DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
WARNING (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Received a process monitor notification " String -> ShowS
forall a. [a] -> [a] -> [a]
++ ProcessMonitorNotification -> String
forall a. Show a => a -> String
show ProcessMonitorNotification
m
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
Channel LogicalProcessMessage -> LogicalProcessMessage -> IO ()
forall a. Channel a -> a -> IO ()
writeChannel Channel LogicalProcessMessage
ch (ProcessMonitorNotification -> LogicalProcessMessage
ProcessMonitorNotificationMessage ProcessMonitorNotification
m)
ProcessMonitorNotification -> Process ProcessMonitorNotification
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ProcessMonitorNotification
m
ProcessMonitorNotification -> Process ProcessMonitorNotification
recv ProcessMonitorNotification
m
Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ProcessId
pid0 ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
serverId) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
case DiedReason
reason of
DiedReason
DP.DiedNormal -> DIOParams -> ProcessId -> Process ()
processTimeServerTerminated DIOParams
ps ProcessId
serverId
DP.DiedException String
_ -> DIOParams -> ProcessId -> Process ()
processTimeServerTerminated DIOParams
ps ProcessId
serverId
DiedReason
DP.DiedNodeDown -> DIOParams -> ProcessId -> Process ()
processTimeServerTerminated DIOParams
ps ProcessId
serverId
DiedReason
_ -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (DIOParams -> Bool
dioProcessReconnectingEnabled DIOParams
ps Bool -> Bool -> Bool
&& (DiedReason
reason DiedReason -> DiedReason -> Bool
forall a. Eq a => a -> a -> Bool
== DiedReason
DP.DiedDisconnect)) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
do IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
Int -> IO ()
threadDelay (DIOParams -> Int
dioProcessReconnectingDelay DIOParams
ps)
let pred :: ProcessMonitorNotification -> Bool
pred m :: ProcessMonitorNotification
m@(DP.ProcessMonitorNotification MonitorRef
_ ProcessId
_ DiedReason
reason) = DiedReason
reason DiedReason -> DiedReason -> Bool
forall a. Eq a => a -> a -> Bool
== DiedReason
DP.DiedDisconnect
loop :: [DP.ProcessMonitorNotification] -> DP.Process [DP.ProcessMonitorNotification]
loop :: [ProcessMonitorNotification]
-> Process [ProcessMonitorNotification]
loop [ProcessMonitorNotification]
acc =
do Maybe ProcessMonitorNotification
y <- Int
-> [Match ProcessMonitorNotification]
-> Process (Maybe ProcessMonitorNotification)
forall b. Int -> [Match b] -> Process (Maybe b)
DP.receiveTimeout Int
0 [(ProcessMonitorNotification -> Bool)
-> (ProcessMonitorNotification
-> Process ProcessMonitorNotification)
-> Match ProcessMonitorNotification
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
DP.matchIf ProcessMonitorNotification -> Bool
pred ProcessMonitorNotification -> Process ProcessMonitorNotification
recv]
case Maybe ProcessMonitorNotification
y of
Maybe ProcessMonitorNotification
Nothing -> [ProcessMonitorNotification]
-> Process [ProcessMonitorNotification]
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ([ProcessMonitorNotification]
-> Process [ProcessMonitorNotification])
-> [ProcessMonitorNotification]
-> Process [ProcessMonitorNotification]
forall a b. (a -> b) -> a -> b
$ [ProcessMonitorNotification] -> [ProcessMonitorNotification]
forall a. [a] -> [a]
reverse [ProcessMonitorNotification]
acc
Just m :: ProcessMonitorNotification
m@(DP.ProcessMonitorNotification MonitorRef
_ ProcessId
_ DiedReason
_) -> [ProcessMonitorNotification]
-> Process [ProcessMonitorNotification]
loop (ProcessMonitorNotification
m ProcessMonitorNotification
-> [ProcessMonitorNotification] -> [ProcessMonitorNotification]
forall a. a -> [a] -> [a]
: [ProcessMonitorNotification]
acc)
[ProcessMonitorNotification]
ms <- [ProcessMonitorNotification]
-> Process [ProcessMonitorNotification]
loop [ProcessMonitorNotification
m]
[ProcessId]
pids <- ConnectionManager
-> [ProcessMonitorNotification] -> Process [ProcessId]
filterMessageReceivers ConnectionManager
connManager [ProcessMonitorNotification]
ms
ConnectionManager -> [ProcessId] -> Process ()
reconnectMessageReceivers ConnectionManager
connManager [ProcessId]
pids
[ProcessId] -> (ProcessId -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [ProcessId]
pids ((ProcessId -> Process ()) -> Process ())
-> (ProcessId -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ \ProcessId
pid ->
do
DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
NOTICE (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$
String
"Writing to the channel about reconnecting to " String -> ShowS
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
Channel LogicalProcessMessage -> LogicalProcessMessage -> IO ()
forall a. Channel a -> a -> IO ()
writeChannel Channel LogicalProcessMessage
ch (ProcessId -> LogicalProcessMessage
ReconnectProcessMessage ProcessId
pid)
Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (DIOParams -> Bool
dioProcessDisconnectingEnabled DIOParams
ps Bool -> Bool -> Bool
&& Bool -> Bool
not (DIOParams -> Bool
dioProcessReconnectingEnabled DIOParams
ps)) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
Channel LogicalProcessMessage -> LogicalProcessMessage -> IO ()
forall a. Channel a -> a -> IO ()
writeChannel Channel LogicalProcessMessage
ch (ProcessId -> LogicalProcessMessage
DisconnectProcessMessage ProcessId
pid0)
handleGeneralMessage :: GeneralMessage
-> DIOParams
-> Channel LogicalProcessMessage
-> ConnectionManager
-> DP.Process ()
handleGeneralMessage :: GeneralMessage
-> DIOParams
-> Channel LogicalProcessMessage
-> ConnectionManager
-> Process ()
handleGeneralMessage m :: GeneralMessage
m@GeneralMessage
KeepAliveMessage DIOParams
ps Channel LogicalProcessMessage
ch ConnectionManager
connManager =
do
DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
DEBUG (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$
String
"Received " String -> ShowS
forall a. [a] -> [a] -> [a]
++ GeneralMessage -> String
forall a. Show a => a -> String
show GeneralMessage
m
() -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
monitorProcessDIO :: DP.ProcessId -> DIO ()
monitorProcessDIO :: ProcessId -> DIO ()
monitorProcessDIO ProcessId
pid =
do DIOParams
ps <- DIO DIOParams
dioParams
if DIOParams -> Bool
dioProcessMonitoringEnabled DIOParams
ps
then do ProcessId
inbox <- DIO ProcessId
messageInboxId
Process () -> DIO ()
forall a. Process a -> DIO a
liftDistributedUnsafe (Process () -> DIO ()) -> Process () -> DIO ()
forall a b. (a -> b) -> a -> b
$
ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
inbox (InboxProcessMessage -> Process ())
-> InboxProcessMessage -> Process ()
forall a b. (a -> b) -> a -> b
$
ProcessId -> InboxProcessMessage
MonitorProcessMessage ProcessId
pid
else Process () -> DIO ()
forall a. Process a -> DIO a
liftDistributedUnsafe (Process () -> DIO ()) -> Process () -> DIO ()
forall a b. (a -> b) -> a -> b
$
DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
WARNING String
"Ignored the process monitoring as it was disabled in the DIO computation parameters"
processTimeServerMessage :: LogicalProcessMessage -> DIOParams -> DP.ProcessId -> IORef UTCTime -> DP.Process ()
processTimeServerMessage :: LogicalProcessMessage
-> DIOParams -> ProcessId -> IORef UTCTime -> Process ()
processTimeServerMessage LogicalProcessMessage
ComputeLocalTimeMessage DIOParams
ps ProcessId
serverId IORef UTCTime
r =
do IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
IO UTCTime
getCurrentTime IO UTCTime -> (UTCTime -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IORef UTCTime -> UTCTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef UTCTime
r
ProcessId
inboxId <- Process ProcessId
DP.getSelfPid
if DIOParams -> Bool
dioProcessMonitoringEnabled DIOParams
ps
then ProcessId -> TimeServerMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.usend ProcessId
serverId (ProcessId -> TimeServerMessage
ComputeLocalTimeAcknowledgementMessage ProcessId
inboxId)
else ProcessId -> TimeServerMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
serverId (ProcessId -> TimeServerMessage
ComputeLocalTimeAcknowledgementMessage ProcessId
inboxId)
processTimeServerMessage (GlobalTimeMessage Double
_) DIOParams
ps ProcessId
serverId IORef UTCTime
r =
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
IO UTCTime
getCurrentTime IO UTCTime -> (UTCTime -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IORef UTCTime -> UTCTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef UTCTime
r
processTimeServerMessage LogicalProcessMessage
_ DIOParams
ps ProcessId
serverId IORef UTCTime
r =
() -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
validateTimeServer :: DIOParams -> DP.ProcessId -> IORef UTCTime -> DP.Process ()
validateTimeServer :: DIOParams -> ProcessId -> IORef UTCTime -> Process ()
validateTimeServer DIOParams
ps ProcessId
inboxId IORef UTCTime
r =
do
DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
NOTICE String
"Validating the time server"
case DIOParams -> DIOStrategy
dioStrategy DIOParams
ps of
DIOStrategy
WaitIndefinitelyForTimeServer ->
() -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
TerminateDueToTimeServerTimeout Int
timeout ->
do UTCTime
utc0 <- IO UTCTime -> Process UTCTime
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO UTCTime -> Process UTCTime) -> IO UTCTime -> Process UTCTime
forall a b. (a -> b) -> a -> b
$ IORef UTCTime -> IO UTCTime
forall a. IORef a -> IO a
readIORef IORef UTCTime
r
UTCTime
utc <- IO UTCTime -> Process UTCTime
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
let dt :: Double
dt = Rational -> Double
forall a. Fractional a => Rational -> a
fromRational (Rational -> Double) -> Rational -> Double
forall a b. (a -> b) -> a -> b
$ NominalDiffTime -> Rational
forall a. Real a => a -> Rational
toRational (UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
utc UTCTime
utc0)
Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Double -> Int
secondsToMicroseconds Double
dt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
timeout) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
do
DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
WARNING String
"Terminating due to the exceeded time server timeout"
ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
inboxId InboxProcessMessage
TerminateInboxProcessMessage
processTimeServerTerminated :: DIOParams -> DP.ProcessId -> DP.Process ()
processTimeServerTerminated :: DIOParams -> ProcessId -> Process ()
processTimeServerTerminated DIOParams
ps ProcessId
inboxId =
do
DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
NOTICE String
"Terminating due to sudden termination of the time server"
ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
inboxId InboxProcessMessage
TerminateInboxProcessMessage
secondsToMicroseconds :: Double -> Int
secondsToMicroseconds :: Double -> Int
secondsToMicroseconds Double
x = Integer -> Int
forall a. Num a => Integer -> a
fromInteger (Integer -> Int) -> Integer -> Int
forall a b. (a -> b) -> a -> b
$ Integer -> Integer
forall a. Integral a => a -> Integer
toInteger (Integer -> Integer) -> Integer -> Integer
forall a b. (a -> b) -> a -> b
$ Double -> Integer
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
1000000 Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
x)
sendMessageDIO :: DP.ProcessId -> Message -> DIO ()
{-# INLINABLE sendMessageDIO #-}
sendMessageDIO :: ProcessId -> Message -> DIO ()
sendMessageDIO ProcessId
pid Message
m =
do DIOParams
ps <- DIO DIOParams
dioParams
if DIOParams -> Bool
dioProcessMonitoringEnabled DIOParams
ps
then do ProcessId
inbox <- DIO ProcessId
messageInboxId
Process () -> DIO ()
forall a. Process a -> DIO a
liftDistributedUnsafe (Process () -> DIO ()) -> Process () -> DIO ()
forall a b. (a -> b) -> a -> b
$
ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
inbox (ProcessId -> Message -> InboxProcessMessage
SendQueueMessage ProcessId
pid Message
m)
else Process () -> DIO ()
forall a. Process a -> DIO a
liftDistributedUnsafe (Process () -> DIO ()) -> Process () -> DIO ()
forall a b. (a -> b) -> a -> b
$
ProcessId -> LogicalProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
pid (Message -> LogicalProcessMessage
QueueMessage Message
m)
sendMessagesDIO :: DP.ProcessId -> [Message] -> DIO ()
{-# INLINABLE sendMessagesDIO #-}
sendMessagesDIO :: ProcessId -> [Message] -> DIO ()
sendMessagesDIO ProcessId
pid [Message]
ms =
do DIOParams
ps <- DIO DIOParams
dioParams
if DIOParams -> Bool
dioProcessMonitoringEnabled DIOParams
ps
then do ProcessId
inbox <- DIO ProcessId
messageInboxId
Process () -> DIO ()
forall a. Process a -> DIO a
liftDistributedUnsafe (Process () -> DIO ()) -> Process () -> DIO ()
forall a b. (a -> b) -> a -> b
$
ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
inbox (ProcessId -> [Message] -> InboxProcessMessage
SendQueueMessageBulk ProcessId
pid [Message]
ms)
else do [Message] -> (Message -> DIO ()) -> DIO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Message]
ms ((Message -> DIO ()) -> DIO ()) -> (Message -> DIO ()) -> DIO ()
forall a b. (a -> b) -> a -> b
$ \Message
m ->
Process () -> DIO ()
forall a. Process a -> DIO a
liftDistributedUnsafe (Process () -> DIO ()) -> Process () -> DIO ()
forall a b. (a -> b) -> a -> b
$
ProcessId -> LogicalProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
pid (Message -> LogicalProcessMessage
QueueMessage Message
m)
sendAcknowledgementMessageDIO :: DP.ProcessId -> AcknowledgementMessage -> DIO ()
{-# INLINABLE sendAcknowledgementMessageDIO #-}
sendAcknowledgementMessageDIO :: ProcessId -> AcknowledgementMessage -> DIO ()
sendAcknowledgementMessageDIO ProcessId
pid AcknowledgementMessage
m =
do DIOParams
ps <- DIO DIOParams
dioParams
if DIOParams -> Bool
dioProcessMonitoringEnabled DIOParams
ps
then do ProcessId
inbox <- DIO ProcessId
messageInboxId
Process () -> DIO ()
forall a. Process a -> DIO a
liftDistributedUnsafe (Process () -> DIO ()) -> Process () -> DIO ()
forall a b. (a -> b) -> a -> b
$
ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
inbox (ProcessId -> AcknowledgementMessage -> InboxProcessMessage
SendAcknowledgementQueueMessage ProcessId
pid AcknowledgementMessage
m)
else Process () -> DIO ()
forall a. Process a -> DIO a
liftDistributedUnsafe (Process () -> DIO ()) -> Process () -> DIO ()
forall a b. (a -> b) -> a -> b
$
ProcessId -> LogicalProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
pid (AcknowledgementMessage -> LogicalProcessMessage
AcknowledgementQueueMessage AcknowledgementMessage
m)
sendAcknowledgementMessagesDIO :: DP.ProcessId -> [AcknowledgementMessage] -> DIO ()
{-# INLINABLE sendAcknowledgementMessagesDIO #-}
sendAcknowledgementMessagesDIO :: ProcessId -> [AcknowledgementMessage] -> DIO ()
sendAcknowledgementMessagesDIO ProcessId
pid [AcknowledgementMessage]
ms =
do DIOParams
ps <- DIO DIOParams
dioParams
if DIOParams -> Bool
dioProcessMonitoringEnabled DIOParams
ps
then do ProcessId
inbox <- DIO ProcessId
messageInboxId
Process () -> DIO ()
forall a. Process a -> DIO a
liftDistributedUnsafe (Process () -> DIO ()) -> Process () -> DIO ()
forall a b. (a -> b) -> a -> b
$
ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
inbox (ProcessId -> [AcknowledgementMessage] -> InboxProcessMessage
SendAcknowledgementQueueMessageBulk ProcessId
pid [AcknowledgementMessage]
ms)
else do [AcknowledgementMessage]
-> (AcknowledgementMessage -> DIO ()) -> DIO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [AcknowledgementMessage]
ms ((AcknowledgementMessage -> DIO ()) -> DIO ())
-> (AcknowledgementMessage -> DIO ()) -> DIO ()
forall a b. (a -> b) -> a -> b
$ \AcknowledgementMessage
m ->
Process () -> DIO ()
forall a. Process a -> DIO a
liftDistributedUnsafe (Process () -> DIO ()) -> Process () -> DIO ()
forall a b. (a -> b) -> a -> b
$
ProcessId -> LogicalProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
pid (AcknowledgementMessage -> LogicalProcessMessage
AcknowledgementQueueMessage AcknowledgementMessage
m)
sendLocalTimeDIO :: DP.ProcessId -> DP.ProcessId -> Double -> DIO ()
{-# INLINABLE sendLocalTimeDIO #-}
sendLocalTimeDIO :: ProcessId -> ProcessId -> Double -> DIO ()
sendLocalTimeDIO ProcessId
receiver ProcessId
sender Double
t =
do DIOParams
ps <- DIO DIOParams
dioParams
if DIOParams -> Bool
dioProcessMonitoringEnabled DIOParams
ps
then do ProcessId
inbox <- DIO ProcessId
messageInboxId
Process () -> DIO ()
forall a. Process a -> DIO a
liftDistributedUnsafe (Process () -> DIO ()) -> Process () -> DIO ()
forall a b. (a -> b) -> a -> b
$
ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
inbox (ProcessId -> ProcessId -> Double -> InboxProcessMessage
SendLocalTimeMessage ProcessId
receiver ProcessId
sender Double
t)
else Process () -> DIO ()
forall a. Process a -> DIO a
liftDistributedUnsafe (Process () -> DIO ()) -> Process () -> DIO ()
forall a b. (a -> b) -> a -> b
$
ProcessId -> TimeServerMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
receiver (ProcessId -> Double -> TimeServerMessage
LocalTimeMessage ProcessId
sender Double
t)
sendRequestGlobalTimeDIO :: DP.ProcessId -> DP.ProcessId -> DIO ()
{-# INLINABLE sendRequestGlobalTimeDIO #-}
sendRequestGlobalTimeDIO :: ProcessId -> ProcessId -> DIO ()
sendRequestGlobalTimeDIO ProcessId
receiver ProcessId
sender =
do DIOParams
ps <- DIO DIOParams
dioParams
if DIOParams -> Bool
dioProcessMonitoringEnabled DIOParams
ps
then do ProcessId
inbox <- DIO ProcessId
messageInboxId
Process () -> DIO ()
forall a. Process a -> DIO a
liftDistributedUnsafe (Process () -> DIO ()) -> Process () -> DIO ()
forall a b. (a -> b) -> a -> b
$
ProcessId -> InboxProcessMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
inbox (ProcessId -> ProcessId -> InboxProcessMessage
SendRequestGlobalTimeMessage ProcessId
receiver ProcessId
sender)
else Process () -> DIO ()
forall a. Process a -> DIO a
liftDistributedUnsafe (Process () -> DIO ()) -> Process () -> DIO ()
forall a b. (a -> b) -> a -> b
$
ProcessId -> TimeServerMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.send ProcessId
receiver (ProcessId -> TimeServerMessage
RequestGlobalTimeMessage ProcessId
sender)
logDIO :: Priority -> String -> DIO ()
{-# INLINE logDIO #-}
logDIO :: Priority -> String -> DIO ()
logDIO Priority
p String
message =
do DIOParams
ps <- DIO DIOParams
dioParams
Bool -> DIO () -> DIO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (DIOParams -> Priority
dioLoggingPriority DIOParams
ps Priority -> Priority -> Bool
forall a. Ord a => a -> a -> Bool
<= Priority
p) (DIO () -> DIO ()) -> DIO () -> DIO ()
forall a b. (a -> b) -> a -> b
$
Process () -> DIO ()
forall a. Process a -> DIO a
liftDistributedUnsafe (Process () -> DIO ()) -> Process () -> DIO ()
forall a b. (a -> b) -> a -> b
$
String -> Process ()
DP.say (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$
Priority -> String
embracePriority Priority
p String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" " String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
message
logProcess :: DIOParams -> Priority -> String -> DP.Process ()
{-# INLINE logProcess #-}
logProcess :: DIOParams -> Priority -> String -> Process ()
logProcess DIOParams
ps Priority
p String
message =
Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (DIOParams -> Priority
dioLoggingPriority DIOParams
ps Priority -> Priority -> Bool
forall a. Ord a => a -> a -> Bool
<= Priority
p) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
String -> Process ()
DP.say (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$
Priority -> String
embracePriority Priority
p String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" " String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
message