-- |
-- Module     : Simulation.Aivika.Task
-- Copyright  : Copyright (c) 2009-2017, David Sorokin <david.sorokin@gmail.com>
-- License    : BSD3
-- Maintainer : David Sorokin <david.sorokin@gmail.com>
-- Stability  : experimental
-- Tested with: GHC 8.0.1
--
-- The 'Task' value represents a process that was already started in background.
-- We can check the completion of the task, receive notifications about changing
-- its state and even suspend an outer process awaiting the final result of the task.
-- It complements the 'Process' monad as it allows immediately continuing the main
-- computation without suspension.
--
module Simulation.Aivika.Task
       (-- * Task
        Task,
        TaskResult(..),
        taskId,
        tryGetTaskResult,
        taskResult,
        taskResultReceived,
        taskProcess,
        cancelTask,
        taskCancelled,
        -- * Running Task
        runTask,
        runTaskUsingId,
        -- * Spawning Tasks
        spawnTask,
        spawnTaskUsingId,
        spawnTaskWith,
        spawnTaskUsingIdWith,
        -- * Enqueueing Task
        enqueueTask,
        enqueueTaskUsingId,
        -- * Parallel Tasks
        taskParallelResult,
        taskParallelProcess) where

import Data.IORef
import Data.Monoid

import Control.Monad
import Control.Monad.Trans
import Control.Exception

import Simulation.Aivika.Internal.Simulation
import Simulation.Aivika.Internal.Dynamics
import Simulation.Aivika.Internal.Event
import Simulation.Aivika.Internal.Cont
import Simulation.Aivika.Internal.Process
import Simulation.Aivika.Signal

-- | The task represents a process that was already started in background.
data Task a =
  Task { forall a. Task a -> ProcessId
taskId :: ProcessId,
         -- ^ Return an identifier for the process that was launched
         -- in background for this task.
         forall a. Task a -> IORef (Maybe (TaskResult a))
taskResultRef :: IORef (Maybe (TaskResult a)),
         -- ^ It contains the result of the computation.
         forall a. Task a -> Signal (TaskResult a)
taskResultReceived :: Signal (TaskResult a)
         -- ^ Return a signal that notifies about receiving
         -- the result of the task.
       }

-- | Represents the result of the task.
data TaskResult a = TaskCompleted a
                    -- ^ the task was successfully completed and
                    -- it returned the specified result
                  | TaskError IOException
                    -- ^ the specified exception was raised when performing the task.
                  | TaskCancelled
                    -- ^ the task was cancelled

-- | Try to get the task result immediately without suspension.
tryGetTaskResult :: Task a -> Event (Maybe (TaskResult a))
tryGetTaskResult :: forall a. Task a -> Event (Maybe (TaskResult a))
tryGetTaskResult Task a
t =
  forall a. (Point -> IO a) -> Event a
Event forall a b. (a -> b) -> a -> b
$ \Point
p -> forall a. IORef a -> IO a
readIORef (forall a. Task a -> IORef (Maybe (TaskResult a))
taskResultRef Task a
t)

-- | Return the task result suspending the outer process if required.
taskResult :: Task a -> Process (TaskResult a)
taskResult :: forall a. Task a -> Process (TaskResult a)
taskResult Task a
t =
  do Maybe (TaskResult a)
x <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef (forall a. Task a -> IORef (Maybe (TaskResult a))
taskResultRef Task a
t)
     case Maybe (TaskResult a)
x of
       Just TaskResult a
x -> forall (m :: * -> *) a. Monad m => a -> m a
return TaskResult a
x
       Maybe (TaskResult a)
Nothing -> forall a. Signal a -> Process a
processAwait (forall a. Task a -> Signal (TaskResult a)
taskResultReceived Task a
t)

-- | Cancel the task.
cancelTask :: Task a -> Event ()
cancelTask :: forall a. Task a -> Event ()
cancelTask Task a
t =
  ProcessId -> Event ()
cancelProcessWithId (forall a. Task a -> ProcessId
taskId Task a
t)

-- | Test whether the task was cancelled.
taskCancelled :: Task a -> Event Bool
taskCancelled :: forall a. Task a -> Event Bool
taskCancelled Task a
t =
  ProcessId -> Event Bool
processCancelled (forall a. Task a -> ProcessId
taskId Task a
t)

-- | Create a task by the specified process and its identifier.
newTaskUsingId :: ProcessId -> Process a -> Event (Task a, Process ())
newTaskUsingId :: forall a. ProcessId -> Process a -> Event (Task a, Process ())
newTaskUsingId ProcessId
pid Process a
p =
  do IORef (Maybe (TaskResult a))
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef forall a. Maybe a
Nothing
     SignalSource (TaskResult a)
s <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a. Simulation (SignalSource a)
newSignalSource
     let t :: Task a
t = Task { taskId :: ProcessId
taskId = ProcessId
pid,
                    taskResultRef :: IORef (Maybe (TaskResult a))
taskResultRef = IORef (Maybe (TaskResult a))
r,
                    taskResultReceived :: Signal (TaskResult a)
taskResultReceived = forall a. SignalSource a -> Signal a
publishSignal SignalSource (TaskResult a)
s }
     let m :: Process ()
m =
           do IORef (TaskResult a)
v <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef forall a. TaskResult a
TaskCancelled
              forall a b. Process a -> Process b -> Process a
finallyProcess
                (forall e a.
Exception e =>
Process a -> (e -> Process a) -> Process a
catchProcess
                 (do a
a <- Process a
p
                     forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef (TaskResult a)
v (forall a. a -> TaskResult a
TaskCompleted a
a))
                 (\IOException
e ->
                   forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef (TaskResult a)
v (forall a. IOException -> TaskResult a
TaskError IOException
e)))
                (forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall a b. (a -> b) -> a -> b
$
                 do TaskResult a
x <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef (TaskResult a)
v
                    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe (TaskResult a))
r (forall a. a -> Maybe a
Just TaskResult a
x)
                    forall a. SignalSource a -> a -> Event ()
triggerSignal SignalSource (TaskResult a)
s TaskResult a
x)
     forall (m :: * -> *) a. Monad m => a -> m a
return (Task a
t, Process ()
m)

-- | Run the process with the specified identifier in background and
-- return the corresponding task immediately.
runTaskUsingId :: ProcessId -> Process a -> Event (Task a)
runTaskUsingId :: forall a. ProcessId -> Process a -> Event (Task a)
runTaskUsingId ProcessId
pid Process a
p =
  do (Task a
t, Process ()
m) <- forall a. ProcessId -> Process a -> Event (Task a, Process ())
newTaskUsingId ProcessId
pid Process a
p
     ProcessId -> Process () -> Event ()
runProcessUsingId ProcessId
pid Process ()
m
     forall (m :: * -> *) a. Monad m => a -> m a
return Task a
t

-- | Run the process in background and return the corresponding task immediately.
runTask :: Process a -> Event (Task a)
runTask :: forall a. Process a -> Event (Task a)
runTask Process a
p =
  do ProcessId
pid <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation Simulation ProcessId
newProcessId
     forall a. ProcessId -> Process a -> Event (Task a)
runTaskUsingId ProcessId
pid Process a
p

-- | Enqueue the process that will be started at the specified time with the given
-- identifier from the event queue. It returns the corresponding task immediately.
enqueueTaskUsingId :: Double -> ProcessId -> Process a -> Event (Task a)
enqueueTaskUsingId :: forall a. Double -> ProcessId -> Process a -> Event (Task a)
enqueueTaskUsingId Double
time ProcessId
pid Process a
p =
  do (Task a
t, Process ()
m) <- forall a. ProcessId -> Process a -> Event (Task a, Process ())
newTaskUsingId ProcessId
pid Process a
p
     Double -> ProcessId -> Process () -> Event ()
enqueueProcessUsingId Double
time ProcessId
pid Process ()
m
     forall (m :: * -> *) a. Monad m => a -> m a
return Task a
t

-- | Enqueue the process that will be started at the specified time from the event queue.
-- It returns the corresponding task immediately.
enqueueTask :: Double -> Process a -> Event (Task a)
enqueueTask :: forall a. Double -> Process a -> Event (Task a)
enqueueTask Double
time Process a
p =
  do ProcessId
pid <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation Simulation ProcessId
newProcessId
     forall a. Double -> ProcessId -> Process a -> Event (Task a)
enqueueTaskUsingId Double
time ProcessId
pid Process a
p

-- | Run using the specified identifier a child process in background and return
-- immediately the corresponding task.
spawnTaskUsingId :: ProcessId -> Process a -> Process (Task a)
spawnTaskUsingId :: forall a. ProcessId -> Process a -> Process (Task a)
spawnTaskUsingId = forall a.
ContCancellation -> ProcessId -> Process a -> Process (Task a)
spawnTaskUsingIdWith ContCancellation
CancelTogether

-- | Run a child process in background and return immediately the corresponding task.
spawnTask :: Process a -> Process (Task a)
spawnTask :: forall a. Process a -> Process (Task a)
spawnTask = forall a. ContCancellation -> Process a -> Process (Task a)
spawnTaskWith ContCancellation
CancelTogether

-- | Run using the specified identifier a child process in background and return
-- immediately the corresponding task.
spawnTaskUsingIdWith :: ContCancellation -> ProcessId -> Process a -> Process (Task a)
spawnTaskUsingIdWith :: forall a.
ContCancellation -> ProcessId -> Process a -> Process (Task a)
spawnTaskUsingIdWith ContCancellation
cancellation ProcessId
pid Process a
p =
  do (Task a
t, Process ()
m) <- forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall a. ProcessId -> Process a -> Event (Task a, Process ())
newTaskUsingId ProcessId
pid Process a
p
     ContCancellation -> ProcessId -> Process () -> Process ()
spawnProcessUsingIdWith ContCancellation
cancellation ProcessId
pid Process ()
m
     forall (m :: * -> *) a. Monad m => a -> m a
return Task a
t

-- | Run a child process in background and return immediately the corresponding task.
spawnTaskWith :: ContCancellation -> Process a -> Process (Task a)
spawnTaskWith :: forall a. ContCancellation -> Process a -> Process (Task a)
spawnTaskWith ContCancellation
cancellation Process a
p =
  do ProcessId
pid <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation Simulation ProcessId
newProcessId
     forall a.
ContCancellation -> ProcessId -> Process a -> Process (Task a)
spawnTaskUsingIdWith ContCancellation
cancellation ProcessId
pid Process a
p

-- | Return an outer process that behaves like the task itself, for example,
-- when the task is cancelled if the outer process is cancelled. 
taskProcess :: Task a -> Process a
taskProcess :: forall a. Task a -> Process a
taskProcess Task a
t =
  do TaskResult a
x <- forall a b. Process a -> Process b -> Process a
finallyProcess
          (forall a. Task a -> Process (TaskResult a)
taskResult Task a
t)
          (do ProcessId
pid <- Process ProcessId
processId
              forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall a b. (a -> b) -> a -> b
$
                do Bool
cancelled <- ProcessId -> Event Bool
processCancelled ProcessId
pid
                   forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
cancelled forall a b. (a -> b) -> a -> b
$
                     forall a. Task a -> Event ()
cancelTask Task a
t)
     case TaskResult a
x of
       TaskCompleted a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return a
a
       TaskError IOException
e -> forall e a. Exception e => e -> Process a
throwProcess IOException
e
       TaskResult a
TaskCancelled -> forall a. Process a
cancelProcess

-- | Return the result of two parallel tasks.
taskParallelResult :: Task a -> Task a -> Process (TaskResult a, Task a)
taskParallelResult :: forall a. Task a -> Task a -> Process (TaskResult a, Task a)
taskParallelResult Task a
t1 Task a
t2 =
  do Maybe (TaskResult a)
x1 <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef (forall a. Task a -> IORef (Maybe (TaskResult a))
taskResultRef Task a
t1)
     case Maybe (TaskResult a)
x1 of
       Just TaskResult a
x1 -> forall (m :: * -> *) a. Monad m => a -> m a
return (TaskResult a
x1, Task a
t2)
       Maybe (TaskResult a)
Nothing ->
         do Maybe (TaskResult a)
x2 <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef (forall a. Task a -> IORef (Maybe (TaskResult a))
taskResultRef Task a
t2)
            case Maybe (TaskResult a)
x2 of
              Just TaskResult a
x2 -> forall (m :: * -> *) a. Monad m => a -> m a
return (TaskResult a
x2, Task a
t1)
              Maybe (TaskResult a)
Nothing ->
                do let s1 :: Signal (Either (TaskResult a) b)
s1 = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ forall a. Task a -> Signal (TaskResult a)
taskResultReceived Task a
t1
                       s2 :: Signal (Either a (TaskResult a))
s2 = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. b -> Either a b
Right forall a b. (a -> b) -> a -> b
$ forall a. Task a -> Signal (TaskResult a)
taskResultReceived Task a
t2
                   Either (TaskResult a) (TaskResult a)
x <- forall a. Signal a -> Process a
processAwait forall a b. (a -> b) -> a -> b
$ forall {b}. Signal (Either (TaskResult a) b)
s1 forall a. Semigroup a => a -> a -> a
<> forall {a}. Signal (Either a (TaskResult a))
s2
                   case Either (TaskResult a) (TaskResult a)
x of
                     Left TaskResult a
x1  -> forall (m :: * -> *) a. Monad m => a -> m a
return (TaskResult a
x1, Task a
t2)
                     Right TaskResult a
x2 -> forall (m :: * -> *) a. Monad m => a -> m a
return (TaskResult a
x2, Task a
t1) 

-- | Return an outer process for two parallel tasks returning the result of
-- the first finished task and the rest task in pair. 
taskParallelProcess :: Task a -> Task a -> Process (a, Task a)
taskParallelProcess :: forall a. Task a -> Task a -> Process (a, Task a)
taskParallelProcess Task a
t1 Task a
t2 =
  do (TaskResult a
x, Task a
t) <-
       forall a b. Process a -> Process b -> Process a
finallyProcess
       (forall a. Task a -> Task a -> Process (TaskResult a, Task a)
taskParallelResult Task a
t1 Task a
t2)
       (do ProcessId
pid <- Process ProcessId
processId
           forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall a b. (a -> b) -> a -> b
$
             do Bool
cancelled <- ProcessId -> Event Bool
processCancelled ProcessId
pid
                forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
cancelled forall a b. (a -> b) -> a -> b
$
                  do forall a. Task a -> Event ()
cancelTask Task a
t1
                     forall a. Task a -> Event ()
cancelTask Task a
t2)
     case TaskResult a
x of
       TaskCompleted a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Task a
t)
       TaskError IOException
e ->
         do forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall a. Task a -> Event ()
cancelTask Task a
t
            forall e a. Exception e => e -> Process a
throwProcess IOException
e
       TaskResult a
TaskCancelled ->
         do forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall a. Task a -> Event ()
cancelTask Task a
t
            forall a. Process a
cancelProcess