module Simulation.Aivika.Task
(
Task,
TaskResult(..),
taskId,
tryGetTaskResult,
taskResult,
taskResultReceived,
taskProcess,
cancelTask,
taskCancelled,
runTask,
runTaskUsingId,
spawnTask,
spawnTaskUsingId,
spawnTaskWith,
spawnTaskUsingIdWith,
enqueueTask,
enqueueTaskUsingId,
taskParallelResult,
taskParallelProcess) where
import Data.IORef
import Data.Monoid
import Control.Monad
import Control.Monad.Trans
import Control.Exception
import Simulation.Aivika.Internal.Simulation
import Simulation.Aivika.Internal.Dynamics
import Simulation.Aivika.Internal.Event
import Simulation.Aivika.Internal.Cont
import Simulation.Aivika.Internal.Process
import Simulation.Aivika.Signal
data Task a =
Task { Task a -> ProcessId
taskId :: ProcessId,
Task a -> IORef (Maybe (TaskResult a))
taskResultRef :: IORef (Maybe (TaskResult a)),
Task a -> Signal (TaskResult a)
taskResultReceived :: Signal (TaskResult a)
}
data TaskResult a = TaskCompleted a
| TaskError IOException
| TaskCancelled
tryGetTaskResult :: Task a -> Event (Maybe (TaskResult a))
tryGetTaskResult :: Task a -> Event (Maybe (TaskResult a))
tryGetTaskResult Task a
t =
(Point -> IO (Maybe (TaskResult a)))
-> Event (Maybe (TaskResult a))
forall a. (Point -> IO a) -> Event a
Event ((Point -> IO (Maybe (TaskResult a)))
-> Event (Maybe (TaskResult a)))
-> (Point -> IO (Maybe (TaskResult a)))
-> Event (Maybe (TaskResult a))
forall a b. (a -> b) -> a -> b
$ \Point
p -> IORef (Maybe (TaskResult a)) -> IO (Maybe (TaskResult a))
forall a. IORef a -> IO a
readIORef (Task a -> IORef (Maybe (TaskResult a))
forall a. Task a -> IORef (Maybe (TaskResult a))
taskResultRef Task a
t)
taskResult :: Task a -> Process (TaskResult a)
taskResult :: Task a -> Process (TaskResult a)
taskResult Task a
t =
do Maybe (TaskResult a)
x <- IO (Maybe (TaskResult a)) -> Process (Maybe (TaskResult a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (TaskResult a)) -> Process (Maybe (TaskResult a)))
-> IO (Maybe (TaskResult a)) -> Process (Maybe (TaskResult a))
forall a b. (a -> b) -> a -> b
$ IORef (Maybe (TaskResult a)) -> IO (Maybe (TaskResult a))
forall a. IORef a -> IO a
readIORef (Task a -> IORef (Maybe (TaskResult a))
forall a. Task a -> IORef (Maybe (TaskResult a))
taskResultRef Task a
t)
case Maybe (TaskResult a)
x of
Just TaskResult a
x -> TaskResult a -> Process (TaskResult a)
forall (m :: * -> *) a. Monad m => a -> m a
return TaskResult a
x
Maybe (TaskResult a)
Nothing -> Signal (TaskResult a) -> Process (TaskResult a)
forall a. Signal a -> Process a
processAwait (Task a -> Signal (TaskResult a)
forall a. Task a -> Signal (TaskResult a)
taskResultReceived Task a
t)
cancelTask :: Task a -> Event ()
cancelTask :: Task a -> Event ()
cancelTask Task a
t =
ProcessId -> Event ()
cancelProcessWithId (Task a -> ProcessId
forall a. Task a -> ProcessId
taskId Task a
t)
taskCancelled :: Task a -> Event Bool
taskCancelled :: Task a -> Event Bool
taskCancelled Task a
t =
ProcessId -> Event Bool
processCancelled (Task a -> ProcessId
forall a. Task a -> ProcessId
taskId Task a
t)
newTaskUsingId :: ProcessId -> Process a -> Event (Task a, Process ())
newTaskUsingId :: ProcessId -> Process a -> Event (Task a, Process ())
newTaskUsingId ProcessId
pid Process a
p =
do IORef (Maybe (TaskResult a))
r <- IO (IORef (Maybe (TaskResult a)))
-> Event (IORef (Maybe (TaskResult a)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Maybe (TaskResult a)))
-> Event (IORef (Maybe (TaskResult a))))
-> IO (IORef (Maybe (TaskResult a)))
-> Event (IORef (Maybe (TaskResult a)))
forall a b. (a -> b) -> a -> b
$ Maybe (TaskResult a) -> IO (IORef (Maybe (TaskResult a)))
forall a. a -> IO (IORef a)
newIORef Maybe (TaskResult a)
forall a. Maybe a
Nothing
SignalSource (TaskResult a)
s <- Simulation (SignalSource (TaskResult a))
-> Event (SignalSource (TaskResult a))
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation Simulation (SignalSource (TaskResult a))
forall a. Simulation (SignalSource a)
newSignalSource
let t :: Task a
t = Task :: forall a.
ProcessId
-> IORef (Maybe (TaskResult a)) -> Signal (TaskResult a) -> Task a
Task { taskId :: ProcessId
taskId = ProcessId
pid,
taskResultRef :: IORef (Maybe (TaskResult a))
taskResultRef = IORef (Maybe (TaskResult a))
r,
taskResultReceived :: Signal (TaskResult a)
taskResultReceived = SignalSource (TaskResult a) -> Signal (TaskResult a)
forall a. SignalSource a -> Signal a
publishSignal SignalSource (TaskResult a)
s }
let m :: Process ()
m =
do IORef (TaskResult a)
v <- IO (IORef (TaskResult a)) -> Process (IORef (TaskResult a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (TaskResult a)) -> Process (IORef (TaskResult a)))
-> IO (IORef (TaskResult a)) -> Process (IORef (TaskResult a))
forall a b. (a -> b) -> a -> b
$ TaskResult a -> IO (IORef (TaskResult a))
forall a. a -> IO (IORef a)
newIORef TaskResult a
forall a. TaskResult a
TaskCancelled
Process () -> Process () -> Process ()
forall a b. Process a -> Process b -> Process a
finallyProcess
(Process () -> (IOException -> Process ()) -> Process ()
forall e a.
Exception e =>
Process a -> (e -> Process a) -> Process a
catchProcess
(do a
a <- Process a
p
IO () -> Process ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ IORef (TaskResult a) -> TaskResult a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (TaskResult a)
v (a -> TaskResult a
forall a. a -> TaskResult a
TaskCompleted a
a))
(\IOException
e ->
IO () -> Process ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ IORef (TaskResult a) -> TaskResult a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (TaskResult a)
v (IOException -> TaskResult a
forall a. IOException -> TaskResult a
TaskError IOException
e)))
(Event () -> Process ()
forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent (Event () -> Process ()) -> Event () -> Process ()
forall a b. (a -> b) -> a -> b
$
do TaskResult a
x <- IO (TaskResult a) -> Event (TaskResult a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TaskResult a) -> Event (TaskResult a))
-> IO (TaskResult a) -> Event (TaskResult a)
forall a b. (a -> b) -> a -> b
$ IORef (TaskResult a) -> IO (TaskResult a)
forall a. IORef a -> IO a
readIORef IORef (TaskResult a)
v
IO () -> Event ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Event ()) -> IO () -> Event ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe (TaskResult a)) -> Maybe (TaskResult a) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe (TaskResult a))
r (TaskResult a -> Maybe (TaskResult a)
forall a. a -> Maybe a
Just TaskResult a
x)
SignalSource (TaskResult a) -> TaskResult a -> Event ()
forall a. SignalSource a -> a -> Event ()
triggerSignal SignalSource (TaskResult a)
s TaskResult a
x)
(Task a, Process ()) -> Event (Task a, Process ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Task a
t, Process ()
m)
runTaskUsingId :: ProcessId -> Process a -> Event (Task a)
runTaskUsingId :: ProcessId -> Process a -> Event (Task a)
runTaskUsingId ProcessId
pid Process a
p =
do (Task a
t, Process ()
m) <- ProcessId -> Process a -> Event (Task a, Process ())
forall a. ProcessId -> Process a -> Event (Task a, Process ())
newTaskUsingId ProcessId
pid Process a
p
ProcessId -> Process () -> Event ()
runProcessUsingId ProcessId
pid Process ()
m
Task a -> Event (Task a)
forall (m :: * -> *) a. Monad m => a -> m a
return Task a
t
runTask :: Process a -> Event (Task a)
runTask :: Process a -> Event (Task a)
runTask Process a
p =
do ProcessId
pid <- Simulation ProcessId -> Event ProcessId
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation Simulation ProcessId
newProcessId
ProcessId -> Process a -> Event (Task a)
forall a. ProcessId -> Process a -> Event (Task a)
runTaskUsingId ProcessId
pid Process a
p
enqueueTaskUsingId :: Double -> ProcessId -> Process a -> Event (Task a)
enqueueTaskUsingId :: Double -> ProcessId -> Process a -> Event (Task a)
enqueueTaskUsingId Double
time ProcessId
pid Process a
p =
do (Task a
t, Process ()
m) <- ProcessId -> Process a -> Event (Task a, Process ())
forall a. ProcessId -> Process a -> Event (Task a, Process ())
newTaskUsingId ProcessId
pid Process a
p
Double -> ProcessId -> Process () -> Event ()
enqueueProcessUsingId Double
time ProcessId
pid Process ()
m
Task a -> Event (Task a)
forall (m :: * -> *) a. Monad m => a -> m a
return Task a
t
enqueueTask :: Double -> Process a -> Event (Task a)
enqueueTask :: Double -> Process a -> Event (Task a)
enqueueTask Double
time Process a
p =
do ProcessId
pid <- Simulation ProcessId -> Event ProcessId
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation Simulation ProcessId
newProcessId
Double -> ProcessId -> Process a -> Event (Task a)
forall a. Double -> ProcessId -> Process a -> Event (Task a)
enqueueTaskUsingId Double
time ProcessId
pid Process a
p
spawnTaskUsingId :: ProcessId -> Process a -> Process (Task a)
spawnTaskUsingId :: ProcessId -> Process a -> Process (Task a)
spawnTaskUsingId = ContCancellation -> ProcessId -> Process a -> Process (Task a)
forall a.
ContCancellation -> ProcessId -> Process a -> Process (Task a)
spawnTaskUsingIdWith ContCancellation
CancelTogether
spawnTask :: Process a -> Process (Task a)
spawnTask :: Process a -> Process (Task a)
spawnTask = ContCancellation -> Process a -> Process (Task a)
forall a. ContCancellation -> Process a -> Process (Task a)
spawnTaskWith ContCancellation
CancelTogether
spawnTaskUsingIdWith :: ContCancellation -> ProcessId -> Process a -> Process (Task a)
spawnTaskUsingIdWith :: ContCancellation -> ProcessId -> Process a -> Process (Task a)
spawnTaskUsingIdWith ContCancellation
cancellation ProcessId
pid Process a
p =
do (Task a
t, Process ()
m) <- Event (Task a, Process ()) -> Process (Task a, Process ())
forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent (Event (Task a, Process ()) -> Process (Task a, Process ()))
-> Event (Task a, Process ()) -> Process (Task a, Process ())
forall a b. (a -> b) -> a -> b
$ ProcessId -> Process a -> Event (Task a, Process ())
forall a. ProcessId -> Process a -> Event (Task a, Process ())
newTaskUsingId ProcessId
pid Process a
p
ContCancellation -> ProcessId -> Process () -> Process ()
spawnProcessUsingIdWith ContCancellation
cancellation ProcessId
pid Process ()
m
Task a -> Process (Task a)
forall (m :: * -> *) a. Monad m => a -> m a
return Task a
t
spawnTaskWith :: ContCancellation -> Process a -> Process (Task a)
spawnTaskWith :: ContCancellation -> Process a -> Process (Task a)
spawnTaskWith ContCancellation
cancellation Process a
p =
do ProcessId
pid <- Simulation ProcessId -> Process ProcessId
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation Simulation ProcessId
newProcessId
ContCancellation -> ProcessId -> Process a -> Process (Task a)
forall a.
ContCancellation -> ProcessId -> Process a -> Process (Task a)
spawnTaskUsingIdWith ContCancellation
cancellation ProcessId
pid Process a
p
taskProcess :: Task a -> Process a
taskProcess :: Task a -> Process a
taskProcess Task a
t =
do TaskResult a
x <- Process (TaskResult a) -> Process () -> Process (TaskResult a)
forall a b. Process a -> Process b -> Process a
finallyProcess
(Task a -> Process (TaskResult a)
forall a. Task a -> Process (TaskResult a)
taskResult Task a
t)
(do ProcessId
pid <- Process ProcessId
processId
Event () -> Process ()
forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent (Event () -> Process ()) -> Event () -> Process ()
forall a b. (a -> b) -> a -> b
$
do Bool
cancelled <- ProcessId -> Event Bool
processCancelled ProcessId
pid
Bool -> Event () -> Event ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
cancelled (Event () -> Event ()) -> Event () -> Event ()
forall a b. (a -> b) -> a -> b
$
Task a -> Event ()
forall a. Task a -> Event ()
cancelTask Task a
t)
case TaskResult a
x of
TaskCompleted a
a -> a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
TaskError IOException
e -> IOException -> Process a
forall e a. Exception e => e -> Process a
throwProcess IOException
e
TaskResult a
TaskCancelled -> Process a
forall a. Process a
cancelProcess
taskParallelResult :: Task a -> Task a -> Process (TaskResult a, Task a)
taskParallelResult :: Task a -> Task a -> Process (TaskResult a, Task a)
taskParallelResult Task a
t1 Task a
t2 =
do Maybe (TaskResult a)
x1 <- IO (Maybe (TaskResult a)) -> Process (Maybe (TaskResult a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (TaskResult a)) -> Process (Maybe (TaskResult a)))
-> IO (Maybe (TaskResult a)) -> Process (Maybe (TaskResult a))
forall a b. (a -> b) -> a -> b
$ IORef (Maybe (TaskResult a)) -> IO (Maybe (TaskResult a))
forall a. IORef a -> IO a
readIORef (Task a -> IORef (Maybe (TaskResult a))
forall a. Task a -> IORef (Maybe (TaskResult a))
taskResultRef Task a
t1)
case Maybe (TaskResult a)
x1 of
Just TaskResult a
x1 -> (TaskResult a, Task a) -> Process (TaskResult a, Task a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TaskResult a
x1, Task a
t2)
Maybe (TaskResult a)
Nothing ->
do Maybe (TaskResult a)
x2 <- IO (Maybe (TaskResult a)) -> Process (Maybe (TaskResult a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (TaskResult a)) -> Process (Maybe (TaskResult a)))
-> IO (Maybe (TaskResult a)) -> Process (Maybe (TaskResult a))
forall a b. (a -> b) -> a -> b
$ IORef (Maybe (TaskResult a)) -> IO (Maybe (TaskResult a))
forall a. IORef a -> IO a
readIORef (Task a -> IORef (Maybe (TaskResult a))
forall a. Task a -> IORef (Maybe (TaskResult a))
taskResultRef Task a
t2)
case Maybe (TaskResult a)
x2 of
Just TaskResult a
x2 -> (TaskResult a, Task a) -> Process (TaskResult a, Task a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TaskResult a
x2, Task a
t1)
Maybe (TaskResult a)
Nothing ->
do let s1 :: Signal (Either (TaskResult a) b)
s1 = (TaskResult a -> Either (TaskResult a) b)
-> Signal (TaskResult a) -> Signal (Either (TaskResult a) b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap TaskResult a -> Either (TaskResult a) b
forall a b. a -> Either a b
Left (Signal (TaskResult a) -> Signal (Either (TaskResult a) b))
-> Signal (TaskResult a) -> Signal (Either (TaskResult a) b)
forall a b. (a -> b) -> a -> b
$ Task a -> Signal (TaskResult a)
forall a. Task a -> Signal (TaskResult a)
taskResultReceived Task a
t1
s2 :: Signal (Either a (TaskResult a))
s2 = (TaskResult a -> Either a (TaskResult a))
-> Signal (TaskResult a) -> Signal (Either a (TaskResult a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap TaskResult a -> Either a (TaskResult a)
forall a b. b -> Either a b
Right (Signal (TaskResult a) -> Signal (Either a (TaskResult a)))
-> Signal (TaskResult a) -> Signal (Either a (TaskResult a))
forall a b. (a -> b) -> a -> b
$ Task a -> Signal (TaskResult a)
forall a. Task a -> Signal (TaskResult a)
taskResultReceived Task a
t2
Either (TaskResult a) (TaskResult a)
x <- Signal (Either (TaskResult a) (TaskResult a))
-> Process (Either (TaskResult a) (TaskResult a))
forall a. Signal a -> Process a
processAwait (Signal (Either (TaskResult a) (TaskResult a))
-> Process (Either (TaskResult a) (TaskResult a)))
-> Signal (Either (TaskResult a) (TaskResult a))
-> Process (Either (TaskResult a) (TaskResult a))
forall a b. (a -> b) -> a -> b
$ Signal (Either (TaskResult a) (TaskResult a))
forall b. Signal (Either (TaskResult a) b)
s1 Signal (Either (TaskResult a) (TaskResult a))
-> Signal (Either (TaskResult a) (TaskResult a))
-> Signal (Either (TaskResult a) (TaskResult a))
forall a. Semigroup a => a -> a -> a
<> Signal (Either (TaskResult a) (TaskResult a))
forall a. Signal (Either a (TaskResult a))
s2
case Either (TaskResult a) (TaskResult a)
x of
Left TaskResult a
x1 -> (TaskResult a, Task a) -> Process (TaskResult a, Task a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TaskResult a
x1, Task a
t2)
Right TaskResult a
x2 -> (TaskResult a, Task a) -> Process (TaskResult a, Task a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TaskResult a
x2, Task a
t1)
taskParallelProcess :: Task a -> Task a -> Process (a, Task a)
taskParallelProcess :: Task a -> Task a -> Process (a, Task a)
taskParallelProcess Task a
t1 Task a
t2 =
do (TaskResult a
x, Task a
t) <-
Process (TaskResult a, Task a)
-> Process () -> Process (TaskResult a, Task a)
forall a b. Process a -> Process b -> Process a
finallyProcess
(Task a -> Task a -> Process (TaskResult a, Task a)
forall a. Task a -> Task a -> Process (TaskResult a, Task a)
taskParallelResult Task a
t1 Task a
t2)
(do ProcessId
pid <- Process ProcessId
processId
Event () -> Process ()
forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent (Event () -> Process ()) -> Event () -> Process ()
forall a b. (a -> b) -> a -> b
$
do Bool
cancelled <- ProcessId -> Event Bool
processCancelled ProcessId
pid
Bool -> Event () -> Event ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
cancelled (Event () -> Event ()) -> Event () -> Event ()
forall a b. (a -> b) -> a -> b
$
do Task a -> Event ()
forall a. Task a -> Event ()
cancelTask Task a
t1
Task a -> Event ()
forall a. Task a -> Event ()
cancelTask Task a
t2)
case TaskResult a
x of
TaskCompleted a
a -> (a, Task a) -> Process (a, Task a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Task a
t)
TaskError IOException
e ->
do Event () -> Process ()
forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent (Event () -> Process ()) -> Event () -> Process ()
forall a b. (a -> b) -> a -> b
$ Task a -> Event ()
forall a. Task a -> Event ()
cancelTask Task a
t
IOException -> Process (a, Task a)
forall e a. Exception e => e -> Process a
throwProcess IOException
e
TaskResult a
TaskCancelled ->
do Event () -> Process ()
forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent (Event () -> Process ()) -> Event () -> Process ()
forall a b. (a -> b) -> a -> b
$ Task a -> Event ()
forall a. Task a -> Event ()
cancelTask Task a
t
Process (a, Task a)
forall a. Process a
cancelProcess