{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE MonoLocalBinds #-}

-- |
-- Module     : Simulation.Aivika.Distributed.Optimistic.Message
-- Copyright  : Copyright (c) 2015-2017, David Sorokin <david.sorokin@gmail.com>
-- License    : BSD3
-- Maintainer : David Sorokin <david.sorokin@gmail.com>
-- Stability  : experimental
-- Tested with: GHC 7.10.3
--
-- This module defines functions for working with messages.
--
module Simulation.Aivika.Distributed.Optimistic.Message
       (sendMessage,
        enqueueMessage,
        messageReceived) where

import Data.Time
import Data.Monoid

import Control.Monad
import Control.Distributed.Process (ProcessId, getSelfPid, wrapMessage, unwrapMessage)
import Control.Distributed.Process.Serializable

import Simulation.Aivika.Trans hiding (ProcessId)
import Simulation.Aivika.Trans.Internal.Types

import Simulation.Aivika.Distributed.Optimistic.Internal.Message
import Simulation.Aivika.Distributed.Optimistic.Internal.IO
import Simulation.Aivika.Distributed.Optimistic.Internal.DIO
import Simulation.Aivika.Distributed.Optimistic.Internal.Event
import qualified Simulation.Aivika.Distributed.Optimistic.Internal.InputMessageQueue as IMQ
import qualified Simulation.Aivika.Distributed.Optimistic.Internal.OutputMessageQueue as OMQ
import Simulation.Aivika.Distributed.Optimistic.DIO
import Simulation.Aivika.Distributed.Optimistic.Ref.Base

-- | Send a message to the specified remote process with the current receive time.
sendMessage :: forall a. Serializable a => ProcessId -> a -> Event DIO ()
{-# INLINABLE sendMessage #-}
sendMessage :: forall a. Serializable a => ProcessId -> a -> Event DIO ()
sendMessage ProcessId
pid a
a =
  do Double
t <- Dynamics DIO Double -> Event DIO Double
forall a. Dynamics DIO a -> Event DIO a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
DynamicsLift t m =>
Dynamics m a -> t m a
liftDynamics Dynamics DIO Double
forall (m :: * -> *). Monad m => Dynamics m Double
time
     ProcessId -> Double -> a -> Event DIO ()
forall a.
Serializable a =>
ProcessId -> Double -> a -> Event DIO ()
enqueueMessage ProcessId
pid Double
t a
a 

-- | Send a message to the specified remote process with the given receive time.
enqueueMessage :: forall a. Serializable a => ProcessId -> Double -> a -> Event DIO ()
{-# INLINABLE enqueueMessage #-}
enqueueMessage :: forall a.
Serializable a =>
ProcessId -> Double -> a -> Event DIO ()
enqueueMessage ProcessId
pid Double
t a
a =
  (Point DIO -> DIO ()) -> Event DIO ()
forall (m :: * -> *) a. (Point m -> m a) -> Event m a
Event ((Point DIO -> DIO ()) -> Event DIO ())
-> (Point DIO -> DIO ()) -> Event DIO ()
forall a b. (a -> b) -> a -> b
$ \Point DIO
p ->
  do let queue :: OutputMessageQueue
queue       = EventQueue DIO -> OutputMessageQueue
queueOutputMessages (EventQueue DIO -> OutputMessageQueue)
-> EventQueue DIO -> OutputMessageQueue
forall a b. (a -> b) -> a -> b
$
                       Run DIO -> EventQueue DIO
forall (m :: * -> *). Run m -> EventQueue m
runEventQueue (Point DIO -> Run DIO
forall (m :: * -> *). Point m -> Run m
pointRun Point DIO
p)
         sendTime :: Double
sendTime    = Point DIO -> Double
forall (m :: * -> *). Point m -> Double
pointTime Point DIO
p
         receiveTime :: Double
receiveTime = Double
t
     Word64
sequenceNo <- IO Word64 -> DIO Word64
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe (IO Word64 -> DIO Word64) -> IO Word64 -> DIO Word64
forall a b. (a -> b) -> a -> b
$ OutputMessageQueue -> IO Word64
OMQ.generateMessageSequenceNo OutputMessageQueue
queue
     ProcessId
sender <- DIO ProcessId
messageInboxId
     let receiver :: ProcessId
receiver = ProcessId
pid
         antiToggle :: Bool
antiToggle = Bool
False
         binaryData :: Message
binaryData = a -> Message
forall a. Serializable a => a -> Message
wrapMessage a
a
         message :: Message
message = Message { messageSequenceNo :: Word64
messageSequenceNo = Word64
sequenceNo,
                             messageSendTime :: Double
messageSendTime = Double
sendTime,
                             messageReceiveTime :: Double
messageReceiveTime = Double
receiveTime,
                             messageSenderId :: ProcessId
messageSenderId = ProcessId
sender,
                             messageReceiverId :: ProcessId
messageReceiverId = ProcessId
receiver,
                             messageAntiToggle :: Bool
messageAntiToggle = Bool
antiToggle,
                             messageData :: Message
messageData = Message
binaryData
                           }
     OutputMessageQueue -> Message -> DIO ()
OMQ.sendMessage OutputMessageQueue
queue Message
message
          
-- | The signal triggered when the remote message of the specified type has come.
messageReceived :: forall a. Serializable a => Signal DIO a
{-# INLINABLE messageReceived #-}
messageReceived :: forall a. Serializable a => Signal DIO a
messageReceived =
  Signal { handleSignal :: (a -> Event DIO ()) -> Event DIO (DisposableEvent DIO)
handleSignal = \a -> Event DIO ()
h ->
            (Point DIO -> DIO (DisposableEvent DIO))
-> Event DIO (DisposableEvent DIO)
forall (m :: * -> *) a. (Point m -> m a) -> Event m a
Event ((Point DIO -> DIO (DisposableEvent DIO))
 -> Event DIO (DisposableEvent DIO))
-> (Point DIO -> DIO (DisposableEvent DIO))
-> Event DIO (DisposableEvent DIO)
forall a b. (a -> b) -> a -> b
$ \Point DIO
p ->
            let queue :: InputMessageQueue
queue = EventQueue DIO -> InputMessageQueue
queueInputMessages (EventQueue DIO -> InputMessageQueue)
-> EventQueue DIO -> InputMessageQueue
forall a b. (a -> b) -> a -> b
$
                        Run DIO -> EventQueue DIO
forall (m :: * -> *). Run m -> EventQueue m
runEventQueue (Point DIO -> Run DIO
forall (m :: * -> *). Point m -> Run m
pointRun Point DIO
p)
                signal :: Signal DIO Message
signal = InputMessageQueue -> Signal DIO Message
IMQ.messageEnqueued InputMessageQueue
queue
            in Point DIO
-> Event DIO (DisposableEvent DIO) -> DIO (DisposableEvent DIO)
forall (m :: * -> *) a. Point m -> Event m a -> m a
invokeEvent Point DIO
p (Event DIO (DisposableEvent DIO) -> DIO (DisposableEvent DIO))
-> Event DIO (DisposableEvent DIO) -> DIO (DisposableEvent DIO)
forall a b. (a -> b) -> a -> b
$
               Signal DIO Message
-> (Message -> Event DIO ()) -> Event DIO (DisposableEvent DIO)
forall (m :: * -> *) a.
Signal m a -> (a -> Event m ()) -> Event m (DisposableEvent m)
handleSignal Signal DIO Message
signal ((Message -> Event DIO ()) -> Event DIO (DisposableEvent DIO))
-> (Message -> Event DIO ()) -> Event DIO (DisposableEvent DIO)
forall a b. (a -> b) -> a -> b
$ \Message
x ->
               do Maybe a
y <- Message -> Event DIO (Maybe a)
forall (m :: * -> *) a.
(Monad m, Serializable a) =>
Message -> m (Maybe a)
unwrapMessage (Message -> Message
messageData Message
x)
                  case Maybe a
y of
                    Maybe a
Nothing -> () -> Event DIO ()
forall a. a -> Event DIO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                    Just a
a  -> a -> Event DIO ()
h a
a
         }