-- | -- Module : Simulation.Aivika.Trans.Task -- Copyright : Copyright (c) 2009-2017, David Sorokin <david.sorokin@gmail.com> -- License : BSD3 -- Maintainer : David Sorokin <david.sorokin@gmail.com> -- Stability : experimental -- Tested with: GHC 8.0.1 -- -- The 'Task' value represents a process that was already started in background. -- We can check the completion of the task, receive notifications about changing -- its state and even suspend an outer process awaiting the final result of the task. -- It complements the 'Process' monad as it allows immediately continuing the main -- computation without suspension. -- module Simulation.Aivika.Trans.Task (-- * Task Task, TaskResult(..), taskId, tryGetTaskResult, taskResult, taskResultReceived, taskProcess, cancelTask, taskCancelled, -- * Running Task runTask, runTaskUsingId, -- * Spawning Tasks spawnTask, spawnTaskUsingId, spawnTaskWith, spawnTaskUsingIdWith, -- * Enqueueing Task enqueueTask, enqueueTaskUsingId, -- * Parallel Tasks taskParallelResult, taskParallelProcess) where import Data.Monoid import Control.Monad import Control.Monad.Trans import Control.Exception import Simulation.Aivika.Trans.Ref.Base import Simulation.Aivika.Trans.DES import Simulation.Aivika.Trans.Internal.Specs import Simulation.Aivika.Trans.Internal.Parameter import Simulation.Aivika.Trans.Internal.Simulation import Simulation.Aivika.Trans.Internal.Dynamics import Simulation.Aivika.Trans.Internal.Event import Simulation.Aivika.Trans.Internal.Cont import Simulation.Aivika.Trans.Internal.Process import Simulation.Aivika.Trans.Signal -- | The task represents a process that was already started in background. data Task m a = Task { Task m a -> ProcessId m taskId :: ProcessId m, -- ^ Return an identifier for the process that was launched -- in background for this task. Task m a -> Ref m (Maybe (TaskResult a)) taskResultRef :: Ref m (Maybe (TaskResult a)), -- ^ It contains the result of the computation. Task m a -> Signal m (TaskResult a) taskResultReceived :: Signal m (TaskResult a) -- ^ Return a signal that notifies about receiving -- the result of the task. } -- | Represents the result of the task. data TaskResult a = TaskCompleted a -- ^ the task was successfully completed and -- it returned the specified result | TaskError SomeException -- ^ the specified exception was raised when performing the task. | TaskCancelled -- ^ the task was cancelled -- | Try to get the task result immediately without suspension. tryGetTaskResult :: MonadDES m => Task m a -> Event m (Maybe (TaskResult a)) {-# INLINABLE tryGetTaskResult #-} tryGetTaskResult :: Task m a -> Event m (Maybe (TaskResult a)) tryGetTaskResult Task m a t = Ref m (Maybe (TaskResult a)) -> Event m (Maybe (TaskResult a)) forall (m :: * -> *) a. MonadRef m => Ref m a -> Event m a readRef (Task m a -> Ref m (Maybe (TaskResult a)) forall (m :: * -> *) a. Task m a -> Ref m (Maybe (TaskResult a)) taskResultRef Task m a t) -- | Return the task result suspending the outer process if required. taskResult :: MonadDES m => Task m a -> Process m (TaskResult a) {-# INLINABLE taskResult #-} taskResult :: Task m a -> Process m (TaskResult a) taskResult Task m a t = do Maybe (TaskResult a) x <- Event m (Maybe (TaskResult a)) -> Process m (Maybe (TaskResult a)) forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. EventLift t m => Event m a -> t m a liftEvent (Event m (Maybe (TaskResult a)) -> Process m (Maybe (TaskResult a))) -> Event m (Maybe (TaskResult a)) -> Process m (Maybe (TaskResult a)) forall a b. (a -> b) -> a -> b $ Ref m (Maybe (TaskResult a)) -> Event m (Maybe (TaskResult a)) forall (m :: * -> *) a. MonadRef m => Ref m a -> Event m a readRef (Task m a -> Ref m (Maybe (TaskResult a)) forall (m :: * -> *) a. Task m a -> Ref m (Maybe (TaskResult a)) taskResultRef Task m a t) case Maybe (TaskResult a) x of Just TaskResult a x -> TaskResult a -> Process m (TaskResult a) forall (m :: * -> *) a. Monad m => a -> m a return TaskResult a x Maybe (TaskResult a) Nothing -> Signal m (TaskResult a) -> Process m (TaskResult a) forall (m :: * -> *) a. MonadDES m => Signal m a -> Process m a processAwait (Task m a -> Signal m (TaskResult a) forall (m :: * -> *) a. Task m a -> Signal m (TaskResult a) taskResultReceived Task m a t) -- | Cancel the task. cancelTask :: MonadDES m => Task m a -> Event m () {-# INLINABLE cancelTask #-} cancelTask :: Task m a -> Event m () cancelTask Task m a t = ProcessId m -> Event m () forall (m :: * -> *). MonadDES m => ProcessId m -> Event m () cancelProcessWithId (Task m a -> ProcessId m forall (m :: * -> *) a. Task m a -> ProcessId m taskId Task m a t) -- | Test whether the task was cancelled. taskCancelled :: MonadDES m => Task m a -> Event m Bool {-# INLINABLE taskCancelled #-} taskCancelled :: Task m a -> Event m Bool taskCancelled Task m a t = ProcessId m -> Event m Bool forall (m :: * -> *). MonadDES m => ProcessId m -> Event m Bool processCancelled (Task m a -> ProcessId m forall (m :: * -> *) a. Task m a -> ProcessId m taskId Task m a t) -- | Create a task by the specified process and its identifier. newTaskUsingId :: MonadDES m => ProcessId m -> Process m a -> Event m (Task m a, Process m ()) {-# INLINABLE newTaskUsingId #-} newTaskUsingId :: ProcessId m -> Process m a -> Event m (Task m a, Process m ()) newTaskUsingId ProcessId m pid Process m a p = do Ref m (Maybe (TaskResult a)) r <- Simulation m (Ref m (Maybe (TaskResult a))) -> Event m (Ref m (Maybe (TaskResult a))) forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SimulationLift t m => Simulation m a -> t m a liftSimulation (Simulation m (Ref m (Maybe (TaskResult a))) -> Event m (Ref m (Maybe (TaskResult a)))) -> Simulation m (Ref m (Maybe (TaskResult a))) -> Event m (Ref m (Maybe (TaskResult a))) forall a b. (a -> b) -> a -> b $ Maybe (TaskResult a) -> Simulation m (Ref m (Maybe (TaskResult a))) forall (m :: * -> *) a. MonadRef m => a -> Simulation m (Ref m a) newRef Maybe (TaskResult a) forall a. Maybe a Nothing SignalSource m (TaskResult a) s <- Simulation m (SignalSource m (TaskResult a)) -> Event m (SignalSource m (TaskResult a)) forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SimulationLift t m => Simulation m a -> t m a liftSimulation Simulation m (SignalSource m (TaskResult a)) forall (m :: * -> *) a. MonadDES m => Simulation m (SignalSource m a) newSignalSource let t :: Task m a t = Task :: forall (m :: * -> *) a. ProcessId m -> Ref m (Maybe (TaskResult a)) -> Signal m (TaskResult a) -> Task m a Task { taskId :: ProcessId m taskId = ProcessId m pid, taskResultRef :: Ref m (Maybe (TaskResult a)) taskResultRef = Ref m (Maybe (TaskResult a)) r, taskResultReceived :: Signal m (TaskResult a) taskResultReceived = SignalSource m (TaskResult a) -> Signal m (TaskResult a) forall (m :: * -> *) a. SignalSource m a -> Signal m a publishSignal SignalSource m (TaskResult a) s } let m :: Process m () m = do Ref m (TaskResult a) v <- Simulation m (Ref m (TaskResult a)) -> Process m (Ref m (TaskResult a)) forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SimulationLift t m => Simulation m a -> t m a liftSimulation (Simulation m (Ref m (TaskResult a)) -> Process m (Ref m (TaskResult a))) -> Simulation m (Ref m (TaskResult a)) -> Process m (Ref m (TaskResult a)) forall a b. (a -> b) -> a -> b $ TaskResult a -> Simulation m (Ref m (TaskResult a)) forall (m :: * -> *) a. MonadRef m => a -> Simulation m (Ref m a) newRef TaskResult a forall a. TaskResult a TaskCancelled Process m () -> Process m () -> Process m () forall (m :: * -> *) a b. MonadDES m => Process m a -> Process m b -> Process m a finallyProcess (Process m () -> (SomeException -> Process m ()) -> Process m () forall (m :: * -> *) e a. (MonadDES m, Exception e) => Process m a -> (e -> Process m a) -> Process m a catchProcess (do a a <- Process m a p Event m () -> Process m () forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. EventLift t m => Event m a -> t m a liftEvent (Event m () -> Process m ()) -> Event m () -> Process m () forall a b. (a -> b) -> a -> b $ Ref m (TaskResult a) -> TaskResult a -> Event m () forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m () writeRef Ref m (TaskResult a) v (a -> TaskResult a forall a. a -> TaskResult a TaskCompleted a a)) (\SomeException e -> Event m () -> Process m () forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. EventLift t m => Event m a -> t m a liftEvent (Event m () -> Process m ()) -> Event m () -> Process m () forall a b. (a -> b) -> a -> b $ Ref m (TaskResult a) -> TaskResult a -> Event m () forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m () writeRef Ref m (TaskResult a) v (SomeException -> TaskResult a forall a. SomeException -> TaskResult a TaskError SomeException e))) (Event m () -> Process m () forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. EventLift t m => Event m a -> t m a liftEvent (Event m () -> Process m ()) -> Event m () -> Process m () forall a b. (a -> b) -> a -> b $ do TaskResult a x <- Ref m (TaskResult a) -> Event m (TaskResult a) forall (m :: * -> *) a. MonadRef m => Ref m a -> Event m a readRef Ref m (TaskResult a) v Ref m (Maybe (TaskResult a)) -> Maybe (TaskResult a) -> Event m () forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m () writeRef Ref m (Maybe (TaskResult a)) r (TaskResult a -> Maybe (TaskResult a) forall a. a -> Maybe a Just TaskResult a x) SignalSource m (TaskResult a) -> TaskResult a -> Event m () forall (m :: * -> *) a. SignalSource m a -> a -> Event m () triggerSignal SignalSource m (TaskResult a) s TaskResult a x) (Task m a, Process m ()) -> Event m (Task m a, Process m ()) forall (m :: * -> *) a. Monad m => a -> m a return (Task m a t, Process m () m) -- | Run the process with the specified identifier in background and -- return the corresponding task immediately. runTaskUsingId :: MonadDES m => ProcessId m -> Process m a -> Event m (Task m a) {-# INLINABLE runTaskUsingId #-} runTaskUsingId :: ProcessId m -> Process m a -> Event m (Task m a) runTaskUsingId ProcessId m pid Process m a p = do (Task m a t, Process m () m) <- ProcessId m -> Process m a -> Event m (Task m a, Process m ()) forall (m :: * -> *) a. MonadDES m => ProcessId m -> Process m a -> Event m (Task m a, Process m ()) newTaskUsingId ProcessId m pid Process m a p ProcessId m -> Process m () -> Event m () forall (m :: * -> *). MonadDES m => ProcessId m -> Process m () -> Event m () runProcessUsingId ProcessId m pid Process m () m Task m a -> Event m (Task m a) forall (m :: * -> *) a. Monad m => a -> m a return Task m a t -- | Run the process in background and return the corresponding task immediately. runTask :: MonadDES m => Process m a -> Event m (Task m a) {-# INLINABLE runTask #-} runTask :: Process m a -> Event m (Task m a) runTask Process m a p = do ProcessId m pid <- Simulation m (ProcessId m) -> Event m (ProcessId m) forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SimulationLift t m => Simulation m a -> t m a liftSimulation Simulation m (ProcessId m) forall (m :: * -> *). MonadDES m => Simulation m (ProcessId m) newProcessId ProcessId m -> Process m a -> Event m (Task m a) forall (m :: * -> *) a. MonadDES m => ProcessId m -> Process m a -> Event m (Task m a) runTaskUsingId ProcessId m pid Process m a p -- | Enqueue the process that will be started at the specified time with the given -- identifier from the event queue. It returns the corresponding task immediately. enqueueTaskUsingId :: MonadDES m => Double -> ProcessId m -> Process m a -> Event m (Task m a) {-# INLINABLE enqueueTaskUsingId #-} enqueueTaskUsingId :: Double -> ProcessId m -> Process m a -> Event m (Task m a) enqueueTaskUsingId Double time ProcessId m pid Process m a p = do (Task m a t, Process m () m) <- ProcessId m -> Process m a -> Event m (Task m a, Process m ()) forall (m :: * -> *) a. MonadDES m => ProcessId m -> Process m a -> Event m (Task m a, Process m ()) newTaskUsingId ProcessId m pid Process m a p Double -> ProcessId m -> Process m () -> Event m () forall (m :: * -> *). MonadDES m => Double -> ProcessId m -> Process m () -> Event m () enqueueProcessUsingId Double time ProcessId m pid Process m () m Task m a -> Event m (Task m a) forall (m :: * -> *) a. Monad m => a -> m a return Task m a t -- | Enqueue the process that will be started at the specified time from the event queue. -- It returns the corresponding task immediately. enqueueTask :: MonadDES m => Double -> Process m a -> Event m (Task m a) {-# INLINABLE enqueueTask #-} enqueueTask :: Double -> Process m a -> Event m (Task m a) enqueueTask Double time Process m a p = do ProcessId m pid <- Simulation m (ProcessId m) -> Event m (ProcessId m) forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SimulationLift t m => Simulation m a -> t m a liftSimulation Simulation m (ProcessId m) forall (m :: * -> *). MonadDES m => Simulation m (ProcessId m) newProcessId Double -> ProcessId m -> Process m a -> Event m (Task m a) forall (m :: * -> *) a. MonadDES m => Double -> ProcessId m -> Process m a -> Event m (Task m a) enqueueTaskUsingId Double time ProcessId m pid Process m a p -- | Run using the specified identifier a child process in background and return -- immediately the corresponding task. spawnTaskUsingId :: MonadDES m => ProcessId m -> Process m a -> Process m (Task m a) {-# INLINABLE spawnTaskUsingId #-} spawnTaskUsingId :: ProcessId m -> Process m a -> Process m (Task m a) spawnTaskUsingId = ContCancellation -> ProcessId m -> Process m a -> Process m (Task m a) forall (m :: * -> *) a. MonadDES m => ContCancellation -> ProcessId m -> Process m a -> Process m (Task m a) spawnTaskUsingIdWith ContCancellation CancelTogether -- | Run a child process in background and return immediately the corresponding task. spawnTask :: MonadDES m => Process m a -> Process m (Task m a) {-# INLINABLE spawnTask #-} spawnTask :: Process m a -> Process m (Task m a) spawnTask = ContCancellation -> Process m a -> Process m (Task m a) forall (m :: * -> *) a. MonadDES m => ContCancellation -> Process m a -> Process m (Task m a) spawnTaskWith ContCancellation CancelTogether -- | Run using the specified identifier a child process in background and return -- immediately the corresponding task. spawnTaskUsingIdWith :: MonadDES m => ContCancellation -> ProcessId m -> Process m a -> Process m (Task m a) {-# INLINABLE spawnTaskUsingIdWith #-} spawnTaskUsingIdWith :: ContCancellation -> ProcessId m -> Process m a -> Process m (Task m a) spawnTaskUsingIdWith ContCancellation cancellation ProcessId m pid Process m a p = do (Task m a t, Process m () m) <- Event m (Task m a, Process m ()) -> Process m (Task m a, Process m ()) forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. EventLift t m => Event m a -> t m a liftEvent (Event m (Task m a, Process m ()) -> Process m (Task m a, Process m ())) -> Event m (Task m a, Process m ()) -> Process m (Task m a, Process m ()) forall a b. (a -> b) -> a -> b $ ProcessId m -> Process m a -> Event m (Task m a, Process m ()) forall (m :: * -> *) a. MonadDES m => ProcessId m -> Process m a -> Event m (Task m a, Process m ()) newTaskUsingId ProcessId m pid Process m a p ContCancellation -> ProcessId m -> Process m () -> Process m () forall (m :: * -> *). MonadDES m => ContCancellation -> ProcessId m -> Process m () -> Process m () spawnProcessUsingIdWith ContCancellation cancellation ProcessId m pid Process m () m Task m a -> Process m (Task m a) forall (m :: * -> *) a. Monad m => a -> m a return Task m a t -- | Run a child process in background and return immediately the corresponding task. spawnTaskWith :: MonadDES m => ContCancellation -> Process m a -> Process m (Task m a) {-# INLINABLE spawnTaskWith #-} spawnTaskWith :: ContCancellation -> Process m a -> Process m (Task m a) spawnTaskWith ContCancellation cancellation Process m a p = do ProcessId m pid <- Simulation m (ProcessId m) -> Process m (ProcessId m) forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. SimulationLift t m => Simulation m a -> t m a liftSimulation Simulation m (ProcessId m) forall (m :: * -> *). MonadDES m => Simulation m (ProcessId m) newProcessId ContCancellation -> ProcessId m -> Process m a -> Process m (Task m a) forall (m :: * -> *) a. MonadDES m => ContCancellation -> ProcessId m -> Process m a -> Process m (Task m a) spawnTaskUsingIdWith ContCancellation cancellation ProcessId m pid Process m a p -- | Return an outer process that behaves like the task itself, for example, -- when the task is cancelled if the outer process is cancelled. taskProcess :: MonadDES m => Task m a -> Process m a {-# INLINABLE taskProcess #-} taskProcess :: Task m a -> Process m a taskProcess Task m a t = do TaskResult a x <- Process m (TaskResult a) -> Process m () -> Process m (TaskResult a) forall (m :: * -> *) a b. MonadDES m => Process m a -> Process m b -> Process m a finallyProcess (Task m a -> Process m (TaskResult a) forall (m :: * -> *) a. MonadDES m => Task m a -> Process m (TaskResult a) taskResult Task m a t) (do ProcessId m pid <- Process m (ProcessId m) forall (m :: * -> *). MonadDES m => Process m (ProcessId m) processId Event m () -> Process m () forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. EventLift t m => Event m a -> t m a liftEvent (Event m () -> Process m ()) -> Event m () -> Process m () forall a b. (a -> b) -> a -> b $ do Bool cancelled <- ProcessId m -> Event m Bool forall (m :: * -> *). MonadDES m => ProcessId m -> Event m Bool processCancelled ProcessId m pid Bool -> Event m () -> Event m () forall (f :: * -> *). Applicative f => Bool -> f () -> f () when Bool cancelled (Event m () -> Event m ()) -> Event m () -> Event m () forall a b. (a -> b) -> a -> b $ Task m a -> Event m () forall (m :: * -> *) a. MonadDES m => Task m a -> Event m () cancelTask Task m a t) case TaskResult a x of TaskCompleted a a -> a -> Process m a forall (m :: * -> *) a. Monad m => a -> m a return a a TaskError SomeException e -> SomeException -> Process m a forall (m :: * -> *) e a. (MonadDES m, Exception e) => e -> Process m a throwProcess SomeException e TaskResult a TaskCancelled -> Process m a forall (m :: * -> *) a. MonadDES m => Process m a cancelProcess -- | Return the result of two parallel tasks. taskParallelResult :: MonadDES m => Task m a -> Task m a -> Process m (TaskResult a, Task m a) {-# INLINABLE taskParallelResult #-} taskParallelResult :: Task m a -> Task m a -> Process m (TaskResult a, Task m a) taskParallelResult Task m a t1 Task m a t2 = do Maybe (TaskResult a) x1 <- Event m (Maybe (TaskResult a)) -> Process m (Maybe (TaskResult a)) forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. EventLift t m => Event m a -> t m a liftEvent (Event m (Maybe (TaskResult a)) -> Process m (Maybe (TaskResult a))) -> Event m (Maybe (TaskResult a)) -> Process m (Maybe (TaskResult a)) forall a b. (a -> b) -> a -> b $ Ref m (Maybe (TaskResult a)) -> Event m (Maybe (TaskResult a)) forall (m :: * -> *) a. MonadRef m => Ref m a -> Event m a readRef (Task m a -> Ref m (Maybe (TaskResult a)) forall (m :: * -> *) a. Task m a -> Ref m (Maybe (TaskResult a)) taskResultRef Task m a t1) case Maybe (TaskResult a) x1 of Just TaskResult a x1 -> (TaskResult a, Task m a) -> Process m (TaskResult a, Task m a) forall (m :: * -> *) a. Monad m => a -> m a return (TaskResult a x1, Task m a t2) Maybe (TaskResult a) Nothing -> do Maybe (TaskResult a) x2 <- Event m (Maybe (TaskResult a)) -> Process m (Maybe (TaskResult a)) forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. EventLift t m => Event m a -> t m a liftEvent (Event m (Maybe (TaskResult a)) -> Process m (Maybe (TaskResult a))) -> Event m (Maybe (TaskResult a)) -> Process m (Maybe (TaskResult a)) forall a b. (a -> b) -> a -> b $ Ref m (Maybe (TaskResult a)) -> Event m (Maybe (TaskResult a)) forall (m :: * -> *) a. MonadRef m => Ref m a -> Event m a readRef (Task m a -> Ref m (Maybe (TaskResult a)) forall (m :: * -> *) a. Task m a -> Ref m (Maybe (TaskResult a)) taskResultRef Task m a t2) case Maybe (TaskResult a) x2 of Just TaskResult a x2 -> (TaskResult a, Task m a) -> Process m (TaskResult a, Task m a) forall (m :: * -> *) a. Monad m => a -> m a return (TaskResult a x2, Task m a t1) Maybe (TaskResult a) Nothing -> do let s1 :: Signal m (Either (TaskResult a) b) s1 = (TaskResult a -> Either (TaskResult a) b) -> Signal m (TaskResult a) -> Signal m (Either (TaskResult a) b) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b fmap TaskResult a -> Either (TaskResult a) b forall a b. a -> Either a b Left (Signal m (TaskResult a) -> Signal m (Either (TaskResult a) b)) -> Signal m (TaskResult a) -> Signal m (Either (TaskResult a) b) forall a b. (a -> b) -> a -> b $ Task m a -> Signal m (TaskResult a) forall (m :: * -> *) a. Task m a -> Signal m (TaskResult a) taskResultReceived Task m a t1 s2 :: Signal m (Either a (TaskResult a)) s2 = (TaskResult a -> Either a (TaskResult a)) -> Signal m (TaskResult a) -> Signal m (Either a (TaskResult a)) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b fmap TaskResult a -> Either a (TaskResult a) forall a b. b -> Either a b Right (Signal m (TaskResult a) -> Signal m (Either a (TaskResult a))) -> Signal m (TaskResult a) -> Signal m (Either a (TaskResult a)) forall a b. (a -> b) -> a -> b $ Task m a -> Signal m (TaskResult a) forall (m :: * -> *) a. Task m a -> Signal m (TaskResult a) taskResultReceived Task m a t2 Either (TaskResult a) (TaskResult a) x <- Signal m (Either (TaskResult a) (TaskResult a)) -> Process m (Either (TaskResult a) (TaskResult a)) forall (m :: * -> *) a. MonadDES m => Signal m a -> Process m a processAwait (Signal m (Either (TaskResult a) (TaskResult a)) -> Process m (Either (TaskResult a) (TaskResult a))) -> Signal m (Either (TaskResult a) (TaskResult a)) -> Process m (Either (TaskResult a) (TaskResult a)) forall a b. (a -> b) -> a -> b $ Signal m (Either (TaskResult a) (TaskResult a)) forall b. Signal m (Either (TaskResult a) b) s1 Signal m (Either (TaskResult a) (TaskResult a)) -> Signal m (Either (TaskResult a) (TaskResult a)) -> Signal m (Either (TaskResult a) (TaskResult a)) forall a. Semigroup a => a -> a -> a <> Signal m (Either (TaskResult a) (TaskResult a)) forall a. Signal m (Either a (TaskResult a)) s2 case Either (TaskResult a) (TaskResult a) x of Left TaskResult a x1 -> (TaskResult a, Task m a) -> Process m (TaskResult a, Task m a) forall (m :: * -> *) a. Monad m => a -> m a return (TaskResult a x1, Task m a t2) Right TaskResult a x2 -> (TaskResult a, Task m a) -> Process m (TaskResult a, Task m a) forall (m :: * -> *) a. Monad m => a -> m a return (TaskResult a x2, Task m a t1) -- | Return an outer process for two parallel tasks returning the result of -- the first finished task and the rest task in pair. taskParallelProcess :: MonadDES m => Task m a -> Task m a -> Process m (a, Task m a) {-# INLINABLE taskParallelProcess #-} taskParallelProcess :: Task m a -> Task m a -> Process m (a, Task m a) taskParallelProcess Task m a t1 Task m a t2 = do (TaskResult a x, Task m a t) <- Process m (TaskResult a, Task m a) -> Process m () -> Process m (TaskResult a, Task m a) forall (m :: * -> *) a b. MonadDES m => Process m a -> Process m b -> Process m a finallyProcess (Task m a -> Task m a -> Process m (TaskResult a, Task m a) forall (m :: * -> *) a. MonadDES m => Task m a -> Task m a -> Process m (TaskResult a, Task m a) taskParallelResult Task m a t1 Task m a t2) (do ProcessId m pid <- Process m (ProcessId m) forall (m :: * -> *). MonadDES m => Process m (ProcessId m) processId Event m () -> Process m () forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. EventLift t m => Event m a -> t m a liftEvent (Event m () -> Process m ()) -> Event m () -> Process m () forall a b. (a -> b) -> a -> b $ do Bool cancelled <- ProcessId m -> Event m Bool forall (m :: * -> *). MonadDES m => ProcessId m -> Event m Bool processCancelled ProcessId m pid Bool -> Event m () -> Event m () forall (f :: * -> *). Applicative f => Bool -> f () -> f () when Bool cancelled (Event m () -> Event m ()) -> Event m () -> Event m () forall a b. (a -> b) -> a -> b $ do Task m a -> Event m () forall (m :: * -> *) a. MonadDES m => Task m a -> Event m () cancelTask Task m a t1 Task m a -> Event m () forall (m :: * -> *) a. MonadDES m => Task m a -> Event m () cancelTask Task m a t2) case TaskResult a x of TaskCompleted a a -> (a, Task m a) -> Process m (a, Task m a) forall (m :: * -> *) a. Monad m => a -> m a return (a a, Task m a t) TaskError SomeException e -> do Event m () -> Process m () forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. EventLift t m => Event m a -> t m a liftEvent (Event m () -> Process m ()) -> Event m () -> Process m () forall a b. (a -> b) -> a -> b $ Task m a -> Event m () forall (m :: * -> *) a. MonadDES m => Task m a -> Event m () cancelTask Task m a t SomeException -> Process m (a, Task m a) forall (m :: * -> *) e a. (MonadDES m, Exception e) => e -> Process m a throwProcess SomeException e TaskResult a TaskCancelled -> do Event m () -> Process m () forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. EventLift t m => Event m a -> t m a liftEvent (Event m () -> Process m ()) -> Event m () -> Process m () forall a b. (a -> b) -> a -> b $ Task m a -> Event m () forall (m :: * -> *) a. MonadDES m => Task m a -> Event m () cancelTask Task m a t Process m (a, Task m a) forall (m :: * -> *) a. MonadDES m => Process m a cancelProcess