-- |
-- Module     : Simulation.Aivika.Distributed.Optimistic.Internal.InputMessageQueue
-- Copyright  : Copyright (c) 2015-2017, David Sorokin <david.sorokin@gmail.com>
-- License    : BSD3
-- Maintainer : David Sorokin <david.sorokin@gmail.com>
-- Stability  : experimental
-- Tested with: GHC 7.10.3
--
-- This module defines an input message queue.
--
module Simulation.Aivika.Distributed.Optimistic.Internal.InputMessageQueue
       (InputMessageQueue,
        newInputMessageQueue,
        inputMessageQueueSize,
        inputMessageQueueVersion,
        enqueueMessage,
        messageEnqueued,
        retryInputMessages,
        reduceInputMessages,
        filterInputMessages) where

import Data.Maybe
import Data.List
import Data.IORef

import Control.Monad
import Control.Monad.Trans
import qualified Control.Distributed.Process as DP

import Simulation.Aivika.Vector
import Simulation.Aivika.Trans.Comp
import Simulation.Aivika.Trans.Simulation
import Simulation.Aivika.Trans.Dynamics
import Simulation.Aivika.Trans.Event
import Simulation.Aivika.Trans.Signal
import Simulation.Aivika.Trans.Internal.Types

import Simulation.Aivika.Distributed.Optimistic.Internal.Priority
import Simulation.Aivika.Distributed.Optimistic.Internal.Message
import Simulation.Aivika.Distributed.Optimistic.Internal.UndoableLog
import Simulation.Aivika.Distributed.Optimistic.Internal.DIO
import Simulation.Aivika.Distributed.Optimistic.Internal.IO
import Simulation.Aivika.Distributed.Optimistic.Internal.TimeWarp
import Simulation.Aivika.Distributed.Optimistic.DIO

-- | Specifies the input message queue.
data InputMessageQueue =
  InputMessageQueue { InputMessageQueue -> UndoableLog
inputMessageLog :: UndoableLog,
                      -- ^ the Redo/Undo log.
                      InputMessageQueue -> Bool -> TimeWarp DIO ()
inputMessageRollbackPre :: Bool -> TimeWarp DIO (),
                      -- ^ Rollback the operations till the specified time before actual changes either including the time or not.
                      InputMessageQueue -> Bool -> TimeWarp DIO ()
inputMessageRollbackPost :: Bool -> TimeWarp DIO (),
                      -- ^ Rollback the operations till the specified time after actual changes either including the time or not.
                      InputMessageQueue -> TimeWarp DIO ()
inputMessageRollbackTime :: TimeWarp DIO (),
                      -- ^ Rollback the event time.
                      InputMessageQueue -> SignalSource DIO Message
inputMessageSource :: SignalSource DIO Message,
                      -- ^ The message source.
                      InputMessageQueue -> Vector InputMessageQueueItem
inputMessages :: Vector InputMessageQueueItem,
                      -- ^ The input messages.
                      InputMessageQueue -> IORef [Event DIO ()]
inputMessageActions :: IORef [Event DIO ()],
                      -- ^ The list of actions to perform.
                      InputMessageQueue -> IORef Int
inputMessageVersionRef :: IORef Int
                      -- ^ The number of reversions.
                    }

-- | Specified the input message queue item.
data InputMessageQueueItem =
  InputMessageQueueItem { InputMessageQueueItem -> Message
itemMessage :: Message,
                          -- ^ The item message.
                          InputMessageQueueItem -> IORef Bool
itemAnnihilated :: IORef Bool,
                          -- ^ Whether the item was annihilated.
                          InputMessageQueueItem -> IORef Bool
itemProcessed :: IORef Bool
                          -- ^ Whether the item was processed.
                        }

-- | Create a new input message queue.
newInputMessageQueue :: UndoableLog
                        -- ^ the Redo/Undo log
                        -> (Bool -> TimeWarp DIO ())
                        -- ^ rollback operations till the specified time before actual changes either including the time or not
                        -> (Bool -> TimeWarp DIO ())
                        -- ^ rollback operations till the specified time after actual changes either including the time or not
                        -> TimeWarp DIO ()
                        -- ^ rollback the event time
                        -> DIO InputMessageQueue
newInputMessageQueue :: UndoableLog
-> (Bool -> TimeWarp DIO ())
-> (Bool -> TimeWarp DIO ())
-> TimeWarp DIO ()
-> DIO InputMessageQueue
newInputMessageQueue UndoableLog
log Bool -> TimeWarp DIO ()
rollbackPre Bool -> TimeWarp DIO ()
rollbackPost TimeWarp DIO ()
rollbackTime =
  do Vector InputMessageQueueItem
ms <- IO (Vector InputMessageQueueItem)
-> DIO (Vector InputMessageQueueItem)
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe IO (Vector InputMessageQueueItem)
forall a. IO (Vector a)
newVector
     IORef [Event DIO ()]
r  <- IO (IORef [Event DIO ()]) -> DIO (IORef [Event DIO ()])
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO (IORef [Event DIO ()]) -> DIO (IORef [Event DIO ()]))
-> IO (IORef [Event DIO ()]) -> DIO (IORef [Event DIO ()])
forall a b. (a -> b) -> a -> b
$ [Event DIO ()] -> IO (IORef [Event DIO ()])
forall a. a -> IO (IORef a)
newIORef []
     SignalSource DIO Message
s  <- DIO (SignalSource DIO Message)
forall (m :: * -> *) a.
(MonadDES m, MonadRef0 m) =>
m (SignalSource m a)
newSignalSource0
     IORef Int
v  <- IO (IORef Int) -> DIO (IORef Int)
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO (IORef Int) -> DIO (IORef Int))
-> IO (IORef Int) -> DIO (IORef Int)
forall a b. (a -> b) -> a -> b
$ Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
     InputMessageQueue -> DIO InputMessageQueue
forall a. a -> DIO a
forall (m :: * -> *) a. Monad m => a -> m a
return InputMessageQueue { inputMessageLog :: UndoableLog
inputMessageLog = UndoableLog
log,
                                inputMessageRollbackPre :: Bool -> TimeWarp DIO ()
inputMessageRollbackPre = Bool -> TimeWarp DIO ()
rollbackPre,
                                inputMessageRollbackPost :: Bool -> TimeWarp DIO ()
inputMessageRollbackPost = Bool -> TimeWarp DIO ()
rollbackPost,
                                inputMessageRollbackTime :: TimeWarp DIO ()
inputMessageRollbackTime = TimeWarp DIO ()
rollbackTime,
                                inputMessageSource :: SignalSource DIO Message
inputMessageSource = SignalSource DIO Message
s,
                                inputMessages :: Vector InputMessageQueueItem
inputMessages = Vector InputMessageQueueItem
ms,
                                inputMessageActions :: IORef [Event DIO ()]
inputMessageActions = IORef [Event DIO ()]
r,
                                inputMessageVersionRef :: IORef Int
inputMessageVersionRef = IORef Int
v }

-- | Return the input message queue size.
inputMessageQueueSize :: InputMessageQueue -> IO Int
{-# INLINE inputMessageQueueSize #-}
inputMessageQueueSize :: InputMessageQueue -> IO Int
inputMessageQueueSize = Vector InputMessageQueueItem -> IO Int
forall a. Vector a -> IO Int
vectorCount (Vector InputMessageQueueItem -> IO Int)
-> (InputMessageQueue -> Vector InputMessageQueueItem)
-> InputMessageQueue
-> IO Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. InputMessageQueue -> Vector InputMessageQueueItem
inputMessages

-- | Return the reversion count.
inputMessageQueueVersion :: InputMessageQueue -> IO Int
{-# INLINE inputMessageQueueVersion #-}
inputMessageQueueVersion :: InputMessageQueue -> IO Int
inputMessageQueueVersion = IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int)
-> (InputMessageQueue -> IORef Int) -> InputMessageQueue -> IO Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. InputMessageQueue -> IORef Int
inputMessageVersionRef

-- | Return a complement.
complement :: Int -> Int
complement :: Int -> Int
complement Int
x = - Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1

-- | Raised when the message is enqueued.
messageEnqueued :: InputMessageQueue -> Signal DIO Message
messageEnqueued :: InputMessageQueue -> Signal DIO Message
messageEnqueued InputMessageQueue
q = SignalSource DIO Message -> Signal DIO Message
forall (m :: * -> *) a. SignalSource m a -> Signal m a
publishSignal (InputMessageQueue -> SignalSource DIO Message
inputMessageSource InputMessageQueue
q)

-- | Enqueue a new message ignoring the duplicated messages.
enqueueMessage :: InputMessageQueue -> Message -> TimeWarp DIO ()
enqueueMessage :: InputMessageQueue -> Message -> TimeWarp DIO ()
enqueueMessage InputMessageQueue
q Message
m =
  (Point DIO -> DIO ()) -> TimeWarp DIO ()
forall (m :: * -> *) a. (Point m -> m a) -> TimeWarp m a
TimeWarp ((Point DIO -> DIO ()) -> TimeWarp DIO ())
-> (Point DIO -> DIO ()) -> TimeWarp DIO ()
forall a b. (a -> b) -> a -> b
$ \Point DIO
p ->
  do let t :: Double
t  = Message -> Double
messageReceiveTime Message
m
         t0 :: Double
t0 = Point DIO -> Double
forall (m :: * -> *). Point m -> Double
pointTime Point DIO
p
     Maybe Int
i <- IO (Maybe Int) -> DIO (Maybe Int)
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO (Maybe Int) -> DIO (Maybe Int))
-> IO (Maybe Int) -> DIO (Maybe Int)
forall a b. (a -> b) -> a -> b
$ InputMessageQueue -> Message -> IO (Maybe Int)
findAntiMessage InputMessageQueue
q Message
m
     case Maybe Int
i of
       Maybe Int
Nothing ->
         do -- skip the message duplicate
            Double -> DIO ()
logSkipInputMessage Double
t0
            () -> DIO ()
forall a. a -> DIO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
       Just Int
i | Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
0 ->
         do -- found the anti-message at the specified index
            Bool -> DIO () -> DIO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Double
t Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
<= Double
t0) (DIO () -> DIO ()) -> DIO () -> DIO ()
forall a b. (a -> b) -> a -> b
$
              IO () -> DIO ()
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO () -> DIO ()) -> IO () -> DIO ()
forall a b. (a -> b) -> a -> b
$ IORef Int -> (Int -> Int) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' (InputMessageQueue -> IORef Int
inputMessageVersionRef InputMessageQueue
q) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
            InputMessageQueueItem
item <- IO InputMessageQueueItem -> DIO InputMessageQueueItem
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO InputMessageQueueItem -> DIO InputMessageQueueItem)
-> IO InputMessageQueueItem -> DIO InputMessageQueueItem
forall a b. (a -> b) -> a -> b
$ Vector InputMessageQueueItem -> Int -> IO InputMessageQueueItem
forall a. Vector a -> Int -> IO a
readVector (InputMessageQueue -> Vector InputMessageQueueItem
inputMessages InputMessageQueue
q) Int
i
            Bool
f <- IO Bool -> DIO Bool
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO Bool -> DIO Bool) -> IO Bool -> DIO Bool
forall a b. (a -> b) -> a -> b
$ IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef (InputMessageQueueItem -> IORef Bool
itemProcessed InputMessageQueueItem
item)
            if Bool
f
              then do let p' :: Point DIO
p' = Double -> Point DIO -> Point DIO
pastPoint Double
t Point DIO
p
                      Double -> Double -> Bool -> DIO ()
logRollbackInputMessages Double
t0 Double
t Bool
True
                      Point DIO -> TimeWarp DIO () -> DIO ()
forall (m :: * -> *) a. Point m -> TimeWarp m a -> m a
invokeTimeWarp Point DIO
p' (TimeWarp DIO () -> DIO ()) -> TimeWarp DIO () -> DIO ()
forall a b. (a -> b) -> a -> b
$
                        InputMessageQueue -> Bool -> Event DIO () -> TimeWarp DIO ()
rollbackInputMessages InputMessageQueue
q Bool
True (Event DIO () -> TimeWarp DIO ())
-> Event DIO () -> TimeWarp DIO ()
forall a b. (a -> b) -> a -> b
$
                        IO () -> Event DIO ()
forall a. IO a -> Event DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO () -> Event DIO ()) -> IO () -> Event DIO ()
forall a b. (a -> b) -> a -> b
$ InputMessageQueue -> Message -> Int -> IO ()
annihilateMessage InputMessageQueue
q Message
m Int
i
                      Point DIO -> TimeWarp DIO () -> DIO ()
forall (m :: * -> *) a. Point m -> TimeWarp m a -> m a
invokeTimeWarp Point DIO
p' (TimeWarp DIO () -> DIO ()) -> TimeWarp DIO () -> DIO ()
forall a b. (a -> b) -> a -> b
$
                        InputMessageQueue -> TimeWarp DIO ()
inputMessageRollbackTime InputMessageQueue
q
              else IO () -> DIO ()
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO () -> DIO ()) -> IO () -> DIO ()
forall a b. (a -> b) -> a -> b
$ InputMessageQueue -> Message -> Int -> IO ()
annihilateMessage InputMessageQueue
q Message
m Int
i
       Just Int
i' | Int
i' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0 ->
         do -- insert the message at the specified right index
            Bool -> DIO () -> DIO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Double
t Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Double
t0) (DIO () -> DIO ()) -> DIO () -> DIO ()
forall a b. (a -> b) -> a -> b
$
              IO () -> DIO ()
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO () -> DIO ()) -> IO () -> DIO ()
forall a b. (a -> b) -> a -> b
$ IORef Int -> (Int -> Int) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' (InputMessageQueue -> IORef Int
inputMessageVersionRef InputMessageQueue
q) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
            let i :: Int
i = Int -> Int
complement Int
i'
            if Double
t Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Double
t0
              then do let p' :: Point DIO
p' = Double -> Point DIO -> Point DIO
pastPoint Double
t Point DIO
p
                      Double -> Double -> Bool -> DIO ()
logRollbackInputMessages Double
t0 Double
t Bool
False
                      Point DIO -> TimeWarp DIO () -> DIO ()
forall (m :: * -> *) a. Point m -> TimeWarp m a -> m a
invokeTimeWarp Point DIO
p' (TimeWarp DIO () -> DIO ()) -> TimeWarp DIO () -> DIO ()
forall a b. (a -> b) -> a -> b
$
                        InputMessageQueue -> Bool -> Event DIO () -> TimeWarp DIO ()
rollbackInputMessages InputMessageQueue
q Bool
False (Event DIO () -> TimeWarp DIO ())
-> Event DIO () -> TimeWarp DIO ()
forall a b. (a -> b) -> a -> b
$
                        (Point DIO -> DIO ()) -> Event DIO ()
forall (m :: * -> *) a. (Point m -> m a) -> Event m a
Event ((Point DIO -> DIO ()) -> Event DIO ())
-> (Point DIO -> DIO ()) -> Event DIO ()
forall a b. (a -> b) -> a -> b
$ \Point DIO
p' ->
                        do IO () -> DIO ()
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO () -> DIO ()) -> IO () -> DIO ()
forall a b. (a -> b) -> a -> b
$ InputMessageQueue -> Message -> Int -> IO ()
insertMessage InputMessageQueue
q Message
m Int
i
                           Point DIO -> Event DIO () -> DIO ()
forall (m :: * -> *) a. Point m -> Event m a -> m a
invokeEvent Point DIO
p' (Event DIO () -> DIO ()) -> Event DIO () -> DIO ()
forall a b. (a -> b) -> a -> b
$ InputMessageQueue -> Int -> Event DIO ()
activateMessage InputMessageQueue
q Int
i
                      Point DIO -> TimeWarp DIO () -> DIO ()
forall (m :: * -> *) a. Point m -> TimeWarp m a -> m a
invokeTimeWarp Point DIO
p' (TimeWarp DIO () -> DIO ()) -> TimeWarp DIO () -> DIO ()
forall a b. (a -> b) -> a -> b
$
                        InputMessageQueue -> TimeWarp DIO ()
inputMessageRollbackTime InputMessageQueue
q
              else do IO () -> DIO ()
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO () -> DIO ()) -> IO () -> DIO ()
forall a b. (a -> b) -> a -> b
$ InputMessageQueue -> Message -> Int -> IO ()
insertMessage InputMessageQueue
q Message
m Int
i
                      Point DIO -> Event DIO () -> DIO ()
forall (m :: * -> *) a. Point m -> Event m a -> m a
invokeEvent Point DIO
p (Event DIO () -> DIO ()) -> Event DIO () -> DIO ()
forall a b. (a -> b) -> a -> b
$ InputMessageQueue -> Int -> Event DIO ()
activateMessage InputMessageQueue
q Int
i

-- | Log the message skip
logSkipInputMessage :: Double -> DIO ()
logSkipInputMessage :: Double -> DIO ()
logSkipInputMessage Double
t0 =
  Priority -> String -> DIO ()
logDIO Priority
NOTICE (String -> DIO ()) -> String -> DIO ()
forall a b. (a -> b) -> a -> b
$
  String
"Skip the message at t = " String -> String -> String
forall a. [a] -> [a] -> [a]
++ (Double -> String
forall a. Show a => a -> String
show Double
t0)

-- | Log the rollback.
logRollbackInputMessages :: Double -> Double -> Bool -> DIO ()
logRollbackInputMessages :: Double -> Double -> Bool -> DIO ()
logRollbackInputMessages Double
t0 Double
t Bool
including =
  Priority -> String -> DIO ()
logDIO Priority
INFO (String -> DIO ()) -> String -> DIO ()
forall a b. (a -> b) -> a -> b
$
  String
"Rollback at t = " String -> String -> String
forall a. [a] -> [a] -> [a]
++ (Double -> String
forall a. Show a => a -> String
show Double
t0) String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" --> " String -> String -> String
forall a. [a] -> [a] -> [a]
++ (Double -> String
forall a. Show a => a -> String
show Double
t) String -> String -> String
forall a. [a] -> [a] -> [a]
++
  (if Bool -> Bool
not Bool
including then String
" not including" else String
"")

-- | Retry the computations.
retryInputMessages :: InputMessageQueue -> TimeWarp DIO ()
retryInputMessages :: InputMessageQueue -> TimeWarp DIO ()
retryInputMessages InputMessageQueue
q =
  (Point DIO -> DIO ()) -> TimeWarp DIO ()
forall (m :: * -> *) a. (Point m -> m a) -> TimeWarp m a
TimeWarp ((Point DIO -> DIO ()) -> TimeWarp DIO ())
-> (Point DIO -> DIO ()) -> TimeWarp DIO ()
forall a b. (a -> b) -> a -> b
$ \Point DIO
p ->
  do IO () -> DIO ()
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO () -> DIO ()) -> IO () -> DIO ()
forall a b. (a -> b) -> a -> b
$
       IORef Int -> (Int -> Int) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' (InputMessageQueue -> IORef Int
inputMessageVersionRef InputMessageQueue
q) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
     Point DIO -> TimeWarp DIO () -> DIO ()
forall (m :: * -> *) a. Point m -> TimeWarp m a -> m a
invokeTimeWarp Point DIO
p (TimeWarp DIO () -> DIO ()) -> TimeWarp DIO () -> DIO ()
forall a b. (a -> b) -> a -> b
$
       InputMessageQueue -> Bool -> Event DIO () -> TimeWarp DIO ()
rollbackInputMessages InputMessageQueue
q Bool
True (Event DIO () -> TimeWarp DIO ())
-> Event DIO () -> TimeWarp DIO ()
forall a b. (a -> b) -> a -> b
$
       () -> Event DIO ()
forall a. a -> Event DIO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
     Point DIO -> TimeWarp DIO () -> DIO ()
forall (m :: * -> *) a. Point m -> TimeWarp m a -> m a
invokeTimeWarp Point DIO
p (TimeWarp DIO () -> DIO ()) -> TimeWarp DIO () -> DIO ()
forall a b. (a -> b) -> a -> b
$
       InputMessageQueue -> TimeWarp DIO ()
inputMessageRollbackTime InputMessageQueue
q

-- | Rollback the input messages till the specified time, either including the time or not, and apply the given computation.
rollbackInputMessages :: InputMessageQueue -> Bool -> Event DIO () -> TimeWarp DIO ()
rollbackInputMessages :: InputMessageQueue -> Bool -> Event DIO () -> TimeWarp DIO ()
rollbackInputMessages InputMessageQueue
q Bool
including Event DIO ()
m =
  (Point DIO -> DIO ()) -> TimeWarp DIO ()
forall (m :: * -> *) a. (Point m -> m a) -> TimeWarp m a
TimeWarp ((Point DIO -> DIO ()) -> TimeWarp DIO ())
-> (Point DIO -> DIO ()) -> TimeWarp DIO ()
forall a b. (a -> b) -> a -> b
$ \Point DIO
p ->
  do IO () -> DIO ()
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO () -> DIO ()) -> IO () -> DIO ()
forall a b. (a -> b) -> a -> b
$
       InputMessageQueue -> IO ()
requireEmptyMessageActions InputMessageQueue
q
     Point DIO -> TimeWarp DIO () -> DIO ()
forall (m :: * -> *) a. Point m -> TimeWarp m a -> m a
invokeTimeWarp Point DIO
p (TimeWarp DIO () -> DIO ()) -> TimeWarp DIO () -> DIO ()
forall a b. (a -> b) -> a -> b
$
       InputMessageQueue -> Bool -> TimeWarp DIO ()
inputMessageRollbackPre InputMessageQueue
q Bool
including
     Point DIO -> Event DIO () -> DIO ()
forall (m :: * -> *) a. Point m -> Event m a -> m a
invokeEvent Point DIO
p Event DIO ()
m
     Point DIO -> Event DIO () -> DIO ()
forall (m :: * -> *) a. Point m -> Event m a -> m a
invokeEvent Point DIO
p (Event DIO () -> DIO ()) -> Event DIO () -> DIO ()
forall a b. (a -> b) -> a -> b
$
       InputMessageQueue -> Event DIO ()
performMessageActions InputMessageQueue
q
     Point DIO -> TimeWarp DIO () -> DIO ()
forall (m :: * -> *) a. Point m -> TimeWarp m a -> m a
invokeTimeWarp Point DIO
p (TimeWarp DIO () -> DIO ()) -> TimeWarp DIO () -> DIO ()
forall a b. (a -> b) -> a -> b
$
       InputMessageQueue -> Bool -> TimeWarp DIO ()
inputMessageRollbackPost InputMessageQueue
q Bool
including

-- | Return the point in the past.
pastPoint :: Double -> Point DIO -> Point DIO
pastPoint :: Double -> Point DIO -> Point DIO
pastPoint Double
t Point DIO
p = Point DIO
p'
  where sc :: Specs DIO
sc = Point DIO -> Specs DIO
forall (m :: * -> *). Point m -> Specs m
pointSpecs Point DIO
p
        t0 :: Double
t0 = Specs DIO -> Double
forall (m :: * -> *). Specs m -> Double
spcStartTime Specs DIO
sc
        dt :: Double
dt = Specs DIO -> Double
forall (m :: * -> *). Specs m -> Double
spcDT Specs DIO
sc
        n :: Int
n  = Integer -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Integer -> Int) -> Integer -> Int
forall a b. (a -> b) -> a -> b
$ Double -> Integer
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
floor ((Double
t Double -> Double -> Double
forall a. Num a => a -> a -> a
- Double
t0) Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
dt)
        p' :: Point DIO
p' = Point DIO
p { pointTime = t,
                 pointIteration = n,
                 pointPhase = -1 }

-- | Require that the there are no message actions.
requireEmptyMessageActions :: InputMessageQueue -> IO ()
requireEmptyMessageActions :: InputMessageQueue -> IO ()
requireEmptyMessageActions InputMessageQueue
q =
  do [Event DIO ()]
xs <- IORef [Event DIO ()] -> IO [Event DIO ()]
forall a. IORef a -> IO a
readIORef (InputMessageQueue -> IORef [Event DIO ()]
inputMessageActions InputMessageQueue
q)
     Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([Event DIO ()] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Event DIO ()]
xs) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
       String -> IO ()
forall a. HasCallStack => String -> a
error String
"There are incomplete message actions: requireEmptyMessageActions"

-- | Perform the message actions.
performMessageActions :: InputMessageQueue -> Event DIO ()
performMessageActions :: InputMessageQueue -> Event DIO ()
performMessageActions InputMessageQueue
q =
  do [Event DIO ()]
xs <- IO [Event DIO ()] -> Event DIO [Event DIO ()]
forall a. IO a -> Event DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO [Event DIO ()] -> Event DIO [Event DIO ()])
-> IO [Event DIO ()] -> Event DIO [Event DIO ()]
forall a b. (a -> b) -> a -> b
$ IORef [Event DIO ()] -> IO [Event DIO ()]
forall a. IORef a -> IO a
readIORef (InputMessageQueue -> IORef [Event DIO ()]
inputMessageActions InputMessageQueue
q)
     IO () -> Event DIO ()
forall a. IO a -> Event DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO () -> Event DIO ()) -> IO () -> Event DIO ()
forall a b. (a -> b) -> a -> b
$ IORef [Event DIO ()] -> [Event DIO ()] -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (InputMessageQueue -> IORef [Event DIO ()]
inputMessageActions InputMessageQueue
q) []
     [Event DIO ()] -> Event DIO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ [Event DIO ()]
xs

-- | Return the leftmost index for the current time.
leftMessageIndex :: InputMessageQueue -> Double -> Int -> IO Int
leftMessageIndex :: InputMessageQueue -> Double -> Int -> IO Int
leftMessageIndex InputMessageQueue
q Double
t Int
i
  | Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0    = Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
0
  | Bool
otherwise = do let i' :: Int
i' = Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1
                   InputMessageQueueItem
item' <- Vector InputMessageQueueItem -> Int -> IO InputMessageQueueItem
forall a. Vector a -> Int -> IO a
readVector (InputMessageQueue -> Vector InputMessageQueueItem
inputMessages InputMessageQueue
q) Int
i'
                   let m' :: Message
m' = InputMessageQueueItem -> Message
itemMessage InputMessageQueueItem
item'
                       t' :: Double
t' = Message -> Double
messageReceiveTime Message
m'
                   if Double
t' Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> Double
t
                     then String -> IO Int
forall a. HasCallStack => String -> a
error String
"Incorrect index: leftMessageIndex"
                     else if Double
t' Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Double
t
                          then Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
i
                          else InputMessageQueue -> Double -> Int -> IO Int
leftMessageIndex InputMessageQueue
q Double
t Int
i'

-- | Find an anti-message and return the index; otherwise, return a complement to
-- the insertion index with the current receive time. The result is 'Nothing' if
-- the message is duplicated.
findAntiMessage :: InputMessageQueue -> Message -> IO (Maybe Int)
findAntiMessage :: InputMessageQueue -> Message -> IO (Maybe Int)
findAntiMessage InputMessageQueue
q Message
m =
  do Int
right <- InputMessageQueue -> Double -> IO Int
lookupRightMessageIndex InputMessageQueue
q (Message -> Double
messageReceiveTime Message
m)
     if Int
right Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0
       then Maybe Int -> IO (Maybe Int)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
right)
       else let loop :: Int -> IO (Maybe Int)
loop Int
i
                  | Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0     = Maybe Int -> IO (Maybe Int)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$ Int -> Int
complement (Int
right Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1))
                  | Bool
otherwise =
                    do InputMessageQueueItem
item <- Vector InputMessageQueueItem -> Int -> IO InputMessageQueueItem
forall a. Vector a -> Int -> IO a
readVector (InputMessageQueue -> Vector InputMessageQueueItem
inputMessages InputMessageQueue
q) Int
i
                       let m' :: Message
m' = InputMessageQueueItem -> Message
itemMessage InputMessageQueueItem
item
                           t :: Double
t  = Message -> Double
messageReceiveTime Message
m
                           t' :: Double
t' = Message -> Double
messageReceiveTime Message
m'
                       if Double
t' Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> Double
t
                         then String -> IO (Maybe Int)
forall a. HasCallStack => String -> a
error String
"Incorrect index: findAntiMessage"
                         else if Double
t' Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Double
t
                              then Maybe Int -> IO (Maybe Int)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$ Int -> Int
complement (Int
right Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1))
                              else if Message -> Message -> Bool
antiMessages Message
m Message
m'
                                   then Maybe Int -> IO (Maybe Int)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
i)
                                   else if Message
m Message -> Message -> Bool
forall a. Eq a => a -> a -> Bool
== Message
m'
                                        then Maybe Int -> IO (Maybe Int)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Int
forall a. Maybe a
Nothing
                                        else Int -> IO (Maybe Int)
loop (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
            in Int -> IO (Maybe Int)
loop Int
right       

-- | Annihilate a message at the specified index.
annihilateMessage :: InputMessageQueue -> Message -> Int -> IO ()
annihilateMessage :: InputMessageQueue -> Message -> Int -> IO ()
annihilateMessage InputMessageQueue
q Message
m Int
i =
  do InputMessageQueueItem
item <- Vector InputMessageQueueItem -> Int -> IO InputMessageQueueItem
forall a. Vector a -> Int -> IO a
readVector (InputMessageQueue -> Vector InputMessageQueueItem
inputMessages InputMessageQueue
q) Int
i
     let m' :: Message
m' = InputMessageQueueItem -> Message
itemMessage InputMessageQueueItem
item
     Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Message -> Message -> Bool
antiMessages Message
m Message
m') (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
       String -> IO ()
forall a. HasCallStack => String -> a
error String
"Cannot annihilate another message: annihilateMessage"
     Bool
f <- IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef (InputMessageQueueItem -> IORef Bool
itemProcessed InputMessageQueueItem
item)
     Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
f (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
       String -> IO ()
forall a. HasCallStack => String -> a
error String
"Cannot annihilate the processed message: annihilateMessage"
     Vector InputMessageQueueItem -> Int -> IO ()
forall a. Vector a -> Int -> IO ()
vectorDeleteAt (InputMessageQueue -> Vector InputMessageQueueItem
inputMessages InputMessageQueue
q) Int
i
     IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (InputMessageQueueItem -> IORef Bool
itemAnnihilated InputMessageQueueItem
item) Bool
True

-- | Activate a message at the specified index.
activateMessage :: InputMessageQueue -> Int -> Event DIO ()
activateMessage :: InputMessageQueue -> Int -> Event DIO ()
activateMessage InputMessageQueue
q Int
i =
  do InputMessageQueueItem
item <- IO InputMessageQueueItem -> Event DIO InputMessageQueueItem
forall a. IO a -> Event DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO InputMessageQueueItem -> Event DIO InputMessageQueueItem)
-> IO InputMessageQueueItem -> Event DIO InputMessageQueueItem
forall a b. (a -> b) -> a -> b
$ Vector InputMessageQueueItem -> Int -> IO InputMessageQueueItem
forall a. Vector a -> Int -> IO a
readVector (InputMessageQueue -> Vector InputMessageQueueItem
inputMessages InputMessageQueue
q) Int
i
     let m :: Message
m    = InputMessageQueueItem -> Message
itemMessage InputMessageQueueItem
item
         loop :: Event DIO ()
loop =
           do Bool
f <- IO Bool -> Event DIO Bool
forall a. IO a -> Event DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO Bool -> Event DIO Bool) -> IO Bool -> Event DIO Bool
forall a b. (a -> b) -> a -> b
$ IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef (InputMessageQueueItem -> IORef Bool
itemAnnihilated InputMessageQueueItem
item)
              Bool -> Event DIO () -> Event DIO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
f (Event DIO () -> Event DIO ()) -> Event DIO () -> Event DIO ()
forall a b. (a -> b) -> a -> b
$
                do UndoableLog -> DIO () -> Event DIO ()
writeLog (InputMessageQueue -> UndoableLog
inputMessageLog InputMessageQueue
q) (DIO () -> Event DIO ()) -> DIO () -> Event DIO ()
forall a b. (a -> b) -> a -> b
$
                     IO () -> DIO ()
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO () -> DIO ()) -> IO () -> DIO ()
forall a b. (a -> b) -> a -> b
$
                     IORef [Event DIO ()] -> ([Event DIO ()] -> [Event DIO ()]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (InputMessageQueue -> IORef [Event DIO ()]
inputMessageActions InputMessageQueue
q) (Event DIO ()
loop Event DIO () -> [Event DIO ()] -> [Event DIO ()]
forall a. a -> [a] -> [a]
:)
                   Double -> Event DIO () -> Event DIO ()
forall (m :: * -> *).
EventQueueing m =>
Double -> Event m () -> Event m ()
enqueueEvent (Message -> Double
messageReceiveTime Message
m) (Event DIO () -> Event DIO ()) -> Event DIO () -> Event DIO ()
forall a b. (a -> b) -> a -> b
$
                     do Bool
f <- IO Bool -> Event DIO Bool
forall a. IO a -> Event DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO Bool -> Event DIO Bool) -> IO Bool -> Event DIO Bool
forall a b. (a -> b) -> a -> b
$ IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef (InputMessageQueueItem -> IORef Bool
itemAnnihilated InputMessageQueueItem
item)
                        Bool -> Event DIO () -> Event DIO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
f (Event DIO () -> Event DIO ()) -> Event DIO () -> Event DIO ()
forall a b. (a -> b) -> a -> b
$
                          do UndoableLog -> DIO () -> Event DIO ()
writeLog (InputMessageQueue -> UndoableLog
inputMessageLog InputMessageQueue
q) (DIO () -> Event DIO ()) -> DIO () -> Event DIO ()
forall a b. (a -> b) -> a -> b
$
                               IO () -> DIO ()
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO () -> DIO ()) -> IO () -> DIO ()
forall a b. (a -> b) -> a -> b
$
                               IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (InputMessageQueueItem -> IORef Bool
itemProcessed InputMessageQueueItem
item) Bool
False
                             IO () -> Event DIO ()
forall a. IO a -> Event DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO () -> Event DIO ()) -> IO () -> Event DIO ()
forall a b. (a -> b) -> a -> b
$
                               IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (InputMessageQueueItem -> IORef Bool
itemProcessed InputMessageQueueItem
item) Bool
True
                             Bool -> Event DIO () -> Event DIO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Message -> Bool
messageAntiToggle Message
m) (Event DIO () -> Event DIO ()) -> Event DIO () -> Event DIO ()
forall a b. (a -> b) -> a -> b
$
                               SignalSource DIO Message -> Message -> Event DIO ()
forall (m :: * -> *) a. SignalSource m a -> a -> Event m ()
triggerSignal (InputMessageQueue -> SignalSource DIO Message
inputMessageSource InputMessageQueue
q) Message
m
     Event DIO ()
loop

-- | Insert a new message.
insertMessage :: InputMessageQueue -> Message -> Int -> IO ()
-- insertMessage q m i =
--   do r1 <- newIORef False
--      r2 <- newIORef False
--      let item = InputMessageQueueItem m r1 r2
--      vectorInsert (inputMessages q) i item
insertMessage :: InputMessageQueue -> Message -> Int -> IO ()
insertMessage InputMessageQueue
q Message
m Int
i =
  do Int
n <- Vector InputMessageQueueItem -> IO Int
forall a. Vector a -> IO Int
vectorCount (InputMessageQueue -> Vector InputMessageQueueItem
inputMessages InputMessageQueue
q)
     Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
n) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
       do InputMessageQueueItem
item0 <- Vector InputMessageQueueItem -> Int -> IO InputMessageQueueItem
forall a. Vector a -> Int -> IO a
readVector (InputMessageQueue -> Vector InputMessageQueueItem
inputMessages InputMessageQueue
q) Int
i
          let m0 :: Message
m0 = InputMessageQueueItem -> Message
itemMessage InputMessageQueueItem
item0
          Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Message -> Double
messageReceiveTime Message
m Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Message -> Double
messageReceiveTime Message
m0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
            String -> IO ()
forall a. HasCallStack => String -> a
error String
"Error inserting a new input message (check before): insertMessage"
     Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
       do InputMessageQueueItem
item0 <- Vector InputMessageQueueItem -> Int -> IO InputMessageQueueItem
forall a. Vector a -> Int -> IO a
readVector (InputMessageQueue -> Vector InputMessageQueueItem
inputMessages InputMessageQueue
q) (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
          let m0 :: Message
m0 = InputMessageQueueItem -> Message
itemMessage InputMessageQueueItem
item0
          Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Message -> Double
messageReceiveTime Message
m Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
>= Message -> Double
messageReceiveTime Message
m0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
            String -> IO ()
forall a. HasCallStack => String -> a
error String
"Error inserting a new input message (check after): insertMessage"
     IORef Bool
r1 <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
     IORef Bool
r2 <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
     let item :: InputMessageQueueItem
item = Message -> IORef Bool -> IORef Bool -> InputMessageQueueItem
InputMessageQueueItem Message
m IORef Bool
r1 IORef Bool
r2
     Vector InputMessageQueueItem
-> Int -> InputMessageQueueItem -> IO ()
forall a. Vector a -> Int -> a -> IO ()
vectorInsert (InputMessageQueue -> Vector InputMessageQueueItem
inputMessages InputMessageQueue
q) Int
i InputMessageQueueItem
item

-- | Search for the rightmost message index.
lookupRightMessageIndex' :: InputMessageQueue -> Double -> Int -> Int -> IO Int
lookupRightMessageIndex' :: InputMessageQueue -> Double -> Int -> Int -> IO Int
lookupRightMessageIndex' InputMessageQueue
q Double
t Int
left Int
right =
  if Int
left Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
right
  then Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> IO Int) -> Int -> IO Int
forall a b. (a -> b) -> a -> b
$ Int -> Int
complement (Int
right Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
  else  
    do let index :: Int
index = ((Int
left Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
right) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
2
       InputMessageQueueItem
item <- Vector InputMessageQueueItem -> Int -> IO InputMessageQueueItem
forall a. Vector a -> Int -> IO a
readVector (InputMessageQueue -> Vector InputMessageQueueItem
inputMessages InputMessageQueue
q) Int
index
       let m' :: Message
m' = InputMessageQueueItem -> Message
itemMessage InputMessageQueueItem
item
           t' :: Double
t' = Message -> Double
messageReceiveTime Message
m'
       if Int
left Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
right
         then if Double
t' Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> Double
t
              then Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> IO Int) -> Int -> IO Int
forall a b. (a -> b) -> a -> b
$ Int -> Int
complement Int
right
              else if Double
t' Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Double
t
                   then Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> IO Int) -> Int -> IO Int
forall a b. (a -> b) -> a -> b
$ Int -> Int
complement (Int
right Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
                   else Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
right
         else if Double
t' Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> Double
t
              then InputMessageQueue -> Double -> Int -> Int -> IO Int
lookupRightMessageIndex' InputMessageQueue
q Double
t Int
left (Int
index Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
              else if Double
t' Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Double
t
                   then InputMessageQueue -> Double -> Int -> Int -> IO Int
lookupRightMessageIndex' InputMessageQueue
q Double
t (Int
index Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int
right
                   else InputMessageQueue -> Double -> Int -> Int -> IO Int
lookupRightMessageIndex' InputMessageQueue
q Double
t Int
index Int
right

-- | Search for the leftmost message index.
lookupLeftMessageIndex' :: InputMessageQueue -> Double -> Int -> Int -> IO Int
lookupLeftMessageIndex' :: InputMessageQueue -> Double -> Int -> Int -> IO Int
lookupLeftMessageIndex' InputMessageQueue
q Double
t Int
left Int
right =
  if Int
left Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
right
  then Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> IO Int) -> Int -> IO Int
forall a b. (a -> b) -> a -> b
$ Int -> Int
complement Int
left
  else  
    do let index :: Int
index = (Int
left Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
right) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
2
       InputMessageQueueItem
item <- Vector InputMessageQueueItem -> Int -> IO InputMessageQueueItem
forall a. Vector a -> Int -> IO a
readVector (InputMessageQueue -> Vector InputMessageQueueItem
inputMessages InputMessageQueue
q) Int
index
       let m' :: Message
m' = InputMessageQueueItem -> Message
itemMessage InputMessageQueueItem
item
           t' :: Double
t' = Message -> Double
messageReceiveTime Message
m'
       if Int
left Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
right
         then if Double
t' Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> Double
t
              then Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> IO Int) -> Int -> IO Int
forall a b. (a -> b) -> a -> b
$ Int -> Int
complement Int
left
              else if Double
t' Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Double
t
                   then Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> IO Int) -> Int -> IO Int
forall a b. (a -> b) -> a -> b
$ Int -> Int
complement (Int
left Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
                   else Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
left
         else if Double
t' Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> Double
t
              then InputMessageQueue -> Double -> Int -> Int -> IO Int
lookupLeftMessageIndex' InputMessageQueue
q Double
t Int
left (Int
index Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
              else if Double
t' Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Double
t
                   then InputMessageQueue -> Double -> Int -> Int -> IO Int
lookupLeftMessageIndex' InputMessageQueue
q Double
t (Int
index Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int
right
                   else InputMessageQueue -> Double -> Int -> Int -> IO Int
lookupLeftMessageIndex' InputMessageQueue
q Double
t Int
left Int
index
 
-- | Search for the rightmost message index.
lookupRightMessageIndex :: InputMessageQueue -> Double -> IO Int
lookupRightMessageIndex :: InputMessageQueue -> Double -> IO Int
lookupRightMessageIndex InputMessageQueue
q Double
t =
  do Int
n <- Vector InputMessageQueueItem -> IO Int
forall a. Vector a -> IO Int
vectorCount (InputMessageQueue -> Vector InputMessageQueueItem
inputMessages InputMessageQueue
q)
     InputMessageQueue -> Double -> Int -> Int -> IO Int
lookupRightMessageIndex' InputMessageQueue
q Double
t Int
0 (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
 
-- | Search for the leftmost message index.
lookupLeftMessageIndex :: InputMessageQueue -> Double -> IO Int
lookupLeftMessageIndex :: InputMessageQueue -> Double -> IO Int
lookupLeftMessageIndex InputMessageQueue
q Double
t =
  do Int
n <- Vector InputMessageQueueItem -> IO Int
forall a. Vector a -> IO Int
vectorCount (InputMessageQueue -> Vector InputMessageQueueItem
inputMessages InputMessageQueue
q)
     InputMessageQueue -> Double -> Int -> Int -> IO Int
lookupLeftMessageIndex' InputMessageQueue
q Double
t Int
0 (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)

-- | Reduce the input messages till the specified time.
reduceInputMessages :: InputMessageQueue -> Double -> IO ()
reduceInputMessages :: InputMessageQueue -> Double -> IO ()
reduceInputMessages InputMessageQueue
q Double
t =
  do Int
count <- Vector InputMessageQueueItem -> IO Int
forall a. Vector a -> IO Int
vectorCount (InputMessageQueue -> Vector InputMessageQueueItem
inputMessages InputMessageQueue
q)
     Int
len   <- Int -> Int -> IO Int
loop Int
count Int
0
     Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
       Vector InputMessageQueueItem -> Int -> Int -> IO ()
forall a. Vector a -> Int -> Int -> IO ()
vectorDeleteRange (InputMessageQueue -> Vector InputMessageQueueItem
inputMessages InputMessageQueue
q) Int
0 Int
len
       where
         loop :: Int -> Int -> IO Int
loop Int
n Int
i
           | Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n    = Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
i
           | Bool
otherwise = do InputMessageQueueItem
item <- Vector InputMessageQueueItem -> Int -> IO InputMessageQueueItem
forall a. Vector a -> Int -> IO a
readVector (InputMessageQueue -> Vector InputMessageQueueItem
inputMessages InputMessageQueue
q) Int
i
                            let m :: Message
m = InputMessageQueueItem -> Message
itemMessage InputMessageQueueItem
item
                            if Message -> Double
messageReceiveTime Message
m Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Double
t
                              then Int -> Int -> IO Int
loop Int
n (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
                              else Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
i

-- | Filter the input messages using the specified predicate.
filterInputMessages :: (Message -> Bool) -> InputMessageQueue -> IO [Message]
filterInputMessages :: (Message -> Bool) -> InputMessageQueue -> IO [Message]
filterInputMessages Message -> Bool
pred InputMessageQueue
q =
  do Int
count <- Vector InputMessageQueueItem -> IO Int
forall a. Vector a -> IO Int
vectorCount (InputMessageQueue -> Vector InputMessageQueueItem
inputMessages InputMessageQueue
q)
     Int -> Int -> [Message] -> IO [Message]
loop Int
count Int
0 []
       where
         loop :: Int -> Int -> [Message] -> IO [Message]
loop Int
n Int
i [Message]
acc
           | Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n    = [Message] -> IO [Message]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Message] -> [Message]
forall a. [a] -> [a]
reverse [Message]
acc)
           | Bool
otherwise = do InputMessageQueueItem
item <- Vector InputMessageQueueItem -> Int -> IO InputMessageQueueItem
forall a. Vector a -> Int -> IO a
readVector (InputMessageQueue -> Vector InputMessageQueueItem
inputMessages InputMessageQueue
q) Int
i
                            let m :: Message
m = InputMessageQueueItem -> Message
itemMessage InputMessageQueueItem
item
                            if Message -> Bool
pred Message
m
                              then Int -> Int -> [Message] -> IO [Message]
loop Int
n (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) (Message
m Message -> [Message] -> [Message]
forall a. a -> [a] -> [a]
: [Message]
acc)
                              else Int -> Int -> [Message] -> IO [Message]
loop Int
n (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) [Message]
acc