module Simulation.Aivika.Distributed.Optimistic.Internal.TransientMessageQueue
(TransientMessageQueue,
newTransientMessageQueue,
transientMessageQueueSize,
transientMessageQueueTime,
transientMessages,
enqueueTransientMessage,
processAcknowledgementMessage,
acknowledgementMessageTime,
resetAcknowledgementMessageTime,
deliverAcknowledgementMessage,
deliverAcknowledgementMessages) where
import qualified Data.Map as M
import Data.List
import Data.IORef
import Data.Word
import Control.Monad
import Control.Monad.Trans
import qualified Control.Distributed.Process as DP
import Simulation.Aivika.Distributed.Optimistic.Internal.Message
import Simulation.Aivika.Distributed.Optimistic.Internal.DIO
import Simulation.Aivika.Distributed.Optimistic.Internal.IO
data TransientMessageQueue =
TransientMessageQueue { queueTransientMessages :: IORef (M.Map TransientMessageQueueItem Message),
queueMarkedMessageTime :: IORef Double
}
data TransientMessageQueueItem =
TransientMessageQueueItem { itemSequenceNo :: Word64,
itemSendTime :: Double,
itemReceiveTime :: Double,
itemSenderId :: DP.ProcessId,
itemReceiverId :: DP.ProcessId,
itemAntiToggle :: Bool
} deriving (Eq, Show)
instance Ord TransientMessageQueueItem where
x <= y
| (itemReceiveTime x < itemReceiveTime y) = True
| (itemReceiveTime x > itemReceiveTime y) = False
| (itemSendTime x < itemSendTime y) = True
| (itemSendTime x > itemSendTime y) = False
| (itemSequenceNo x < itemSequenceNo y) = True
| (itemSequenceNo x > itemSequenceNo y) = False
| (itemReceiverId x < itemReceiverId y) = True
| (itemReceiverId x > itemReceiverId y) = False
| (itemSenderId x < itemSenderId y) = True
| (itemSenderId x > itemSenderId y) = False
| (itemAntiToggle x < itemAntiToggle y) = True
| (itemAntiToggle x > itemAntiToggle y) = False
| otherwise = True
transientMessageQueueItem :: Message -> TransientMessageQueueItem
transientMessageQueueItem m =
TransientMessageQueueItem { itemSequenceNo = messageSequenceNo m,
itemSendTime = messageSendTime m,
itemReceiveTime = messageReceiveTime m,
itemSenderId = messageSenderId m,
itemReceiverId = messageReceiverId m,
itemAntiToggle = messageAntiToggle m }
acknowledgementMessageQueueItem :: AcknowledgementMessage -> TransientMessageQueueItem
acknowledgementMessageQueueItem m =
TransientMessageQueueItem { itemSequenceNo = acknowledgementSequenceNo m,
itemSendTime = acknowledgementSendTime m,
itemReceiveTime = acknowledgementReceiveTime m,
itemSenderId = acknowledgementSenderId m,
itemReceiverId = acknowledgementReceiverId m,
itemAntiToggle = acknowledgementAntiToggle m }
newTransientMessageQueue :: DIO TransientMessageQueue
newTransientMessageQueue =
do ms <- liftIOUnsafe $ newIORef M.empty
r <- liftIOUnsafe $ newIORef (1 / 0)
return TransientMessageQueue { queueTransientMessages = ms,
queueMarkedMessageTime = r }
transientMessageQueueSize :: TransientMessageQueue -> IO Int
transientMessageQueueSize q =
fmap M.size $ readIORef (queueTransientMessages q)
transientMessageQueueTime :: TransientMessageQueue -> IO Double
transientMessageQueueTime q =
do ms <- readIORef (queueTransientMessages q)
if M.null ms
then return (1 / 0)
else let (m, _) = M.findMin ms
in return (itemReceiveTime m)
transientMessages :: TransientMessageQueue -> IO [Message]
transientMessages q =
do ms <- readIORef (queueTransientMessages q)
return (M.elems ms)
enqueueTransientMessage :: TransientMessageQueue -> Message -> IO ()
enqueueTransientMessage q m =
modifyIORef (queueTransientMessages q) $
M.insert (transientMessageQueueItem m) m
enqueueAcknowledgementMessage :: TransientMessageQueue -> AcknowledgementMessage -> IO ()
enqueueAcknowledgementMessage q m =
modifyIORef' (queueMarkedMessageTime q) $
min (acknowledgementReceiveTime m)
processAcknowledgementMessage :: TransientMessageQueue -> AcknowledgementMessage -> IO ()
processAcknowledgementMessage q m =
do ms <- readIORef (queueTransientMessages q)
let k = acknowledgementMessageQueueItem m
when (M.member k ms) $
do modifyIORef (queueTransientMessages q) $
M.delete k
when (acknowledgementMarked m) $
enqueueAcknowledgementMessage q m
acknowledgementMessageTime :: TransientMessageQueue -> IO Double
acknowledgementMessageTime q =
readIORef (queueMarkedMessageTime q)
resetAcknowledgementMessageTime :: TransientMessageQueue -> IO ()
resetAcknowledgementMessageTime q =
writeIORef (queueMarkedMessageTime q) (1 / 0)
deliverAcknowledgementMessage :: AcknowledgementMessage -> DIO ()
deliverAcknowledgementMessage x =
sendAcknowledgementMessageDIO (acknowledgementSenderId x) x
deliverAcknowledgementMessages :: [AcknowledgementMessage] -> DIO ()
deliverAcknowledgementMessages xs =
let ys = groupBy (\a b -> acknowledgementSenderId a == acknowledgementSenderId b) xs
dlv [] = return ()
dlv zs@(z : _) =
sendAcknowledgementMessagesDIO (acknowledgementSenderId z) zs
in forM_ ys dlv