module Simulation.Aivika.Distributed.Optimistic.Internal.OutputMessageQueue
(OutputMessageQueue,
newOutputMessageQueue,
outputMessageQueueSize,
sendMessage,
rollbackOutputMessages,
reduceOutputMessages,
generateMessageSequenceNo) where
import Data.List
import Data.IORef
import Data.Word
import Control.Monad
import Control.Monad.Trans
import qualified Control.Distributed.Process as DP
import qualified Simulation.Aivika.DoubleLinkedList as DLL
import Simulation.Aivika.Trans.Comp
import Simulation.Aivika.Trans.Simulation
import Simulation.Aivika.Trans.Event
import Simulation.Aivika.Distributed.Optimistic.Internal.Message
import Simulation.Aivika.Distributed.Optimistic.Internal.DIO
import Simulation.Aivika.Distributed.Optimistic.Internal.IO
import Simulation.Aivika.Distributed.Optimistic.DIO
data OutputMessageQueue =
OutputMessageQueue { OutputMessageQueue -> Message -> IO ()
outputEnqueueTransientMessage :: Message -> IO (),
OutputMessageQueue -> DoubleLinkedList Message
outputMessages :: DLL.DoubleLinkedList Message,
OutputMessageQueue -> IORef Word64
outputMessageSequenceNo :: IORef Word64
}
newOutputMessageQueue :: (Message -> IO ()) -> DIO OutputMessageQueue
newOutputMessageQueue :: (Message -> IO ()) -> DIO OutputMessageQueue
newOutputMessageQueue Message -> IO ()
transient =
do DoubleLinkedList Message
ms <- IO (DoubleLinkedList Message) -> DIO (DoubleLinkedList Message)
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe IO (DoubleLinkedList Message)
forall a. IO (DoubleLinkedList a)
DLL.newList
IORef Word64
rn <- IO (IORef Word64) -> DIO (IORef Word64)
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO (IORef Word64) -> DIO (IORef Word64))
-> IO (IORef Word64) -> DIO (IORef Word64)
forall a b. (a -> b) -> a -> b
$ Word64 -> IO (IORef Word64)
forall a. a -> IO (IORef a)
newIORef Word64
0
OutputMessageQueue -> DIO OutputMessageQueue
forall a. a -> DIO a
forall (m :: * -> *) a. Monad m => a -> m a
return OutputMessageQueue { outputEnqueueTransientMessage :: Message -> IO ()
outputEnqueueTransientMessage = Message -> IO ()
transient,
outputMessages :: DoubleLinkedList Message
outputMessages = DoubleLinkedList Message
ms,
outputMessageSequenceNo :: IORef Word64
outputMessageSequenceNo = IORef Word64
rn }
outputMessageQueueSize :: OutputMessageQueue -> IO Int
{-# INLINE outputMessageQueueSize #-}
outputMessageQueueSize :: OutputMessageQueue -> IO Int
outputMessageQueueSize = DoubleLinkedList Message -> IO Int
forall a. DoubleLinkedList a -> IO Int
DLL.listCount (DoubleLinkedList Message -> IO Int)
-> (OutputMessageQueue -> DoubleLinkedList Message)
-> OutputMessageQueue
-> IO Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. OutputMessageQueue -> DoubleLinkedList Message
outputMessages
sendMessage :: OutputMessageQueue -> Message -> DIO ()
sendMessage :: OutputMessageQueue -> Message -> DIO ()
sendMessage OutputMessageQueue
q Message
m =
do Bool -> DIO () -> DIO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Message -> Double
messageSendTime Message
m Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> Message -> Double
messageReceiveTime Message
m) (DIO () -> DIO ()) -> DIO () -> DIO ()
forall a b. (a -> b) -> a -> b
$
[Char] -> DIO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"The Send time cannot be greater than the Receive message time: sendMessage"
Bool -> DIO () -> DIO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Message -> Bool
messageAntiToggle Message
m) (DIO () -> DIO ()) -> DIO () -> DIO ()
forall a b. (a -> b) -> a -> b
$
[Char] -> DIO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"Cannot directly send the anti-message: sendMessage"
Bool
f <- IO Bool -> DIO Bool
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO Bool -> DIO Bool) -> IO Bool -> DIO Bool
forall a b. (a -> b) -> a -> b
$ DoubleLinkedList Message -> IO Bool
forall a. DoubleLinkedList a -> IO Bool
DLL.listNull (OutputMessageQueue -> DoubleLinkedList Message
outputMessages OutputMessageQueue
q)
Bool -> DIO () -> DIO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
f (DIO () -> DIO ()) -> DIO () -> DIO ()
forall a b. (a -> b) -> a -> b
$
do Message
m' <- IO Message -> DIO Message
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO Message -> DIO Message) -> IO Message -> DIO Message
forall a b. (a -> b) -> a -> b
$ DoubleLinkedList Message -> IO Message
forall a. DoubleLinkedList a -> IO a
DLL.listLast (OutputMessageQueue -> DoubleLinkedList Message
outputMessages OutputMessageQueue
q)
Bool -> DIO () -> DIO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Message -> Double
messageSendTime Message
m' Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> Message -> Double
messageSendTime Message
m) (DIO () -> DIO ()) -> DIO () -> DIO ()
forall a b. (a -> b) -> a -> b
$
[Char] -> DIO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"A new output message comes from the past: sendMessage."
IO () -> DIO ()
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO () -> DIO ()) -> IO () -> DIO ()
forall a b. (a -> b) -> a -> b
$ OutputMessageQueue -> Message -> IO ()
outputEnqueueTransientMessage OutputMessageQueue
q Message
m
Message -> DIO ()
deliverMessage Message
m
IO () -> DIO ()
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO () -> DIO ()) -> IO () -> DIO ()
forall a b. (a -> b) -> a -> b
$ DoubleLinkedList Message -> Message -> IO ()
forall a. DoubleLinkedList a -> a -> IO ()
DLL.listAddLast (OutputMessageQueue -> DoubleLinkedList Message
outputMessages OutputMessageQueue
q) Message
m
rollbackOutputMessages :: OutputMessageQueue -> Double -> Bool -> DIO ()
rollbackOutputMessages :: OutputMessageQueue -> Double -> Bool -> DIO ()
rollbackOutputMessages OutputMessageQueue
q Double
t Bool
including =
do [Message]
ms <- IO [Message] -> DIO [Message]
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO [Message] -> DIO [Message]) -> IO [Message] -> DIO [Message]
forall a b. (a -> b) -> a -> b
$ OutputMessageQueue -> Double -> Bool -> IO [Message]
extractMessagesToRollback OutputMessageQueue
q Double
t Bool
including
let ms' :: [Message]
ms' = (Message -> Message) -> [Message] -> [Message]
forall a b. (a -> b) -> [a] -> [b]
map Message -> Message
antiMessage [Message]
ms
IO () -> DIO ()
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO () -> DIO ()) -> IO () -> DIO ()
forall a b. (a -> b) -> a -> b
$
[Message] -> (Message -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Message]
ms' ((Message -> IO ()) -> IO ()) -> (Message -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ OutputMessageQueue -> Message -> IO ()
outputEnqueueTransientMessage OutputMessageQueue
q
[Message] -> DIO ()
deliverAntiMessages [Message]
ms'
extractMessagesToRollback :: OutputMessageQueue -> Double -> Bool -> IO [Message]
OutputMessageQueue
q Double
t Bool
including = [Message] -> IO [Message]
loop []
where
loop :: [Message] -> IO [Message]
loop [Message]
acc =
do Bool
f <- DoubleLinkedList Message -> IO Bool
forall a. DoubleLinkedList a -> IO Bool
DLL.listNull (OutputMessageQueue -> DoubleLinkedList Message
outputMessages OutputMessageQueue
q)
if Bool
f
then [Message] -> IO [Message]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return [Message]
acc
else do Message
m <- DoubleLinkedList Message -> IO Message
forall a. DoubleLinkedList a -> IO a
DLL.listLast (OutputMessageQueue -> DoubleLinkedList Message
outputMessages OutputMessageQueue
q)
if (Bool -> Bool
not Bool
including) Bool -> Bool -> Bool
&& (Message -> Double
messageSendTime Message
m Double -> Double -> Bool
forall a. Eq a => a -> a -> Bool
== Double
t)
then [Message] -> IO [Message]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return [Message]
acc
else if Message -> Double
messageSendTime Message
m Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Double
t
then [Message] -> IO [Message]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return [Message]
acc
else do DoubleLinkedList Message -> IO ()
forall a. DoubleLinkedList a -> IO ()
DLL.listRemoveLast (OutputMessageQueue -> DoubleLinkedList Message
outputMessages OutputMessageQueue
q)
[Message] -> IO [Message]
loop (Message
m Message -> [Message] -> [Message]
forall a. a -> [a] -> [a]
: [Message]
acc)
reduceOutputMessages :: OutputMessageQueue -> Double -> IO ()
reduceOutputMessages :: OutputMessageQueue -> Double -> IO ()
reduceOutputMessages OutputMessageQueue
q Double
t = IO ()
loop
where
loop :: IO ()
loop =
do Bool
f <- DoubleLinkedList Message -> IO Bool
forall a. DoubleLinkedList a -> IO Bool
DLL.listNull (OutputMessageQueue -> DoubleLinkedList Message
outputMessages OutputMessageQueue
q)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
f (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
do Message
m <- DoubleLinkedList Message -> IO Message
forall a. DoubleLinkedList a -> IO a
DLL.listFirst (OutputMessageQueue -> DoubleLinkedList Message
outputMessages OutputMessageQueue
q)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Message -> Double
messageSendTime Message
m Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Double
t) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
do DoubleLinkedList Message -> IO ()
forall a. DoubleLinkedList a -> IO ()
DLL.listRemoveFirst (OutputMessageQueue -> DoubleLinkedList Message
outputMessages OutputMessageQueue
q)
IO ()
loop
generateMessageSequenceNo :: OutputMessageQueue -> IO Word64
generateMessageSequenceNo :: OutputMessageQueue -> IO Word64
generateMessageSequenceNo OutputMessageQueue
q =
IORef Word64 -> (Word64 -> (Word64, Word64)) -> IO Word64
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef (OutputMessageQueue -> IORef Word64
outputMessageSequenceNo OutputMessageQueue
q) ((Word64 -> (Word64, Word64)) -> IO Word64)
-> (Word64 -> (Word64, Word64)) -> IO Word64
forall a b. (a -> b) -> a -> b
$ \Word64
n ->
let n' :: Word64
n' = Word64
n Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ Word64
1 in Word64
n' Word64 -> (Word64, Word64) -> (Word64, Word64)
forall a b. a -> b -> b
`seq` (Word64
n', Word64
n)
deliverMessage :: Message -> DIO ()
deliverMessage :: Message -> DIO ()
deliverMessage Message
x =
ProcessId -> Message -> DIO ()
sendMessageDIO (Message -> ProcessId
messageReceiverId Message
x) Message
x
deliverAntiMessage :: Message -> DIO ()
deliverAntiMessage :: Message -> DIO ()
deliverAntiMessage Message
x =
ProcessId -> Message -> DIO ()
sendMessageDIO (Message -> ProcessId
messageReceiverId Message
x) Message
x
deliverAntiMessages :: [Message] -> DIO ()
deliverAntiMessages :: [Message] -> DIO ()
deliverAntiMessages [Message]
xs =
let ys :: [[Message]]
ys = (Message -> Message -> Bool) -> [Message] -> [[Message]]
forall a. (a -> a -> Bool) -> [a] -> [[a]]
groupBy (\Message
a Message
b -> Message -> ProcessId
messageReceiverId Message
a ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== Message -> ProcessId
messageReceiverId Message
b) [Message]
xs
dlv :: [Message] -> DIO ()
dlv [] = () -> DIO ()
forall a. a -> DIO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
dlv zs :: [Message]
zs@(Message
z : [Message]
_) =
ProcessId -> [Message] -> DIO ()
sendMessagesDIO (Message -> ProcessId
messageReceiverId Message
z) [Message]
zs
in [[Message]] -> ([Message] -> DIO ()) -> DIO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [[Message]]
ys [Message] -> DIO ()
dlv