module Simulation.Aivika.Distributed.Optimistic.Internal.DIO
liftDistributedUnsafe) where
import Data.Typeable
import Data.Binary
import Data.IORef
import GHC.Generics
import Control.Applicative
import Control.Monad
import Control.Monad.Trans
import Control.Exception (throw)
import Control.Monad.Catch as C
import qualified Control.Distributed.Process as DP
import Control.Concurrent
import Control.Concurrent.STM
import System.Timeout
import Simulation.Aivika.Trans.Exception
import Simulation.Aivika.Trans.Internal.Types
import Simulation.Aivika.Distributed.Optimistic.Internal.Channel
import Simulation.Aivika.Distributed.Optimistic.Internal.Message
import Simulation.Aivika.Distributed.Optimistic.Internal.TimeServer
import Simulation.Aivika.Distributed.Optimistic.Internal.Priority
import Simulation.Aivika.Distributed.Optimistic.Internal.KeepAliveManager
data DIOParams =
DIOParams { dioLoggingPriority :: Priority,
dioUndoableLogSizeThreshold :: Int,
dioOutputMessageQueueSizeThreshold :: Int,
dioTransientMessageQueueSizeThreshold :: Int,
dioSyncTimeout :: Int,
dioAllowPrematureIO :: Bool,
dioAllowSkippingOutdatedMessage :: Bool,
dioProcessMonitoringEnabled :: Bool,
dioProcessMonitoringDelay :: Int,
dioProcessReconnectingEnabled :: Bool,
dioProcessReconnectingDelay :: Int,
dioKeepAliveInterval :: Int,
dioTimeServerAcknowledgmentTimeout :: Int
} deriving (Eq, Ord, Show, Typeable, Generic)
instance Binary DIOParams
newtype DIO a = DIO { unDIO :: DIOContext -> DP.Process a
data DIOContext =
DIOContext { dioChannel :: Channel LocalProcessMessage,
dioInboxId :: DP.ProcessId,
dioTimeServerId :: DP.ProcessId,
dioParams0 :: DIOParams,
dioRegisteredInTimeServer :: TVar Bool,
dioUnregisteredFromTimeServer :: TVar Bool,
dioTimeServerTerminating :: TVar Bool
instance Monad DIO where
return = DIO . const . return
(DIO m) >>= k = DIO $ \ps ->
m ps >>= \a ->
let m' = unDIO (k a) in m' ps
instance Applicative DIO where
pure = return
(<*>) = ap
instance Functor DIO where
fmap f (DIO m) = DIO $ fmap f . m
instance MonadException DIO where
catchComp (DIO m) h = DIO $ \ps ->
C.catch (m ps) (\e -> unDIO (h e) ps)
finallyComp (DIO m1) (DIO m2) = DIO $ \ps ->
C.finally (m1 ps) (m2 ps)
throwComp e = DIO $ \ps ->
throw e
invokeDIO :: DIOContext -> DIO a -> DP.Process a
invokeDIO ps (DIO m) = m ps
liftDistributedUnsafe :: DP.Process a -> DIO a
liftDistributedUnsafe = DIO . const
defaultDIOParams :: DIOParams
defaultDIOParams =
DIOParams { dioLoggingPriority = WARNING,
dioUndoableLogSizeThreshold = 1000000,
dioOutputMessageQueueSizeThreshold = 10000,
dioTransientMessageQueueSizeThreshold = 10000,
dioSyncTimeout = 60000000,
dioAllowPrematureIO = False,
dioAllowSkippingOutdatedMessage = True,
dioProcessMonitoringEnabled = False,
dioProcessMonitoringDelay = 5000000,
dioProcessReconnectingEnabled = False,
dioProcessReconnectingDelay = 5000000,
dioKeepAliveInterval = 5000000,
dioTimeServerAcknowledgmentTimeout = 5000000
dioContext :: DIO DIOContext
dioContext = DIO return
dioParams :: DIO DIOParams
dioParams = DIO $ return . dioParams0
messageChannel :: DIO (Channel LocalProcessMessage)
messageChannel = DIO $ return . dioChannel
messageInboxId :: DIO DP.ProcessId
messageInboxId = DIO $ return . dioInboxId
timeServerId :: DIO DP.ProcessId
timeServerId = DIO $ return . dioTimeServerId
terminateDIO :: DIO ()
terminateDIO =
DIO $ \ctx ->
do let ps = dioParams0 ctx
logProcess ps INFO "Terminating the simulation..."
sender <- invokeDIO ctx messageInboxId
receiver <- invokeDIO ctx timeServerId
let inbox = sender
if dioProcessMonitoringEnabled ps
then DP.send inbox (SendTerminateTimeServerMessage receiver sender)
else DP.send receiver (TerminateTimeServerMessage sender)
liftIO $
timeout (dioTimeServerAcknowledgmentTimeout ps) $
atomically $
do f <- readTVar (dioTimeServerTerminating ctx)
unless f retry
DP.send inbox TerminateInboxProcessMessage
return ()
registerDIO :: DIO ()
registerDIO =
DIO $ \ctx ->
do let ps = dioParams0 ctx
logProcess ps INFO "Registering the simulation process..."
sender <- invokeDIO ctx messageInboxId
receiver <- invokeDIO ctx timeServerId
let inbox = sender
if dioProcessMonitoringEnabled ps
then DP.send inbox (SendRegisterLocalProcessMessage receiver sender)
else DP.send receiver (RegisterLocalProcessMessage sender)
liftIO $
timeout (dioTimeServerAcknowledgmentTimeout ps) $
atomically $
do f <- readTVar (dioRegisteredInTimeServer ctx)
unless f retry
return ()
unregisterDIO :: DIO ()
unregisterDIO =
DIO $ \ctx ->
do let ps = dioParams0 ctx
logProcess ps INFO "Unregistering the simulation process..."
sender <- invokeDIO ctx messageInboxId
receiver <- invokeDIO ctx timeServerId
let inbox = sender
if dioProcessMonitoringEnabled ps
then DP.send inbox (SendUnregisterLocalProcessMessage receiver sender)
else DP.send receiver (UnregisterLocalProcessMessage sender)
liftIO $
timeout (dioTimeServerAcknowledgmentTimeout ps) $
atomically $
do f <- readTVar (dioUnregisteredFromTimeServer ctx)
unless f retry
DP.send inbox TerminateInboxProcessMessage
return ()
data InternalLocalProcessMessage = InternalLocalProcessMessage LocalProcessMessage
| InternalProcessMonitorNotification DP.ProcessMonitorNotification
| InternalInboxProcessMessage InboxProcessMessage
| InternalKeepAliveMessage KeepAliveMessage
handleException :: DIOParams -> SomeException -> DP.Process ()
handleException ps e =
logProcess ps ERROR $ "Exception occured: " ++ show e
C.throwM e
runDIO :: DIO a -> DIOParams -> DP.ProcessId -> DP.Process (DP.ProcessId, DP.Process a)
runDIO m ps serverId =
do ch <- liftIO newChannel
let keepAliveParams =
KeepAliveParams { keepAliveLoggingPriority = dioLoggingPriority ps,
keepAliveInterval = dioKeepAliveInterval ps }
keepAliveManager <- liftIO $ newKeepAliveManager keepAliveParams
terminated <- liftIO $ newIORef False
registeredInTimeServer <- liftIO $ newTVarIO False
unregisteredFromTimeServer <- liftIO $ newTVarIO False
timeServerTerminating <- liftIO $ newTVarIO False
let loop =
forever $
do let f1 :: LocalProcessMessage -> DP.Process InternalLocalProcessMessage
f1 x = return (InternalLocalProcessMessage x)
f2 :: DP.ProcessMonitorNotification -> DP.Process InternalLocalProcessMessage
f2 x = return (InternalProcessMonitorNotification x)
f3 :: InboxProcessMessage -> DP.Process InternalLocalProcessMessage
f3 x = return (InternalInboxProcessMessage x)
f4 :: KeepAliveMessage -> DP.Process InternalLocalProcessMessage
f4 x = return (InternalKeepAliveMessage x)
x <- fmap Just $ DP.receiveWait [DP.match f1, DP.match f2, DP.match f3, DP.match f4]
case x of
Nothing -> return ()
Just (InternalLocalProcessMessage m) ->
liftIO $
writeChannel ch m
Just (InternalProcessMonitorNotification m@(DP.ProcessMonitorNotification _ _ _)) ->
handleProcessMonitorNotification m ps ch
Just (InternalInboxProcessMessage m) ->
case m of
SendQueueMessage pid m ->
DP.send pid (QueueMessage m)
SendQueueMessageBulk pid ms ->
forM_ ms $ \m ->
DP.send pid (QueueMessage m)
SendAcknowledgmentQueueMessage pid m ->
DP.send pid (AcknowledgmentQueueMessage m)
SendAcknowledgmentQueueMessageBulk pid ms ->
forM_ ms $ \m ->
DP.send pid (AcknowledgmentQueueMessage m)
SendLocalTimeMessage receiver sender t ->
DP.send receiver (LocalTimeMessage sender t)
SendRequestGlobalTimeMessage receiver sender ->
DP.send receiver (RequestGlobalTimeMessage sender)
SendRegisterLocalProcessMessage receiver sender ->
DP.send receiver (RegisterLocalProcessMessage sender)
SendUnregisterLocalProcessMessage receiver sender ->
DP.send receiver (UnregisterLocalProcessMessage sender)
SendTerminateTimeServerMessage receiver sender ->
DP.send receiver (TerminateTimeServerMessage sender)
MonitorProcessMessage pid ->
logProcess ps INFO $ "Monitoring process " ++ show pid
DP.monitor pid
liftIO $
addKeepAliveReceiver keepAliveManager pid
return ()
ReMonitorProcessMessage pids ->
handleReMonitorProcessMessage pids ps ch
TrySendKeepAliveMessage ->
trySendKeepAlive keepAliveManager
RegisterLocalProcessAcknowledgmentMessage pid ->
logProcess ps INFO "Registered the local process in the time server"
liftIO $
atomically $
writeTVar registeredInTimeServer True
UnregisterLocalProcessAcknowledgmentMessage pid ->
logProcess ps INFO "Unregistered the local process from the time server"
liftIO $
atomically $
writeTVar unregisteredFromTimeServer True
TerminateTimeServerAcknowledgmentMessage pid ->
logProcess ps INFO "Started terminating the time server"
liftIO $
atomically $
writeTVar timeServerTerminating True
TerminateInboxProcessMessage ->
logProcess ps INFO "Terminating the inbox and keep-alive processes..."
liftIO $
atomicWriteIORef terminated True
Just (InternalKeepAliveMessage m) ->
logProcess ps DEBUG $ "Received " ++ show m
liftIO $
writeChannel ch (KeepAliveLocalProcessMessage m)
inboxId <-
DP.spawnLocal $
C.catch loop (handleException ps)
DP.spawnLocal $
let loop =
do f <- liftIO $ readIORef terminated
unless f $
do liftIO $
threadDelay (dioKeepAliveInterval ps)
DP.send inboxId TrySendKeepAliveMessage
in C.catch loop (handleException ps)
let simulation =
unDIO m DIOContext { dioChannel = ch,
dioInboxId = inboxId,
dioTimeServerId = serverId,
dioParams0 = ps,
dioRegisteredInTimeServer = registeredInTimeServer,
dioUnregisteredFromTimeServer = unregisteredFromTimeServer,
dioTimeServerTerminating = timeServerTerminating }
return (inboxId, simulation)
handleProcessMonitorNotification :: DP.ProcessMonitorNotification -> DIOParams -> Channel LocalProcessMessage -> DP.Process ()
handleProcessMonitorNotification m@(DP.ProcessMonitorNotification _ pid0 reason) ps ch =
do let recv m@(DP.ProcessMonitorNotification _ _ _) =
logProcess ps WARNING $ "Received a process monitor notification " ++ show m
liftIO $
writeChannel ch (ProcessMonitorNotificationMessage m)
return m
recv m
when (dioProcessReconnectingEnabled ps && (reason == DP.DiedDisconnect)) $
do liftIO $
threadDelay (dioProcessReconnectingDelay ps)
let pred m@(DP.ProcessMonitorNotification _ _ reason) = reason == DP.DiedDisconnect
loop :: [DP.ProcessId] -> DP.Process [DP.ProcessId]
loop acc =
do y <- DP.receiveTimeout 0 [DP.matchIf pred recv]
case y of
Nothing -> return $ reverse acc
Just m@(DP.ProcessMonitorNotification _ pid _) -> loop (pid : acc)
pids <- loop [pid0]
logProcess ps NOTICE "Begin reconnecting..."
forM_ pids $ \pid ->
logProcess ps NOTICE $ "Direct reconnecting to " ++ show pid
DP.reconnect pid
inboxId <- DP.getSelfPid
DP.spawnLocal $
let action =
do liftIO $
threadDelay (dioProcessMonitoringDelay ps)
logProcess ps NOTICE $ "Proceed to the re-monitoring"
DP.send inboxId (ReMonitorProcessMessage pids)
in C.catch action (handleException ps)
return ()
handleReMonitorProcessMessage :: [DP.ProcessId] -> DIOParams -> Channel LocalProcessMessage -> DP.Process ()
handleReMonitorProcessMessage pids ps ch =
forM_ pids $ \pid ->
logProcess ps NOTICE $ "Re-monitoring " ++ show pid
DP.monitor pid
logProcess ps NOTICE $ "Writing to the channel about reconnecting to " ++ show pid
liftIO $
writeChannel ch (ReconnectProcessMessage pid)
monitorProcessDIO :: DP.ProcessId -> DIO ()
monitorProcessDIO pid =
do ps <- dioParams
if dioProcessMonitoringEnabled ps
then do inbox <- messageInboxId
liftDistributedUnsafe $
DP.send inbox $
MonitorProcessMessage pid
else liftDistributedUnsafe $
logProcess ps WARNING "Ignored the process monitoring as it was disabled in the DIO computation parameters"
sendMessageDIO :: DP.ProcessId -> Message -> DIO ()
sendMessageDIO pid m =
do ps <- dioParams
if dioProcessMonitoringEnabled ps
then do inbox <- messageInboxId
liftDistributedUnsafe $
DP.send inbox (SendQueueMessage pid m)
else liftDistributedUnsafe $
DP.send pid (QueueMessage m)
sendMessagesDIO :: DP.ProcessId -> [Message] -> DIO ()
sendMessagesDIO pid ms =
do ps <- dioParams
if dioProcessMonitoringEnabled ps
then do inbox <- messageInboxId
liftDistributedUnsafe $
DP.send inbox (SendQueueMessageBulk pid ms)
else do forM_ ms $ \m ->
liftDistributedUnsafe $
DP.send pid (QueueMessage m)
sendAcknowledgmentMessageDIO :: DP.ProcessId -> AcknowledgmentMessage -> DIO ()
sendAcknowledgmentMessageDIO pid m =
do ps <- dioParams
if dioProcessMonitoringEnabled ps
then do inbox <- messageInboxId
liftDistributedUnsafe $
DP.send inbox (SendAcknowledgmentQueueMessage pid m)
else liftDistributedUnsafe $
DP.send pid (AcknowledgmentQueueMessage m)
sendAcknowledgmentMessagesDIO :: DP.ProcessId -> [AcknowledgmentMessage] -> DIO ()
sendAcknowledgmentMessagesDIO pid ms =
do ps <- dioParams
if dioProcessMonitoringEnabled ps
then do inbox <- messageInboxId
liftDistributedUnsafe $
DP.send inbox (SendAcknowledgmentQueueMessageBulk pid ms)
else do forM_ ms $ \m ->
liftDistributedUnsafe $
DP.send pid (AcknowledgmentQueueMessage m)
sendLocalTimeDIO :: DP.ProcessId -> DP.ProcessId -> Double -> DIO ()
sendLocalTimeDIO receiver sender t =
do ps <- dioParams
if dioProcessMonitoringEnabled ps
then do inbox <- messageInboxId
liftDistributedUnsafe $
DP.send inbox (SendLocalTimeMessage receiver sender t)
else liftDistributedUnsafe $
DP.send receiver (LocalTimeMessage sender t)
sendRequestGlobalTimeDIO :: DP.ProcessId -> DP.ProcessId -> DIO ()
sendRequestGlobalTimeDIO receiver sender =
do ps <- dioParams
if dioProcessMonitoringEnabled ps
then do inbox <- messageInboxId
liftDistributedUnsafe $
DP.send inbox (SendRequestGlobalTimeMessage receiver sender)
else liftDistributedUnsafe $
DP.send receiver (RequestGlobalTimeMessage sender)
logDIO :: Priority -> String -> DIO ()
logDIO p message =
do ps <- dioParams
when (dioLoggingPriority ps <= p) $
liftDistributedUnsafe $
DP.say $
embracePriority p ++ " " ++ message
logProcess :: DIOParams -> Priority -> String -> DP.Process ()
logProcess ps p message =
when (dioLoggingPriority ps <= p) $
DP.say $
embracePriority p ++ " " ++ message