ki-0.3.0: A lightweight structured concurrency library
Safe HaskellNone
LanguageHaskell2010

Ki

Description

ki is a lightweight structured concurrency library.

For a variant of this API generalized to MonadUnliftIO, see ki-unlifted.

Remember to link your program with -threaded to use the threaded runtime!

Synopsis

Introduction

Structured concurrency is a paradigm of concurrent programming in which a lexical scope delimits the lifetime of each thread. Threads form a "call tree" hierarchy in which no child can outlive its parent.

Exceptions are propagated promptly from child to parent and vice-versa:

  • If an exception is raised in a child thread, the child raises the same exception in its parent, then terminates.
  • If an exception is raised in a parent thread, the parent first raises an exception in all of its living children, waits for them to terminate, then re-raises the original exception.

All together, this library:

  • Guarantees the absence of "ghost threads" (i.e. threads that accidentally continue to run alongside the main thread long after the function that spawned them returns).
  • Performs prompt, bi-directional exception propagation when an exception is raised anywhere in the call tree.
  • Provides a safe and flexible API that can be used directly, or with which higher-level concurrency patterns can be built on top, such as worker queues, publish-subscribe, supervision trees, and so on.

For a longer introduction to structured concurrency, including an educative analogy to structured programming, please read Nathaniel J. Smith's blog post, "Notes on structured concurrency, or: Go statement considered harmful".

👉 Quick start examples

Expand
  • Perform two actions concurrently, and wait for both of them to complete.

    Note that this program only creates one additional thread, unlike the concurrently combinator from the async package, which creates two.

    concurrently :: IO a -> IO b -> IO (a, b)
    concurrently action1 action2 =
      Ki.scoped \scope -> do
        thread1 <- Ki.fork scope action1
        result2 <- action2
        result1 <- atomically (Ki.await thread1)
        pure (result1, result2)
    
  • Perform two actions concurrently, and when the first action terminates, stop executing the other.

    race :: IO a -> IO a -> IO a
    race action1 action2 =
      Ki.scoped \scope -> do
        resultVar <- newEmptyMVar
        _ <- Ki.fork scope (action1 >>= tryPutMVar resultVar)
        _ <- Ki.fork scope (action2 >>= tryPutMVar resultVar)
        takeMVar resultVar
    

Core API

data Scope Source #

A thread scope.

👉 Details

Expand
  • A thread scope delimits the lifetime of all threads created within it (see fork, forkTry).
  • A thread scope can only be created with scoped, is only valid during the provided callback.
  • The thread scope object explicitly represents the lexical scope induced by scoped:

    scoped \scope ->
      -- This indented region of the code is represented by the variable `scope`
    
  • The thread that creates a scope is considered the parent of all threads created within it.

data Thread a Source #

A child thread.

👉 Details

Expand
  • If an exception is raised in a child thread, the child propagates the exception to its parent (see fork).
  • An exception may be caught and returned as a value instead (see forkTry).
  • Asynchronous exceptions are always propagated.
  • A thread cannot be terminated explicitly ala killThread. However, all threads created within a scope are terminated when the scope closes.

Instances

Instances details
Functor Thread Source # 
Instance details

Defined in Ki.Internal.Thread

Methods

fmap :: (a -> b) -> Thread a -> Thread b #

(<$) :: a -> Thread b -> Thread a #

Eq (Thread a) Source # 
Instance details

Defined in Ki.Internal.Thread

Methods

(==) :: Thread a -> Thread a -> Bool #

(/=) :: Thread a -> Thread a -> Bool #

Ord (Thread a) Source # 
Instance details

Defined in Ki.Internal.Thread

Methods

compare :: Thread a -> Thread a -> Ordering #

(<) :: Thread a -> Thread a -> Bool #

(<=) :: Thread a -> Thread a -> Bool #

(>) :: Thread a -> Thread a -> Bool #

(>=) :: Thread a -> Thread a -> Bool #

max :: Thread a -> Thread a -> Thread a #

min :: Thread a -> Thread a -> Thread a #

scoped :: (Scope -> IO a) -> IO a Source #

Execute an action in a new scope.

👉 Details

Expand
  • Just before a call to scoped returns, whether with a value or an exception:

    • The parent thread raises an exception in all of its living children.
    • The parent thread blocks until those threads terminate.
  • Child threads created within a scope can be awaited individually (see await), or as a collection (see wait).

fork :: Scope -> IO a -> IO (Thread a) Source #

Create a child thread to execute an action within a scope.

Masking state

The child thread does not mask asynchronous exceptions, regardless of the parent thread's masking state.

To create a child thread with a different initial masking state, use forkWith.

forkTry :: forall e a. Exception e => Scope -> IO a -> IO (Thread (Either e a)) Source #

Like fork, but the child thread does not propagate exceptions that are both:

await :: Thread a -> STM a Source #

Wait for a thread to terminate.

wait :: Scope -> STM () Source #

Wait until all threads created within a scope terminate.

Extended API

fork_ :: Scope -> IO Void -> IO () Source #

Variant of fork for threads that never return.

forkWith :: Scope -> ThreadOptions -> IO a -> IO (Thread a) Source #

Variant of fork that takes an additional options argument.

forkWith_ :: Scope -> ThreadOptions -> IO Void -> IO () Source #

Variant of forkWith for threads that never return.

forkTryWith :: forall e a. Exception e => Scope -> ThreadOptions -> IO a -> IO (Thread (Either e a)) Source #

Variant of forkTry that takes an additional options argument.

Thread options

data ThreadOptions Source #

affinity

The affinity of a thread. A thread can be unbound, bound to a specific capability, or bound to a specific OS thread.

Default: Unbound

allocationLimit

The maximum number of bytes a thread may allocate before it is delivered an AllocationLimitExceeded exception. If caught, the thread is allowed to allocate an additional 100kb (tunable with +RTS -xq) to perform any necessary cleanup actions; if exceeded, the thread is delivered another.

Default: Nothing (no limit)

label

The label of a thread, visible in the event log (+RTS -l).

Default: "" (no label)

maskingState

The masking state a thread is created in. To unmask, use unsafeUnmask.

Default: Unmasked

Instances

Instances details
Eq ThreadOptions Source # 
Instance details

Defined in Ki.Internal.Thread

Show ThreadOptions Source # 
Instance details

Defined in Ki.Internal.Thread

defaultThreadOptions :: ThreadOptions Source #

Default thread options.

ThreadOptions
  { affinity = Nothing
  , allocationLimit = Nothing
  , label = ""
  , maskingState = Unmasked
  }

data ThreadAffinity Source #

What, if anything, a thread is bound to.

Constructors

Unbound

Unbound.

Capability Int

Bound to a capability.

OsThread

Bound to an OS thread.

Instances

Instances details
Eq ThreadAffinity Source # 
Instance details

Defined in Ki.Internal.Thread

Show ThreadAffinity Source # 
Instance details

Defined in Ki.Internal.Thread

Byte count

data ByteCount Source #

A number of bytes.

Instances

Instances details
Eq ByteCount Source # 
Instance details

Defined in Ki.Internal.ByteCount

Ord ByteCount Source # 
Instance details

Defined in Ki.Internal.ByteCount

Show ByteCount Source # 
Instance details

Defined in Ki.Internal.ByteCount

kilobytes :: Natural -> ByteCount Source #

A number of kilobytes.

megabytes :: Natural -> ByteCount Source #

A number of megabytes.