module Simulation.Aivika.Internal.Process
(
ProcessId,
Process(..),
ProcessLift(..),
invokeProcess,
runProcess,
runProcessUsingId,
runProcessInStartTime,
runProcessInStartTimeUsingId,
runProcessInStopTime,
runProcessInStopTimeUsingId,
spawnProcess,
spawnProcessUsingId,
spawnProcessWith,
spawnProcessUsingIdWith,
enqueueProcess,
enqueueProcessUsingId,
newProcessId,
processId,
processUsingId,
holdProcess,
interruptProcess,
processInterrupted,
passivateProcess,
processPassive,
reactivateProcess,
cancelProcessWithId,
cancelProcess,
processCancelled,
processCancelling,
whenCancellingProcess,
processAwait,
processPreemptionBegin,
processPreemptionEnd,
processPreemptionBeginning,
processPreemptionEnding,
processYield,
timeoutProcess,
timeoutProcessUsingId,
processParallel,
processParallelUsingIds,
processParallel_,
processParallelUsingIds_,
catchProcess,
finallyProcess,
throwProcess,
zipProcessParallel,
zip3ProcessParallel,
unzipProcess,
memoProcess,
neverProcess,
retryProcess,
traceProcess) where
import Data.Maybe
import Data.IORef
import Control.Exception
import Control.Monad
import Control.Monad.Trans
import Control.Applicative
import Simulation.Aivika.Internal.Specs
import Simulation.Aivika.Internal.Parameter
import Simulation.Aivika.Internal.Simulation
import Simulation.Aivika.Internal.Dynamics
import Simulation.Aivika.Internal.Event
import Simulation.Aivika.Internal.Cont
import Simulation.Aivika.Signal
data ProcessId =
ProcessId { processStarted :: IORef Bool,
processReactCont :: IORef (Maybe (ContParams ())),
processContId :: ContId,
processInterruptRef :: IORef Bool,
processInterruptCont :: IORef (Maybe (ContParams ())),
processInterruptTime :: IORef Double,
processInterruptVersion :: IORef Int }
newtype Process a = Process (ProcessId -> Cont a)
class ProcessLift m where
liftProcess :: Process a -> m a
instance ProcessLift Process where
liftProcess = id
invokeProcess :: ProcessId -> Process a -> Cont a
invokeProcess pid (Process m) = m pid
holdProcess :: Double -> Process ()
holdProcess dt =
Process $ \pid ->
Cont $ \c ->
Event $ \p ->
do when (dt < 0) $
error "Time period dt < 0: holdProcess"
let x = processInterruptCont pid
t = pointTime p + dt
writeIORef x $ Just c
writeIORef (processInterruptRef pid) False
writeIORef (processInterruptTime pid) t
v <- readIORef (processInterruptVersion pid)
invokeEvent p $
enqueueEvent t $
Event $ \p ->
do v' <- readIORef (processInterruptVersion pid)
when (v == v') $
do writeIORef x Nothing
invokeEvent p $ resumeCont c ()
interruptProcess :: ProcessId -> Event ()
interruptProcess pid =
Event $ \p ->
do let x = processInterruptCont pid
a <- readIORef x
case a of
Nothing -> return ()
Just c ->
do writeIORef x Nothing
writeIORef (processInterruptRef pid) True
modifyIORef (processInterruptVersion pid) $ (+) 1
invokeEvent p $ enqueueEvent (pointTime p) $ resumeCont c ()
processInterrupted :: ProcessId -> Event Bool
processInterrupted pid =
Event $ \p ->
readIORef (processInterruptRef pid)
processPreempted :: ProcessId -> Event ()
processPreempted pid =
Event $ \p ->
do let x = processInterruptCont pid
a <- readIORef x
case a of
Just c ->
do writeIORef x Nothing
writeIORef (processInterruptRef pid) True
modifyIORef (processInterruptVersion pid) $ (+) 1
t <- readIORef (processInterruptTime pid)
let dt = t pointTime p
c' = substituteCont c $ \a ->
Event $ \p ->
invokeEvent p $
invokeCont c $
invokeProcess pid $
holdProcess dt
invokeEvent p $
reenterCont c' ()
Nothing ->
do let x = processReactCont pid
a <- readIORef x
case a of
Nothing ->
return ()
Just c ->
do let c' = substituteCont c $ reenterCont c
writeIORef x $ Just c'
passivateProcess :: Process ()
passivateProcess =
Process $ \pid ->
Cont $ \c ->
Event $ \p ->
do let x = processReactCont pid
a <- readIORef x
case a of
Nothing -> writeIORef x $ Just c
Just _ -> error "Cannot passivate the process twice: passivateProcess"
processPassive :: ProcessId -> Event Bool
processPassive pid =
Event $ \p ->
do let x = processReactCont pid
a <- readIORef x
return $ isJust a
reactivateProcess :: ProcessId -> Event ()
reactivateProcess pid =
Event $ \p ->
do let x = processReactCont pid
a <- readIORef x
case a of
Nothing ->
return ()
Just c ->
do writeIORef x Nothing
invokeEvent p $ enqueueEvent (pointTime p) $ resumeCont c ()
processIdPrepare :: ProcessId -> Event ()
processIdPrepare pid =
Event $ \p ->
do y <- readIORef (processStarted pid)
if y
then error $
"Another process with the specified identifier " ++
"has been started already: processIdPrepare"
else writeIORef (processStarted pid) True
let signal = contSignal $ processContId pid
invokeEvent p $
handleSignal_ signal $ \e ->
Event $ \p ->
case e of
ContCancellationInitiating ->
do z <- contCancellationActivated $ processContId pid
when z $
do invokeEvent p $ interruptProcess pid
invokeEvent p $ reactivateProcess pid
ContPreemptionBeginning ->
invokeEvent p $ processPreempted pid
ContPreemptionEnding ->
return ()
runProcess :: Process () -> Event ()
runProcess p =
do pid <- liftSimulation newProcessId
runProcessUsingId pid p
runProcessUsingId :: ProcessId -> Process () -> Event ()
runProcessUsingId pid p =
do processIdPrepare pid
runCont m cont econt ccont (processContId pid) False
where cont = return
econt = throwEvent
ccont = return
m = invokeProcess pid p
runProcessInStartTime :: Process () -> Simulation ()
runProcessInStartTime = runEventInStartTime . runProcess
runProcessInStartTimeUsingId :: ProcessId -> Process () -> Simulation ()
runProcessInStartTimeUsingId pid p =
runEventInStartTime $ runProcessUsingId pid p
runProcessInStopTime :: Process () -> Simulation ()
runProcessInStopTime = runEventInStopTime . runProcess
runProcessInStopTimeUsingId :: ProcessId -> Process () -> Simulation ()
runProcessInStopTimeUsingId pid p =
runEventInStopTime $ runProcessUsingId pid p
enqueueProcess :: Double -> Process () -> Event ()
enqueueProcess t p =
enqueueEvent t $ runProcess p
enqueueProcessUsingId :: Double -> ProcessId -> Process () -> Event ()
enqueueProcessUsingId t pid p =
enqueueEvent t $ runProcessUsingId pid p
processId :: Process ProcessId
processId = Process return
newProcessId :: Simulation ProcessId
newProcessId =
do x <- liftIO $ newIORef Nothing
y <- liftIO $ newIORef False
c <- newContId
i <- liftIO $ newIORef False
z <- liftIO $ newIORef Nothing
t <- liftIO $ newIORef 0
v <- liftIO $ newIORef 0
return ProcessId { processStarted = y,
processReactCont = x,
processContId = c,
processInterruptRef = i,
processInterruptCont = z,
processInterruptTime = t,
processInterruptVersion = v }
cancelProcessWithId :: ProcessId -> Event ()
cancelProcessWithId pid = contCancellationInitiate (processContId pid)
cancelProcess :: Process a
cancelProcess =
do pid <- processId
liftEvent $ cancelProcessWithId pid
throwProcess $
(error "The process must be cancelled already: cancelProcess." :: SomeException)
processCancelled :: ProcessId -> Event Bool
processCancelled pid = contCancellationInitiated (processContId pid)
processCancelling :: ProcessId -> Signal ()
processCancelling pid = contCancellationInitiating (processContId pid)
whenCancellingProcess :: Event () -> Process ()
whenCancellingProcess h =
Process $ \pid ->
liftEvent $
handleSignal_ (processCancelling pid) $ \() -> h
processPreemptionBegin :: ProcessId -> Event ()
processPreemptionBegin pid = contPreemptionBegin (processContId pid)
processPreemptionEnd :: ProcessId -> Event ()
processPreemptionEnd pid = contPreemptionEnd (processContId pid)
processPreemptionBeginning :: ProcessId -> Signal ()
processPreemptionBeginning pid = contPreemptionBeginning (processContId pid)
processPreemptionEnding :: ProcessId -> Signal ()
processPreemptionEnding pid = contPreemptionEnding (processContId pid)
instance Eq ProcessId where
x == y = processReactCont x == processReactCont y
instance Monad Process where
return = returnP
m >>= k = bindP m k
instance Functor Process where
fmap = liftM
instance Applicative Process where
pure = return
(<*>) = ap
instance ParameterLift Process where
liftParameter = liftPP
instance SimulationLift Process where
liftSimulation = liftSP
instance DynamicsLift Process where
liftDynamics = liftDP
instance EventLift Process where
liftEvent = liftEP
instance MonadIO Process where
liftIO = liftIOP
returnP :: a -> Process a
returnP a = Process $ \pid -> return a
bindP :: Process a -> (a -> Process b) -> Process b
bindP (Process m) k =
Process $ \pid ->
do a <- m pid
let Process m' = k a
m' pid
liftPP :: Parameter a -> Process a
liftPP m = Process $ \pid -> liftParameter m
liftSP :: Simulation a -> Process a
liftSP m = Process $ \pid -> liftSimulation m
liftDP :: Dynamics a -> Process a
liftDP m = Process $ \pid -> liftDynamics m
liftEP :: Event a -> Process a
liftEP m = Process $ \pid -> liftEvent m
liftIOP :: IO a -> Process a
liftIOP m = Process $ \pid -> liftIO m
catchProcess :: Exception e => Process a -> (e -> Process a) -> Process a
catchProcess (Process m) h =
Process $ \pid ->
catchCont (m pid) $ \e ->
let Process m' = h e in m' pid
finallyProcess :: Process a -> Process b -> Process a
finallyProcess (Process m) (Process m') =
Process $ \pid ->
finallyCont (m pid) (m' pid)
throwProcess :: Exception e => e -> Process a
throwProcess = liftIO . throw
processParallel :: [Process a] -> Process [a]
processParallel xs =
liftSimulation (processParallelCreateIds xs) >>= processParallelUsingIds
processParallelUsingIds :: [(ProcessId, Process a)] -> Process [a]
processParallelUsingIds xs =
Process $ \pid ->
do liftEvent $ processParallelPrepare xs
contParallel $
flip map xs $ \(pid, m) ->
(invokeProcess pid m, processContId pid)
processParallel_ :: [Process a] -> Process ()
processParallel_ xs =
liftSimulation (processParallelCreateIds xs) >>= processParallelUsingIds_
processParallelUsingIds_ :: [(ProcessId, Process a)] -> Process ()
processParallelUsingIds_ xs =
Process $ \pid ->
do liftEvent $ processParallelPrepare xs
contParallel_ $
flip map xs $ \(pid, m) ->
(invokeProcess pid m, processContId pid)
processParallelCreateIds :: [Process a] -> Simulation [(ProcessId, Process a)]
processParallelCreateIds xs =
do pids <- liftSimulation $ forM xs $ const newProcessId
return $ zip pids xs
processParallelPrepare :: [(ProcessId, Process a)] -> Event ()
processParallelPrepare xs =
Event $ \p ->
forM_ xs $ invokeEvent p . processIdPrepare . fst
processUsingId :: ProcessId -> Process a -> Process a
processUsingId pid x =
Process $ \pid' ->
do liftEvent $ processIdPrepare pid
rerunCont (invokeProcess pid x) (processContId pid)
spawnProcess :: Process () -> Process ()
spawnProcess = spawnProcessWith CancelTogether
spawnProcessUsingId :: ProcessId -> Process () -> Process ()
spawnProcessUsingId = spawnProcessUsingIdWith CancelTogether
spawnProcessWith :: ContCancellation -> Process () -> Process ()
spawnProcessWith cancellation x =
do pid <- liftSimulation newProcessId
spawnProcessUsingIdWith cancellation pid x
spawnProcessUsingIdWith :: ContCancellation -> ProcessId -> Process () -> Process ()
spawnProcessUsingIdWith cancellation pid x =
Process $ \pid' ->
do liftEvent $ processIdPrepare pid
spawnCont cancellation (invokeProcess pid x) (processContId pid)
processAwait :: Signal a -> Process a
processAwait signal =
Process $ \pid -> contAwait signal
data MemoResult a = MemoComputed a
| MemoError IOException
| MemoCancelled
memoProcess :: Process a -> Simulation (Process a)
memoProcess x =
do started <- liftIO $ newIORef False
computed <- newSignalSource
value <- liftIO $ newIORef Nothing
let result =
do Just x <- liftIO $ readIORef value
case x of
MemoComputed a -> return a
MemoError e -> throwProcess e
MemoCancelled -> cancelProcess
return $
do v <- liftIO $ readIORef value
case v of
Just _ -> result
Nothing ->
do f <- liftIO $ readIORef started
case f of
True ->
do processAwait $ publishSignal computed
result
False ->
do liftIO $ writeIORef started True
r <- liftIO $ newIORef MemoCancelled
finallyProcess
(catchProcess
(do a <- x
liftIO $ writeIORef r (MemoComputed a))
(\e ->
liftIO $ writeIORef r (MemoError e)))
(liftEvent $
do liftIO $
do x <- readIORef r
writeIORef value (Just x)
triggerSignal computed ())
result
zipProcessParallel :: Process a -> Process b -> Process (a, b)
zipProcessParallel x y =
do [Left a, Right b] <- processParallel [fmap Left x, fmap Right y]
return (a, b)
zip3ProcessParallel :: Process a -> Process b -> Process c -> Process (a, b, c)
zip3ProcessParallel x y z =
do [Left a,
Right (Left b),
Right (Right c)] <-
processParallel [fmap Left x,
fmap (Right . Left) y,
fmap (Right . Right) z]
return (a, b, c)
unzipProcess :: Process (a, b) -> Simulation (Process a, Process b)
unzipProcess xy =
do xy' <- memoProcess xy
return (fmap fst xy', fmap snd xy')
timeoutProcess :: Double -> Process a -> Process (Maybe a)
timeoutProcess timeout p =
do pid <- liftSimulation newProcessId
timeoutProcessUsingId timeout pid p
timeoutProcessUsingId :: Double -> ProcessId -> Process a -> Process (Maybe a)
timeoutProcessUsingId timeout pid p =
do s <- liftSimulation newSignalSource
timeoutPid <- liftSimulation newProcessId
spawnProcessUsingIdWith CancelChildAfterParent timeoutPid $
do holdProcess timeout
liftEvent $
cancelProcessWithId pid
spawnProcessUsingIdWith CancelChildAfterParent pid $
do r <- liftIO $ newIORef Nothing
finallyProcess
(catchProcess
(do a <- p
liftIO $ writeIORef r $ Just (Right a))
(\e ->
liftIO $ writeIORef r $ Just (Left e)))
(liftEvent $
do cancelProcessWithId timeoutPid
x <- liftIO $ readIORef r
triggerSignal s x)
x <- processAwait $ publishSignal s
case x of
Nothing -> return Nothing
Just (Right a) -> return (Just a)
Just (Left (SomeException e)) -> throwProcess e
processYield :: Process ()
processYield =
Process $ \pid ->
Cont $ \c ->
Event $ \p ->
invokeEvent p $
enqueueEvent (pointTime p) $
resumeCont c ()
neverProcess :: Process a
neverProcess =
Process $ \pid ->
Cont $ \c ->
let signal = processCancelling pid
in handleSignal_ signal $ \_ ->
resumeCont c $ error "It must never be computed: neverProcess"
retryProcess :: String -> Process a
retryProcess = liftEvent . retryEvent
traceProcess :: String -> Process a -> Process a
traceProcess message m =
Process $ \pid ->
traceCont message $
invokeProcess pid m