-- |
-- Module     : Simulation.Aivika.Distributed.Optimistic.Internal.OutputMessageQueue
-- Copyright  : Copyright (c) 2015-2017, David Sorokin <david.sorokin@gmail.com>
-- License    : BSD3
-- Maintainer : David Sorokin <david.sorokin@gmail.com>
-- Stability  : experimental
-- Tested with: GHC 7.10.3
--
-- This module defines an output message queue.
--
module Simulation.Aivika.Distributed.Optimistic.Internal.OutputMessageQueue
       (OutputMessageQueue,
        newOutputMessageQueue,
        outputMessageQueueSize,
        sendMessage,
        rollbackOutputMessages,
        reduceOutputMessages,
        generateMessageSequenceNo) where

import Data.List
import Data.IORef
import Data.Word

import Control.Monad
import Control.Monad.Trans
import qualified Control.Distributed.Process as DP

import qualified Simulation.Aivika.DoubleLinkedList as DLL

import Simulation.Aivika.Trans.Comp
import Simulation.Aivika.Trans.Simulation
import Simulation.Aivika.Trans.Event

import Simulation.Aivika.Distributed.Optimistic.Internal.Message
import Simulation.Aivika.Distributed.Optimistic.Internal.DIO
import Simulation.Aivika.Distributed.Optimistic.Internal.IO
import Simulation.Aivika.Distributed.Optimistic.DIO

-- | Specifies the output message queue.
data OutputMessageQueue =
  OutputMessageQueue { OutputMessageQueue -> Message -> IO ()
outputEnqueueTransientMessage :: Message -> IO (),
                       -- ^ Enqueue the transient message.
                       OutputMessageQueue -> DoubleLinkedList Message
outputMessages :: DLL.DoubleLinkedList Message,
                       -- ^ The output messages.
                       OutputMessageQueue -> IORef Word64
outputMessageSequenceNo :: IORef Word64
                       -- ^ The next sequence number.
                     }

-- | Create a new output message queue.
newOutputMessageQueue :: (Message -> IO ()) -> DIO OutputMessageQueue
newOutputMessageQueue :: (Message -> IO ()) -> DIO OutputMessageQueue
newOutputMessageQueue Message -> IO ()
transient =
  do DoubleLinkedList Message
ms <- IO (DoubleLinkedList Message) -> DIO (DoubleLinkedList Message)
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe IO (DoubleLinkedList Message)
forall a. IO (DoubleLinkedList a)
DLL.newList
     IORef Word64
rn <- IO (IORef Word64) -> DIO (IORef Word64)
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO (IORef Word64) -> DIO (IORef Word64))
-> IO (IORef Word64) -> DIO (IORef Word64)
forall a b. (a -> b) -> a -> b
$ Word64 -> IO (IORef Word64)
forall a. a -> IO (IORef a)
newIORef Word64
0
     OutputMessageQueue -> DIO OutputMessageQueue
forall a. a -> DIO a
forall (m :: * -> *) a. Monad m => a -> m a
return OutputMessageQueue { outputEnqueueTransientMessage :: Message -> IO ()
outputEnqueueTransientMessage = Message -> IO ()
transient,
                                 outputMessages :: DoubleLinkedList Message
outputMessages = DoubleLinkedList Message
ms,
                                 outputMessageSequenceNo :: IORef Word64
outputMessageSequenceNo = IORef Word64
rn }

-- | Return the output message queue size.
outputMessageQueueSize :: OutputMessageQueue -> IO Int
{-# INLINE outputMessageQueueSize #-}
outputMessageQueueSize :: OutputMessageQueue -> IO Int
outputMessageQueueSize = DoubleLinkedList Message -> IO Int
forall a. DoubleLinkedList a -> IO Int
DLL.listCount (DoubleLinkedList Message -> IO Int)
-> (OutputMessageQueue -> DoubleLinkedList Message)
-> OutputMessageQueue
-> IO Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. OutputMessageQueue -> DoubleLinkedList Message
outputMessages

-- | Send the message.
sendMessage :: OutputMessageQueue -> Message -> DIO ()
sendMessage :: OutputMessageQueue -> Message -> DIO ()
sendMessage OutputMessageQueue
q Message
m =
  do Bool -> DIO () -> DIO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Message -> Double
messageSendTime Message
m Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> Message -> Double
messageReceiveTime Message
m) (DIO () -> DIO ()) -> DIO () -> DIO ()
forall a b. (a -> b) -> a -> b
$
       [Char] -> DIO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"The Send time cannot be greater than the Receive message time: sendMessage"
     Bool -> DIO () -> DIO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Message -> Bool
messageAntiToggle Message
m) (DIO () -> DIO ()) -> DIO () -> DIO ()
forall a b. (a -> b) -> a -> b
$
       [Char] -> DIO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"Cannot directly send the anti-message: sendMessage"
     Bool
f <- IO Bool -> DIO Bool
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO Bool -> DIO Bool) -> IO Bool -> DIO Bool
forall a b. (a -> b) -> a -> b
$ DoubleLinkedList Message -> IO Bool
forall a. DoubleLinkedList a -> IO Bool
DLL.listNull (OutputMessageQueue -> DoubleLinkedList Message
outputMessages OutputMessageQueue
q)
     Bool -> DIO () -> DIO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
f (DIO () -> DIO ()) -> DIO () -> DIO ()
forall a b. (a -> b) -> a -> b
$
       do Message
m' <- IO Message -> DIO Message
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO Message -> DIO Message) -> IO Message -> DIO Message
forall a b. (a -> b) -> a -> b
$ DoubleLinkedList Message -> IO Message
forall a. DoubleLinkedList a -> IO a
DLL.listLast (OutputMessageQueue -> DoubleLinkedList Message
outputMessages OutputMessageQueue
q)
          Bool -> DIO () -> DIO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Message -> Double
messageSendTime Message
m' Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> Message -> Double
messageSendTime Message
m) (DIO () -> DIO ()) -> DIO () -> DIO ()
forall a b. (a -> b) -> a -> b
$
            [Char] -> DIO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"A new output message comes from the past: sendMessage."
     IO () -> DIO ()
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO () -> DIO ()) -> IO () -> DIO ()
forall a b. (a -> b) -> a -> b
$ OutputMessageQueue -> Message -> IO ()
outputEnqueueTransientMessage OutputMessageQueue
q Message
m
     Message -> DIO ()
deliverMessage Message
m
     IO () -> DIO ()
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO () -> DIO ()) -> IO () -> DIO ()
forall a b. (a -> b) -> a -> b
$ DoubleLinkedList Message -> Message -> IO ()
forall a. DoubleLinkedList a -> a -> IO ()
DLL.listAddLast (OutputMessageQueue -> DoubleLinkedList Message
outputMessages OutputMessageQueue
q) Message
m

-- | Rollback the messages till the specified time either including that one or not.
rollbackOutputMessages :: OutputMessageQueue -> Double -> Bool -> DIO ()
rollbackOutputMessages :: OutputMessageQueue -> Double -> Bool -> DIO ()
rollbackOutputMessages OutputMessageQueue
q Double
t Bool
including =
  do [Message]
ms <- IO [Message] -> DIO [Message]
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO [Message] -> DIO [Message]) -> IO [Message] -> DIO [Message]
forall a b. (a -> b) -> a -> b
$ OutputMessageQueue -> Double -> Bool -> IO [Message]
extractMessagesToRollback OutputMessageQueue
q Double
t Bool
including
     let ms' :: [Message]
ms' = (Message -> Message) -> [Message] -> [Message]
forall a b. (a -> b) -> [a] -> [b]
map Message -> Message
antiMessage [Message]
ms
     IO () -> DIO ()
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO () -> DIO ()) -> IO () -> DIO ()
forall a b. (a -> b) -> a -> b
$
       [Message] -> (Message -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Message]
ms' ((Message -> IO ()) -> IO ()) -> (Message -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ OutputMessageQueue -> Message -> IO ()
outputEnqueueTransientMessage OutputMessageQueue
q
     [Message] -> DIO ()
deliverAntiMessages [Message]
ms'
     -- forM_ ms' deliverAntiMessage
                 
-- | Return the messages to roolback by the specified time.
extractMessagesToRollback :: OutputMessageQueue -> Double -> Bool -> IO [Message]
extractMessagesToRollback :: OutputMessageQueue -> Double -> Bool -> IO [Message]
extractMessagesToRollback OutputMessageQueue
q Double
t Bool
including = [Message] -> IO [Message]
loop []
  where
    loop :: [Message] -> IO [Message]
loop [Message]
acc =
      do Bool
f <- DoubleLinkedList Message -> IO Bool
forall a. DoubleLinkedList a -> IO Bool
DLL.listNull (OutputMessageQueue -> DoubleLinkedList Message
outputMessages OutputMessageQueue
q)
         if Bool
f
           then [Message] -> IO [Message]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return [Message]
acc
           else do Message
m <- DoubleLinkedList Message -> IO Message
forall a. DoubleLinkedList a -> IO a
DLL.listLast (OutputMessageQueue -> DoubleLinkedList Message
outputMessages OutputMessageQueue
q)
                   if (Bool -> Bool
not Bool
including) Bool -> Bool -> Bool
&& (Message -> Double
messageSendTime Message
m Double -> Double -> Bool
forall a. Eq a => a -> a -> Bool
== Double
t)
                     then [Message] -> IO [Message]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return [Message]
acc
                     else if Message -> Double
messageSendTime Message
m Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Double
t
                          then [Message] -> IO [Message]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return [Message]
acc
                          else do DoubleLinkedList Message -> IO ()
forall a. DoubleLinkedList a -> IO ()
DLL.listRemoveLast (OutputMessageQueue -> DoubleLinkedList Message
outputMessages OutputMessageQueue
q)
                                  [Message] -> IO [Message]
loop (Message
m Message -> [Message] -> [Message]
forall a. a -> [a] -> [a]
: [Message]
acc)

-- | Reduce the output messages till the specified time.
reduceOutputMessages :: OutputMessageQueue -> Double -> IO ()
reduceOutputMessages :: OutputMessageQueue -> Double -> IO ()
reduceOutputMessages OutputMessageQueue
q Double
t = IO ()
loop
  where
    loop :: IO ()
loop =
      do Bool
f <- DoubleLinkedList Message -> IO Bool
forall a. DoubleLinkedList a -> IO Bool
DLL.listNull (OutputMessageQueue -> DoubleLinkedList Message
outputMessages OutputMessageQueue
q)
         Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
f (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
           do Message
m <- DoubleLinkedList Message -> IO Message
forall a. DoubleLinkedList a -> IO a
DLL.listFirst (OutputMessageQueue -> DoubleLinkedList Message
outputMessages OutputMessageQueue
q)
              Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Message -> Double
messageSendTime Message
m Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Double
t) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
                do DoubleLinkedList Message -> IO ()
forall a. DoubleLinkedList a -> IO ()
DLL.listRemoveFirst (OutputMessageQueue -> DoubleLinkedList Message
outputMessages OutputMessageQueue
q)
                   IO ()
loop

-- | Generate a next message sequence number.
generateMessageSequenceNo :: OutputMessageQueue -> IO Word64
generateMessageSequenceNo :: OutputMessageQueue -> IO Word64
generateMessageSequenceNo OutputMessageQueue
q =
  IORef Word64 -> (Word64 -> (Word64, Word64)) -> IO Word64
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef (OutputMessageQueue -> IORef Word64
outputMessageSequenceNo OutputMessageQueue
q) ((Word64 -> (Word64, Word64)) -> IO Word64)
-> (Word64 -> (Word64, Word64)) -> IO Word64
forall a b. (a -> b) -> a -> b
$ \Word64
n ->
  let n' :: Word64
n' = Word64
n Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ Word64
1 in Word64
n' Word64 -> (Word64, Word64) -> (Word64, Word64)
forall a b. a -> b -> b
`seq` (Word64
n', Word64
n)

-- | Deliver the message on low level.
deliverMessage :: Message -> DIO ()
deliverMessage :: Message -> DIO ()
deliverMessage Message
x =
  ProcessId -> Message -> DIO ()
sendMessageDIO (Message -> ProcessId
messageReceiverId Message
x) Message
x

-- | Deliver the anti-message on low level.
deliverAntiMessage :: Message -> DIO ()
deliverAntiMessage :: Message -> DIO ()
deliverAntiMessage Message
x =
  ProcessId -> Message -> DIO ()
sendMessageDIO (Message -> ProcessId
messageReceiverId Message
x) Message
x

-- | Deliver the anti-messages on low level.
deliverAntiMessages :: [Message] -> DIO ()
deliverAntiMessages :: [Message] -> DIO ()
deliverAntiMessages [Message]
xs =
  let ys :: [[Message]]
ys = (Message -> Message -> Bool) -> [Message] -> [[Message]]
forall a. (a -> a -> Bool) -> [a] -> [[a]]
groupBy (\Message
a Message
b -> Message -> ProcessId
messageReceiverId Message
a ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== Message -> ProcessId
messageReceiverId Message
b) [Message]
xs
      dlv :: [Message] -> DIO ()
dlv []         = () -> DIO ()
forall a. a -> DIO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
      dlv zs :: [Message]
zs@(Message
z : [Message]
_) =
        ProcessId -> [Message] -> DIO ()
sendMessagesDIO (Message -> ProcessId
messageReceiverId Message
z) [Message]
zs
  in [[Message]] -> ([Message] -> DIO ()) -> DIO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [[Message]]
ys [Message] -> DIO ()
dlv