module Simulation.Aivika.Trans.Task
(
Task,
TaskResult(..),
taskId,
tryGetTaskResult,
taskResult,
taskResultReceived,
taskProcess,
cancelTask,
taskCancelled,
runTask,
runTaskUsingId,
spawnTask,
spawnTaskUsingId,
spawnTaskWith,
spawnTaskUsingIdWith,
enqueueTask,
enqueueTaskUsingId,
taskParallelResult,
taskParallelProcess) where
import Data.Monoid
import Control.Monad
import Control.Monad.Trans
import Control.Exception
import Simulation.Aivika.Trans.Ref.Base
import Simulation.Aivika.Trans.DES
import Simulation.Aivika.Trans.Internal.Specs
import Simulation.Aivika.Trans.Internal.Parameter
import Simulation.Aivika.Trans.Internal.Simulation
import Simulation.Aivika.Trans.Internal.Dynamics
import Simulation.Aivika.Trans.Internal.Event
import Simulation.Aivika.Trans.Internal.Cont
import Simulation.Aivika.Trans.Internal.Process
import Simulation.Aivika.Trans.Signal
data Task m a =
Task { taskId :: ProcessId m,
taskResultRef :: Ref m (Maybe (TaskResult a)),
taskResultReceived :: Signal m (TaskResult a)
}
data TaskResult a = TaskCompleted a
| TaskError SomeException
| TaskCancelled
tryGetTaskResult :: MonadDES m => Task m a -> Event m (Maybe (TaskResult a))
tryGetTaskResult t = readRef (taskResultRef t)
taskResult :: MonadDES m => Task m a -> Process m (TaskResult a)
taskResult t =
do x <- liftEvent $ readRef (taskResultRef t)
case x of
Just x -> return x
Nothing -> processAwait (taskResultReceived t)
cancelTask :: MonadDES m => Task m a -> Event m ()
cancelTask t =
cancelProcessWithId (taskId t)
taskCancelled :: MonadDES m => Task m a -> Event m Bool
taskCancelled t =
processCancelled (taskId t)
newTaskUsingId :: MonadDES m => ProcessId m -> Process m a -> Event m (Task m a, Process m ())
newTaskUsingId pid p =
do r <- liftSimulation $ newRef Nothing
s <- liftSimulation newSignalSource
let t = Task { taskId = pid,
taskResultRef = r,
taskResultReceived = publishSignal s }
let m =
do v <- liftSimulation $ newRef TaskCancelled
finallyProcess
(catchProcess
(do a <- p
liftEvent $ writeRef v (TaskCompleted a))
(\e ->
liftEvent $ writeRef v (TaskError e)))
(liftEvent $
do x <- readRef v
writeRef r (Just x)
triggerSignal s x)
return (t, m)
runTaskUsingId :: MonadDES m => ProcessId m -> Process m a -> Event m (Task m a)
runTaskUsingId pid p =
do (t, m) <- newTaskUsingId pid p
runProcessUsingId pid m
return t
runTask :: MonadDES m => Process m a -> Event m (Task m a)
runTask p =
do pid <- liftSimulation newProcessId
runTaskUsingId pid p
enqueueTaskUsingId :: MonadDES m => Double -> ProcessId m -> Process m a -> Event m (Task m a)
enqueueTaskUsingId time pid p =
do (t, m) <- newTaskUsingId pid p
enqueueProcessUsingId time pid m
return t
enqueueTask :: MonadDES m => Double -> Process m a -> Event m (Task m a)
enqueueTask time p =
do pid <- liftSimulation newProcessId
enqueueTaskUsingId time pid p
spawnTaskUsingId :: MonadDES m => ProcessId m -> Process m a -> Process m (Task m a)
spawnTaskUsingId = spawnTaskUsingIdWith CancelTogether
spawnTask :: MonadDES m => Process m a -> Process m (Task m a)
spawnTask = spawnTaskWith CancelTogether
spawnTaskUsingIdWith :: MonadDES m => ContCancellation -> ProcessId m -> Process m a -> Process m (Task m a)
spawnTaskUsingIdWith cancellation pid p =
do (t, m) <- liftEvent $ newTaskUsingId pid p
spawnProcessUsingIdWith cancellation pid m
return t
spawnTaskWith :: MonadDES m => ContCancellation -> Process m a -> Process m (Task m a)
spawnTaskWith cancellation p =
do pid <- liftSimulation newProcessId
spawnTaskUsingIdWith cancellation pid p
taskProcess :: MonadDES m => Task m a -> Process m a
taskProcess t =
do x <- finallyProcess
(taskResult t)
(do pid <- processId
liftEvent $
do cancelled <- processCancelled pid
when cancelled $
cancelTask t)
case x of
TaskCompleted a -> return a
TaskError e -> throwProcess e
TaskCancelled -> cancelProcess
taskParallelResult :: MonadDES m => Task m a -> Task m a -> Process m (TaskResult a, Task m a)
taskParallelResult t1 t2 =
do x1 <- liftEvent $ readRef (taskResultRef t1)
case x1 of
Just x1 -> return (x1, t2)
Nothing ->
do x2 <- liftEvent $ readRef (taskResultRef t2)
case x2 of
Just x2 -> return (x2, t1)
Nothing ->
do let s1 = fmap Left $ taskResultReceived t1
s2 = fmap Right $ taskResultReceived t2
x <- processAwait $ s1 <> s2
case x of
Left x1 -> return (x1, t2)
Right x2 -> return (x2, t1)
taskParallelProcess :: MonadDES m => Task m a -> Task m a -> Process m (a, Task m a)
taskParallelProcess t1 t2 =
do (x, t) <-
finallyProcess
(taskParallelResult t1 t2)
(do pid <- processId
liftEvent $
do cancelled <- processCancelled pid
when cancelled $
do cancelTask t1
cancelTask t2)
case x of
TaskCompleted a -> return (a, t)
TaskError e ->
do liftEvent $ cancelTask t
throwProcess e
TaskCancelled ->
do liftEvent $ cancelTask t
cancelProcess