{-# LANGUAGE TypeFamilies, FlexibleInstances #-} -- | -- Module : Simulation.Aivika.Distributed.Optimistic.Internal.Event -- Copyright : Copyright (c) 2015-2017, David Sorokin -- License : BSD3 -- Maintainer : David Sorokin -- Stability : experimental -- Tested with: GHC 7.10.3 -- -- The module defines an event queue. -- module Simulation.Aivika.Distributed.Optimistic.Internal.Event (queueInputMessages, queueOutputMessages, queueLog, expectEvent, processMonitorSignal, leaveSimulation) where import Data.Maybe import Data.IORef import Data.Time.Clock import System.Timeout import Control.Monad import Control.Monad.Trans import Control.Exception import qualified Control.Distributed.Process as DP import qualified Simulation.Aivika.PriorityQueue.EventQueue.Pure as PQ import Simulation.Aivika.Trans import Simulation.Aivika.Trans.Internal.Types import Simulation.Aivika.Trans.Internal.Event import Simulation.Aivika.Trans.Internal.Cont import Simulation.Aivika.Trans.Internal.Process import Simulation.Aivika.Distributed.Optimistic.Internal.Priority import Simulation.Aivika.Distributed.Optimistic.Internal.Channel import Simulation.Aivika.Distributed.Optimistic.Internal.DIO import Simulation.Aivika.Distributed.Optimistic.Internal.IO import Simulation.Aivika.Distributed.Optimistic.Internal.Message import Simulation.Aivika.Distributed.Optimistic.Internal.TimeServer import Simulation.Aivika.Distributed.Optimistic.Internal.TimeWarp import {-# SOURCE #-} Simulation.Aivika.Distributed.Optimistic.Internal.SignalHelper import {-# SOURCE #-} Simulation.Aivika.Distributed.Optimistic.Internal.InputMessageQueue import {-# SOURCE #-} Simulation.Aivika.Distributed.Optimistic.Internal.OutputMessageQueue import Simulation.Aivika.Distributed.Optimistic.Internal.TransientMessageQueue import Simulation.Aivika.Distributed.Optimistic.Internal.UndoableLog import {-# SOURCE #-} Simulation.Aivika.Distributed.Optimistic.Internal.AcknowledgementMessageQueue import {-# SOURCE #-} qualified Simulation.Aivika.Distributed.Optimistic.Internal.Ref.Strict as R import Simulation.Aivika.Distributed.Optimistic.State -- | Convert microseconds to seconds. microsecondsToSeconds :: Int -> Rational microsecondsToSeconds x = (fromInteger $ toInteger x) / 1000000 -- | An implementation of the 'EventQueueing' type class. instance EventQueueing DIO where -- | The event queue type. data EventQueue DIO = EventQueue { queueInputMessages :: InputMessageQueue, -- ^ the input message queue queueOutputMessages :: OutputMessageQueue, -- ^ the output message queue queueTransientMessages :: TransientMessageQueue, -- ^ the transient message queue queueAcknowledgementMessages :: AcknowledgementMessageQueue, -- ^ the acknowledgement message queue queueLog :: UndoableLog, -- ^ the undoable log of operations queuePQ :: R.Ref (PQ.PriorityQueue (Point DIO -> DIO ())), -- ^ the underlying priority queue queueBusy :: IORef Bool, -- ^ whether the queue is currently processing events queueTime :: IORef Double, -- ^ the actual time of the event queue queueGlobalTime :: IORef Double, -- ^ the global time queueInFind :: IORef Bool, -- ^ whether the queue is in find mode queueProcessMonitorNotificationSource :: SignalSource DIO DP.ProcessMonitorNotification, -- ^ the source of process monitor notifications queueIsLeaving :: IORef Bool -- ^ whether the logical process tries to leave the simulation } newEventQueue specs = do f <- liftIOUnsafe $ newIORef False t <- liftIOUnsafe $ newIORef $ spcStartTime specs gt <- liftIOUnsafe $ newIORef $ spcStartTime specs pq <- R.newRef0 PQ.emptyQueue log <- newUndoableLog transient <- newTransientMessageQueue output <- newOutputMessageQueue $ enqueueTransientMessage transient input <- newInputMessageQueue log rollbackEventPre rollbackEventPost rollbackEventTime ack <- newAcknowledgementMessageQueue infind <- liftIOUnsafe $ newIORef False s <- newDIOSignalSource0 leaving <- liftIOUnsafe $ newIORef False return EventQueue { queueInputMessages = input, queueOutputMessages = output, queueTransientMessages = transient, queueAcknowledgementMessages = ack, queueLog = log, queuePQ = pq, queueBusy = f, queueTime = t, queueGlobalTime = gt, queueInFind = infind, queueProcessMonitorNotificationSource = s, queueIsLeaving = leaving } enqueueEventWithPriority t priority (Event m) = Event $ \p -> let pq = queuePQ $ runEventQueue $ pointRun p in invokeEvent p $ R.modifyRef pq $ \x -> PQ.enqueue x t priority m runEventWith processing (Event e) = Dynamics $ \p -> do p0 <- invokeEvent p currentEventPoint invokeEvent p0 $ enqueueEvent (pointTime p) (return ()) invokeEvent p $ syncEvents processing e p eventQueueCount = Event $ \p -> let pq = queuePQ $ runEventQueue $ pointRun p in invokeEvent p $ fmap PQ.queueCount $ R.readRef pq -- | The first stage of rolling the changes back. rollbackEventPre :: Bool -> TimeWarp DIO () rollbackEventPre including = TimeWarp $ \p -> do let q = runEventQueue $ pointRun p rollbackLog (queueLog q) (pointTime p) including -- | The post stage of rolling the changes back. rollbackEventPost :: Bool -> TimeWarp DIO () rollbackEventPost including = TimeWarp $ \p -> do let q = runEventQueue $ pointRun p rollbackOutputMessages (queueOutputMessages q) (pointTime p) including -- | Rollback the event time. rollbackEventTime :: TimeWarp DIO () rollbackEventTime = TimeWarp $ \p -> do let q = runEventQueue $ pointRun p t = pointTime p --- --- logDIO DEBUG $ --- "Setting the queue time = " ++ show t --- liftIOUnsafe $ writeIORef (queueTime q) t t0 <- liftIOUnsafe $ readIORef (queueGlobalTime q) when (t0 > t) $ do --- --- logDIO DEBUG $ --- "Setting the global time = " ++ show t --- liftIOUnsafe $ writeIORef (queueGlobalTime q) t -- | Return the current event time. currentEventTime :: Event DIO Double {-# INLINE currentEventTime #-} currentEventTime = Event $ \p -> do let q = runEventQueue $ pointRun p liftIOUnsafe $ readIORef (queueTime q) -- | Return the current event point. currentEventPoint :: Event DIO (Point DIO) {-# INLINE currentEventPoint #-} currentEventPoint = Event $ \p -> do let q = runEventQueue $ pointRun p t' <- liftIOUnsafe $ readIORef (queueTime q) if t' == pointTime p then return p else let sc = pointSpecs p t0 = spcStartTime sc dt = spcDT sc n' = fromIntegral $ floor ((t' - t0) / dt) in return p { pointTime = t', pointIteration = n', pointPhase = -1 } -- | Process the pending events. processPendingEventsCore :: Bool -> Dynamics DIO () processPendingEventsCore includingCurrentEvents = Dynamics r where r p = do let q = runEventQueue $ pointRun p f = queueBusy q f' <- liftIOUnsafe $ readIORef f if f' then error $ "Detected an event loop, which may indicate to " ++ "a logical error in the model: processPendingEventsCore" else do liftIOUnsafe $ writeIORef f True call q p p liftIOUnsafe $ writeIORef f False call q p p0 = do let pq = queuePQ q r = pointRun p -- process external messages p1 <- invokeEvent p0 currentEventPoint ok <- invokeEvent p1 $ runTimeWarp processChannelMessages if not ok then call q p p1 else do -- proceed with processing the events f <- invokeEvent p1 $ fmap PQ.queueNull $ R.readRef pq unless f $ do (t2, priority2, c2) <- invokeEvent p1 $ fmap PQ.queueFront $ R.readRef pq let t = queueTime q t' <- liftIOUnsafe $ readIORef t when (t2 < t') $ -- error "The time value is too small: processPendingEventsCore" error $ "The time value is too small (" ++ show t2 ++ " < " ++ show t' ++ "): processPendingEventsCore" when ((t2 < pointTime p) || (includingCurrentEvents && (t2 == pointTime p))) $ do let sc = pointSpecs p t0 = spcStartTime sc dt = spcDT sc n2 = fromIntegral $ floor ((t2 - t0) / dt) p2 = p { pointTime = t2, pointPriority = priority2, pointIteration = n2, pointPhase = -1 } --- --- ps <- dioParams --- when (dioLoggingPriority ps <= DEBUG) $ --- invokeEvent p2 $ --- writeLog (queueLog q) $ --- logDIO DEBUG $ --- "Reverting the queue time " ++ show t2 ++ " --> " ++ show t' --- liftIOUnsafe $ writeIORef t t2 invokeEvent p2 $ R.modifyRef pq PQ.dequeue catchComp (c2 p2) (\e@(SimulationRetry _) -> invokeEvent p2 $ handleEventRetry e) call q p p2 -- | Process the pending events synchronously, i.e. without past. processPendingEvents :: Bool -> Dynamics DIO () processPendingEvents includingCurrentEvents = Dynamics r where r p = do let q = runEventQueue $ pointRun p t = queueTime q t' <- liftIOUnsafe $ readIORef t if pointTime p < t' then error $ "The current time is less than " ++ "the time in the queue: processPendingEvents" else invokeDynamics p m m = processPendingEventsCore includingCurrentEvents -- | A memoized value. processEventsIncludingCurrent :: Dynamics DIO () processEventsIncludingCurrent = processPendingEvents True -- | A memoized value. processEventsIncludingEarlier :: Dynamics DIO () processEventsIncludingEarlier = processPendingEvents False -- | A memoized value. processEventsIncludingCurrentCore :: Dynamics DIO () processEventsIncludingCurrentCore = processPendingEventsCore True -- | A memoized value. processEventsIncludingEarlierCore :: Dynamics DIO () processEventsIncludingEarlierCore = processPendingEventsCore True -- | Process the events. processEvents :: EventProcessing -> Dynamics DIO () processEvents CurrentEvents = processEventsIncludingCurrent processEvents EarlierEvents = processEventsIncludingEarlier processEvents CurrentEventsOrFromPast = processEventsIncludingCurrentCore processEvents EarlierEventsOrFromPast = processEventsIncludingEarlierCore -- | Whether there is an overflow. isEventOverflow :: Event DIO Bool isEventOverflow = Event $ \p -> do let q = runEventQueue $ pointRun p n1 <- liftIOUnsafe $ logSize (queueLog q) n2 <- liftIOUnsafe $ outputMessageQueueSize (queueOutputMessages q) n3 <- liftIOUnsafe $ transientMessageQueueSize (queueTransientMessages q) ps <- dioParams let th1 = dioUndoableLogSizeThreshold ps th2 = dioOutputMessageQueueSizeThreshold ps th3 = dioTransientMessageQueueSizeThreshold ps if (n1 >= th1) || (n2 >= th2) || (n3 >= th3) then do logDIO NOTICE $ "t = " ++ (show $ pointTime p) ++ ": detected the event overflow" return True else return False -- | Whether the time horizon has exceeded. isTimeHorizonExceeded :: Event DIO Bool isTimeHorizonExceeded = Event $ \p -> do ps <- dioParams case dioTimeHorizon ps of Nothing -> return False Just th -> do let q = runEventQueue $ pointRun p gvt <- liftIOUnsafe $ readIORef (queueGlobalTime q) t <- liftIOUnsafe $ readIORef (queueTime q) if t - gvt > th then do logDIO INFO $ "t = " ++ (show $ pointTime p) ++ ": exceeded the time horizon" return True else return False -- | Throttle the message channel. throttleMessageChannel :: TimeWarp DIO () throttleMessageChannel = TimeWarp $ \p -> do -- invokeEvent p requestGlobalTime ch <- messageChannel dt <- fmap dioSyncTimeout dioParams liftIOUnsafe $ timeout dt $ awaitChannel ch invokeTimeWarp p $ processChannelMessages -- | Process the channel messages. processChannelMessages :: TimeWarp DIO () processChannelMessages = TimeWarp $ \p -> do ch <- messageChannel f <- liftIOUnsafe $ channelEmpty ch unless f $ do xs <- liftIOUnsafe $ readChannel ch forM_ xs $ \x -> do p' <- invokeEvent p currentEventPoint invokeTimeWarp p' $ processChannelMessage x p' <- invokeEvent p currentEventPoint f2 <- invokeEvent p' isEventOverflow if f2 then invokeTimeWarp p' throttleMessageChannel else do f3 <- invokeEvent p' isTimeHorizonExceeded if f3 then invokeTimeWarp p' throttleMessageChannel else return () -- | Process the channel message. processChannelMessage :: LogicalProcessMessage -> TimeWarp DIO () processChannelMessage x@(QueueMessage m) = TimeWarp $ \p -> do let q = runEventQueue $ pointRun p --- --- invokeEvent p $ --- logMessage x --- infind <- liftIOUnsafe $ readIORef (queueInFind q) deliverAcknowledgementMessage (acknowledgementMessage infind m) t0 <- liftIOUnsafe $ readIORef (queueGlobalTime q) p' <- invokeEvent p currentEventPoint if messageReceiveTime m < t0 then do f <- fmap dioAllowSkippingOutdatedMessage dioParams if f then invokeEvent p' logOutdatedMessage else error "Received the outdated message: processChannelMessage" else do ps <- dioParams when (dioProcessReconnectingEnabled ps) $ liftIOUnsafe $ enqueueAcknowledgementMessage (queueAcknowledgementMessages q) $ acknowledgementMessage infind m invokeTimeWarp p' $ enqueueMessage (queueInputMessages q) m processChannelMessage x@(QueueMessageBulk ms) = TimeWarp $ \p -> do let q = runEventQueue $ pointRun p --- --- invokeEvent p $ --- logMessage x --- infind <- liftIOUnsafe $ readIORef (queueInFind q) deliverAcknowledgementMessages $ map (acknowledgementMessage infind) ms t0 <- liftIOUnsafe $ readIORef (queueGlobalTime q) forM_ ms $ \m -> do p' <- invokeEvent p currentEventPoint if messageReceiveTime m < t0 then do f <- fmap dioAllowSkippingOutdatedMessage dioParams if f then invokeEvent p' logOutdatedMessage else error "Received the outdated message: processChannelMessage" else do ps <- dioParams when (dioProcessReconnectingEnabled ps) $ liftIOUnsafe $ enqueueAcknowledgementMessage (queueAcknowledgementMessages q) $ acknowledgementMessage infind m invokeTimeWarp p' $ enqueueMessage (queueInputMessages q) m processChannelMessage x@(AcknowledgementQueueMessage m) = TimeWarp $ \p -> do let q = runEventQueue $ pointRun p --- --- invokeEvent p $ --- logMessage x --- liftIOUnsafe $ processAcknowledgementMessage (queueTransientMessages q) m processChannelMessage x@(AcknowledgementQueueMessageBulk ms) = TimeWarp $ \p -> do let q = runEventQueue $ pointRun p --- --- invokeEvent p $ --- logMessage x --- liftIOUnsafe $ forM_ ms $ processAcknowledgementMessage (queueTransientMessages q) processChannelMessage x@ComputeLocalTimeMessage = TimeWarp $ \p -> do let q = runEventQueue $ pointRun p --- --- invokeEvent p $ --- logMessage x --- liftIOUnsafe $ writeIORef (queueInFind q) True t' <- invokeEvent p getLocalTime sender <- messageInboxId receiver <- timeServerId sendLocalTimeDIO receiver sender t' processChannelMessage x@(GlobalTimeMessage globalTime) = TimeWarp $ \p -> do let q = runEventQueue $ pointRun p --- --- invokeEvent p $ --- logMessage x --- liftIOUnsafe $ do writeIORef (queueInFind q) False resetAcknowledgementMessageTime (queueTransientMessages q) invokeEvent p $ updateGlobalTime globalTime processChannelMessage x@(ProcessMonitorNotificationMessage y@(DP.ProcessMonitorNotification _ pid reason)) = TimeWarp $ \p -> do let q = runEventQueue $ pointRun p --- --- invokeEvent p $ --- logMessage x --- invokeEvent p $ triggerSignal (queueProcessMonitorNotificationSource q) y processChannelMessage x@(ReconnectProcessMessage pid) = TimeWarp $ \p -> do let q = runEventQueue $ pointRun p --- --- invokeEvent p $ --- logMessage x --- invokeEvent p $ reconnectProcess pid processChannelMessage x@(ProvideLogicalProcessStateMessage pid) = TimeWarp $ \p -> do --- --- invokeEvent p $ --- logMessage x --- invokeEvent p $ sendState pid processChannelMessage x@AbortSimulationMessage = TimeWarp $ \p -> do --- --- invokeEvent p $ --- logMessage x --- invokeEvent p $ throwEvent $ SimulationAbort "Aborted by the outer process." processChannelMessage x@(DisconnectProcessMessage pid) = TimeWarp $ \p -> do let q = runEventQueue $ pointRun p --- --- invokeEvent p $ --- logMessage x --- invokeEvent p $ disconnectProcess pid -- | Return the local minimum time. getLocalTime :: Event DIO Double getLocalTime = Event $ \p -> do let q = runEventQueue $ pointRun p t1 <- liftIOUnsafe $ readIORef (queueTime q) t2 <- liftIOUnsafe $ transientMessageQueueTime (queueTransientMessages q) t3 <- liftIOUnsafe $ acknowledgementMessageTime (queueTransientMessages q) let t' = t1 `min` t2 `min` t3 --- --- n <- liftIOUnsafe $ transientMessageQueueSize (queueTransientMessages q) --- logDIO ERROR $ --- "t = " ++ show (pointTime p) ++ --- ": queue time = " ++ show t1 ++ --- ", unacknowledged time = " ++ show t2 ++ --- ", marked acknowledged time = " ++ show t3 ++ --- ", transient queue size = " ++ show n ++ --- " -> " ++ show t' --- return t' -- | Test whether the local time is ending. isLocalTimeEnding :: Event DIO Bool isLocalTimeEnding = Event $ \p -> do let q = runEventQueue $ pointRun p t0 = spcStopTime $ pointSpecs p t1 <- liftIOUnsafe $ readIORef (queueTime q) t2 <- liftIOUnsafe $ transientMessageQueueTime (queueTransientMessages q) t3 <- liftIOUnsafe $ acknowledgementMessageTime (queueTransientMessages q) return $ (t1 == t0) && (t2 > t0) && (t3 > t0) -- | Update the global time. updateGlobalTime :: Double -> Event DIO () updateGlobalTime t = Event $ \p -> do let q = runEventQueue $ pointRun p t' <- invokeEvent p getLocalTime if t > t' then logDIO WARNING $ "t = " ++ show t' ++ ": Ignored the global time that is greater than the current local time" else do liftIOUnsafe $ writeIORef (queueGlobalTime q) t invokeEvent p $ reduceEvents t -- | Request for the global minimum time. requestGlobalTime :: Event DIO () requestGlobalTime = Event $ \p -> do let q = runEventQueue $ pointRun p --- --- invokeEvent p $ --- logRequestGlobalTime --- sender <- messageInboxId receiver <- timeServerId sendRequestGlobalTimeDIO receiver sender -- | Show the message. showMessage :: Message -> ShowS showMessage m = showString "{ " . showString "sendTime = " . shows (messageSendTime m) . showString ", receiveTime = " . shows (messageReceiveTime m) . (if messageAntiToggle m then showString ", antiToggle = True" else showString "") . showString " }" -- | Log the message at the specified time. logMessage :: LogicalProcessMessage -> Event DIO () logMessage (QueueMessage m) = Event $ \p -> logDIO INFO $ "t = " ++ (show $ pointTime p) ++ ": QueueMessage " ++ showMessage m [] logMessage (QueueMessageBulk ms) = Event $ \p -> logDIO INFO $ "t = " ++ (show $ pointTime p) ++ ": QueueMessageBulk [ " ++ let fs = foldl1 (\a b -> a . showString ", " . b) $ map showMessage ms in fs [] ++ " ]" logMessage m = Event $ \p -> logDIO DEBUG $ "t = " ++ (show $ pointTime p) ++ ": " ++ show m -- | Log that the local time is to be synchronized. logSyncLocalTime :: Event DIO () logSyncLocalTime = Event $ \p -> do let q = runEventQueue $ pointRun p t' <- liftIOUnsafe $ readIORef (queueGlobalTime q) logDIO DEBUG $ "t = " ++ (show $ pointTime p) ++ ", global t = " ++ (show t') ++ ": synchronizing the local time..." -- | Log that the local time is to be synchronized in ring 0. logSyncLocalTime0 :: Event DIO () logSyncLocalTime0 = Event $ \p -> do let q = runEventQueue $ pointRun p t' <- liftIOUnsafe $ readIORef (queueGlobalTime q) logDIO DEBUG $ "t = " ++ (show $ pointTime p) ++ ", global t = " ++ (show t') ++ ": synchronizing the local time in ring 0..." -- | Log that the global time is requested. logRequestGlobalTime :: Event DIO () logRequestGlobalTime = Event $ \p -> do let q = runEventQueue $ pointRun p t' <- liftIOUnsafe $ readIORef (queueGlobalTime q) logDIO DEBUG $ "t = " ++ (show $ pointTime p) ++ ", global t = " ++ (show t') ++ ": requesting for a new global time..." -- | Log an evidence of the premature IO. logPrematureIO :: Event DIO () logPrematureIO = Event $ \p -> logDIO ERROR $ "t = " ++ (show $ pointTime p) ++ ": detected a premature IO action" -- | Log an evidence of receiving the outdated message. logOutdatedMessage :: Event DIO () logOutdatedMessage = Event $ \p -> logDIO WARNING $ "t = " ++ (show $ pointTime p) ++ ": skipping the outdated message" -- | Reduce events till the specified time. reduceEvents :: Double -> Event DIO () reduceEvents t = Event $ \p -> do let q = runEventQueue $ pointRun p ps <- dioParams liftIOUnsafe $ do reduceInputMessages (queueInputMessages q) t reduceOutputMessages (queueOutputMessages q) t reduceLog (queueLog q) t when (dioProcessReconnectingEnabled ps) $ reduceAcknowledgementMessages (queueAcknowledgementMessages q) t instance {-# OVERLAPPING #-} MonadIO (Event DIO) where liftIO m = Event $ \p -> do ok <- invokeEvent p $ runTimeWarp $ syncLocalTime $ return () if ok then liftIOUnsafe m else do f <- fmap dioAllowPrematureIO dioParams if f then do --- --- invokeEvent p $ logPrematureIO --- liftIOUnsafe m else error $ "Detected a premature IO action at t = " ++ (show $ pointTime p) ++ ": liftIO" -- | Synchronize the local time executing the specified computation. syncLocalTime :: Dynamics DIO () -> TimeWarp DIO () syncLocalTime m = TimeWarp $ \p -> do invokeDynamics p m f <- invokeEvent p isLocalTimeSynchronized unless f $ do --- --- invokeEvent p logSyncLocalTime --- ch <- messageChannel dt <- fmap dioSyncTimeout dioParams f <- liftIOUnsafe $ timeout dt $ awaitChannel ch ok <- invokeEvent p $ runTimeWarp processChannelMessages when ok $ case f of Just _ -> invokeTimeWarp p $ syncLocalTime m Nothing -> do -- invokeEvent p requestGlobalTime invokeTimeWarp p $ syncLocalTime0 m -- | Synchronize the local time executing the specified computation in ring 0. syncLocalTime0 :: Dynamics DIO () -> TimeWarp DIO () syncLocalTime0 m = TimeWarp $ \p -> do invokeDynamics p m f <- invokeEvent p isLocalTimeSynchronized unless f $ do --- --- invokeEvent p logSyncLocalTime0 --- ch <- messageChannel dt <- fmap dioSyncTimeout dioParams f <- liftIOUnsafe $ timeout dt $ awaitChannel ch ok <- invokeEvent p $ runTimeWarp processChannelMessages when ok $ case f of Just _ -> invokeTimeWarp p $ syncLocalTime m Nothing -> error "Exceeded the timeout when synchronizing the local time: syncLocalTime0" -- | Test whether the local time is synchronized. isLocalTimeSynchronized :: Event DIO Bool isLocalTimeSynchronized = Event $ \p -> do let q = runEventQueue $ pointRun p t = pointTime p t' <- liftIOUnsafe $ readIORef (queueGlobalTime q) if t' > t then error "Inconsistent time: isLocalTimeSynchronized" else if (t == spcStartTime (pointSpecs p)) || (t' == t) then return True else do leaving <- liftIOUnsafe $ readIORef (queueIsLeaving q) if (leaving && (t == spcStopTime (pointSpecs p))) then invokeEvent p isLocalTimeEnding else return False -- | Run the computation and return a flag indicating whether there was no rollback. runTimeWarp :: TimeWarp DIO () -> Event DIO Bool runTimeWarp m = Event $ \p -> do let q = runEventQueue $ pointRun p v0 <- liftIOUnsafe $ inputMessageQueueVersion (queueInputMessages q) invokeTimeWarp p m v2 <- liftIOUnsafe $ inputMessageQueueVersion (queueInputMessages q) return (v0 == v2) -- | Synchronize the events. syncEvents :: EventProcessing -> Event DIO () syncEvents processing = Event $ \p -> do ok <- invokeEvent p $ runTimeWarp $ syncLocalTime $ processEvents processing unless ok $ invokeEvent p $ syncEvents processing -- | 'DIO' is an instance of 'EventIOQueueing'. instance EventIOQueueing DIO where enqueueEventIO t h = enqueueEvent t $ Event $ \p -> do ok <- invokeEvent p $ runTimeWarp $ syncLocalTime $ return () when ok $ invokeEvent p h -- | Handle the 'Event' retry. handleEventRetry :: SimulationRetry -> Event DIO () handleEventRetry e = Event $ \p -> do let q = runEventQueue $ pointRun p t = pointTime p --- logDIO INFO $ "t = " ++ show t ++ ": retrying the computations..." --- invokeTimeWarp p $ retryInputMessages (queueInputMessages q) let loop = do --- --- logDIO DEBUG $ --- "t = " ++ show t ++ --- ": waiting for arriving a message..." --- ch <- messageChannel dt <- fmap dioSyncTimeout dioParams f <- liftIOUnsafe $ timeout dt $ awaitChannel ch ok <- invokeEvent p $ runTimeWarp processChannelMessages when ok $ case f of Just _ -> loop Nothing -> loop0 loop0 = do --- --- logDIO DEBUG $ --- "t = " ++ show t ++ --- ": waiting for arriving a message in ring 0..." --- ch <- messageChannel dt <- fmap dioSyncTimeout dioParams f <- liftIOUnsafe $ timeout dt $ awaitChannel ch ok <- invokeEvent p $ runTimeWarp processChannelMessages when ok $ case f of Just _ -> loop Nothing -> error $ "Detected a deadlock when retrying the computations: handleEventRetry\n" ++ "--- the nested exception ---\n" ++ show e loop -- | Reconnect to the remote process. reconnectProcess :: DP.ProcessId -> Event DIO () reconnectProcess pid = Event $ \p -> do let q = runEventQueue $ pointRun p --- logDIO NOTICE $ "t = " ++ show (pointTime p) ++ ": reconnecting to " ++ show pid ++ "..." --- let ys = queueAcknowledgementMessages q ys' <- liftIOUnsafe $ filterAcknowledgementMessages (\x -> acknowledgementSenderId x == pid) ys unless (null ys') $ sendAcknowledgementMessagesDIO pid ys' xs <- liftIOUnsafe $ transientMessages (queueTransientMessages q) let xs' = filter (\x -> messageReceiverId x == pid) xs unless (null xs') $ sendMessagesDIO pid xs' -- | A signal triggered when coming the process monitor notification from the Cloud Haskell back-end. processMonitorSignal :: Signal DIO DP.ProcessMonitorNotification processMonitorSignal = Signal { handleSignal = \h -> Event $ \p -> let q = runEventQueue (pointRun p) s = publishSignal (queueProcessMonitorNotificationSource q) in invokeEvent p $ handleSignal s h } -- | Suspend the 'Event' computation until the specified computation is determined. -- -- The tested computation should depend on messages that come from other logical processes. -- Moreover, the event must be initiated through the event queue. expectEvent :: Event DIO (Maybe a) -> (a -> Event DIO ()) -> Event DIO () expectEvent m cont = Event $ \p -> do let q = runEventQueue $ pointRun p t = pointTime p priority = pointPriority p --- logDIO INFO $ "t = " ++ show (pointTime p) ++ ": expecting the computation result: expectEvent" --- let loop = do --- --- logDIO DEBUG $ --- "t = " ++ show (pointTime p) ++ --- ": testing the predicate: expectEvent" --- x <- invokeEvent p m case x of Just a -> invokeEvent p $ cont a Nothing -> next loop0 loop0 = do --- --- logDIO DEBUG $ --- "t = " ++ show (pointTime p) ++ --- ": testing the predicate in ring 0: expectEvent" --- x <- invokeEvent p m case x of Just a -> invokeEvent p $ cont a Nothing -> next $ error "Detected a deadlock: expectEvent" next loop' = do pq <- invokeEvent p $ R.readRef $ queuePQ q let f = PQ.queueNull pq if f then await loop' else do let (t2, priority2, _) = PQ.queueFront pq if (t < t2) || (t == t2 && priority > priority2) then await loop' else invokeEvent p $ enqueueEvent t $ Event $ \p -> loop await loop' = do --- --- logDIO DEBUG $ --- "t = " ++ show t ++ --- ": waiting for arriving a message: expectEvent" --- ch <- messageChannel dt <- fmap dioSyncTimeout dioParams f <- liftIOUnsafe $ timeout dt $ awaitChannel ch ok <- invokeEvent p $ runTimeWarp processChannelMessages when ok $ case f of Just _ -> loop Nothing -> loop' loop -- | Send the simulation monitoring message about the current state of the logical process. sendState :: DP.ProcessId -> Event DIO () sendState pid = Event $ \p -> do let q = runEventQueue $ pointRun p pq <- invokeEvent p $ R.readRef (queuePQ q) n1 <- liftIOUnsafe $ logSize (queueLog q) n2 <- liftIOUnsafe $ inputMessageQueueSize (queueInputMessages q) n3 <- liftIOUnsafe $ outputMessageQueueSize (queueOutputMessages q) n4 <- liftIOUnsafe $ transientMessageQueueSize (queueTransientMessages q) n5 <- liftIOUnsafe $ inputMessageQueueVersion (queueInputMessages q) let n6 = PQ.queueCount pq sc = pointSpecs p t0 = spcStartTime sc t2 = spcStopTime sc tq <- liftIOUnsafe $ readIORef (queueTime q) t' <- invokeEvent p getLocalTime ps <- dioParams let name = dioName ps inbox <- messageInboxId liftDistributedUnsafe $ DP.send pid $ LogicalProcessState { lpStateId = inbox, lpStateName = name, lpStateStartTime = t0, lpStateStopTime = t2, lpStateLocalTime = t', lpStateEventQueueTime = tq, lpStateEventQueueSize = n6, lpStateLogSize = n1, lpStateInputMessageCount = n2, lpStateOutputMessageCount = n3, lpStateTransientMessageCount = n4, lpStateRollbackCount = n5 } -- | Try to leave the simulation. -- -- It enqueues a new event with obligatory synchronization with the global virtual time. -- If that event will not be reverted during rollback, then the logical process will -- leave the simulation immediately after approaching the final modeling time without -- synchronization with the global virtual time anymore if it has no unacknowledged -- sent messages. -- -- It makes sense to use this action if logical processes can enter and leave your long-running -- distributed simulation by demand. -- leaveSimulation :: Event DIO () leaveSimulation = Event $ \p -> do let q = runEventQueue $ pointRun p t = pointTime p invokeEvent p $ enqueueEventIO t $ Event $ \p -> liftIOUnsafe $ writeIORef (queueIsLeaving q) True -- | Disconnect to the remote process. disconnectProcess :: DP.ProcessId -> Event DIO () disconnectProcess pid = Event $ \p -> do let q = runEventQueue $ pointRun p --- logDIO NOTICE $ "t = " ++ show (pointTime p) ++ ": disconnecting from " ++ show pid ++ "..." --- ps <- dioParams when (dioProcessReconnectingEnabled ps) $ error "The logical process cannot be in the reconnecting state: disconnectProcess" liftIOUnsafe $ dequeueTransientMessages (queueTransientMessages q) pid