module Simulation.Aivika.Distributed.Optimistic.Internal.TransientMessageQueue
(TransientMessageQueue,
newTransientMessageQueue,
transientMessageQueueSize,
transientMessageQueueTime,
transientMessages,
enqueueTransientMessage,
processAcknowledgementMessage,
acknowledgementMessageTime,
resetAcknowledgementMessageTime,
deliverAcknowledgementMessage,
deliverAcknowledgementMessages,
dequeueTransientMessages) where
import qualified Data.Map as M
import Data.List
import Data.IORef
import Data.Word
import Control.Monad
import Control.Monad.Trans
import qualified Control.Distributed.Process as DP
import Simulation.Aivika.Distributed.Optimistic.Internal.Message
import Simulation.Aivika.Distributed.Optimistic.Internal.DIO
import Simulation.Aivika.Distributed.Optimistic.Internal.IO
data TransientMessageQueue =
TransientMessageQueue { TransientMessageQueue
-> IORef (Map TransientMessageQueueItem Message)
queueTransientMessages :: IORef (M.Map TransientMessageQueueItem Message),
TransientMessageQueue -> IORef Double
queueMarkedMessageTime :: IORef Double
}
data TransientMessageQueueItem =
TransientMessageQueueItem { TransientMessageQueueItem -> Word64
itemSequenceNo :: Word64,
TransientMessageQueueItem -> Double
itemSendTime :: Double,
TransientMessageQueueItem -> Double
itemReceiveTime :: Double,
TransientMessageQueueItem -> ProcessId
itemSenderId :: DP.ProcessId,
TransientMessageQueueItem -> ProcessId
itemReceiverId :: DP.ProcessId,
TransientMessageQueueItem -> Bool
itemAntiToggle :: Bool
} deriving (TransientMessageQueueItem -> TransientMessageQueueItem -> Bool
(TransientMessageQueueItem -> TransientMessageQueueItem -> Bool)
-> (TransientMessageQueueItem -> TransientMessageQueueItem -> Bool)
-> Eq TransientMessageQueueItem
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: TransientMessageQueueItem -> TransientMessageQueueItem -> Bool
== :: TransientMessageQueueItem -> TransientMessageQueueItem -> Bool
$c/= :: TransientMessageQueueItem -> TransientMessageQueueItem -> Bool
/= :: TransientMessageQueueItem -> TransientMessageQueueItem -> Bool
Eq, Int -> TransientMessageQueueItem -> ShowS
[TransientMessageQueueItem] -> ShowS
TransientMessageQueueItem -> String
(Int -> TransientMessageQueueItem -> ShowS)
-> (TransientMessageQueueItem -> String)
-> ([TransientMessageQueueItem] -> ShowS)
-> Show TransientMessageQueueItem
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> TransientMessageQueueItem -> ShowS
showsPrec :: Int -> TransientMessageQueueItem -> ShowS
$cshow :: TransientMessageQueueItem -> String
show :: TransientMessageQueueItem -> String
$cshowList :: [TransientMessageQueueItem] -> ShowS
showList :: [TransientMessageQueueItem] -> ShowS
Show)
instance Ord TransientMessageQueueItem where
TransientMessageQueueItem
x <= :: TransientMessageQueueItem -> TransientMessageQueueItem -> Bool
<= TransientMessageQueueItem
y
| (TransientMessageQueueItem -> Double
itemReceiveTime TransientMessageQueueItem
x Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< TransientMessageQueueItem -> Double
itemReceiveTime TransientMessageQueueItem
y) = Bool
True
| (TransientMessageQueueItem -> Double
itemReceiveTime TransientMessageQueueItem
x Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> TransientMessageQueueItem -> Double
itemReceiveTime TransientMessageQueueItem
y) = Bool
False
| (TransientMessageQueueItem -> Double
itemSendTime TransientMessageQueueItem
x Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< TransientMessageQueueItem -> Double
itemSendTime TransientMessageQueueItem
y) = Bool
True
| (TransientMessageQueueItem -> Double
itemSendTime TransientMessageQueueItem
x Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> TransientMessageQueueItem -> Double
itemSendTime TransientMessageQueueItem
y) = Bool
False
| (TransientMessageQueueItem -> Word64
itemSequenceNo TransientMessageQueueItem
x Word64 -> Word64 -> Bool
forall a. Ord a => a -> a -> Bool
< TransientMessageQueueItem -> Word64
itemSequenceNo TransientMessageQueueItem
y) = Bool
True
| (TransientMessageQueueItem -> Word64
itemSequenceNo TransientMessageQueueItem
x Word64 -> Word64 -> Bool
forall a. Ord a => a -> a -> Bool
> TransientMessageQueueItem -> Word64
itemSequenceNo TransientMessageQueueItem
y) = Bool
False
| (TransientMessageQueueItem -> ProcessId
itemReceiverId TransientMessageQueueItem
x ProcessId -> ProcessId -> Bool
forall a. Ord a => a -> a -> Bool
< TransientMessageQueueItem -> ProcessId
itemReceiverId TransientMessageQueueItem
y) = Bool
True
| (TransientMessageQueueItem -> ProcessId
itemReceiverId TransientMessageQueueItem
x ProcessId -> ProcessId -> Bool
forall a. Ord a => a -> a -> Bool
> TransientMessageQueueItem -> ProcessId
itemReceiverId TransientMessageQueueItem
y) = Bool
False
| (TransientMessageQueueItem -> ProcessId
itemSenderId TransientMessageQueueItem
x ProcessId -> ProcessId -> Bool
forall a. Ord a => a -> a -> Bool
< TransientMessageQueueItem -> ProcessId
itemSenderId TransientMessageQueueItem
y) = Bool
True
| (TransientMessageQueueItem -> ProcessId
itemSenderId TransientMessageQueueItem
x ProcessId -> ProcessId -> Bool
forall a. Ord a => a -> a -> Bool
> TransientMessageQueueItem -> ProcessId
itemSenderId TransientMessageQueueItem
y) = Bool
False
| (TransientMessageQueueItem -> Bool
itemAntiToggle TransientMessageQueueItem
x Bool -> Bool -> Bool
forall a. Ord a => a -> a -> Bool
< TransientMessageQueueItem -> Bool
itemAntiToggle TransientMessageQueueItem
y) = Bool
True
| (TransientMessageQueueItem -> Bool
itemAntiToggle TransientMessageQueueItem
x Bool -> Bool -> Bool
forall a. Ord a => a -> a -> Bool
> TransientMessageQueueItem -> Bool
itemAntiToggle TransientMessageQueueItem
y) = Bool
False
| Bool
otherwise = Bool
True
transientMessageQueueItem :: Message -> TransientMessageQueueItem
transientMessageQueueItem :: Message -> TransientMessageQueueItem
transientMessageQueueItem Message
m =
TransientMessageQueueItem { itemSequenceNo :: Word64
itemSequenceNo = Message -> Word64
messageSequenceNo Message
m,
itemSendTime :: Double
itemSendTime = Message -> Double
messageSendTime Message
m,
itemReceiveTime :: Double
itemReceiveTime = Message -> Double
messageReceiveTime Message
m,
itemSenderId :: ProcessId
itemSenderId = Message -> ProcessId
messageSenderId Message
m,
itemReceiverId :: ProcessId
itemReceiverId = Message -> ProcessId
messageReceiverId Message
m,
itemAntiToggle :: Bool
itemAntiToggle = Message -> Bool
messageAntiToggle Message
m }
acknowledgementMessageQueueItem :: AcknowledgementMessage -> TransientMessageQueueItem
acknowledgementMessageQueueItem :: AcknowledgementMessage -> TransientMessageQueueItem
acknowledgementMessageQueueItem AcknowledgementMessage
m =
TransientMessageQueueItem { itemSequenceNo :: Word64
itemSequenceNo = AcknowledgementMessage -> Word64
acknowledgementSequenceNo AcknowledgementMessage
m,
itemSendTime :: Double
itemSendTime = AcknowledgementMessage -> Double
acknowledgementSendTime AcknowledgementMessage
m,
itemReceiveTime :: Double
itemReceiveTime = AcknowledgementMessage -> Double
acknowledgementReceiveTime AcknowledgementMessage
m,
itemSenderId :: ProcessId
itemSenderId = AcknowledgementMessage -> ProcessId
acknowledgementSenderId AcknowledgementMessage
m,
itemReceiverId :: ProcessId
itemReceiverId = AcknowledgementMessage -> ProcessId
acknowledgementReceiverId AcknowledgementMessage
m,
itemAntiToggle :: Bool
itemAntiToggle = AcknowledgementMessage -> Bool
acknowledgementAntiToggle AcknowledgementMessage
m }
newTransientMessageQueue :: DIO TransientMessageQueue
newTransientMessageQueue :: DIO TransientMessageQueue
newTransientMessageQueue =
do IORef (Map TransientMessageQueueItem Message)
ms <- IO (IORef (Map TransientMessageQueueItem Message))
-> DIO (IORef (Map TransientMessageQueueItem Message))
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO (IORef (Map TransientMessageQueueItem Message))
-> DIO (IORef (Map TransientMessageQueueItem Message)))
-> IO (IORef (Map TransientMessageQueueItem Message))
-> DIO (IORef (Map TransientMessageQueueItem Message))
forall a b. (a -> b) -> a -> b
$ Map TransientMessageQueueItem Message
-> IO (IORef (Map TransientMessageQueueItem Message))
forall a. a -> IO (IORef a)
newIORef Map TransientMessageQueueItem Message
forall k a. Map k a
M.empty
IORef Double
r <- IO (IORef Double) -> DIO (IORef Double)
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO (IORef Double) -> DIO (IORef Double))
-> IO (IORef Double) -> DIO (IORef Double)
forall a b. (a -> b) -> a -> b
$ Double -> IO (IORef Double)
forall a. a -> IO (IORef a)
newIORef (Double
1 Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
0)
TransientMessageQueue -> DIO TransientMessageQueue
forall a. a -> DIO a
forall (m :: * -> *) a. Monad m => a -> m a
return TransientMessageQueue { queueTransientMessages :: IORef (Map TransientMessageQueueItem Message)
queueTransientMessages = IORef (Map TransientMessageQueueItem Message)
ms,
queueMarkedMessageTime :: IORef Double
queueMarkedMessageTime = IORef Double
r }
transientMessageQueueSize :: TransientMessageQueue -> IO Int
{-# INLINE transientMessageQueueSize #-}
transientMessageQueueSize :: TransientMessageQueue -> IO Int
transientMessageQueueSize TransientMessageQueue
q =
(Map TransientMessageQueueItem Message -> Int)
-> IO (Map TransientMessageQueueItem Message) -> IO Int
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Map TransientMessageQueueItem Message -> Int
forall k a. Map k a -> Int
M.size (IO (Map TransientMessageQueueItem Message) -> IO Int)
-> IO (Map TransientMessageQueueItem Message) -> IO Int
forall a b. (a -> b) -> a -> b
$ IORef (Map TransientMessageQueueItem Message)
-> IO (Map TransientMessageQueueItem Message)
forall a. IORef a -> IO a
readIORef (TransientMessageQueue
-> IORef (Map TransientMessageQueueItem Message)
queueTransientMessages TransientMessageQueue
q)
transientMessageQueueTime :: TransientMessageQueue -> IO Double
transientMessageQueueTime :: TransientMessageQueue -> IO Double
transientMessageQueueTime TransientMessageQueue
q =
do Map TransientMessageQueueItem Message
ms <- IORef (Map TransientMessageQueueItem Message)
-> IO (Map TransientMessageQueueItem Message)
forall a. IORef a -> IO a
readIORef (TransientMessageQueue
-> IORef (Map TransientMessageQueueItem Message)
queueTransientMessages TransientMessageQueue
q)
if Map TransientMessageQueueItem Message -> Bool
forall k a. Map k a -> Bool
M.null Map TransientMessageQueueItem Message
ms
then Double -> IO Double
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Double
1 Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
0)
else let (TransientMessageQueueItem
m, Message
_) = Map TransientMessageQueueItem Message
-> (TransientMessageQueueItem, Message)
forall k a. Map k a -> (k, a)
M.findMin Map TransientMessageQueueItem Message
ms
in Double -> IO Double
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (TransientMessageQueueItem -> Double
itemReceiveTime TransientMessageQueueItem
m)
transientMessages :: TransientMessageQueue -> IO [Message]
transientMessages :: TransientMessageQueue -> IO [Message]
transientMessages TransientMessageQueue
q =
do Map TransientMessageQueueItem Message
ms <- IORef (Map TransientMessageQueueItem Message)
-> IO (Map TransientMessageQueueItem Message)
forall a. IORef a -> IO a
readIORef (TransientMessageQueue
-> IORef (Map TransientMessageQueueItem Message)
queueTransientMessages TransientMessageQueue
q)
[Message] -> IO [Message]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Map TransientMessageQueueItem Message -> [Message]
forall k a. Map k a -> [a]
M.elems Map TransientMessageQueueItem Message
ms)
enqueueTransientMessage :: TransientMessageQueue -> Message -> IO ()
enqueueTransientMessage :: TransientMessageQueue -> Message -> IO ()
enqueueTransientMessage TransientMessageQueue
q Message
m =
IORef (Map TransientMessageQueueItem Message)
-> (Map TransientMessageQueueItem Message
-> Map TransientMessageQueueItem Message)
-> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (TransientMessageQueue
-> IORef (Map TransientMessageQueueItem Message)
queueTransientMessages TransientMessageQueue
q) ((Map TransientMessageQueueItem Message
-> Map TransientMessageQueueItem Message)
-> IO ())
-> (Map TransientMessageQueueItem Message
-> Map TransientMessageQueueItem Message)
-> IO ()
forall a b. (a -> b) -> a -> b
$
TransientMessageQueueItem
-> Message
-> Map TransientMessageQueueItem Message
-> Map TransientMessageQueueItem Message
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert (Message -> TransientMessageQueueItem
transientMessageQueueItem Message
m) Message
m
enqueueAcknowledgementMessage :: TransientMessageQueue -> AcknowledgementMessage -> IO ()
enqueueAcknowledgementMessage :: TransientMessageQueue -> AcknowledgementMessage -> IO ()
enqueueAcknowledgementMessage TransientMessageQueue
q AcknowledgementMessage
m =
IORef Double -> (Double -> Double) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' (TransientMessageQueue -> IORef Double
queueMarkedMessageTime TransientMessageQueue
q) ((Double -> Double) -> IO ()) -> (Double -> Double) -> IO ()
forall a b. (a -> b) -> a -> b
$
Double -> Double -> Double
forall a. Ord a => a -> a -> a
min (AcknowledgementMessage -> Double
acknowledgementReceiveTime AcknowledgementMessage
m)
processAcknowledgementMessage :: TransientMessageQueue -> AcknowledgementMessage -> IO ()
processAcknowledgementMessage :: TransientMessageQueue -> AcknowledgementMessage -> IO ()
processAcknowledgementMessage TransientMessageQueue
q AcknowledgementMessage
m =
do Map TransientMessageQueueItem Message
ms <- IORef (Map TransientMessageQueueItem Message)
-> IO (Map TransientMessageQueueItem Message)
forall a. IORef a -> IO a
readIORef (TransientMessageQueue
-> IORef (Map TransientMessageQueueItem Message)
queueTransientMessages TransientMessageQueue
q)
let k :: TransientMessageQueueItem
k = AcknowledgementMessage -> TransientMessageQueueItem
acknowledgementMessageQueueItem AcknowledgementMessage
m
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (TransientMessageQueueItem
-> Map TransientMessageQueueItem Message -> Bool
forall k a. Ord k => k -> Map k a -> Bool
M.member TransientMessageQueueItem
k Map TransientMessageQueueItem Message
ms) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
do IORef (Map TransientMessageQueueItem Message)
-> (Map TransientMessageQueueItem Message
-> Map TransientMessageQueueItem Message)
-> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (TransientMessageQueue
-> IORef (Map TransientMessageQueueItem Message)
queueTransientMessages TransientMessageQueue
q) ((Map TransientMessageQueueItem Message
-> Map TransientMessageQueueItem Message)
-> IO ())
-> (Map TransientMessageQueueItem Message
-> Map TransientMessageQueueItem Message)
-> IO ()
forall a b. (a -> b) -> a -> b
$
TransientMessageQueueItem
-> Map TransientMessageQueueItem Message
-> Map TransientMessageQueueItem Message
forall k a. Ord k => k -> Map k a -> Map k a
M.delete TransientMessageQueueItem
k
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (AcknowledgementMessage -> Bool
acknowledgementMarked AcknowledgementMessage
m) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
TransientMessageQueue -> AcknowledgementMessage -> IO ()
enqueueAcknowledgementMessage TransientMessageQueue
q AcknowledgementMessage
m
acknowledgementMessageTime :: TransientMessageQueue -> IO Double
acknowledgementMessageTime :: TransientMessageQueue -> IO Double
acknowledgementMessageTime TransientMessageQueue
q =
IORef Double -> IO Double
forall a. IORef a -> IO a
readIORef (TransientMessageQueue -> IORef Double
queueMarkedMessageTime TransientMessageQueue
q)
resetAcknowledgementMessageTime :: TransientMessageQueue -> IO ()
resetAcknowledgementMessageTime :: TransientMessageQueue -> IO ()
resetAcknowledgementMessageTime TransientMessageQueue
q =
IORef Double -> Double -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (TransientMessageQueue -> IORef Double
queueMarkedMessageTime TransientMessageQueue
q) (Double
1 Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
0)
deliverAcknowledgementMessage :: AcknowledgementMessage -> DIO ()
deliverAcknowledgementMessage :: AcknowledgementMessage -> DIO ()
deliverAcknowledgementMessage AcknowledgementMessage
x =
ProcessId -> AcknowledgementMessage -> DIO ()
sendAcknowledgementMessageDIO (AcknowledgementMessage -> ProcessId
acknowledgementSenderId AcknowledgementMessage
x) AcknowledgementMessage
x
deliverAcknowledgementMessages :: [AcknowledgementMessage] -> DIO ()
deliverAcknowledgementMessages :: [AcknowledgementMessage] -> DIO ()
deliverAcknowledgementMessages [AcknowledgementMessage]
xs =
let ys :: [[AcknowledgementMessage]]
ys = (AcknowledgementMessage -> AcknowledgementMessage -> Bool)
-> [AcknowledgementMessage] -> [[AcknowledgementMessage]]
forall a. (a -> a -> Bool) -> [a] -> [[a]]
groupBy (\AcknowledgementMessage
a AcknowledgementMessage
b -> AcknowledgementMessage -> ProcessId
acknowledgementSenderId AcknowledgementMessage
a ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== AcknowledgementMessage -> ProcessId
acknowledgementSenderId AcknowledgementMessage
b) [AcknowledgementMessage]
xs
dlv :: [AcknowledgementMessage] -> DIO ()
dlv [] = () -> DIO ()
forall a. a -> DIO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
dlv zs :: [AcknowledgementMessage]
zs@(AcknowledgementMessage
z : [AcknowledgementMessage]
_) =
ProcessId -> [AcknowledgementMessage] -> DIO ()
sendAcknowledgementMessagesDIO (AcknowledgementMessage -> ProcessId
acknowledgementSenderId AcknowledgementMessage
z) [AcknowledgementMessage]
zs
in [[AcknowledgementMessage]]
-> ([AcknowledgementMessage] -> DIO ()) -> DIO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [[AcknowledgementMessage]]
ys [AcknowledgementMessage] -> DIO ()
dlv
dequeueTransientMessages :: TransientMessageQueue -> DP.ProcessId -> IO ()
dequeueTransientMessages :: TransientMessageQueue -> ProcessId -> IO ()
dequeueTransientMessages TransientMessageQueue
q ProcessId
pid =
IORef (Map TransientMessageQueueItem Message)
-> (Map TransientMessageQueueItem Message
-> Map TransientMessageQueueItem Message)
-> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (TransientMessageQueue
-> IORef (Map TransientMessageQueueItem Message)
queueTransientMessages TransientMessageQueue
q) ((Map TransientMessageQueueItem Message
-> Map TransientMessageQueueItem Message)
-> IO ())
-> (Map TransientMessageQueueItem Message
-> Map TransientMessageQueueItem Message)
-> IO ()
forall a b. (a -> b) -> a -> b
$
(Message -> Bool)
-> Map TransientMessageQueueItem Message
-> Map TransientMessageQueueItem Message
forall a k. (a -> Bool) -> Map k a -> Map k a
M.filter (\Message
m -> Message -> ProcessId
messageReceiverId Message
m ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
/= ProcessId
pid)