{-# LANGUAGE DeriveGeneric, DeriveDataTypeable #-}
module Simulation.Aivika.Distributed.Optimistic.Internal.DIO
(DIO(..),
DIOParams(..),
DIOEnv(..),
DIOStrategy(..),
invokeDIO,
runDIO,
runDIOWithEnv,
defaultDIOParams,
defaultDIOEnv,
terminateDIO,
registerDIO,
unregisterDIO,
monitorProcessDIO,
dioParams,
messageChannel,
messageInboxId,
timeServerId,
sendMessageDIO,
sendMessagesDIO,
sendAcknowledgementMessageDIO,
sendAcknowledgementMessagesDIO,
sendLocalTimeDIO,
sendRequestGlobalTimeDIO,
logDIO,
liftDistributedUnsafe) where
import Data.Typeable
import Data.Binary
import Data.IORef
import Data.Time.Clock
import GHC.Generics
import Control.Applicative
import Control.Monad
import Control.Monad.Trans
import Control.Exception (throw)
import Control.Monad.Catch as C
import qualified Control.Distributed.Process as DP
import Control.Concurrent
import Control.Concurrent.STM
import System.Timeout
import Simulation.Aivika.Trans.Exception
import Simulation.Aivika.Trans.Internal.Types
import Simulation.Aivika.Distributed.Optimistic.Internal.Channel
import Simulation.Aivika.Distributed.Optimistic.Internal.Message
import Simulation.Aivika.Distributed.Optimistic.Internal.TimeServer
import Simulation.Aivika.Distributed.Optimistic.Internal.Priority
import Simulation.Aivika.Distributed.Optimistic.Internal.ConnectionManager
import Simulation.Aivika.Distributed.Optimistic.State
data DIOParams =
DIOParams { dioLoggingPriority :: Priority,
dioName :: String,
dioTimeHorizon :: Maybe Double,
dioUndoableLogSizeThreshold :: Int,
dioOutputMessageQueueSizeThreshold :: Int,
dioTransientMessageQueueSizeThreshold :: Int,
dioSyncTimeout :: Int,
dioAllowPrematureIO :: Bool,
dioAllowSkippingOutdatedMessage :: Bool,
dioProcessMonitoringEnabled :: Bool,
dioProcessMonitoringDelay :: Int,
dioProcessReconnectingEnabled :: Bool,
dioProcessReconnectingDelay :: Int,
dioKeepAliveInterval :: Int,
dioTimeServerAcknowledgementTimeout :: Int,
dioSimulationMonitoringInterval :: Int,
dioSimulationMonitoringTimeout :: Int,
dioStrategy :: DIOStrategy,
dioProcessDisconnectingEnabled :: Bool
} deriving (Eq, Ord, Show, Typeable, Generic)
instance Binary DIOParams
data DIOEnv =
DIOEnv { dioSimulationMonitoringAction :: Maybe (LogicalProcessState -> DP.Process ())
}
data DIOStrategy = WaitIndefinitelyForTimeServer
| TerminateDueToTimeServerTimeout Int
deriving (Eq, Ord, Show, Typeable, Generic)
instance Binary DIOStrategy
newtype DIO a = DIO { unDIO :: DIOContext -> DP.Process a
}
data DIOContext =
DIOContext { dioChannel :: Channel LogicalProcessMessage,
dioInboxId :: DP.ProcessId,
dioTimeServerId :: DP.ProcessId,
dioParams0 :: DIOParams,
dioRegisteredInTimeServer :: TVar Bool,
dioUnregisteredFromTimeServer :: TVar Bool,
dioTimeServerTerminating :: TVar Bool
}
instance Monad DIO where
{-# INLINE return #-}
return = DIO . const . return
{-# INLINE (>>=) #-}
(DIO m) >>= k = DIO $ \ps ->
m ps >>= \a ->
let m' = unDIO (k a) in m' ps
instance Applicative DIO where
{-# INLINE pure #-}
pure = return
{-# INLINE (<*>) #-}
(<*>) = ap
instance Functor DIO where
{-# INLINE fmap #-}
fmap f (DIO m) = DIO $ fmap f . m
instance MonadException DIO where
catchComp (DIO m) h = DIO $ \ps ->
C.catch (m ps) (\e -> unDIO (h e) ps)
finallyComp (DIO m1) (DIO m2) = DIO $ \ps ->
C.finally (m1 ps) (m2 ps)
throwComp e = DIO $ \ps ->
throw e
invokeDIO :: DIOContext -> DIO a -> DP.Process a
{-# INLINE invokeDIO #-}
invokeDIO ps (DIO m) = m ps
liftDistributedUnsafe :: DP.Process a -> DIO a
liftDistributedUnsafe = DIO . const
defaultDIOParams :: DIOParams
defaultDIOParams =
DIOParams { dioLoggingPriority = WARNING,
dioName = "LP",
dioTimeHorizon = Nothing,
dioUndoableLogSizeThreshold = 10000000,
dioOutputMessageQueueSizeThreshold = 10000,
dioTransientMessageQueueSizeThreshold = 5000,
dioSyncTimeout = 60000000,
dioAllowPrematureIO = False,
dioAllowSkippingOutdatedMessage = True,
dioProcessMonitoringEnabled = False,
dioProcessMonitoringDelay = 5000000,
dioProcessReconnectingEnabled = False,
dioProcessReconnectingDelay = 5000000,
dioKeepAliveInterval = 5000000,
dioTimeServerAcknowledgementTimeout = 5000000,
dioSimulationMonitoringInterval = 30000000,
dioSimulationMonitoringTimeout = 100000,
dioStrategy = TerminateDueToTimeServerTimeout 300000000,
dioProcessDisconnectingEnabled = False
}
defaultDIOEnv :: DIOEnv
defaultDIOEnv =
DIOEnv { dioSimulationMonitoringAction = Nothing }
dioContext :: DIO DIOContext
dioContext = DIO return
dioParams :: DIO DIOParams
dioParams = DIO $ return . dioParams0
messageChannel :: DIO (Channel LogicalProcessMessage)
messageChannel = DIO $ return . dioChannel
messageInboxId :: DIO DP.ProcessId
messageInboxId = DIO $ return . dioInboxId
timeServerId :: DIO DP.ProcessId
timeServerId = DIO $ return . dioTimeServerId
terminateDIO :: DIO ()
terminateDIO =
DIO $ \ctx ->
do let ps = dioParams0 ctx
logProcess ps INFO "Terminating the simulation..."
sender <- invokeDIO ctx messageInboxId
receiver <- invokeDIO ctx timeServerId
let inbox = sender
if dioProcessMonitoringEnabled ps
then DP.send inbox (SendTerminateTimeServerMessage receiver sender)
else DP.send receiver (TerminateTimeServerMessage sender)
liftIO $
timeout (dioTimeServerAcknowledgementTimeout ps) $
atomically $
do f <- readTVar (dioTimeServerTerminating ctx)
unless f retry
DP.send inbox TerminateInboxProcessMessage
return ()
registerDIO :: DIO ()
registerDIO =
DIO $ \ctx ->
do let ps = dioParams0 ctx
logProcess ps INFO "Registering the simulation process..."
sender <- invokeDIO ctx messageInboxId
receiver <- invokeDIO ctx timeServerId
let inbox = sender
if dioProcessMonitoringEnabled ps
then DP.send inbox (SendRegisterLogicalProcessMessage receiver sender)
else DP.send receiver (RegisterLogicalProcessMessage sender)
liftIO $
timeout (dioTimeServerAcknowledgementTimeout ps) $
atomically $
do f <- readTVar (dioRegisteredInTimeServer ctx)
unless f retry
return ()
unregisterDIO :: DIO ()
unregisterDIO =
DIO $ \ctx ->
do let ps = dioParams0 ctx
logProcess ps INFO "Unregistering the simulation process..."
sender <- invokeDIO ctx messageInboxId
receiver <- invokeDIO ctx timeServerId
let inbox = sender
if dioProcessMonitoringEnabled ps
then DP.send inbox (SendUnregisterLogicalProcessMessage receiver sender)
else DP.send receiver (UnregisterLogicalProcessMessage sender)
liftIO $
timeout (dioTimeServerAcknowledgementTimeout ps) $
atomically $
do f <- readTVar (dioUnregisteredFromTimeServer ctx)
unless f retry
DP.send inbox TerminateInboxProcessMessage
return ()
data InternalLogicalProcessMessage = InternalLogicalProcessMessage LogicalProcessMessage
| InternalProcessMonitorNotification DP.ProcessMonitorNotification
| InternalInboxProcessMessage InboxProcessMessage
| InternalGeneralMessage GeneralMessage
handleException :: DIOParams -> SomeException -> DP.Process ()
handleException ps e =
do
logProcess ps ERROR $ "Exception occurred: " ++ show e
C.throwM e
runDIO :: DIO a -> DIOParams -> DP.ProcessId -> DP.Process (DP.ProcessId, DP.Process a)
runDIO m ps serverId = runDIOWithEnv m ps defaultDIOEnv serverId
runDIOWithEnv :: DIO a -> DIOParams -> DIOEnv -> DP.ProcessId -> DP.Process (DP.ProcessId, DP.Process a)
runDIOWithEnv m ps env serverId =
do ch <- liftIO newChannel
let connParams =
ConnectionParams { connLoggingPriority = dioLoggingPriority ps,
connKeepAliveInterval = dioKeepAliveInterval ps,
connReconnectingDelay = dioProcessReconnectingDelay ps,
connMonitoringDelay = dioProcessMonitoringDelay ps }
connManager <- liftIO $ newConnectionManager connParams
terminated <- liftIO $ newIORef False
registeredInTimeServer <- liftIO $ newTVarIO False
unregisteredFromTimeServer <- liftIO $ newTVarIO False
timeServerTerminating <- liftIO $ newTVarIO False
timeServerTimestamp <- liftIO $ getCurrentTime >>= newIORef
let loop0 =
forever $
do let f1 :: LogicalProcessMessage -> DP.Process InternalLogicalProcessMessage
f1 x = return (InternalLogicalProcessMessage x)
f2 :: DP.ProcessMonitorNotification -> DP.Process InternalLogicalProcessMessage
f2 x = return (InternalProcessMonitorNotification x)
f3 :: InboxProcessMessage -> DP.Process InternalLogicalProcessMessage
f3 x = return (InternalInboxProcessMessage x)
f4 :: GeneralMessage -> DP.Process InternalLogicalProcessMessage
f4 x = return (InternalGeneralMessage x)
x <- fmap Just $ DP.receiveWait [DP.match f1, DP.match f2, DP.match f3, DP.match f4]
case x of
Nothing -> return ()
Just (InternalLogicalProcessMessage m) ->
do processTimeServerMessage m ps serverId timeServerTimestamp
liftIO $
writeChannel ch m
Just (InternalProcessMonitorNotification m@(DP.ProcessMonitorNotification _ _ _)) ->
handleProcessMonitorNotification m ps ch connManager serverId
Just (InternalInboxProcessMessage m) ->
case m of
SendQueueMessage pid m ->
DP.usend pid (QueueMessage m)
SendQueueMessageBulk pid ms ->
forM_ ms $ \m ->
DP.usend pid (QueueMessage m)
SendAcknowledgementQueueMessage pid m ->
DP.usend pid (AcknowledgementQueueMessage m)
SendAcknowledgementQueueMessageBulk pid ms ->
forM_ ms $ \m ->
DP.usend pid (AcknowledgementQueueMessage m)
SendLocalTimeMessage receiver sender t ->
DP.usend receiver (LocalTimeMessage sender t)
SendRequestGlobalTimeMessage receiver sender ->
DP.usend receiver (RequestGlobalTimeMessage sender)
SendRegisterLogicalProcessMessage receiver sender ->
DP.usend receiver (RegisterLogicalProcessMessage sender)
SendUnregisterLogicalProcessMessage receiver sender ->
DP.usend receiver (UnregisterLogicalProcessMessage sender)
SendTerminateTimeServerMessage receiver sender ->
DP.usend receiver (TerminateTimeServerMessage sender)
MonitorProcessMessage pid ->
tryAddMessageReceiver connManager pid >> return ()
TrySendProcessKeepAliveMessage ->
trySendKeepAlive connManager
RegisterLogicalProcessAcknowledgementMessage pid ->
do
logProcess ps INFO "Registered the logical process in the time server"
liftIO $
atomically $
writeTVar registeredInTimeServer True
UnregisterLogicalProcessAcknowledgementMessage pid ->
do
logProcess ps INFO "Unregistered the logical process from the time server"
liftIO $
atomically $
writeTVar unregisteredFromTimeServer True
TerminateTimeServerAcknowledgementMessage pid ->
do
logProcess ps INFO "Started terminating the time server"
liftIO $
atomically $
writeTVar timeServerTerminating True
TerminateInboxProcessMessage ->
do
logProcess ps INFO "Terminating the inbox and keep-alive processes..."
liftIO $
do atomicWriteIORef terminated True
writeChannel ch AbortSimulationMessage
DP.terminate
Just (InternalGeneralMessage m) ->
handleGeneralMessage m ps ch connManager
loop =
C.finally loop0
(do liftIO $
do atomicWriteIORef terminated True
writeChannel ch AbortSimulationMessage
clearMessageReceivers connManager)
inboxId <-
DP.spawnLocal $
C.catch loop (handleException ps)
DP.spawnLocal $
let loop =
do f <- liftIO $ readIORef terminated
unless f $
do liftIO $
threadDelay (dioKeepAliveInterval ps)
DP.send inboxId TrySendProcessKeepAliveMessage
loop
in C.catch loop (handleException ps)
DP.spawnLocal $
let stop =
do f <- liftIO $ readIORef terminated
return (f || dioStrategy ps == WaitIndefinitelyForTimeServer)
loop =
do f <- stop
unless f $
do liftIO $
threadDelay (dioSyncTimeout ps)
f <- stop
unless f $
do validateTimeServer ps inboxId timeServerTimestamp
loop
in C.catch loop (handleException ps)
case dioSimulationMonitoringAction env of
Nothing -> return ()
Just act ->
do monitorId <-
DP.spawnLocal $
let loop =
do f <- liftIO $ readIORef terminated
unless f $
do x <- DP.expectTimeout (dioSimulationMonitoringTimeout ps)
case x of
Nothing -> return ()
Just st -> act st
loop
in C.catch loop (handleException ps)
DP.spawnLocal $
let loop =
do f <- liftIO $ readIORef terminated
unless f $
do liftIO $
do threadDelay (dioSimulationMonitoringInterval ps)
writeChannel ch (ProvideLogicalProcessStateMessage monitorId)
loop
in C.catch loop (handleException ps)
return ()
let simulation =
unDIO m DIOContext { dioChannel = ch,
dioInboxId = inboxId,
dioTimeServerId = serverId,
dioParams0 = ps,
dioRegisteredInTimeServer = registeredInTimeServer,
dioUnregisteredFromTimeServer = unregisteredFromTimeServer,
dioTimeServerTerminating = timeServerTerminating }
return (inboxId, simulation)
handleProcessMonitorNotification :: DP.ProcessMonitorNotification
-> DIOParams
-> Channel LogicalProcessMessage
-> ConnectionManager
-> DP.ProcessId
-> DP.Process ()
handleProcessMonitorNotification m@(DP.ProcessMonitorNotification _ pid0 reason) ps ch connManager serverId =
do let recv m@(DP.ProcessMonitorNotification _ _ _) =
do
logProcess ps WARNING $ "Received a process monitor notification " ++ show m
liftIO $
writeChannel ch (ProcessMonitorNotificationMessage m)
return m
recv m
when (pid0 == serverId) $
case reason of
DP.DiedNormal -> processTimeServerTerminated ps serverId
DP.DiedException _ -> processTimeServerTerminated ps serverId
DP.DiedNodeDown -> processTimeServerTerminated ps serverId
_ -> return ()
when (dioProcessReconnectingEnabled ps && (reason == DP.DiedDisconnect)) $
do liftIO $
threadDelay (dioProcessReconnectingDelay ps)
let pred m@(DP.ProcessMonitorNotification _ _ reason) = reason == DP.DiedDisconnect
loop :: [DP.ProcessMonitorNotification] -> DP.Process [DP.ProcessMonitorNotification]
loop acc =
do y <- DP.receiveTimeout 0 [DP.matchIf pred recv]
case y of
Nothing -> return $ reverse acc
Just m@(DP.ProcessMonitorNotification _ _ _) -> loop (m : acc)
ms <- loop [m]
pids <- filterMessageReceivers connManager ms
reconnectMessageReceivers connManager pids
forM_ pids $ \pid ->
do
logProcess ps NOTICE $
"Writing to the channel about reconnecting to " ++ show pid
liftIO $
writeChannel ch (ReconnectProcessMessage pid)
when (dioProcessDisconnectingEnabled ps && not (dioProcessReconnectingEnabled ps)) $
liftIO $
writeChannel ch (DisconnectProcessMessage pid0)
handleGeneralMessage :: GeneralMessage
-> DIOParams
-> Channel LogicalProcessMessage
-> ConnectionManager
-> DP.Process ()
handleGeneralMessage m@KeepAliveMessage ps ch connManager =
do
logProcess ps DEBUG $
"Received " ++ show m
return ()
monitorProcessDIO :: DP.ProcessId -> DIO ()
monitorProcessDIO pid =
do ps <- dioParams
if dioProcessMonitoringEnabled ps
then do inbox <- messageInboxId
liftDistributedUnsafe $
DP.send inbox $
MonitorProcessMessage pid
else liftDistributedUnsafe $
logProcess ps WARNING "Ignored the process monitoring as it was disabled in the DIO computation parameters"
processTimeServerMessage :: LogicalProcessMessage -> DIOParams -> DP.ProcessId -> IORef UTCTime -> DP.Process ()
processTimeServerMessage ComputeLocalTimeMessage ps serverId r =
do liftIO $
getCurrentTime >>= writeIORef r
inboxId <- DP.getSelfPid
if dioProcessMonitoringEnabled ps
then DP.usend serverId (ComputeLocalTimeAcknowledgementMessage inboxId)
else DP.send serverId (ComputeLocalTimeAcknowledgementMessage inboxId)
processTimeServerMessage (GlobalTimeMessage _) ps serverId r =
liftIO $
getCurrentTime >>= writeIORef r
processTimeServerMessage _ ps serverId r =
return ()
validateTimeServer :: DIOParams -> DP.ProcessId -> IORef UTCTime -> DP.Process ()
validateTimeServer ps inboxId r =
do
logProcess ps NOTICE "Validating the time server"
case dioStrategy ps of
WaitIndefinitelyForTimeServer ->
return ()
TerminateDueToTimeServerTimeout timeout ->
do utc0 <- liftIO $ readIORef r
utc <- liftIO getCurrentTime
let dt = fromRational $ toRational (diffUTCTime utc utc0)
when (secondsToMicroseconds dt > timeout) $
do
logProcess ps WARNING "Terminating due to the exceeded time server timeout"
DP.send inboxId TerminateInboxProcessMessage
processTimeServerTerminated :: DIOParams -> DP.ProcessId -> DP.Process ()
processTimeServerTerminated ps inboxId =
do
logProcess ps NOTICE "Terminating due to sudden termination of the time server"
DP.send inboxId TerminateInboxProcessMessage
secondsToMicroseconds :: Double -> Int
secondsToMicroseconds x = fromInteger $ toInteger $ round (1000000 * x)
sendMessageDIO :: DP.ProcessId -> Message -> DIO ()
{-# INLINABLE sendMessageDIO #-}
sendMessageDIO pid m =
do ps <- dioParams
if dioProcessMonitoringEnabled ps
then do inbox <- messageInboxId
liftDistributedUnsafe $
DP.send inbox (SendQueueMessage pid m)
else liftDistributedUnsafe $
DP.send pid (QueueMessage m)
sendMessagesDIO :: DP.ProcessId -> [Message] -> DIO ()
{-# INLINABLE sendMessagesDIO #-}
sendMessagesDIO pid ms =
do ps <- dioParams
if dioProcessMonitoringEnabled ps
then do inbox <- messageInboxId
liftDistributedUnsafe $
DP.send inbox (SendQueueMessageBulk pid ms)
else do forM_ ms $ \m ->
liftDistributedUnsafe $
DP.send pid (QueueMessage m)
sendAcknowledgementMessageDIO :: DP.ProcessId -> AcknowledgementMessage -> DIO ()
{-# INLINABLE sendAcknowledgementMessageDIO #-}
sendAcknowledgementMessageDIO pid m =
do ps <- dioParams
if dioProcessMonitoringEnabled ps
then do inbox <- messageInboxId
liftDistributedUnsafe $
DP.send inbox (SendAcknowledgementQueueMessage pid m)
else liftDistributedUnsafe $
DP.send pid (AcknowledgementQueueMessage m)
sendAcknowledgementMessagesDIO :: DP.ProcessId -> [AcknowledgementMessage] -> DIO ()
{-# INLINABLE sendAcknowledgementMessagesDIO #-}
sendAcknowledgementMessagesDIO pid ms =
do ps <- dioParams
if dioProcessMonitoringEnabled ps
then do inbox <- messageInboxId
liftDistributedUnsafe $
DP.send inbox (SendAcknowledgementQueueMessageBulk pid ms)
else do forM_ ms $ \m ->
liftDistributedUnsafe $
DP.send pid (AcknowledgementQueueMessage m)
sendLocalTimeDIO :: DP.ProcessId -> DP.ProcessId -> Double -> DIO ()
{-# INLINABLE sendLocalTimeDIO #-}
sendLocalTimeDIO receiver sender t =
do ps <- dioParams
if dioProcessMonitoringEnabled ps
then do inbox <- messageInboxId
liftDistributedUnsafe $
DP.send inbox (SendLocalTimeMessage receiver sender t)
else liftDistributedUnsafe $
DP.send receiver (LocalTimeMessage sender t)
sendRequestGlobalTimeDIO :: DP.ProcessId -> DP.ProcessId -> DIO ()
{-# INLINABLE sendRequestGlobalTimeDIO #-}
sendRequestGlobalTimeDIO receiver sender =
do ps <- dioParams
if dioProcessMonitoringEnabled ps
then do inbox <- messageInboxId
liftDistributedUnsafe $
DP.send inbox (SendRequestGlobalTimeMessage receiver sender)
else liftDistributedUnsafe $
DP.send receiver (RequestGlobalTimeMessage sender)
logDIO :: Priority -> String -> DIO ()
{-# INLINE logDIO #-}
logDIO p message =
do ps <- dioParams
when (dioLoggingPriority ps <= p) $
liftDistributedUnsafe $
DP.say $
embracePriority p ++ " " ++ message
logProcess :: DIOParams -> Priority -> String -> DP.Process ()
{-# INLINE logProcess #-}
logProcess ps p message =
when (dioLoggingPriority ps <= p) $
DP.say $
embracePriority p ++ " " ++ message