{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE MonoLocalBinds #-}
module Simulation.Aivika.Distributed.Optimistic.Message
(sendMessage,
enqueueMessage,
messageReceived) where
import Data.Time
import Data.Monoid
import Control.Monad
import Control.Distributed.Process (ProcessId, getSelfPid, wrapMessage, unwrapMessage)
import Control.Distributed.Process.Serializable
import Simulation.Aivika.Trans hiding (ProcessId)
import Simulation.Aivika.Trans.Internal.Types
import Simulation.Aivika.Distributed.Optimistic.Internal.Message
import Simulation.Aivika.Distributed.Optimistic.Internal.IO
import Simulation.Aivika.Distributed.Optimistic.Internal.DIO
import Simulation.Aivika.Distributed.Optimistic.Internal.Event
import qualified Simulation.Aivika.Distributed.Optimistic.Internal.InputMessageQueue as IMQ
import qualified Simulation.Aivika.Distributed.Optimistic.Internal.OutputMessageQueue as OMQ
import Simulation.Aivika.Distributed.Optimistic.DIO
import Simulation.Aivika.Distributed.Optimistic.Ref.Base
sendMessage :: forall a. Serializable a => ProcessId -> a -> Event DIO ()
sendMessage pid a =
do t <- liftDynamics time
enqueueMessage pid t a
enqueueMessage :: forall a. Serializable a => ProcessId -> Double -> a -> Event DIO ()
enqueueMessage pid t a =
Event $ \p ->
do let queue = queueOutputMessages $
runEventQueue (pointRun p)
sendTime = pointTime p
receiveTime = t
sequenceNo <- liftIOUnsafe $ OMQ.generateMessageSequenceNo queue
sender <- messageInboxId
let receiver = pid
antiToggle = False
binaryData = wrapMessage a
message = Message { messageSequenceNo = sequenceNo,
messageSendTime = sendTime,
messageReceiveTime = receiveTime,
messageSenderId = sender,
messageReceiverId = receiver,
messageAntiToggle = antiToggle,
messageData = binaryData
}
OMQ.sendMessage queue message
messageReceived :: forall a. Serializable a => Signal DIO a
messageReceived =
Signal { handleSignal = \h ->
Event $ \p ->
let queue = queueInputMessages $
runEventQueue (pointRun p)
signal = IMQ.messageEnqueued queue
in invokeEvent p $
handleSignal signal $ \x ->
do y <- unwrapMessage (messageData x)
case y of
Nothing -> return ()
Just a -> h a
}