module Simulation.Aivika.Trans.Internal.Process
(
ProcessId,
Process(..),
ProcessLift(..),
invokeProcess,
runProcess,
runProcessUsingId,
runProcessInStartTime,
runProcessInStartTimeUsingId,
runProcessInStopTime,
runProcessInStopTimeUsingId,
spawnProcess,
spawnProcessUsingId,
spawnProcessWith,
spawnProcessUsingIdWith,
enqueueProcess,
enqueueProcessUsingId,
newProcessId,
processId,
processUsingId,
holdProcess,
interruptProcess,
processInterrupted,
passivateProcess,
processPassive,
reactivateProcess,
cancelProcessWithId,
cancelProcess,
processCancelled,
processCancelling,
whenCancellingProcess,
processAwait,
processPreemptionBegin,
processPreemptionEnd,
processPreemptionBeginning,
processPreemptionEnding,
processYield,
timeoutProcess,
timeoutProcessUsingId,
processParallel,
processParallelUsingIds,
processParallel_,
processParallelUsingIds_,
catchProcess,
finallyProcess,
throwProcess,
zipProcessParallel,
zip3ProcessParallel,
unzipProcess,
memoProcess,
neverProcess,
traceProcess) where
import Data.Maybe
import Control.Exception
import Control.Monad
import Control.Monad.Trans
import Control.Applicative
import Simulation.Aivika.Trans.Ref.Base
import Simulation.Aivika.Trans.Comp
import Simulation.Aivika.Trans.DES
import Simulation.Aivika.Trans.Internal.Specs
import Simulation.Aivika.Trans.Internal.Parameter
import Simulation.Aivika.Trans.Internal.Simulation
import Simulation.Aivika.Trans.Internal.Dynamics
import Simulation.Aivika.Trans.Internal.Event
import Simulation.Aivika.Trans.Internal.Cont
import Simulation.Aivika.Trans.Signal
data ProcessId m =
ProcessId { processStarted :: Ref m Bool,
processReactCont :: Ref m (Maybe (ContParams m ())),
processContId :: ContId m,
processInterruptRef :: Ref m Bool,
processInterruptCont :: Ref m (Maybe (ContParams m ())),
processInterruptTime :: Ref m Double,
processInterruptVersion :: Ref m Int }
newtype Process m a = Process (ProcessId m -> Cont m a)
class ProcessLift t m where
liftProcess :: Process m a -> t m a
invokeProcess :: ProcessId m -> Process m a -> Cont m a
invokeProcess pid (Process m) = m pid
holdProcess :: MonadDES m => Double -> Process m ()
holdProcess dt =
Process $ \pid ->
Cont $ \c ->
Event $ \p ->
do when (dt < 0) $
error "Time period dt < 0: holdProcess"
let x = processInterruptCont pid
t = pointTime p + dt
invokeEvent p $ writeRef x $ Just c
invokeEvent p $ writeRef (processInterruptRef pid) False
invokeEvent p $ writeRef (processInterruptTime pid) t
v <- invokeEvent p $ readRef (processInterruptVersion pid)
invokeEvent p $
enqueueEvent t $
Event $ \p ->
do v' <- invokeEvent p $ readRef (processInterruptVersion pid)
when (v == v') $
do invokeEvent p $ writeRef x Nothing
invokeEvent p $ resumeCont c ()
interruptProcess :: MonadDES m => ProcessId m -> Event m ()
interruptProcess pid =
Event $ \p ->
do let x = processInterruptCont pid
a <- invokeEvent p $ readRef x
case a of
Nothing -> return ()
Just c ->
do invokeEvent p $ writeRef x Nothing
invokeEvent p $ writeRef (processInterruptRef pid) True
invokeEvent p $ modifyRef (processInterruptVersion pid) $ (+) 1
invokeEvent p $ enqueueEvent (pointTime p) $ resumeCont c ()
processInterrupted :: MonadDES m => ProcessId m -> Event m Bool
processInterrupted pid =
Event $ \p ->
invokeEvent p $ readRef (processInterruptRef pid)
processPreempted :: MonadDES m => ProcessId m -> Event m ()
processPreempted pid =
Event $ \p ->
do let x = processInterruptCont pid
a <- invokeEvent p $ readRef x
case a of
Just c ->
do invokeEvent p $ writeRef x Nothing
invokeEvent p $ writeRef (processInterruptRef pid) True
invokeEvent p $ modifyRef (processInterruptVersion pid) $ (+) 1
t <- invokeEvent p $ readRef (processInterruptTime pid)
let dt = t pointTime p
c' = substituteCont c $ \a ->
Event $ \p ->
invokeEvent p $
invokeCont c $
invokeProcess pid $
holdProcess dt
invokeEvent p $
reenterCont c' ()
Nothing ->
do let x = processReactCont pid
a <- invokeEvent p $ readRef x
case a of
Nothing ->
return ()
Just c ->
do let c' = substituteCont c $ reenterCont c
invokeEvent p $ writeRef x $ Just c'
passivateProcess :: MonadDES m => Process m ()
passivateProcess =
Process $ \pid ->
Cont $ \c ->
Event $ \p ->
do let x = processReactCont pid
a <- invokeEvent p $ readRef x
case a of
Nothing -> invokeEvent p $ writeRef x $ Just c
Just _ -> error "Cannot passivate the process twice: passivateProcess"
processPassive :: MonadDES m => ProcessId m -> Event m Bool
processPassive pid =
Event $ \p ->
do let x = processReactCont pid
a <- invokeEvent p $ readRef x
return $ isJust a
reactivateProcess :: MonadDES m => ProcessId m -> Event m ()
reactivateProcess pid =
Event $ \p ->
do let x = processReactCont pid
a <- invokeEvent p $ readRef x
case a of
Nothing ->
return ()
Just c ->
do invokeEvent p $ writeRef x Nothing
invokeEvent p $ enqueueEvent (pointTime p) $ resumeCont c ()
processIdPrepare :: MonadDES m => ProcessId m -> Event m ()
processIdPrepare pid =
Event $ \p ->
do y <- invokeEvent p $ readRef (processStarted pid)
if y
then error $
"Another process with the specified identifier " ++
"has been started already: processIdPrepare"
else invokeEvent p $ writeRef (processStarted pid) True
let signal = contSignal $ processContId pid
invokeEvent p $
handleSignal_ signal $ \e ->
Event $ \p ->
case e of
ContCancellationInitiating ->
do z <- invokeEvent p $ contCancellationActivated $ processContId pid
when z $
do invokeEvent p $ interruptProcess pid
invokeEvent p $ reactivateProcess pid
ContPreemptionBeginning ->
invokeEvent p $ processPreempted pid
ContPreemptionEnding ->
return ()
runProcess :: MonadDES m => Process m () -> Event m ()
runProcess p =
do pid <- liftSimulation newProcessId
runProcessUsingId pid p
runProcessUsingId :: MonadDES m => ProcessId m -> Process m () -> Event m ()
runProcessUsingId pid p =
do processIdPrepare pid
runCont m cont econt ccont (processContId pid) False
where cont = return
econt = throwEvent
ccont = return
m = invokeProcess pid p
runProcessInStartTime :: MonadDES m => Process m () -> Simulation m ()
runProcessInStartTime = runEventInStartTime . runProcess
runProcessInStartTimeUsingId :: MonadDES m => ProcessId m -> Process m () -> Simulation m ()
runProcessInStartTimeUsingId pid p =
runEventInStartTime $ runProcessUsingId pid p
runProcessInStopTime :: MonadDES m => Process m () -> Simulation m ()
runProcessInStopTime = runEventInStopTime . runProcess
runProcessInStopTimeUsingId :: MonadDES m => ProcessId m -> Process m () -> Simulation m ()
runProcessInStopTimeUsingId pid p =
runEventInStopTime $ runProcessUsingId pid p
enqueueProcess :: MonadDES m => Double -> Process m () -> Event m ()
enqueueProcess t p =
enqueueEvent t $ runProcess p
enqueueProcessUsingId :: MonadDES m => Double -> ProcessId m -> Process m () -> Event m ()
enqueueProcessUsingId t pid p =
enqueueEvent t $ runProcessUsingId pid p
processId :: MonadDES m => Process m (ProcessId m)
processId = Process return
newProcessId :: MonadDES m => Simulation m (ProcessId m)
newProcessId =
Simulation $ \r ->
do x <- invokeSimulation r $ newRef Nothing
y <- invokeSimulation r $ newRef False
c <- invokeSimulation r $ newContId
i <- invokeSimulation r $ newRef False
z <- invokeSimulation r $ newRef Nothing
t <- invokeSimulation r $ newRef 0
v <- invokeSimulation r $ newRef 0
return ProcessId { processStarted = y,
processReactCont = x,
processContId = c,
processInterruptRef = i,
processInterruptCont = z,
processInterruptTime = t,
processInterruptVersion = v }
cancelProcessWithId :: MonadDES m => ProcessId m -> Event m ()
cancelProcessWithId pid = contCancellationInitiate (processContId pid)
cancelProcess :: MonadDES m => Process m a
cancelProcess =
do pid <- processId
liftEvent $ cancelProcessWithId pid
throwProcess $
(error "The process must be cancelled already: cancelProcess." :: SomeException)
processCancelled :: MonadDES m => ProcessId m -> Event m Bool
processCancelled pid = contCancellationInitiated (processContId pid)
processCancelling :: MonadDES m => ProcessId m -> Signal m ()
processCancelling pid = contCancellationInitiating (processContId pid)
whenCancellingProcess :: MonadDES m => Event m () -> Process m ()
whenCancellingProcess h =
Process $ \pid ->
liftEvent $
handleSignal_ (processCancelling pid) $ \() -> h
processPreemptionBegin :: MonadDES m => ProcessId m -> Event m ()
processPreemptionBegin pid = contPreemptionBegin (processContId pid)
processPreemptionEnd :: MonadDES m => ProcessId m -> Event m ()
processPreemptionEnd pid = contPreemptionEnd (processContId pid)
processPreemptionBeginning :: MonadDES m => ProcessId m -> Signal m ()
processPreemptionBeginning pid = contPreemptionBeginning (processContId pid)
processPreemptionEnding :: MonadDES m => ProcessId m -> Signal m ()
processPreemptionEnding pid = contPreemptionEnding (processContId pid)
instance MonadDES m => Eq (ProcessId m) where
x == y = processStarted x == processStarted y
instance MonadDES m => Monad (Process m) where
return a = Process $ \pid -> return a
(Process m) >>= k =
Process $ \pid ->
do a <- m pid
let Process m' = k a
m' pid
instance MonadDES m => MonadCompTrans Process m where
liftComp = Process . const . liftComp
instance MonadDES m => Functor (Process m) where
fmap f (Process x) = Process $ \pid -> fmap f $ x pid
instance MonadDES m => Applicative (Process m) where
pure = Process . const . pure
(Process x) <*> (Process y) = Process $ \pid -> x pid <*> y pid
instance (MonadDES m, MonadIO m) => MonadIO (Process m) where
liftIO = Process . const . liftIO
instance MonadDES m => ParameterLift Process m where
liftParameter = Process . const . liftParameter
instance MonadDES m => SimulationLift Process m where
liftSimulation = Process . const . liftSimulation
instance MonadDES m => DynamicsLift Process m where
liftDynamics = Process . const . liftDynamics
instance MonadDES m => EventLift Process m where
liftEvent = Process . const . liftEvent
instance MonadDES m => ProcessLift Process m where
liftProcess = id
catchProcess :: (MonadDES m, Exception e) => Process m a -> (e -> Process m a) -> Process m a
catchProcess (Process m) h =
Process $ \pid ->
catchCont (m pid) $ \e ->
let Process m' = h e in m' pid
finallyProcess :: MonadDES m => Process m a -> Process m b -> Process m a
finallyProcess (Process m) (Process m') =
Process $ \pid ->
finallyCont (m pid) (m' pid)
throwProcess :: (MonadDES m, Exception e) => e -> Process m a
throwProcess = liftEvent . throwEvent
processParallel :: MonadDES m => [Process m a] -> Process m [a]
processParallel xs =
liftSimulation (processParallelCreateIds xs) >>= processParallelUsingIds
processParallelUsingIds :: MonadDES m => [(ProcessId m, Process m a)] -> Process m [a]
processParallelUsingIds xs =
Process $ \pid ->
do liftEvent $ processParallelPrepare xs
contParallel $
flip map xs $ \(pid, m) ->
(invokeProcess pid m, processContId pid)
processParallel_ :: MonadDES m => [Process m a] -> Process m ()
processParallel_ xs =
liftSimulation (processParallelCreateIds xs) >>= processParallelUsingIds_
processParallelUsingIds_ :: MonadDES m => [(ProcessId m, Process m a)] -> Process m ()
processParallelUsingIds_ xs =
Process $ \pid ->
do liftEvent $ processParallelPrepare xs
contParallel_ $
flip map xs $ \(pid, m) ->
(invokeProcess pid m, processContId pid)
processParallelCreateIds :: MonadDES m => [Process m a] -> Simulation m [(ProcessId m, Process m a)]
processParallelCreateIds xs =
do pids <- liftSimulation $ forM xs $ const newProcessId
return $ zip pids xs
processParallelPrepare :: MonadDES m => [(ProcessId m, Process m a)] -> Event m ()
processParallelPrepare xs =
Event $ \p ->
forM_ xs $ invokeEvent p . processIdPrepare . fst
processUsingId :: MonadDES m => ProcessId m -> Process m a -> Process m a
processUsingId pid x =
Process $ \pid' ->
do liftEvent $ processIdPrepare pid
rerunCont (invokeProcess pid x) (processContId pid)
spawnProcess :: MonadDES m => Process m () -> Process m ()
spawnProcess = spawnProcessWith CancelTogether
spawnProcessUsingId :: MonadDES m => ProcessId m -> Process m () -> Process m ()
spawnProcessUsingId = spawnProcessUsingIdWith CancelTogether
spawnProcessWith :: MonadDES m => ContCancellation -> Process m () -> Process m ()
spawnProcessWith cancellation x =
do pid <- liftSimulation newProcessId
spawnProcessUsingIdWith cancellation pid x
spawnProcessUsingIdWith :: MonadDES m => ContCancellation -> ProcessId m -> Process m () -> Process m ()
spawnProcessUsingIdWith cancellation pid x =
Process $ \pid' ->
do liftEvent $ processIdPrepare pid
spawnCont cancellation (invokeProcess pid x) (processContId pid)
processAwait :: MonadDES m => Signal m a -> Process m a
processAwait signal =
Process $ \pid -> contAwait signal
data MemoResult a = MemoComputed a
| MemoError IOException
| MemoCancelled
memoProcess :: MonadDES m => Process m a -> Simulation m (Process m a)
memoProcess x =
Simulation $ \r ->
do started <- invokeSimulation r $ newRef False
computed <- invokeSimulation r newSignalSource
value <- invokeSimulation r $ newRef Nothing
let result =
do Just x <- liftEvent $ readRef value
case x of
MemoComputed a -> return a
MemoError e -> throwProcess e
MemoCancelled -> cancelProcess
return $
do v <- liftEvent $ readRef value
case v of
Just _ -> result
Nothing ->
do f <- liftEvent $ readRef started
case f of
True ->
do processAwait $ publishSignal computed
result
False ->
do liftEvent $ writeRef started True
r <- liftSimulation $ newRef MemoCancelled
finallyProcess
(catchProcess
(do a <- x
liftEvent $ writeRef r (MemoComputed a))
(\e ->
liftEvent $ writeRef r (MemoError e)))
(liftEvent $
do x <- readRef r
writeRef value (Just x)
triggerSignal computed ())
result
zipProcessParallel :: MonadDES m => Process m a -> Process m b -> Process m (a, b)
zipProcessParallel x y =
do [Left a, Right b] <- processParallel [fmap Left x, fmap Right y]
return (a, b)
zip3ProcessParallel :: MonadDES m => Process m a -> Process m b -> Process m c -> Process m (a, b, c)
zip3ProcessParallel x y z =
do [Left a,
Right (Left b),
Right (Right c)] <-
processParallel [fmap Left x,
fmap (Right . Left) y,
fmap (Right . Right) z]
return (a, b, c)
unzipProcess :: MonadDES m => Process m (a, b) -> Simulation m (Process m a, Process m b)
unzipProcess xy =
do xy' <- memoProcess xy
return (fmap fst xy', fmap snd xy')
timeoutProcess :: MonadDES m => Double -> Process m a -> Process m (Maybe a)
timeoutProcess timeout p =
do pid <- liftSimulation newProcessId
timeoutProcessUsingId timeout pid p
timeoutProcessUsingId :: MonadDES m => Double -> ProcessId m -> Process m a -> Process m (Maybe a)
timeoutProcessUsingId timeout pid p =
do s <- liftSimulation newSignalSource
timeoutPid <- liftSimulation newProcessId
spawnProcessUsingIdWith CancelChildAfterParent timeoutPid $
do holdProcess timeout
liftEvent $
cancelProcessWithId pid
spawnProcessUsingIdWith CancelChildAfterParent pid $
do r <- liftSimulation $ newRef Nothing
finallyProcess
(catchProcess
(do a <- p
liftEvent $ writeRef r $ Just (Right a))
(\e ->
liftEvent $ writeRef r $ Just (Left e)))
(liftEvent $
do cancelProcessWithId timeoutPid
x <- readRef r
triggerSignal s x)
x <- processAwait $ publishSignal s
case x of
Nothing -> return Nothing
Just (Right a) -> return (Just a)
Just (Left (SomeException e)) -> throwProcess e
processYield :: MonadDES m => Process m ()
processYield =
Process $ \pid ->
Cont $ \c ->
Event $ \p ->
invokeEvent p $
enqueueEvent (pointTime p) $
resumeCont c ()
neverProcess :: MonadDES m => Process m a
neverProcess =
Process $ \pid ->
Cont $ \c ->
let signal = processCancelling pid
in handleSignal_ signal $ \_ ->
resumeCont c $ error "It must never be computed: neverProcess"
traceProcess :: MonadDES m => String -> Process m a -> Process m a
traceProcess message m =
Process $ \pid ->
traceCont message $
invokeProcess pid m