{-# LANGUAGE DeriveGeneric, DeriveDataTypeable, MonoLocalBinds #-}
module Simulation.Aivika.Distributed.Optimistic.Guard
(
runMasterGuard,
runSlaveGuard,
runMasterGuard_,
runSlaveGuard_) where
import GHC.Generics
import Data.Typeable
import Data.Binary
import qualified Data.Map as M
import Control.Monad
import qualified Control.Distributed.Process as DP
import Control.Distributed.Process.Serializable
import Simulation.Aivika.Trans
import Simulation.Aivika.Trans.Internal.Types
import Simulation.Aivika.Distributed.Optimistic.Internal.Expect
import Simulation.Aivika.Distributed.Optimistic.DIO
import Simulation.Aivika.Distributed.Optimistic.Message
data MasterMessage a = MasterMessage DP.ProcessId (Maybe a)
deriving (Show, Typeable, Generic)
data SlaveMessage a = SlaveMessage DP.ProcessId a
deriving (Show, Typeable, Generic)
instance Binary a => Binary (MasterMessage a)
instance Binary a => Binary (SlaveMessage a)
data MasterGuard a = MasterGuard { masterGuardSlaveMessages :: Ref DIO (M.Map DP.ProcessId a)
}
data SlaveGuard a = SlaveGuard { slaveGuardAcknowledgedMessage :: Ref DIO (Maybe (Maybe a))
}
newMasterGuard :: Serializable a => Event DIO (MasterGuard a)
newMasterGuard =
do r <- liftSimulation $ newRef M.empty
handleSignal messageReceived $ \(SlaveMessage slaveId a) ->
modifyRef r $ M.insert slaveId a
return MasterGuard { masterGuardSlaveMessages = r }
newSlaveGuard :: Serializable a => Event DIO (SlaveGuard a)
newSlaveGuard =
do r <- liftSimulation $ newRef Nothing
handleSignal messageReceived $ \(MasterMessage masterId a) ->
writeRef r (Just a)
return SlaveGuard { slaveGuardAcknowledgedMessage = r }
awaitMasterGuard :: Serializable b
=> MasterGuard a
-> Int
-> (M.Map DP.ProcessId a -> Event DIO (M.Map DP.ProcessId b))
-> Process DIO (M.Map DP.ProcessId b)
awaitMasterGuard guard n transform =
expectProcess $
do m <- readRef $ masterGuardSlaveMessages guard
if M.size m < n
then return Nothing
else do m' <- transform m
inboxId <- liftComp messageInboxId
forM_ (M.keys m) $ \slaveId ->
sendMessage slaveId (MasterMessage inboxId $ M.lookup slaveId m')
return $ Just m'
awaitSlaveGuard :: (Serializable a,
Serializable b)
=> SlaveGuard a
-> DP.ProcessId
-> Event DIO b
-> Process DIO (Maybe a)
awaitSlaveGuard guard masterId generator =
do liftEvent $
do b <- generator
inboxId <- liftComp messageInboxId
sendMessage masterId (SlaveMessage inboxId b)
expectProcess $
readRef $ slaveGuardAcknowledgedMessage guard
runMasterGuard :: (Serializable a,
Serializable b)
=> Int
-> (M.Map DP.ProcessId a -> Event DIO (M.Map DP.ProcessId b))
-> Process DIO (M.Map DP.ProcessId b)
runMasterGuard n transform =
do source <- liftSimulation newSignalSource
liftEvent $
do guard <- newMasterGuard
enqueueEventWithStopTime $
runProcess $
do b <- awaitMasterGuard guard n transform
liftEvent $
triggerSignal source b
processAwait $ publishSignal source
runSlaveGuard :: (Serializable a,
Serializable b)
=> DP.ProcessId
-> Event DIO a
-> Process DIO (Maybe b)
runSlaveGuard masterId generator =
do source <- liftSimulation newSignalSource
liftEvent $
do guard <- newSlaveGuard
enqueueEventWithStopTime $
runProcess $
do b <- awaitSlaveGuard guard masterId generator
liftEvent $
triggerSignal source b
processAwait $ publishSignal source
runMasterGuard_ :: Int -> Process DIO ()
runMasterGuard_ n =
do _ <- runMasterGuard n transform :: Process DIO (M.Map DP.ProcessId ())
return ()
where transform m = return m
runSlaveGuard_ :: DP.ProcessId -> Process DIO ()
runSlaveGuard_ masterId =
do _ <- runSlaveGuard masterId generator :: Process DIO (Maybe ())
return ()
where generator = return ()