async-pool-0.9.1: A modified version of async that supports worker groups and many-to-many task dependencies

Copyright(c) Simon Marlow 2012 John Wiegley 2014
LicenseBSD3 (see the file LICENSE)
MaintainerJohn Wiegley <johnw@newartisans.com>
Stabilityprovisional
Portabilitynon-portable (requires concurrency)
Safe HaskellNone
LanguageHaskell98

Control.Concurrent.Async.Pool

Contents

Description

This module provides a set of operations for running IO operations asynchronously and waiting for their results. It is a thin layer over the basic concurrency operations provided by Control.Concurrent. The main additional functionality it provides is the ability to wait for the return value of a thread, plus functions for managing task pools, work groups, and many-to-many dependencies between tasks. The interface also provides some additional safety and robustness over using threads and MVar directly.

The basic type is Async a, which represents an asynchronous IO action that will return a value of type a, or die with an exception. An Async corresponds to either a thread, or a Handle to an action waiting to be spawned. This makes it possible to submit very large numbers of tasks, with only N threads active at one time.

For example, to fetch two web pages at the same time, we could do this (assuming a suitable getURL function):

   withTaskGroup 4 $ \g -> do
      a1 <- async g (getURL url1)
      a2 <- async g (getURL url2)
      page1 <- wait a1
      page2 <- wait a2
      ...

where async submits the operation to the worker group (and from which it is spawned in a separate thread), and wait waits for and returns the result. The number 4 indicates the maximum number of threads which may be spawned at one time. If the operation throws an exception, then that exception is re-thrown by wait. This is one of the ways in which this library provides some additional safety: it is harder to accidentally forget about exceptions thrown in child threads.

A slight improvement over the previous example is this:

   withTaskGroup 4 $ \g -> do
      withAsync g (getURL url1) $ \a1 -> do
      withAsync g (getURL url2) $ \a2 -> do
      page1 <- wait a1
      page2 <- wait a2
      ...

withAsync is like async, except that the Async is automatically killed (or unscheduled, using cancel) if the enclosing IO operation returns before it has completed. Consider the case when the first wait throws an exception; then the second Async will be automatically killed rather than being left to run in the background, possibly indefinitely. This is the second way that the library provides additional safety: using withAsync means we can avoid accidentally leaving threads running. Furthermore, withAsync allows a tree of threads to be built, such that children are automatically killed if their parents die for any reason.

The pattern of performing two IO actions concurrently and waiting for their results is packaged up in a combinator concurrently, so we can further shorten the above example to:

   withTaskGroup 4 $ \g -> do
      (page1, page2) <- concurrently g (getURL url1) (getURL url2)
      ...

The Functor instance can be used to change the result of an Async. For example:

ghci> a <- async g (return 3)
ghci> wait a
3
ghci> wait (fmap (+1) a)
4
Synopsis

Asynchronous actions

data Async a Source #

An asynchronous action spawned by async or withAsync. Asynchronous actions are executed in a separate thread, and operations are provided for waiting for asynchronous actions to complete and obtaining their results (see e.g. wait).

Instances
Functor Async Source # 
Instance details

Defined in Control.Concurrent.Async.Pool.Async

Methods

fmap :: (a -> b) -> Async a -> Async b #

(<$) :: a -> Async b -> Async a #

Eq (Async a) Source # 
Instance details

Defined in Control.Concurrent.Async.Pool.Async

Methods

(==) :: Async a -> Async a -> Bool #

(/=) :: Async a -> Async a -> Bool #

Ord (Async a) Source # 
Instance details

Defined in Control.Concurrent.Async.Pool.Async

Methods

compare :: Async a -> Async a -> Ordering #

(<) :: Async a -> Async a -> Bool #

(<=) :: Async a -> Async a -> Bool #

(>) :: Async a -> Async a -> Bool #

(>=) :: Async a -> Async a -> Bool #

max :: Async a -> Async a -> Async a #

min :: Async a -> Async a -> Async a #

Task pools and groups

withTaskGroup :: Int -> (TaskGroup -> IO b) -> IO b Source #

Create both a pool, and a task group with a given number of execution slots.

withTaskGroupIn :: Pool -> Int -> (TaskGroup -> IO b) -> IO b Source #

Create a task group within the given pool having a specified number of execution slots, but with a bounded lifetime. Leaving the block cancels every task still executing in the group.

data Pool Source #

A Pool manages a collection of possibly interdependent tasks, such that tasks await execution until the tasks they depend on have finished (and tasks may depend on an arbitrary number of other tasks), while independent tasks execute concurrently up to the number of available resource slots in the pool.

Results from each task are available until the status of the task is polled or waited on. Further, the results are kept until that occurs, so failing to ever wait will result in a memory leak.

Tasks may be cancelled, in which case all dependent tasks are unscheduled.

createPool :: IO Pool Source #

Create a task pool for managing many-to-many acyclic dependencies among tasks.

createTaskGroup :: Pool -> Int -> IO TaskGroup Source #

Create a task group for executing interdependent tasks concurrently. The number of available slots governs how many tasks may run at one time.

runTaskGroup :: TaskGroup -> IO () Source #

Execute tasks in a given task group. The number of slots determines how many threads may execute concurrently.

Spawning tasks

async :: TaskGroup -> IO a -> IO (Async a) Source #

Spawn an asynchronous action in a separate thread.

asyncBound :: TaskGroup -> IO a -> IO (Async a) Source #

Like async but using forkOS internally.

asyncOn :: TaskGroup -> Int -> IO a -> IO (Async a) Source #

Like async but using forkOn internally.

asyncWithUnmask :: TaskGroup -> ((forall b. IO b -> IO b) -> IO a) -> IO (Async a) Source #

Like async but using forkIOWithUnmask internally. The child thread is passed a function that can be used to unmask asynchronous exceptions.

asyncOnWithUnmask :: TaskGroup -> Int -> ((forall b. IO b -> IO b) -> IO a) -> IO (Async a) Source #

Like asyncOn but using forkOnWithUnmask internally. The child thread is passed a function that can be used to unmask asynchronous exceptions.

asyncSTM :: TaskGroup -> IO a -> STM (Async a) Source #

Equivalent to async, but acts in STM so that makeDependent may be called after the task is created, but before it begins executing.

Dependent tasks

taskHandle :: Async a -> Handle Source #

asyncAfter :: TaskGroup -> Async b -> IO a -> IO (Async a) Source #

Submit a task that begins execution only after its parent has completed. This is equivalent to submitting a new task with asyncSTM and linking it to its parent using makeDependent.

asyncAfterAll :: TaskGroup -> [Handle] -> IO a -> IO (Async a) Source #

Submit a task which begins execution after all its parents have completed. This is equivalent to submitting a new task with asyncSTM and linking it to its parents using 'mapM makeDependent'.

makeDependent Source #

Arguments

:: Pool 
-> Handle

Handle of task doing the waiting

-> Handle

Handle of task we must wait on (the parent)

-> STM () 

Given parent and child tasks, link them so the child cannot execute until the parent has finished.

unsafeMakeDependent Source #

Arguments

:: Pool 
-> Handle

Handle of task doing the waiting

-> Handle

Handle of task we must wait on (the parent)

-> STM () 

Given parent and child tasks, link them so the child cannot execute until the parent has finished. This function does not check for introduction of cycles into the dependency graph, which would prevent the child from ever running.

Spawning with automatic cancelation

withAsync :: TaskGroup -> IO a -> (Async a -> IO b) -> IO b Source #

Spawn an asynchronous action in a separate thread, and pass its Async handle to the supplied function. When the function returns or throws an exception, cancel is called on the Async.

withAsync action inner = bracket (async action) cancel inner

This is a useful variant of async that ensures an Async is never left running unintentionally.

Since cancel may block, withAsync may also block; see cancel for details.

withAsyncBound :: TaskGroup -> IO a -> (Async a -> IO b) -> IO b Source #

Like withAsync but uses forkOS internally.

withAsyncOn :: TaskGroup -> Int -> IO a -> (Async a -> IO b) -> IO b Source #

Like withAsync but uses forkOn internally.

withAsyncWithUnmask :: TaskGroup -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b Source #

Like withAsync but uses forkIOWithUnmask internally. The child thread is passed a function that can be used to unmask asynchronous exceptions.

withAsyncOnWithUnmask :: TaskGroup -> Int -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b Source #

Like withAsyncOn but uses forkOnWithUnmask internally. The child thread is passed a function that can be used to unmask asynchronous exceptions

Quering Asyncs

wait :: Async a -> IO a Source #

Wait for an asynchronous action to complete, and return its value. If the asynchronous action threw an exception, then the exception is re-thrown by wait.

wait = atomically . waitSTM

poll :: Async a -> IO (Maybe (Either SomeException a)) Source #

Check whether an Async has completed yet. If it has not completed yet, then the result is Nothing, otherwise the result is Just e where e is Left x if the Async raised an exception x, or Right a if it returned a value a.

poll = atomically . pollSTM

waitCatch :: Async a -> IO (Either SomeException a) Source #

Wait for an asynchronous action to complete, and return either Left e if the action raised an exception e, or Right a if it returned a value a.

waitCatch = atomically . waitCatchSTM

cancel :: Async a -> IO () Source #

Cancel an asynchronous action by throwing the ThreadKilled exception to it. Has no effect if the Async has already completed.

cancel a = throwTo (asyncThreadId a) ThreadKilled

Note that cancel is synchronous in the same sense as throwTo. It does not return until the exception has been thrown in the target thread, or the target thread has completed. In particular, if the target thread is making a foreign call, the exception will not be thrown until the foreign call returns, and in this case cancel may block indefinitely. An asynchronous cancel can of course be obtained by wrapping cancel itself in async.

cancelWith :: Exception e => Async a -> e -> IO () Source #

STM operations

waitSTM :: Async a -> STM a Source #

A version of wait that can be used inside an STM transaction.

pollSTM :: Async a -> STM (Maybe (Either SomeException a)) Source #

A version of poll that can be used inside an STM transaction.

waitCatchSTM :: Async a -> STM (Either SomeException a) Source #

A version of waitCatch that can be used inside an STM transaction.

Waiting for multiple Asyncs

waitAny :: [Async a] -> IO (Async a, a) Source #

Wait for any of the supplied Asyncs to complete. If the first to complete throws an exception, then that exception is re-thrown by waitAny.

If multiple Asyncs complete or have completed, then the value returned corresponds to the first completed Async in the list.

waitAnyCatch :: [Async a] -> IO (Async a, Either SomeException a) Source #

Wait for any of the supplied asynchronous operations to complete. The value returned is a pair of the Async that completed, and the result that would be returned by wait on that Async.

If multiple Asyncs complete or have completed, then the value returned corresponds to the first completed Async in the list.

waitAnyCancel :: [Async a] -> IO (Async a, a) Source #

Like waitAny, but also cancels the other asynchronous operations as soon as one has completed.

waitAnyCatchCancel :: [Async a] -> IO (Async a, Either SomeException a) Source #

Like waitAnyCatch, but also cancels the other asynchronous operations as soon as one has completed.

waitEither :: Async a -> Async b -> IO (Either a b) Source #

Wait for the first of two Asyncs to finish. If the Async that finished first raised an exception, then the exception is re-thrown by waitEither.

waitEitherCatch :: Async a -> Async b -> IO (Either (Either SomeException a) (Either SomeException b)) Source #

Wait for the first of two Asyncs to finish.

waitEitherCancel :: Async a -> Async b -> IO (Either a b) Source #

Like waitEither, but also cancels both Asyncs before returning.

waitEitherCatchCancel :: Async a -> Async b -> IO (Either (Either SomeException a) (Either SomeException b)) Source #

Like waitEitherCatch, but also cancels both Asyncs before returning.

waitEither_ :: Async a -> Async b -> IO () Source #

Like waitEither, but the result is ignored.

waitBoth :: Async a -> Async b -> IO (a, b) Source #

Waits for both Asyncs to finish, but if either of them throws an exception before they have both finished, then the exception is re-thrown by waitBoth.

Linking

link :: Async a -> IO () Source #

Link the given Async to the current thread, such that if the Async raises an exception, that exception will be re-thrown in the current thread.

link2 :: Async a -> Async b -> IO () Source #

Link two Asyncs together, such that if either raises an exception, the same exception is re-thrown in the other Async.

Lists of actions

mapTasks :: Traversable t => TaskGroup -> t (IO a) -> IO (t a) Source #

Execute a group of tasks within the given task group, returning the results in order. The order of execution is random, but the results are returned in order.

mapTasks_ :: Foldable t => TaskGroup -> t (IO a) -> IO () Source #

Execute a group of tasks within the given task group, ignoring results.

mapTasksE :: Traversable t => TaskGroup -> t (IO a) -> IO (t (Either SomeException a)) Source #

Execute a group of tasks within the given task group, returning the results in order as an Either type to represent exceptions from actions. The order of execution is random, but the results are returned in order.

mapTasksE_ :: Traversable t => TaskGroup -> t (IO a) -> IO (t (Maybe SomeException)) Source #

Execute a group of tasks within the given task group, ignoring results, but returning a list of all exceptions.

mapRace :: Foldable t => TaskGroup -> t (IO a) -> IO (Async a, Either SomeException a) Source #

Execute a group of tasks, but return the first result or failure and cancel the remaining tasks.

mapReduce Source #

Arguments

:: (Foldable t, Monoid a) 
=> TaskGroup

Task group to execute the tasks within

-> t (IO a)

Set of Monoid-yielding IO actions

-> STM (Async a)

Returns the final result task

Given a list of actions yielding Monoid results, execute the actions concurrently (up to N at a time, based on available slots), and mappend each pair of results concurrently as they become ready. The immediate result of this function is an Async representing the final value.

This is similar to the following: mconcat $ mapTasks n actions, except that intermediate results can be garbage collected as soon as they've been merged. Also, the value returned from this function is an Async which may be polled for the final result.

Lastly, if an Exception occurs in any subtask, the final result will also yield an exception -- but not necessarily the first or last that was caught.

scatterFoldMapM :: (Foldable t, Monoid b, MonadBaseControl IO m) => TaskGroup -> t (IO a) -> (Either SomeException a -> m b) -> m b Source #

Execute a group of tasks concurrently (using up to N active threads, depending on the task group), and feed results to a continuation as soon as they become available, in random order. The continuation function may return a monoid value which is accumulated to yield a final result. If no such value is needed, simply provide `()`.

The Task Monad and Applicative

data Task a Source #

The Task Applicative and Monad allow for task dependencies to be built using Applicative and do notation. Monadic evaluation is sequenced, while applicative Evaluation is concurrent for each argument. In this way, mixing the two builds a dependency graph via ordinary Haskell code.

Instances
Monad Task Source # 
Instance details

Defined in Control.Concurrent.Async.Pool.Internal

Methods

(>>=) :: Task a -> (a -> Task b) -> Task b #

(>>) :: Task a -> Task b -> Task b #

return :: a -> Task a #

fail :: String -> Task a #

Functor Task Source # 
Instance details

Defined in Control.Concurrent.Async.Pool.Internal

Methods

fmap :: (a -> b) -> Task a -> Task b #

(<$) :: a -> Task b -> Task a #

Applicative Task Source # 
Instance details

Defined in Control.Concurrent.Async.Pool.Internal

Methods

pure :: a -> Task a #

(<*>) :: Task (a -> b) -> Task a -> Task b #

liftA2 :: (a -> b -> c) -> Task a -> Task b -> Task c #

(*>) :: Task a -> Task b -> Task b #

(<*) :: Task a -> Task b -> Task a #

MonadIO Task Source # 
Instance details

Defined in Control.Concurrent.Async.Pool.Internal

Methods

liftIO :: IO a -> Task a #

runTask :: TaskGroup -> Task a -> IO a Source #

Run a value in the Task monad and block until the final result is computed.

task :: IO a -> Task a Source #

Lift any IO action into a Task. This is a synonym for liftIO.

Other utilities

race :: TaskGroup -> IO a -> IO b -> IO (Either a b) Source #

Run two IO actions concurrently, and return the first to finish. The loser of the race is cancelled.

race left right =
  withAsync left $ \a ->
  withAsync right $ \b ->
  waitEither a b

race_ :: TaskGroup -> IO a -> IO b -> IO () Source #

Like race, but the result is ignored.

concurrently :: TaskGroup -> IO a -> IO b -> IO (a, b) Source #

Run two IO actions concurrently, and return both results. If either action throws an exception at any time, then the other action is cancelled, and the exception is re-thrown by concurrently.

concurrently left right =
  withAsync left $ \a ->
  withAsync right $ \b ->
  waitBoth a b

mapConcurrently :: Traversable t => TaskGroup -> (a -> IO b) -> t a -> IO (t b) Source #

maps an IO-performing function over any Traversable data type, performing all the IO actions concurrently, and returning the original data structure with the arguments replaced by the results.

For example, mapConcurrently works with lists:

pages <- mapConcurrently getURL ["url1", "url2", "url3"]

newtype Concurrently a Source #

A value of type Concurrently a is an IO operation that can be composed with other Concurrently values, using the Applicative and Alternative instances.

Calling runConcurrently on a value of type Concurrently a will execute the IO operations it contains concurrently, before delivering the result of type a.

For example

(page1, page2, page3)
    <- runConcurrently $ (,,)
    <$> Concurrently (getURL "url1")
    <*> Concurrently (getURL "url2")
    <*> Concurrently (getURL "url3")

Constructors

Concurrently 

Fields