module Simulation.Aivika.Task
(
Task,
TaskResult(..),
taskId,
tryGetTaskResult,
taskResult,
taskResultReceived,
taskProcess,
cancelTask,
taskCancelled,
runTask,
runTaskUsingId,
spawnTask,
spawnTaskUsingId,
spawnTaskWith,
spawnTaskUsingIdWith,
enqueueTask,
enqueueTaskUsingId,
taskParallelResult,
taskParallelProcess) where
import Data.IORef
import Data.Monoid
import Control.Monad
import Control.Monad.Trans
import Control.Exception
import Simulation.Aivika.Internal.Simulation
import Simulation.Aivika.Internal.Dynamics
import Simulation.Aivika.Internal.Event
import Simulation.Aivika.Internal.Cont
import Simulation.Aivika.Internal.Process
import Simulation.Aivika.Signal
data Task a =
Task { taskId :: ProcessId,
taskResultRef :: IORef (Maybe (TaskResult a)),
taskResultReceived :: Signal (TaskResult a)
}
data TaskResult a = TaskCompleted a
| TaskError IOException
| TaskCancelled
tryGetTaskResult :: Task a -> Event (Maybe (TaskResult a))
tryGetTaskResult t =
Event $ \p -> readIORef (taskResultRef t)
taskResult :: Task a -> Process (TaskResult a)
taskResult t =
do x <- liftIO $ readIORef (taskResultRef t)
case x of
Just x -> return x
Nothing -> processAwait (taskResultReceived t)
cancelTask :: Task a -> Event ()
cancelTask t =
cancelProcessWithId (taskId t)
taskCancelled :: Task a -> Event Bool
taskCancelled t =
processCancelled (taskId t)
newTaskUsingId :: ProcessId -> Process a -> Event (Task a, Process ())
newTaskUsingId pid p =
do r <- liftIO $ newIORef Nothing
s <- liftSimulation newSignalSource
let t = Task { taskId = pid,
taskResultRef = r,
taskResultReceived = publishSignal s }
let m =
do v <- liftIO $ newIORef TaskCancelled
finallyProcess
(catchProcess
(do a <- p
liftIO $ writeIORef v (TaskCompleted a))
(\e ->
liftIO $ writeIORef v (TaskError e)))
(liftEvent $
do x <- liftIO $ readIORef v
liftIO $ writeIORef r (Just x)
triggerSignal s x)
return (t, m)
runTaskUsingId :: ProcessId -> Process a -> Event (Task a)
runTaskUsingId pid p =
do (t, m) <- newTaskUsingId pid p
runProcessUsingId pid m
return t
runTask :: Process a -> Event (Task a)
runTask p =
do pid <- liftSimulation newProcessId
runTaskUsingId pid p
enqueueTaskUsingId :: Double -> ProcessId -> Process a -> Event (Task a)
enqueueTaskUsingId time pid p =
do (t, m) <- newTaskUsingId pid p
enqueueProcessUsingId time pid m
return t
enqueueTask :: Double -> Process a -> Event (Task a)
enqueueTask time p =
do pid <- liftSimulation newProcessId
enqueueTaskUsingId time pid p
spawnTaskUsingId :: ProcessId -> Process a -> Process (Task a)
spawnTaskUsingId = spawnTaskUsingIdWith CancelTogether
spawnTask :: Process a -> Process (Task a)
spawnTask = spawnTaskWith CancelTogether
spawnTaskUsingIdWith :: ContCancellation -> ProcessId -> Process a -> Process (Task a)
spawnTaskUsingIdWith cancellation pid p =
do (t, m) <- liftEvent $ newTaskUsingId pid p
spawnProcessUsingIdWith cancellation pid m
return t
spawnTaskWith :: ContCancellation -> Process a -> Process (Task a)
spawnTaskWith cancellation p =
do pid <- liftSimulation newProcessId
spawnTaskUsingIdWith cancellation pid p
taskProcess :: Task a -> Process a
taskProcess t =
do x <- finallyProcess
(taskResult t)
(do pid <- processId
liftEvent $
do cancelled <- processCancelled pid
when cancelled $
cancelTask t)
case x of
TaskCompleted a -> return a
TaskError e -> throwProcess e
TaskCancelled -> cancelProcess
taskParallelResult :: Task a -> Task a -> Process (TaskResult a, Task a)
taskParallelResult t1 t2 =
do x1 <- liftIO $ readIORef (taskResultRef t1)
case x1 of
Just x1 -> return (x1, t2)
Nothing ->
do x2 <- liftIO $ readIORef (taskResultRef t2)
case x2 of
Just x2 -> return (x2, t1)
Nothing ->
do let s1 = fmap Left $ taskResultReceived t1
s2 = fmap Right $ taskResultReceived t2
x <- processAwait $ s1 <> s2
case x of
Left x1 -> return (x1, t2)
Right x2 -> return (x2, t1)
taskParallelProcess :: Task a -> Task a -> Process (a, Task a)
taskParallelProcess t1 t2 =
do (x, t) <-
finallyProcess
(taskParallelResult t1 t2)
(do pid <- processId
liftEvent $
do cancelled <- processCancelled pid
when cancelled $
do cancelTask t1
cancelTask t2)
case x of
TaskCompleted a -> return (a, t)
TaskError e ->
do liftEvent $ cancelTask t
throwProcess e
TaskCancelled ->
do liftEvent $ cancelTask t
cancelProcess