module Simulation.Aivika.Distributed.Optimistic.Internal.DIO
(DIO(..),
DIOParams(..),
invokeDIO,
runDIO,
defaultDIOParams,
terminateDIO,
registerDIO,
unregisterDIO,
monitorProcessDIO,
dioParams,
messageChannel,
messageInboxId,
timeServerId,
sendMessageDIO,
sendMessagesDIO,
sendAcknowledgmentMessageDIO,
sendAcknowledgmentMessagesDIO,
sendLocalTimeDIO,
sendRequestGlobalTimeDIO,
logDIO,
liftDistributedUnsafe) where
import Data.Typeable
import Data.Binary
import Data.IORef
import GHC.Generics
import Control.Applicative
import Control.Monad
import Control.Monad.Trans
import Control.Exception (throw)
import Control.Monad.Catch as C
import qualified Control.Distributed.Process as DP
import Control.Concurrent
import Control.Concurrent.STM
import System.Timeout
import Simulation.Aivika.Trans.Exception
import Simulation.Aivika.Trans.Internal.Types
import Simulation.Aivika.Distributed.Optimistic.Internal.Channel
import Simulation.Aivika.Distributed.Optimistic.Internal.Message
import Simulation.Aivika.Distributed.Optimistic.Internal.TimeServer
import Simulation.Aivika.Distributed.Optimistic.Internal.Priority
import Simulation.Aivika.Distributed.Optimistic.Internal.KeepAliveManager
data DIOParams =
DIOParams { dioLoggingPriority :: Priority,
dioUndoableLogSizeThreshold :: Int,
dioOutputMessageQueueSizeThreshold :: Int,
dioTransientMessageQueueSizeThreshold :: Int,
dioSyncTimeout :: Int,
dioAllowPrematureIO :: Bool,
dioAllowSkippingOutdatedMessage :: Bool,
dioProcessMonitoringEnabled :: Bool,
dioProcessMonitoringDelay :: Int,
dioProcessReconnectingEnabled :: Bool,
dioProcessReconnectingDelay :: Int,
dioKeepAliveInterval :: Int,
dioTimeServerAcknowledgmentTimeout :: Int
} deriving (Eq, Ord, Show, Typeable, Generic)
instance Binary DIOParams
newtype DIO a = DIO { unDIO :: DIOContext -> DP.Process a
}
data DIOContext =
DIOContext { dioChannel :: Channel LocalProcessMessage,
dioInboxId :: DP.ProcessId,
dioTimeServerId :: DP.ProcessId,
dioParams0 :: DIOParams,
dioRegisteredInTimeServer :: TVar Bool,
dioUnregisteredFromTimeServer :: TVar Bool,
dioTimeServerTerminating :: TVar Bool
}
instance Monad DIO where
return = DIO . const . return
(DIO m) >>= k = DIO $ \ps ->
m ps >>= \a ->
let m' = unDIO (k a) in m' ps
instance Applicative DIO where
pure = return
(<*>) = ap
instance Functor DIO where
fmap f (DIO m) = DIO $ fmap f . m
instance MonadException DIO where
catchComp (DIO m) h = DIO $ \ps ->
C.catch (m ps) (\e -> unDIO (h e) ps)
finallyComp (DIO m1) (DIO m2) = DIO $ \ps ->
C.finally (m1 ps) (m2 ps)
throwComp e = DIO $ \ps ->
throw e
invokeDIO :: DIOContext -> DIO a -> DP.Process a
invokeDIO ps (DIO m) = m ps
liftDistributedUnsafe :: DP.Process a -> DIO a
liftDistributedUnsafe = DIO . const
defaultDIOParams :: DIOParams
defaultDIOParams =
DIOParams { dioLoggingPriority = WARNING,
dioUndoableLogSizeThreshold = 1000000,
dioOutputMessageQueueSizeThreshold = 10000,
dioTransientMessageQueueSizeThreshold = 10000,
dioSyncTimeout = 60000000,
dioAllowPrematureIO = False,
dioAllowSkippingOutdatedMessage = True,
dioProcessMonitoringEnabled = False,
dioProcessMonitoringDelay = 5000000,
dioProcessReconnectingEnabled = False,
dioProcessReconnectingDelay = 5000000,
dioKeepAliveInterval = 5000000,
dioTimeServerAcknowledgmentTimeout = 5000000
}
dioContext :: DIO DIOContext
dioContext = DIO return
dioParams :: DIO DIOParams
dioParams = DIO $ return . dioParams0
messageChannel :: DIO (Channel LocalProcessMessage)
messageChannel = DIO $ return . dioChannel
messageInboxId :: DIO DP.ProcessId
messageInboxId = DIO $ return . dioInboxId
timeServerId :: DIO DP.ProcessId
timeServerId = DIO $ return . dioTimeServerId
terminateDIO :: DIO ()
terminateDIO =
DIO $ \ctx ->
do let ps = dioParams0 ctx
logProcess ps INFO "Terminating the simulation..."
sender <- invokeDIO ctx messageInboxId
receiver <- invokeDIO ctx timeServerId
let inbox = sender
if dioProcessMonitoringEnabled ps
then DP.send inbox (SendTerminateTimeServerMessage receiver sender)
else DP.send receiver (TerminateTimeServerMessage sender)
liftIO $
timeout (dioTimeServerAcknowledgmentTimeout ps) $
atomically $
do f <- readTVar (dioTimeServerTerminating ctx)
unless f retry
DP.send inbox TerminateInboxProcessMessage
return ()
registerDIO :: DIO ()
registerDIO =
DIO $ \ctx ->
do let ps = dioParams0 ctx
logProcess ps INFO "Registering the simulation process..."
sender <- invokeDIO ctx messageInboxId
receiver <- invokeDIO ctx timeServerId
let inbox = sender
if dioProcessMonitoringEnabled ps
then DP.send inbox (SendRegisterLocalProcessMessage receiver sender)
else DP.send receiver (RegisterLocalProcessMessage sender)
liftIO $
timeout (dioTimeServerAcknowledgmentTimeout ps) $
atomically $
do f <- readTVar (dioRegisteredInTimeServer ctx)
unless f retry
return ()
unregisterDIO :: DIO ()
unregisterDIO =
DIO $ \ctx ->
do let ps = dioParams0 ctx
logProcess ps INFO "Unregistering the simulation process..."
sender <- invokeDIO ctx messageInboxId
receiver <- invokeDIO ctx timeServerId
let inbox = sender
if dioProcessMonitoringEnabled ps
then DP.send inbox (SendUnregisterLocalProcessMessage receiver sender)
else DP.send receiver (UnregisterLocalProcessMessage sender)
liftIO $
timeout (dioTimeServerAcknowledgmentTimeout ps) $
atomically $
do f <- readTVar (dioUnregisteredFromTimeServer ctx)
unless f retry
DP.send inbox TerminateInboxProcessMessage
return ()
data InternalLocalProcessMessage = InternalLocalProcessMessage LocalProcessMessage
| InternalProcessMonitorNotification DP.ProcessMonitorNotification
| InternalInboxProcessMessage InboxProcessMessage
| InternalKeepAliveMessage KeepAliveMessage
handleException :: DIOParams -> SomeException -> DP.Process ()
handleException ps e =
do
logProcess ps ERROR $ "Exception occured: " ++ show e
C.throwM e
runDIO :: DIO a -> DIOParams -> DP.ProcessId -> DP.Process (DP.ProcessId, DP.Process a)
runDIO m ps serverId =
do ch <- liftIO newChannel
let keepAliveParams =
KeepAliveParams { keepAliveLoggingPriority = dioLoggingPriority ps,
keepAliveInterval = dioKeepAliveInterval ps }
keepAliveManager <- liftIO $ newKeepAliveManager keepAliveParams
terminated <- liftIO $ newIORef False
registeredInTimeServer <- liftIO $ newTVarIO False
unregisteredFromTimeServer <- liftIO $ newTVarIO False
timeServerTerminating <- liftIO $ newTVarIO False
let loop =
forever $
do let f1 :: LocalProcessMessage -> DP.Process InternalLocalProcessMessage
f1 x = return (InternalLocalProcessMessage x)
f2 :: DP.ProcessMonitorNotification -> DP.Process InternalLocalProcessMessage
f2 x = return (InternalProcessMonitorNotification x)
f3 :: InboxProcessMessage -> DP.Process InternalLocalProcessMessage
f3 x = return (InternalInboxProcessMessage x)
f4 :: KeepAliveMessage -> DP.Process InternalLocalProcessMessage
f4 x = return (InternalKeepAliveMessage x)
x <- fmap Just $ DP.receiveWait [DP.match f1, DP.match f2, DP.match f3, DP.match f4]
case x of
Nothing -> return ()
Just (InternalLocalProcessMessage m) ->
liftIO $
writeChannel ch m
Just (InternalProcessMonitorNotification m@(DP.ProcessMonitorNotification _ _ _)) ->
handleProcessMonitorNotification m ps ch
Just (InternalInboxProcessMessage m) ->
case m of
SendQueueMessage pid m ->
DP.send pid (QueueMessage m)
SendQueueMessageBulk pid ms ->
forM_ ms $ \m ->
DP.send pid (QueueMessage m)
SendAcknowledgmentQueueMessage pid m ->
DP.send pid (AcknowledgmentQueueMessage m)
SendAcknowledgmentQueueMessageBulk pid ms ->
forM_ ms $ \m ->
DP.send pid (AcknowledgmentQueueMessage m)
SendLocalTimeMessage receiver sender t ->
DP.send receiver (LocalTimeMessage sender t)
SendRequestGlobalTimeMessage receiver sender ->
DP.send receiver (RequestGlobalTimeMessage sender)
SendRegisterLocalProcessMessage receiver sender ->
DP.send receiver (RegisterLocalProcessMessage sender)
SendUnregisterLocalProcessMessage receiver sender ->
DP.send receiver (UnregisterLocalProcessMessage sender)
SendTerminateTimeServerMessage receiver sender ->
DP.send receiver (TerminateTimeServerMessage sender)
MonitorProcessMessage pid ->
do
logProcess ps INFO $ "Monitoring process " ++ show pid
DP.monitor pid
liftIO $
addKeepAliveReceiver keepAliveManager pid
return ()
ReMonitorProcessMessage pids ->
handleReMonitorProcessMessage pids ps ch
TrySendKeepAliveMessage ->
trySendKeepAlive keepAliveManager
RegisterLocalProcessAcknowledgmentMessage pid ->
do
logProcess ps INFO "Registered the local process in the time server"
liftIO $
atomically $
writeTVar registeredInTimeServer True
UnregisterLocalProcessAcknowledgmentMessage pid ->
do
logProcess ps INFO "Unregistered the local process from the time server"
liftIO $
atomically $
writeTVar unregisteredFromTimeServer True
TerminateTimeServerAcknowledgmentMessage pid ->
do
logProcess ps INFO "Started terminating the time server"
liftIO $
atomically $
writeTVar timeServerTerminating True
TerminateInboxProcessMessage ->
do
logProcess ps INFO "Terminating the inbox and keep-alive processes..."
liftIO $
atomicWriteIORef terminated True
DP.terminate
Just (InternalKeepAliveMessage m) ->
do
logProcess ps DEBUG $ "Received " ++ show m
liftIO $
writeChannel ch (KeepAliveLocalProcessMessage m)
inboxId <-
DP.spawnLocal $
C.catch loop (handleException ps)
DP.spawnLocal $
let loop =
do f <- liftIO $ readIORef terminated
unless f $
do liftIO $
threadDelay (dioKeepAliveInterval ps)
DP.send inboxId TrySendKeepAliveMessage
loop
in C.catch loop (handleException ps)
let simulation =
unDIO m DIOContext { dioChannel = ch,
dioInboxId = inboxId,
dioTimeServerId = serverId,
dioParams0 = ps,
dioRegisteredInTimeServer = registeredInTimeServer,
dioUnregisteredFromTimeServer = unregisteredFromTimeServer,
dioTimeServerTerminating = timeServerTerminating }
return (inboxId, simulation)
handleProcessMonitorNotification :: DP.ProcessMonitorNotification -> DIOParams -> Channel LocalProcessMessage -> DP.Process ()
handleProcessMonitorNotification m@(DP.ProcessMonitorNotification _ pid0 reason) ps ch =
do let recv m@(DP.ProcessMonitorNotification _ _ _) =
do
logProcess ps WARNING $ "Received a process monitor notification " ++ show m
liftIO $
writeChannel ch (ProcessMonitorNotificationMessage m)
return m
recv m
when (dioProcessReconnectingEnabled ps && (reason == DP.DiedDisconnect)) $
do liftIO $
threadDelay (dioProcessReconnectingDelay ps)
let pred m@(DP.ProcessMonitorNotification _ _ reason) = reason == DP.DiedDisconnect
loop :: [DP.ProcessId] -> DP.Process [DP.ProcessId]
loop acc =
do y <- DP.receiveTimeout 0 [DP.matchIf pred recv]
case y of
Nothing -> return $ reverse acc
Just m@(DP.ProcessMonitorNotification _ pid _) -> loop (pid : acc)
pids <- loop [pid0]
logProcess ps NOTICE "Begin reconnecting..."
forM_ pids $ \pid ->
do
logProcess ps NOTICE $ "Direct reconnecting to " ++ show pid
DP.reconnect pid
inboxId <- DP.getSelfPid
DP.spawnLocal $
let action =
do liftIO $
threadDelay (dioProcessMonitoringDelay ps)
logProcess ps NOTICE $ "Proceed to the re-monitoring"
DP.send inboxId (ReMonitorProcessMessage pids)
in C.catch action (handleException ps)
return ()
handleReMonitorProcessMessage :: [DP.ProcessId] -> DIOParams -> Channel LocalProcessMessage -> DP.Process ()
handleReMonitorProcessMessage pids ps ch =
forM_ pids $ \pid ->
do
logProcess ps NOTICE $ "Re-monitoring " ++ show pid
DP.monitor pid
logProcess ps NOTICE $ "Writing to the channel about reconnecting to " ++ show pid
liftIO $
writeChannel ch (ReconnectProcessMessage pid)
monitorProcessDIO :: DP.ProcessId -> DIO ()
monitorProcessDIO pid =
do ps <- dioParams
if dioProcessMonitoringEnabled ps
then do inbox <- messageInboxId
liftDistributedUnsafe $
DP.send inbox $
MonitorProcessMessage pid
else liftDistributedUnsafe $
logProcess ps WARNING "Ignored the process monitoring as it was disabled in the DIO computation parameters"
sendMessageDIO :: DP.ProcessId -> Message -> DIO ()
sendMessageDIO pid m =
do ps <- dioParams
if dioProcessMonitoringEnabled ps
then do inbox <- messageInboxId
liftDistributedUnsafe $
DP.send inbox (SendQueueMessage pid m)
else liftDistributedUnsafe $
DP.send pid (QueueMessage m)
sendMessagesDIO :: DP.ProcessId -> [Message] -> DIO ()
sendMessagesDIO pid ms =
do ps <- dioParams
if dioProcessMonitoringEnabled ps
then do inbox <- messageInboxId
liftDistributedUnsafe $
DP.send inbox (SendQueueMessageBulk pid ms)
else do forM_ ms $ \m ->
liftDistributedUnsafe $
DP.send pid (QueueMessage m)
sendAcknowledgmentMessageDIO :: DP.ProcessId -> AcknowledgmentMessage -> DIO ()
sendAcknowledgmentMessageDIO pid m =
do ps <- dioParams
if dioProcessMonitoringEnabled ps
then do inbox <- messageInboxId
liftDistributedUnsafe $
DP.send inbox (SendAcknowledgmentQueueMessage pid m)
else liftDistributedUnsafe $
DP.send pid (AcknowledgmentQueueMessage m)
sendAcknowledgmentMessagesDIO :: DP.ProcessId -> [AcknowledgmentMessage] -> DIO ()
sendAcknowledgmentMessagesDIO pid ms =
do ps <- dioParams
if dioProcessMonitoringEnabled ps
then do inbox <- messageInboxId
liftDistributedUnsafe $
DP.send inbox (SendAcknowledgmentQueueMessageBulk pid ms)
else do forM_ ms $ \m ->
liftDistributedUnsafe $
DP.send pid (AcknowledgmentQueueMessage m)
sendLocalTimeDIO :: DP.ProcessId -> DP.ProcessId -> Double -> DIO ()
sendLocalTimeDIO receiver sender t =
do ps <- dioParams
if dioProcessMonitoringEnabled ps
then do inbox <- messageInboxId
liftDistributedUnsafe $
DP.send inbox (SendLocalTimeMessage receiver sender t)
else liftDistributedUnsafe $
DP.send receiver (LocalTimeMessage sender t)
sendRequestGlobalTimeDIO :: DP.ProcessId -> DP.ProcessId -> DIO ()
sendRequestGlobalTimeDIO receiver sender =
do ps <- dioParams
if dioProcessMonitoringEnabled ps
then do inbox <- messageInboxId
liftDistributedUnsafe $
DP.send inbox (SendRequestGlobalTimeMessage receiver sender)
else liftDistributedUnsafe $
DP.send receiver (RequestGlobalTimeMessage sender)
logDIO :: Priority -> String -> DIO ()
logDIO p message =
do ps <- dioParams
when (dioLoggingPriority ps <= p) $
liftDistributedUnsafe $
DP.say $
embracePriority p ++ " " ++ message
logProcess :: DIOParams -> Priority -> String -> DP.Process ()
logProcess ps p message =
when (dioLoggingPriority ps <= p) $
DP.say $
embracePriority p ++ " " ++ message