module Simulation.Aivika.Distributed.Optimistic.Internal.DIO
(DIO(..),
DIOParams(..),
DIOEnv(..),
DIOStrategy(..),
invokeDIO,
runDIO,
runDIOWithEnv,
defaultDIOParams,
defaultDIOEnv,
terminateDIO,
registerDIO,
unregisterDIO,
monitorProcessDIO,
dioParams,
messageChannel,
messageInboxId,
timeServerId,
sendMessageDIO,
sendMessagesDIO,
sendAcknowledgementMessageDIO,
sendAcknowledgementMessagesDIO,
sendLocalTimeDIO,
sendRequestGlobalTimeDIO,
logDIO,
liftDistributedUnsafe) where
import Data.Typeable
import Data.Binary
import Data.IORef
import Data.Time.Clock
import GHC.Generics
import Control.Applicative
import Control.Monad
import Control.Monad.Trans
import Control.Exception (throw)
import Control.Monad.Catch as C
import qualified Control.Distributed.Process as DP
import Control.Concurrent
import Control.Concurrent.STM
import System.Timeout
import Simulation.Aivika.Trans.Exception
import Simulation.Aivika.Trans.Internal.Types
import Simulation.Aivika.Distributed.Optimistic.Internal.Channel
import Simulation.Aivika.Distributed.Optimistic.Internal.Message
import Simulation.Aivika.Distributed.Optimistic.Internal.TimeServer
import Simulation.Aivika.Distributed.Optimistic.Internal.Priority
import Simulation.Aivika.Distributed.Optimistic.Internal.KeepAliveManager
import Simulation.Aivika.Distributed.Optimistic.State
data DIOParams =
DIOParams { dioLoggingPriority :: Priority,
dioName :: String,
dioUndoableLogSizeThreshold :: Int,
dioOutputMessageQueueSizeThreshold :: Int,
dioTransientMessageQueueSizeThreshold :: Int,
dioSyncTimeout :: Int,
dioAllowPrematureIO :: Bool,
dioAllowSkippingOutdatedMessage :: Bool,
dioProcessMonitoringEnabled :: Bool,
dioProcessMonitoringDelay :: Int,
dioProcessReconnectingEnabled :: Bool,
dioProcessReconnectingDelay :: Int,
dioKeepAliveInterval :: Int,
dioTimeServerAcknowledgementTimeout :: Int,
dioSimulationMonitoringInterval :: Int,
dioSimulationMonitoringTimeout :: Int,
dioStrategy :: DIOStrategy
} deriving (Eq, Ord, Show, Typeable, Generic)
instance Binary DIOParams
data DIOEnv =
DIOEnv { dioSimulationMonitoringAction :: Maybe (LogicalProcessState -> DP.Process ())
}
data DIOStrategy = WaitIndefinitelyForTimeServer
| TerminateDueToTimeServerTimeout Int
deriving (Eq, Ord, Show, Typeable, Generic)
instance Binary DIOStrategy
newtype DIO a = DIO { unDIO :: DIOContext -> DP.Process a
}
data DIOContext =
DIOContext { dioChannel :: Channel LogicalProcessMessage,
dioInboxId :: DP.ProcessId,
dioTimeServerId :: DP.ProcessId,
dioParams0 :: DIOParams,
dioRegisteredInTimeServer :: TVar Bool,
dioUnregisteredFromTimeServer :: TVar Bool,
dioTimeServerTerminating :: TVar Bool
}
instance Monad DIO where
return = DIO . const . return
(DIO m) >>= k = DIO $ \ps ->
m ps >>= \a ->
let m' = unDIO (k a) in m' ps
instance Applicative DIO where
pure = return
(<*>) = ap
instance Functor DIO where
fmap f (DIO m) = DIO $ fmap f . m
instance MonadException DIO where
catchComp (DIO m) h = DIO $ \ps ->
C.catch (m ps) (\e -> unDIO (h e) ps)
finallyComp (DIO m1) (DIO m2) = DIO $ \ps ->
C.finally (m1 ps) (m2 ps)
throwComp e = DIO $ \ps ->
throw e
invokeDIO :: DIOContext -> DIO a -> DP.Process a
invokeDIO ps (DIO m) = m ps
liftDistributedUnsafe :: DP.Process a -> DIO a
liftDistributedUnsafe = DIO . const
defaultDIOParams :: DIOParams
defaultDIOParams =
DIOParams { dioLoggingPriority = WARNING,
dioName = "LP",
dioUndoableLogSizeThreshold = 10000000,
dioOutputMessageQueueSizeThreshold = 10000,
dioTransientMessageQueueSizeThreshold = 5000,
dioSyncTimeout = 60000000,
dioAllowPrematureIO = False,
dioAllowSkippingOutdatedMessage = True,
dioProcessMonitoringEnabled = False,
dioProcessMonitoringDelay = 5000000,
dioProcessReconnectingEnabled = False,
dioProcessReconnectingDelay = 5000000,
dioKeepAliveInterval = 5000000,
dioTimeServerAcknowledgementTimeout = 5000000,
dioSimulationMonitoringInterval = 30000000,
dioSimulationMonitoringTimeout = 100000,
dioStrategy = TerminateDueToTimeServerTimeout 300000000
}
defaultDIOEnv :: DIOEnv
defaultDIOEnv =
DIOEnv { dioSimulationMonitoringAction = Nothing }
dioContext :: DIO DIOContext
dioContext = DIO return
dioParams :: DIO DIOParams
dioParams = DIO $ return . dioParams0
messageChannel :: DIO (Channel LogicalProcessMessage)
messageChannel = DIO $ return . dioChannel
messageInboxId :: DIO DP.ProcessId
messageInboxId = DIO $ return . dioInboxId
timeServerId :: DIO DP.ProcessId
timeServerId = DIO $ return . dioTimeServerId
terminateDIO :: DIO ()
terminateDIO =
DIO $ \ctx ->
do let ps = dioParams0 ctx
logProcess ps INFO "Terminating the simulation..."
sender <- invokeDIO ctx messageInboxId
receiver <- invokeDIO ctx timeServerId
let inbox = sender
if dioProcessMonitoringEnabled ps
then DP.send inbox (SendTerminateTimeServerMessage receiver sender)
else DP.send receiver (TerminateTimeServerMessage sender)
liftIO $
timeout (dioTimeServerAcknowledgementTimeout ps) $
atomically $
do f <- readTVar (dioTimeServerTerminating ctx)
unless f retry
DP.send inbox TerminateInboxProcessMessage
return ()
registerDIO :: DIO ()
registerDIO =
DIO $ \ctx ->
do let ps = dioParams0 ctx
logProcess ps INFO "Registering the simulation process..."
sender <- invokeDIO ctx messageInboxId
receiver <- invokeDIO ctx timeServerId
let inbox = sender
if dioProcessMonitoringEnabled ps
then DP.send inbox (SendRegisterLogicalProcessMessage receiver sender)
else DP.send receiver (RegisterLogicalProcessMessage sender)
liftIO $
timeout (dioTimeServerAcknowledgementTimeout ps) $
atomically $
do f <- readTVar (dioRegisteredInTimeServer ctx)
unless f retry
return ()
unregisterDIO :: DIO ()
unregisterDIO =
DIO $ \ctx ->
do let ps = dioParams0 ctx
logProcess ps INFO "Unregistering the simulation process..."
sender <- invokeDIO ctx messageInboxId
receiver <- invokeDIO ctx timeServerId
let inbox = sender
if dioProcessMonitoringEnabled ps
then DP.send inbox (SendUnregisterLogicalProcessMessage receiver sender)
else DP.send receiver (UnregisterLogicalProcessMessage sender)
liftIO $
timeout (dioTimeServerAcknowledgementTimeout ps) $
atomically $
do f <- readTVar (dioUnregisteredFromTimeServer ctx)
unless f retry
DP.send inbox TerminateInboxProcessMessage
return ()
data InternalLogicalProcessMessage = InternalLogicalProcessMessage LogicalProcessMessage
| InternalProcessMonitorNotification DP.ProcessMonitorNotification
| InternalInboxProcessMessage InboxProcessMessage
| InternalKeepAliveMessage KeepAliveMessage
handleException :: DIOParams -> SomeException -> DP.Process ()
handleException ps e =
do
logProcess ps ERROR $ "Exception occurred: " ++ show e
C.throwM e
runDIO :: DIO a -> DIOParams -> DP.ProcessId -> DP.Process (DP.ProcessId, DP.Process a)
runDIO m ps serverId = runDIOWithEnv m ps defaultDIOEnv serverId
runDIOWithEnv :: DIO a -> DIOParams -> DIOEnv -> DP.ProcessId -> DP.Process (DP.ProcessId, DP.Process a)
runDIOWithEnv m ps env serverId =
do ch <- liftIO newChannel
let keepAliveParams =
KeepAliveParams { keepAliveLoggingPriority = dioLoggingPriority ps,
keepAliveInterval = dioKeepAliveInterval ps }
keepAliveManager <- liftIO $ newKeepAliveManager keepAliveParams
terminated <- liftIO $ newIORef False
registeredInTimeServer <- liftIO $ newTVarIO False
unregisteredFromTimeServer <- liftIO $ newTVarIO False
timeServerTerminating <- liftIO $ newTVarIO False
timeServerTimestamp <- liftIO $ getCurrentTime >>= newIORef
let loop0 =
forever $
do let f1 :: LogicalProcessMessage -> DP.Process InternalLogicalProcessMessage
f1 x = return (InternalLogicalProcessMessage x)
f2 :: DP.ProcessMonitorNotification -> DP.Process InternalLogicalProcessMessage
f2 x = return (InternalProcessMonitorNotification x)
f3 :: InboxProcessMessage -> DP.Process InternalLogicalProcessMessage
f3 x = return (InternalInboxProcessMessage x)
f4 :: KeepAliveMessage -> DP.Process InternalLogicalProcessMessage
f4 x = return (InternalKeepAliveMessage x)
x <- fmap Just $ DP.receiveWait [DP.match f1, DP.match f2, DP.match f3, DP.match f4]
case x of
Nothing -> return ()
Just (InternalLogicalProcessMessage m) ->
do processTimeServerMessage m serverId timeServerTimestamp
liftIO $
writeChannel ch m
Just (InternalProcessMonitorNotification m@(DP.ProcessMonitorNotification _ _ _)) ->
handleProcessMonitorNotification m ps ch serverId
Just (InternalInboxProcessMessage m) ->
case m of
SendQueueMessage pid m ->
DP.send pid (QueueMessage m)
SendQueueMessageBulk pid ms ->
forM_ ms $ \m ->
DP.send pid (QueueMessage m)
SendAcknowledgementQueueMessage pid m ->
DP.send pid (AcknowledgementQueueMessage m)
SendAcknowledgementQueueMessageBulk pid ms ->
forM_ ms $ \m ->
DP.send pid (AcknowledgementQueueMessage m)
SendLocalTimeMessage receiver sender t ->
DP.send receiver (LocalTimeMessage sender t)
SendRequestGlobalTimeMessage receiver sender ->
DP.send receiver (RequestGlobalTimeMessage sender)
SendRegisterLogicalProcessMessage receiver sender ->
DP.send receiver (RegisterLogicalProcessMessage sender)
SendUnregisterLogicalProcessMessage receiver sender ->
DP.send receiver (UnregisterLogicalProcessMessage sender)
SendTerminateTimeServerMessage receiver sender ->
DP.send receiver (TerminateTimeServerMessage sender)
MonitorProcessMessage pid ->
do
logProcess ps INFO $ "Monitoring process " ++ show pid
f <- liftIO $
existsKeepAliveReceiver keepAliveManager pid
unless f $
do DP.monitor pid
liftIO $
addKeepAliveReceiver keepAliveManager pid
ReMonitorProcessMessage pids ->
handleReMonitorProcessMessage pids ps ch
TrySendKeepAliveMessage ->
trySendKeepAlive keepAliveManager
RegisterLogicalProcessAcknowledgementMessage pid ->
do
logProcess ps INFO "Registered the logical process in the time server"
liftIO $
atomically $
writeTVar registeredInTimeServer True
UnregisterLogicalProcessAcknowledgementMessage pid ->
do
logProcess ps INFO "Unregistered the logical process from the time server"
liftIO $
atomically $
writeTVar unregisteredFromTimeServer True
TerminateTimeServerAcknowledgementMessage pid ->
do
logProcess ps INFO "Started terminating the time server"
liftIO $
atomically $
writeTVar timeServerTerminating True
TerminateInboxProcessMessage ->
do
logProcess ps INFO "Terminating the inbox and keep-alive processes..."
liftIO $
do atomicWriteIORef terminated True
writeChannel ch AbortSimulationMessage
DP.terminate
Just (InternalKeepAliveMessage m) ->
do
logProcess ps DEBUG $ "Received " ++ show m
return ()
loop =
C.finally loop0
(liftIO $
do atomicWriteIORef terminated True
writeChannel ch AbortSimulationMessage)
inboxId <-
DP.spawnLocal $
C.catch loop (handleException ps)
DP.spawnLocal $
let loop =
do f <- liftIO $ readIORef terminated
unless f $
do liftIO $
threadDelay (dioKeepAliveInterval ps)
DP.send inboxId TrySendKeepAliveMessage
loop
in C.catch loop (handleException ps)
DP.spawnLocal $
let stop =
do f <- liftIO $ readIORef terminated
return (f || dioStrategy ps == WaitIndefinitelyForTimeServer)
loop =
do f <- stop
unless f $
do liftIO $
threadDelay (dioSyncTimeout ps)
f <- stop
unless f $
do validateTimeServer ps inboxId timeServerTimestamp
loop
in C.catch loop (handleException ps)
case dioSimulationMonitoringAction env of
Nothing -> return ()
Just act ->
do monitorId <-
DP.spawnLocal $
let loop =
do f <- liftIO $ readIORef terminated
unless f $
do x <- DP.expectTimeout (dioSimulationMonitoringTimeout ps)
case x of
Nothing -> return ()
Just st -> act st
loop
in C.catch loop (handleException ps)
DP.spawnLocal $
let loop =
do f <- liftIO $ readIORef terminated
unless f $
do liftIO $
do threadDelay (dioSimulationMonitoringInterval ps)
writeChannel ch (ProvideLogicalProcessStateMessage monitorId)
loop
in C.catch loop (handleException ps)
return ()
let simulation =
unDIO m DIOContext { dioChannel = ch,
dioInboxId = inboxId,
dioTimeServerId = serverId,
dioParams0 = ps,
dioRegisteredInTimeServer = registeredInTimeServer,
dioUnregisteredFromTimeServer = unregisteredFromTimeServer,
dioTimeServerTerminating = timeServerTerminating }
return (inboxId, simulation)
handleProcessMonitorNotification :: DP.ProcessMonitorNotification -> DIOParams -> Channel LogicalProcessMessage -> DP.ProcessId -> DP.Process ()
handleProcessMonitorNotification m@(DP.ProcessMonitorNotification _ pid0 reason) ps ch serverId =
do let recv m@(DP.ProcessMonitorNotification _ _ _) =
do
logProcess ps WARNING $ "Received a process monitor notification " ++ show m
liftIO $
writeChannel ch (ProcessMonitorNotificationMessage m)
return m
recv m
when (pid0 == serverId) $
case reason of
DP.DiedNormal -> processTimeServerTerminated ps serverId
DP.DiedException _ -> processTimeServerTerminated ps serverId
DP.DiedNodeDown -> processTimeServerTerminated ps serverId
_ -> return ()
when (dioProcessReconnectingEnabled ps && (reason == DP.DiedDisconnect)) $
do liftIO $
threadDelay (dioProcessReconnectingDelay ps)
let pred m@(DP.ProcessMonitorNotification _ _ reason) = reason == DP.DiedDisconnect
loop :: [DP.ProcessId] -> DP.Process [DP.ProcessId]
loop acc =
do y <- DP.receiveTimeout 0 [DP.matchIf pred recv]
case y of
Nothing -> return $ reverse acc
Just m@(DP.ProcessMonitorNotification _ pid _) -> loop (pid : acc)
pids <- loop [pid0]
logProcess ps NOTICE "Begin reconnecting..."
forM_ pids $ \pid ->
do
logProcess ps NOTICE $ "Direct reconnecting to " ++ show pid
DP.reconnect pid
inboxId <- DP.getSelfPid
DP.spawnLocal $
let action =
do liftIO $
threadDelay (dioProcessMonitoringDelay ps)
logProcess ps NOTICE $ "Proceed to the re-monitoring"
DP.send inboxId (ReMonitorProcessMessage pids)
in C.catch action (handleException ps)
return ()
handleReMonitorProcessMessage :: [DP.ProcessId] -> DIOParams -> Channel LogicalProcessMessage -> DP.Process ()
handleReMonitorProcessMessage pids ps ch =
forM_ pids $ \pid ->
do
logProcess ps NOTICE $ "Re-monitoring " ++ show pid
DP.monitor pid
logProcess ps NOTICE $ "Writing to the channel about reconnecting to " ++ show pid
liftIO $
writeChannel ch (ReconnectProcessMessage pid)
monitorProcessDIO :: DP.ProcessId -> DIO ()
monitorProcessDIO pid =
do ps <- dioParams
if dioProcessMonitoringEnabled ps
then do inbox <- messageInboxId
liftDistributedUnsafe $
DP.send inbox $
MonitorProcessMessage pid
else liftDistributedUnsafe $
logProcess ps WARNING "Ignored the process monitoring as it was disabled in the DIO computation parameters"
processTimeServerMessage :: LogicalProcessMessage -> DP.ProcessId -> IORef UTCTime -> DP.Process ()
processTimeServerMessage ComputeLocalTimeMessage serverId r =
do liftIO $
getCurrentTime >>= writeIORef r
inboxId <- DP.getSelfPid
DP.send serverId (ComputeLocalTimeAcknowledgementMessage inboxId)
processTimeServerMessage (GlobalTimeMessage _) serverId r =
liftIO $
getCurrentTime >>= writeIORef r
processTimeServerMessage _ serverId r =
return ()
validateTimeServer :: DIOParams -> DP.ProcessId -> IORef UTCTime -> DP.Process ()
validateTimeServer ps inboxId r =
do
logProcess ps NOTICE "Validating the time server"
case dioStrategy ps of
WaitIndefinitelyForTimeServer ->
return ()
TerminateDueToTimeServerTimeout timeout ->
do utc0 <- liftIO $ readIORef r
utc <- liftIO getCurrentTime
let dt = fromRational $ toRational (diffUTCTime utc utc0)
when (secondsToMicroseconds dt > timeout) $
do
logProcess ps WARNING "Terminating due to the exceeded time server timeout"
DP.send inboxId TerminateInboxProcessMessage
processTimeServerTerminated :: DIOParams -> DP.ProcessId -> DP.Process ()
processTimeServerTerminated ps inboxId =
do
logProcess ps NOTICE "Terminating due to sudden termination of the time server"
DP.send inboxId TerminateInboxProcessMessage
secondsToMicroseconds :: Double -> Int
secondsToMicroseconds x = fromInteger $ toInteger $ round (1000000 * x)
sendMessageDIO :: DP.ProcessId -> Message -> DIO ()
sendMessageDIO pid m =
do ps <- dioParams
if dioProcessMonitoringEnabled ps
then do inbox <- messageInboxId
liftDistributedUnsafe $
DP.send inbox (SendQueueMessage pid m)
else liftDistributedUnsafe $
DP.send pid (QueueMessage m)
sendMessagesDIO :: DP.ProcessId -> [Message] -> DIO ()
sendMessagesDIO pid ms =
do ps <- dioParams
if dioProcessMonitoringEnabled ps
then do inbox <- messageInboxId
liftDistributedUnsafe $
DP.send inbox (SendQueueMessageBulk pid ms)
else do forM_ ms $ \m ->
liftDistributedUnsafe $
DP.send pid (QueueMessage m)
sendAcknowledgementMessageDIO :: DP.ProcessId -> AcknowledgementMessage -> DIO ()
sendAcknowledgementMessageDIO pid m =
do ps <- dioParams
if dioProcessMonitoringEnabled ps
then do inbox <- messageInboxId
liftDistributedUnsafe $
DP.send inbox (SendAcknowledgementQueueMessage pid m)
else liftDistributedUnsafe $
DP.send pid (AcknowledgementQueueMessage m)
sendAcknowledgementMessagesDIO :: DP.ProcessId -> [AcknowledgementMessage] -> DIO ()
sendAcknowledgementMessagesDIO pid ms =
do ps <- dioParams
if dioProcessMonitoringEnabled ps
then do inbox <- messageInboxId
liftDistributedUnsafe $
DP.send inbox (SendAcknowledgementQueueMessageBulk pid ms)
else do forM_ ms $ \m ->
liftDistributedUnsafe $
DP.send pid (AcknowledgementQueueMessage m)
sendLocalTimeDIO :: DP.ProcessId -> DP.ProcessId -> Double -> DIO ()
sendLocalTimeDIO receiver sender t =
do ps <- dioParams
if dioProcessMonitoringEnabled ps
then do inbox <- messageInboxId
liftDistributedUnsafe $
DP.send inbox (SendLocalTimeMessage receiver sender t)
else liftDistributedUnsafe $
DP.send receiver (LocalTimeMessage sender t)
sendRequestGlobalTimeDIO :: DP.ProcessId -> DP.ProcessId -> DIO ()
sendRequestGlobalTimeDIO receiver sender =
do ps <- dioParams
if dioProcessMonitoringEnabled ps
then do inbox <- messageInboxId
liftDistributedUnsafe $
DP.send inbox (SendRequestGlobalTimeMessage receiver sender)
else liftDistributedUnsafe $
DP.send receiver (RequestGlobalTimeMessage sender)
logDIO :: Priority -> String -> DIO ()
logDIO p message =
do ps <- dioParams
when (dioLoggingPriority ps <= p) $
liftDistributedUnsafe $
DP.say $
embracePriority p ++ " " ++ message
logProcess :: DIOParams -> Priority -> String -> DP.Process ()
logProcess ps p message =
when (dioLoggingPriority ps <= p) $
DP.say $
embracePriority p ++ " " ++ message