{-# LANGUAGE TypeFamilies, FlexibleInstances #-} -- | -- Module : Simulation.Aivika.Distributed.Optimistic.Internal.Event -- Copyright : Copyright (c) 2015-2016, David Sorokin -- License : BSD3 -- Maintainer : David Sorokin -- Stability : experimental -- Tested with: GHC 7.10.3 -- -- The module defines an event queue. -- module Simulation.Aivika.Distributed.Optimistic.Internal.Event (queueInputMessages, queueOutputMessages, queueLog, syncEvent) where import Data.Maybe import Data.IORef import Data.Typeable import Data.Time.Clock import System.Timeout import Control.Monad import Control.Monad.Trans import Control.Exception import qualified Control.Distributed.Process as DP import qualified Simulation.Aivika.PriorityQueue.Pure as PQ import Simulation.Aivika.Trans import Simulation.Aivika.Trans.Internal.Types import Simulation.Aivika.Distributed.Optimistic.Internal.Priority import Simulation.Aivika.Distributed.Optimistic.Internal.Channel import Simulation.Aivika.Distributed.Optimistic.Internal.DIO import Simulation.Aivika.Distributed.Optimistic.Internal.IO import Simulation.Aivika.Distributed.Optimistic.Internal.Message import Simulation.Aivika.Distributed.Optimistic.Internal.TimeServer import Simulation.Aivika.Distributed.Optimistic.Internal.TimeWarp import {-# SOURCE #-} Simulation.Aivika.Distributed.Optimistic.Internal.InputMessageQueue import {-# SOURCE #-} Simulation.Aivika.Distributed.Optimistic.Internal.OutputMessageQueue import Simulation.Aivika.Distributed.Optimistic.Internal.UndoableLog import {-# SOURCE #-} qualified Simulation.Aivika.Distributed.Optimistic.Internal.Ref as R -- | Convert microseconds to seconds. microsecondsToSeconds :: Int -> Rational microsecondsToSeconds x = (fromInteger $ toInteger x) / 1000000 -- | An implementation of the 'EventQueueing' type class. instance EventQueueing DIO where -- | The event queue type. data EventQueue DIO = EventQueue { queueInputMessages :: InputMessageQueue, -- ^ the input message queue queueOutputMessages :: OutputMessageQueue, -- ^ the output message queue queueLog :: UndoableLog, -- ^ the undoable log of operations queuePQ :: R.Ref (PQ.PriorityQueue (Point DIO -> DIO ())), -- ^ the underlying priority queue queueBusy :: IORef Bool, -- ^ whether the queue is currently processing events queueTime :: IORef Double, -- ^ the actual time of the event queue queueGlobalTime :: IORef Double, -- ^ the global time queueLocalTime :: IORef Double, -- ^ the long-term queue local time queueLocalTime0 :: IORef Double, -- ^ the short-term queue local time queueLocalTimeTimestamp :: IORef UTCTime, -- ^ the queue local time timestamp queueLocalTimeInterval :: NominalDiffTime -- ^ the queue local time interval for updating } newEventQueue specs = do f <- liftIOUnsafe $ newIORef False t <- liftIOUnsafe $ newIORef $ spcStartTime specs gt <- liftIOUnsafe $ newIORef $ spcStartTime specs pq <- R.newRef0 PQ.emptyQueue log <- newUndoableLog output <- newOutputMessageQueue input <- newInputMessageQueue log rollbackEventPre rollbackEventPost rollbackEventTime loct <- liftIOUnsafe $ newIORef $ spcStartTime specs loct0 <- liftIOUnsafe $ newIORef $ spcStartTime specs loctstamp <- liftIOUnsafe $ getCurrentTime >>= newIORef locdt <- fmap (fromRational . microsecondsToSeconds . dioSyncTimeout) dioParams return EventQueue { queueInputMessages = input, queueOutputMessages = output, queueLog = log, queuePQ = pq, queueBusy = f, queueTime = t, queueGlobalTime = gt, queueLocalTime = loct, queueLocalTime0 = loct0, queueLocalTimeTimestamp = loctstamp, queueLocalTimeInterval = locdt } enqueueEvent t (Event m) = Event $ \p -> let pq = queuePQ $ runEventQueue $ pointRun p in invokeEvent p $ R.modifyRef pq $ \x -> PQ.enqueue x t m runEventWith processing (Event e) = Dynamics $ \p -> do p0 <- invokeEvent p currentEventPoint invokeEvent p0 $ enqueueEvent (pointTime p) (return ()) invokeEvent p $ syncEvents processing e p eventQueueCount = Event $ \p -> let pq = queuePQ $ runEventQueue $ pointRun p in invokeEvent p $ fmap PQ.queueCount $ R.readRef pq -- | The first stage of rolling the changes back. rollbackEventPre :: Bool -> TimeWarp DIO () rollbackEventPre including = TimeWarp $ \p -> do let q = runEventQueue $ pointRun p rollbackLog (queueLog q) (pointTime p) including -- | The post stage of rolling the changes back. rollbackEventPost :: Bool -> TimeWarp DIO () rollbackEventPost including = TimeWarp $ \p -> do let q = runEventQueue $ pointRun p rollbackOutputMessages (queueOutputMessages q) (pointTime p) including -- | Rollback the event time. rollbackEventTime :: TimeWarp DIO () rollbackEventTime = TimeWarp $ \p -> do let q = runEventQueue $ pointRun p t = pointTime p --- --- logDIO DEBUG $ --- "Setting the queue time = " ++ show t --- liftIOUnsafe $ do writeIORef (queueTime q) t modifyIORef' (queueLocalTime q) (min t) modifyIORef' (queueLocalTime0 q) (min t) t0 <- liftIOUnsafe $ readIORef (queueGlobalTime q) when (t0 > t) $ do --- --- logDIO DEBUG $ --- "Setting the global time = " ++ show t --- liftIOUnsafe $ writeIORef (queueGlobalTime q) t invokeEvent p sendLocalTime -- | Return the current event time. currentEventTime :: Event DIO Double {-# INLINE currentEventTime #-} currentEventTime = Event $ \p -> do let q = runEventQueue $ pointRun p liftIOUnsafe $ readIORef (queueTime q) -- | Return the current event point. currentEventPoint :: Event DIO (Point DIO) {-# INLINE currentEventPoint #-} currentEventPoint = Event $ \p -> do let q = runEventQueue $ pointRun p t' <- liftIOUnsafe $ readIORef (queueTime q) if t' == pointTime p then return p else let sc = pointSpecs p t0 = spcStartTime sc dt = spcDT sc n' = fromIntegral $ floor ((t' - t0) / dt) in return p { pointTime = t', pointIteration = n', pointPhase = -1 } -- | Process the pending events. processPendingEventsCore :: Bool -> Dynamics DIO () processPendingEventsCore includingCurrentEvents = Dynamics r where r p = do let q = runEventQueue $ pointRun p f = queueBusy q f' <- liftIOUnsafe $ readIORef f if f' then error $ "Detected an event loop, which may indicate to " ++ "a logical error in the model: processPendingEventsCore" else do liftIOUnsafe $ writeIORef f True call q p p liftIOUnsafe $ writeIORef f False call q p p0 = do let pq = queuePQ q r = pointRun p -- process external messages p1 <- invokeEvent p0 currentEventPoint ok <- invokeEvent p1 $ runTimeWarp processChannelMessages if not ok then call q p p1 else do -- proceed with processing the events f <- invokeEvent p1 $ fmap PQ.queueNull $ R.readRef pq unless f $ do (t2, c2) <- invokeEvent p1 $ fmap PQ.queueFront $ R.readRef pq let t = queueTime q t' <- liftIOUnsafe $ readIORef t when (t2 < t') $ -- error "The time value is too small: processPendingEventsCore" error $ "The time value is too small (" ++ show t2 ++ " < " ++ show t' ++ "): processPendingEventsCore" when ((t2 < pointTime p) || (includingCurrentEvents && (t2 == pointTime p))) $ do let sc = pointSpecs p t0 = spcStartTime sc dt = spcDT sc n2 = fromIntegral $ floor ((t2 - t0) / dt) p2 = p { pointTime = t2, pointIteration = n2, pointPhase = -1 } --- --- ps <- dioParams --- when (dioLoggingPriority ps <= DEBUG) $ --- invokeEvent p2 $ --- writeLog (queueLog q) $ --- logDIO DEBUG $ --- "Reverting the queue time " ++ show t2 ++ " --> " ++ show t' --- liftIOUnsafe $ writeIORef t t2 invokeEvent p2 $ R.modifyRef pq PQ.dequeue catchComp (c2 p2) (\e@(SimulationRetry _) -> invokeEvent p2 $ handleEventRetry e) call q p p2 -- | Process the pending events synchronously, i.e. without past. processPendingEvents :: Bool -> Dynamics DIO () processPendingEvents includingCurrentEvents = Dynamics r where r p = do let q = runEventQueue $ pointRun p t = queueTime q t' <- liftIOUnsafe $ readIORef t if pointTime p < t' then error $ "The current time is less than " ++ "the time in the queue: processPendingEvents" else invokeDynamics p m m = processPendingEventsCore includingCurrentEvents -- | A memoized value. processEventsIncludingCurrent :: Dynamics DIO () processEventsIncludingCurrent = processPendingEvents True -- | A memoized value. processEventsIncludingEarlier :: Dynamics DIO () processEventsIncludingEarlier = processPendingEvents False -- | A memoized value. processEventsIncludingCurrentCore :: Dynamics DIO () processEventsIncludingCurrentCore = processPendingEventsCore True -- | A memoized value. processEventsIncludingEarlierCore :: Dynamics DIO () processEventsIncludingEarlierCore = processPendingEventsCore True -- | Process the events. processEvents :: EventProcessing -> Dynamics DIO () processEvents CurrentEvents = processEventsIncludingCurrent processEvents EarlierEvents = processEventsIncludingEarlier processEvents CurrentEventsOrFromPast = processEventsIncludingCurrentCore processEvents EarlierEventsOrFromPast = processEventsIncludingEarlierCore -- | Whether there is an overflow. isEventOverflow :: Event DIO Bool isEventOverflow = Event $ \p -> do let q = runEventQueue $ pointRun p n1 <- liftIOUnsafe $ logSize (queueLog q) n2 <- liftIOUnsafe $ outputMessageQueueSize (queueOutputMessages q) ps <- dioParams let th1 = dioUndoableLogSizeThreshold ps th2 = dioOutputMessageQueueSizeThreshold ps if (n1 >= th1) || (n2 >= th2) then do logDIO NOTICE $ "t = " ++ (show $ pointTime p) ++ ": detected the event overflow" return True else return False -- | Throttle the message channel. throttleMessageChannel :: TimeWarp DIO () throttleMessageChannel = TimeWarp $ \p -> do invokeEvent p updateLocalTime invokeEvent p sendLocalTime ch <- messageChannel dt <- fmap dioSyncTimeout dioParams liftIOUnsafe $ timeout dt $ awaitChannel ch invokeTimeWarp p $ processChannelMessages -- | Process the channel messages. processChannelMessages :: TimeWarp DIO () processChannelMessages = TimeWarp $ \p -> do ch <- messageChannel f <- liftIOUnsafe $ channelEmpty ch unless f $ do xs <- liftIOUnsafe $ readChannel ch forM_ xs $ \x -> do p' <- invokeEvent p currentEventPoint invokeTimeWarp p' $ processChannelMessage x p' <- invokeEvent p currentEventPoint f2 <- invokeEvent p' isEventOverflow when f2 $ invokeTimeWarp p' throttleMessageChannel -- | Process the channel message. processChannelMessage :: LocalProcessMessage -> TimeWarp DIO () processChannelMessage x@(QueueMessage m) = TimeWarp $ \p -> do let q = runEventQueue $ pointRun p --- --- invokeEvent p $ --- logMessage x --- t0 <- liftIOUnsafe $ readIORef (queueGlobalTime q) when (messageReceiveTime m < t0) $ do f <- fmap dioAllowProcessingOutdatedMessage dioParams if f then invokeEvent p logOutdatedMessage else error "Received the outdated message: processChannelMessage" invokeTimeWarp p $ enqueueMessage (queueInputMessages q) m processChannelMessage x@(QueueMessageBulk ms) = TimeWarp $ \p -> do let q = runEventQueue $ pointRun p --- --- invokeEvent p $ --- logMessage x --- t0 <- liftIOUnsafe $ readIORef (queueGlobalTime q) forM_ ms $ \m -> do when (messageReceiveTime m < t0) $ do f <- fmap dioAllowProcessingOutdatedMessage dioParams if f then invokeEvent p logOutdatedMessage else error "Received the outdated message: processChannelMessage" invokeTimeWarp p $ enqueueMessage (queueInputMessages q) m processChannelMessage x@(GlobalTimeMessage globalTime) = TimeWarp $ \p -> do let q = runEventQueue $ pointRun p --- --- invokeEvent p $ --- logMessage x --- case globalTime of Nothing -> return () Just t0 -> invokeEvent p $ updateGlobalTime t0 invokeEvent p updateLocalTime t <- invokeEvent p getLocalTime sender <- messageInboxId receiver <- timeServerId liftDistributedUnsafe $ DP.send receiver (GlobalTimeMessageResp sender t) processChannelMessage x@(LocalTimeMessageResp globalTime) = TimeWarp $ \p -> do let q = runEventQueue $ pointRun p --- --- invokeEvent p $ --- logMessage x --- invokeEvent p $ updateGlobalTime globalTime processChannelMessage x@TerminateLocalProcessMessage = TimeWarp $ \p -> do --- --- invokeEvent p $ --- logMessage x --- liftDistributedUnsafe $ DP.terminate -- | Update the global time. updateGlobalTime :: Double -> Event DIO () updateGlobalTime t = Event $ \p -> do let q = runEventQueue $ pointRun p invokeEvent p updateLocalTime t' <- invokeEvent p getLocalTime if t > t' then logDIO WARNING $ "t = " ++ show t' ++ ": Ignored the global time that is greater than the current local time" else do liftIOUnsafe $ writeIORef (queueGlobalTime q) t invokeEvent p $ reduceEvents t -- | Show the message. showMessage :: Message -> ShowS showMessage m = showString "{ " . showString "sendTime = " . shows (messageSendTime m) . showString ", receiveTime = " . shows (messageReceiveTime m) . (if messageAntiToggle m then showString ", antiToggle = True" else showString "") . showString " }" -- | Log the message at the specified time. logMessage :: LocalProcessMessage -> Event DIO () logMessage (QueueMessage m) = Event $ \p -> logDIO INFO $ "t = " ++ (show $ pointTime p) ++ ": QueueMessage " ++ showMessage m [] logMessage (QueueMessageBulk ms) = Event $ \p -> logDIO INFO $ "t = " ++ (show $ pointTime p) ++ ": QueueMessageBulk [ " ++ let fs = foldl1 (\a b -> a . showString ", " . b) $ map showMessage ms in fs [] ++ " ]" logMessage m = Event $ \p -> logDIO DEBUG $ "t = " ++ (show $ pointTime p) ++ ": " ++ show m -- | Log that the local time is to be synchronized. logSyncLocalTime :: Event DIO () logSyncLocalTime = Event $ \p -> do let q = runEventQueue $ pointRun p t' <- liftIOUnsafe $ readIORef (queueGlobalTime q) logDIO DEBUG $ "t = " ++ (show $ pointTime p) ++ ", global t = " ++ (show t') ++ ": synchronizing the local time..." -- | Log that the local time is to be synchronized in ring 0. logSyncLocalTime0 :: Event DIO () logSyncLocalTime0 = Event $ \p -> do let q = runEventQueue $ pointRun p t' <- liftIOUnsafe $ readIORef (queueGlobalTime q) logDIO DEBUG $ "t = " ++ (show $ pointTime p) ++ ", global t = " ++ (show t') ++ ": synchronizing the local time in ring 0..." -- | Log that the local time is sent to the time server. logSendLocalTime :: Event DIO () logSendLocalTime = Event $ \p -> do let q = runEventQueue $ pointRun p t' <- liftIOUnsafe $ readIORef (queueGlobalTime q) logDIO DEBUG $ "t = " ++ (show $ pointTime p) ++ ", global t = " ++ (show t') ++ ": sending the local time to the time server after delay..." -- | Log an evidence of the premature IO. logPrematureIO :: Event DIO () logPrematureIO = Event $ \p -> logDIO ERROR $ "t = " ++ (show $ pointTime p) ++ ": detected a premature IO action" -- | Log an evidence of receiving the outdated message. logOutdatedMessage :: Event DIO () logOutdatedMessage = Event $ \p -> logDIO ERROR $ "t = " ++ (show $ pointTime p) ++ ": received the outdated message" -- | Reduce events till the specified time. reduceEvents :: Double -> Event DIO () reduceEvents t = Event $ \p -> do let q = runEventQueue $ pointRun p liftIOUnsafe $ do reduceInputMessages (queueInputMessages q) t reduceOutputMessages (queueOutputMessages q) t reduceLog (queueLog q) t instance {-# OVERLAPPING #-} MonadIO (Event DIO) where liftIO m = Event $ \p -> do ok <- invokeEvent p $ runTimeWarp $ syncLocalTime $ return () if ok then liftIOUnsafe m else do f <- fmap dioAllowPrematureIO dioParams if f then do --- --- invokeEvent p $ logPrematureIO --- liftIOUnsafe m else error $ "Detected a premature IO action at t = " ++ (show $ pointTime p) ++ ": liftIO" -- | Update the local time. updateLocalTime :: Event DIO Bool updateLocalTime = Event $ \p -> do let q = runEventQueue $ pointRun p timestamp0 <- liftIOUnsafe $ readIORef (queueLocalTimeTimestamp q) timestamp <- liftIOUnsafe getCurrentTime let dt = queueLocalTimeInterval q if timestamp >= addUTCTime dt timestamp0 then do --- --- logDIO DEBUG $ "t = " ++ (show $ pointTime p) ++ ": updating the local time" --- liftIOUnsafe $ do t <- readIORef (queueTime q) loct0 <- readIORef (queueLocalTime0 q) writeIORef (queueLocalTime q) (min t loct0) writeIORef (queueLocalTime0 q) t writeIORef (queueLocalTimeTimestamp q) timestamp return (t /= loct0) else return False -- | Get the local time. getLocalTime :: Event DIO Double getLocalTime = Event $ \p -> let q = runEventQueue $ pointRun p in liftIOUnsafe $ readIORef (queueLocalTime q) -- | Send the local time to the time server. sendLocalTime :: Event DIO () sendLocalTime = Event $ \p -> do --- --- invokeEvent p logSendLocalTime --- invokeEvent p updateLocalTime t <- invokeEvent p getLocalTime sender <- messageInboxId receiver <- timeServerId liftDistributedUnsafe $ DP.send receiver (LocalTimeMessage sender t) -- | Synchronize the local time executing the specified computation. syncLocalTime :: Dynamics DIO () -> TimeWarp DIO () syncLocalTime m = TimeWarp $ \p -> do let q = runEventQueue $ pointRun p t = pointTime p invokeDynamics p m t' <- liftIOUnsafe $ readIORef (queueGlobalTime q) if t' > t then error "Inconsistent time: syncLocalTime" else if t == spcStartTime (pointSpecs p) then return () else do --- --- invokeEvent p logSyncLocalTime --- ch <- messageChannel dt <- fmap dioSyncTimeout dioParams f <- liftIOUnsafe $ timeout dt $ awaitChannel ch ok <- invokeEvent p $ runTimeWarp processChannelMessages if ok then do case f of Just _ -> invokeTimeWarp p $ syncLocalTime m Nothing -> do f <- invokeEvent p updateLocalTime invokeEvent p sendLocalTime if f then invokeTimeWarp p $ syncLocalTime m else invokeTimeWarp p $ syncLocalTime0 m else return () -- | Synchronize the local time executing the specified computation in ring 0. syncLocalTime0 :: Dynamics DIO () -> TimeWarp DIO () syncLocalTime0 m = TimeWarp $ \p -> do let q = runEventQueue $ pointRun p t = pointTime p invokeDynamics p m t' <- liftIOUnsafe $ readIORef (queueGlobalTime q) if t' > t then error "Inconsistent time: syncLocalTime0" else if t' == pointTime p then return () else do --- --- invokeEvent p logSyncLocalTime0 --- ch <- messageChannel dt <- fmap dioSyncTimeout dioParams f <- liftIOUnsafe $ timeout dt $ awaitChannel ch ok <- invokeEvent p $ runTimeWarp processChannelMessages if ok then do case f of Just _ -> invokeTimeWarp p $ syncLocalTime m Nothing -> error "Detected a deadlock when synchronizing the local time: syncLocalTime0" else return () -- | Run the computation and return a flag indicating whether there was no rollback. runTimeWarp :: TimeWarp DIO () -> Event DIO Bool runTimeWarp m = Event $ \p -> do let q = runEventQueue $ pointRun p v0 <- liftIOUnsafe $ inputMessageQueueVersion (queueInputMessages q) invokeTimeWarp p m v2 <- liftIOUnsafe $ inputMessageQueueVersion (queueInputMessages q) return (v0 == v2) -- | Synchronize the events. syncEvents :: EventProcessing -> Event DIO () syncEvents processing = Event $ \p -> do ok <- invokeEvent p $ runTimeWarp $ syncLocalTime $ processEvents processing unless ok $ invokeEvent p $ syncEvents processing -- | Synchronize the simulation in all nodes and call -- the specified computation at the given modeling time. -- -- It is rather safe to put 'liftIO' within this function. syncEvent :: Double -> Event DIO () -> Event DIO () syncEvent t h = enqueueEvent t $ Event $ \p -> do ok <- invokeEvent p $ runTimeWarp $ syncLocalTime $ return () when ok $ invokeEvent p h -- | Handle the 'Event' retry. handleEventRetry :: SimulationRetry -> Event DIO () handleEventRetry e = Event $ \p -> do let q = runEventQueue $ pointRun p t = pointTime p --- logDIO NOTICE $ "t = " ++ show t ++ ": retrying the computations..." --- invokeTimeWarp p $ retryInputMessages (queueInputMessages q) let loop = do --- --- logDIO DEBUG $ --- "t = " ++ show t ++ --- ": waiting for arriving a message..." --- ch <- messageChannel dt <- fmap dioSyncTimeout dioParams f <- liftIOUnsafe $ timeout dt $ awaitChannel ch ok <- invokeEvent p $ runTimeWarp processChannelMessages when ok $ case f of Just _ -> loop Nothing -> loop0 loop0 = do --- --- logDIO DEBUG $ --- "t = " ++ show t ++ --- ": waiting for arriving a message in ring 0..." --- ch <- messageChannel dt <- fmap dioSyncTimeout dioParams f <- liftIOUnsafe $ timeout dt $ awaitChannel ch ok <- invokeEvent p $ runTimeWarp processChannelMessages when ok $ case f of Just _ -> loop Nothing -> error $ "Detected a deadlock when retrying the computations: handleEventRetry\n" ++ "--- the nested exception ---\n" ++ show e loop