{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE MonoLocalBinds #-}
module Simulation.Aivika.Distributed.Optimistic.Message
(sendMessage,
enqueueMessage,
messageReceived) where
import Data.Time
import Data.Monoid
import Control.Monad
import Control.Distributed.Process (ProcessId, getSelfPid, wrapMessage, unwrapMessage)
import Control.Distributed.Process.Serializable
import Simulation.Aivika.Trans hiding (ProcessId)
import Simulation.Aivika.Trans.Internal.Types
import Simulation.Aivika.Distributed.Optimistic.Internal.Message
import Simulation.Aivika.Distributed.Optimistic.Internal.IO
import Simulation.Aivika.Distributed.Optimistic.Internal.DIO
import Simulation.Aivika.Distributed.Optimistic.Internal.Event
import qualified Simulation.Aivika.Distributed.Optimistic.Internal.InputMessageQueue as IMQ
import qualified Simulation.Aivika.Distributed.Optimistic.Internal.OutputMessageQueue as OMQ
import Simulation.Aivika.Distributed.Optimistic.DIO
import Simulation.Aivika.Distributed.Optimistic.Ref.Base
sendMessage :: forall a. Serializable a => ProcessId -> a -> Event DIO ()
{-# INLINABLE sendMessage #-}
sendMessage :: forall a. Serializable a => ProcessId -> a -> Event DIO ()
sendMessage ProcessId
pid a
a =
do Double
t <- Dynamics DIO Double -> Event DIO Double
forall a. Dynamics DIO a -> Event DIO a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
DynamicsLift t m =>
Dynamics m a -> t m a
liftDynamics Dynamics DIO Double
forall (m :: * -> *). Monad m => Dynamics m Double
time
ProcessId -> Double -> a -> Event DIO ()
forall a.
Serializable a =>
ProcessId -> Double -> a -> Event DIO ()
enqueueMessage ProcessId
pid Double
t a
a
enqueueMessage :: forall a. Serializable a => ProcessId -> Double -> a -> Event DIO ()
{-# INLINABLE enqueueMessage #-}
enqueueMessage :: forall a.
Serializable a =>
ProcessId -> Double -> a -> Event DIO ()
enqueueMessage ProcessId
pid Double
t a
a =
(Point DIO -> DIO ()) -> Event DIO ()
forall (m :: * -> *) a. (Point m -> m a) -> Event m a
Event ((Point DIO -> DIO ()) -> Event DIO ())
-> (Point DIO -> DIO ()) -> Event DIO ()
forall a b. (a -> b) -> a -> b
$ \Point DIO
p ->
do let queue :: OutputMessageQueue
queue = EventQueue DIO -> OutputMessageQueue
queueOutputMessages (EventQueue DIO -> OutputMessageQueue)
-> EventQueue DIO -> OutputMessageQueue
forall a b. (a -> b) -> a -> b
$
Run DIO -> EventQueue DIO
forall (m :: * -> *). Run m -> EventQueue m
runEventQueue (Point DIO -> Run DIO
forall (m :: * -> *). Point m -> Run m
pointRun Point DIO
p)
sendTime :: Double
sendTime = Point DIO -> Double
forall (m :: * -> *). Point m -> Double
pointTime Point DIO
p
receiveTime :: Double
receiveTime = Double
t
Word64
sequenceNo <- IO Word64 -> DIO Word64
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO Word64 -> DIO Word64) -> IO Word64 -> DIO Word64
forall a b. (a -> b) -> a -> b
$ OutputMessageQueue -> IO Word64
OMQ.generateMessageSequenceNo OutputMessageQueue
queue
ProcessId
sender <- DIO ProcessId
messageInboxId
let receiver :: ProcessId
receiver = ProcessId
pid
antiToggle :: Bool
antiToggle = Bool
False
binaryData :: Message
binaryData = a -> Message
forall a. Serializable a => a -> Message
wrapMessage a
a
message :: Message
message = Message { messageSequenceNo :: Word64
messageSequenceNo = Word64
sequenceNo,
messageSendTime :: Double
messageSendTime = Double
sendTime,
messageReceiveTime :: Double
messageReceiveTime = Double
receiveTime,
messageSenderId :: ProcessId
messageSenderId = ProcessId
sender,
messageReceiverId :: ProcessId
messageReceiverId = ProcessId
receiver,
messageAntiToggle :: Bool
messageAntiToggle = Bool
antiToggle,
messageData :: Message
messageData = Message
binaryData
}
OutputMessageQueue -> Message -> DIO ()
OMQ.sendMessage OutputMessageQueue
queue Message
message
messageReceived :: forall a. Serializable a => Signal DIO a
{-# INLINABLE messageReceived #-}
messageReceived :: forall a. Serializable a => Signal DIO a
messageReceived =
Signal { handleSignal :: (a -> Event DIO ()) -> Event DIO (DisposableEvent DIO)
handleSignal = \a -> Event DIO ()
h ->
(Point DIO -> DIO (DisposableEvent DIO))
-> Event DIO (DisposableEvent DIO)
forall (m :: * -> *) a. (Point m -> m a) -> Event m a
Event ((Point DIO -> DIO (DisposableEvent DIO))
-> Event DIO (DisposableEvent DIO))
-> (Point DIO -> DIO (DisposableEvent DIO))
-> Event DIO (DisposableEvent DIO)
forall a b. (a -> b) -> a -> b
$ \Point DIO
p ->
let queue :: InputMessageQueue
queue = EventQueue DIO -> InputMessageQueue
queueInputMessages (EventQueue DIO -> InputMessageQueue)
-> EventQueue DIO -> InputMessageQueue
forall a b. (a -> b) -> a -> b
$
Run DIO -> EventQueue DIO
forall (m :: * -> *). Run m -> EventQueue m
runEventQueue (Point DIO -> Run DIO
forall (m :: * -> *). Point m -> Run m
pointRun Point DIO
p)
signal :: Signal DIO Message
signal = InputMessageQueue -> Signal DIO Message
IMQ.messageEnqueued InputMessageQueue
queue
in Point DIO
-> Event DIO (DisposableEvent DIO) -> DIO (DisposableEvent DIO)
forall (m :: * -> *) a. Point m -> Event m a -> m a
invokeEvent Point DIO
p (Event DIO (DisposableEvent DIO) -> DIO (DisposableEvent DIO))
-> Event DIO (DisposableEvent DIO) -> DIO (DisposableEvent DIO)
forall a b. (a -> b) -> a -> b
$
Signal DIO Message
-> (Message -> Event DIO ()) -> Event DIO (DisposableEvent DIO)
forall (m :: * -> *) a.
Signal m a -> (a -> Event m ()) -> Event m (DisposableEvent m)
handleSignal Signal DIO Message
signal ((Message -> Event DIO ()) -> Event DIO (DisposableEvent DIO))
-> (Message -> Event DIO ()) -> Event DIO (DisposableEvent DIO)
forall a b. (a -> b) -> a -> b
$ \Message
x ->
do Maybe a
y <- Message -> Event DIO (Maybe a)
forall (m :: * -> *) a.
(Monad m, Serializable a) =>
Message -> m (Maybe a)
unwrapMessage (Message -> Message
messageData Message
x)
case Maybe a
y of
Maybe a
Nothing -> () -> Event DIO ()
forall a. a -> Event DIO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just a
a -> a -> Event DIO ()
h a
a
}