ki-1.0.0: A lightweight structured concurrency library
Safe HaskellSafe-Inferred
LanguageHaskell2010

Ki

Description

ki is a lightweight structured concurrency library.

For a variant of this API generalized to MonadUnliftIO, see ki-unlifted.

Remember to link your program with -threaded to use the threaded runtime!

Synopsis

Introduction

Structured concurrency is a paradigm of concurrent programming in which a lexical scope delimits the lifetime of each thread. Threads therefore form a "call tree" hierarchy in which no child can outlive its parent.

Exceptions are propagated promptly from child to parent and vice-versa:

  • If an exception is raised in a child thread, the child raises the same exception in its parent, then terminates.
  • If an exception is raised in a parent thread, the parent first raises an exception in all of its living children, waits for them to terminate, then re-raises the original exception.

All together, this library:

  • Guarantees the absence of "ghost threads" (i.e. threads that accidentally continue to run alongside the main thread after the function that spawned them returns).
  • Performs prompt, bidirectional exception propagation when an exception is raised anywhere in the call tree.
  • Provides a safe and flexible API that can be used directly, or with which higher-level concurrency patterns can be built on top, such as worker queues, pub-sub pipelines, and supervision trees.

For a longer introduction to structured concurrency, including an educative analogy to structured programming, please read Nathaniel J. Smith's blog post, "Notes on structured concurrency, or: Go statement considered harmful".

👉 Quick start examples

Expand
  • Perform two actions concurrently, and wait for both of them to complete.

    concurrently :: IO a -> IO b -> IO (a, b)
    concurrently action1 action2 =
      Ki.scoped \scope -> do
        thread1 <- Ki.fork scope action1
        result2 <- action2
        result1 <- atomically (Ki.await thread1)
        pure (result1, result2)
    
  • Perform two actions concurrently, and when the first action terminates, stop executing the other.

    race :: IO a -> IO a -> IO a
    race action1 action2 =
      Ki.scoped \scope -> do
        resultVar <- newEmptyMVar
        _ <- Ki.fork scope (action1 >>= tryPutMVar resultVar)
        _ <- Ki.fork scope (action2 >>= tryPutMVar resultVar)
        takeMVar resultVar
    

Core API

data Scope Source #

A scope.

👉 Details

Expand
  • A scope delimits the lifetime of all threads created within it.
  • A scope is only valid during the callback provided to scoped.
  • The thread that creates a scope is considered the parent of all threads created within it.
  • All threads created within a scope can be awaited together (see awaitAll).
  • All threads created within a scope are terminated when the scope closes.

data Thread a Source #

A thread.

👉 Details

Expand
  • A thread's lifetime is delimited by the scope in which it was created.
  • The thread that creates a scope is considered the parent of all threads created within it.
  • If an exception is raised in a child thread, the child either propagates the exception to its parent (see fork), or returns the exception as a value (see forkTry).
  • All threads created within a scope are terminated when the scope closes.

Instances

Instances details
Functor Thread Source # 
Instance details

Defined in Ki.Internal.Thread

Methods

fmap :: (a -> b) -> Thread a -> Thread b #

(<$) :: a -> Thread b -> Thread a #

Eq (Thread a) Source # 
Instance details

Defined in Ki.Internal.Thread

Methods

(==) :: Thread a -> Thread a -> Bool #

(/=) :: Thread a -> Thread a -> Bool #

Ord (Thread a) Source # 
Instance details

Defined in Ki.Internal.Thread

Methods

compare :: Thread a -> Thread a -> Ordering #

(<) :: Thread a -> Thread a -> Bool #

(<=) :: Thread a -> Thread a -> Bool #

(>) :: Thread a -> Thread a -> Bool #

(>=) :: Thread a -> Thread a -> Bool #

max :: Thread a -> Thread a -> Thread a #

min :: Thread a -> Thread a -> Thread a #

scoped :: (Scope -> IO a) -> IO a Source #

Open a scope, perform an IO action with it, then close the scope.

👉 Details

Expand
  • The thread that creates a scope is considered the parent of all threads created within it.
  • A scope is only valid during the callback provided to scoped.
  • When a scope closes (i.e. just before scoped returns):

    • The parent thread raises an exception in all of its living children.
    • The parent thread blocks until those threads terminate.

fork :: Scope -> IO a -> IO (Thread a) Source #

Create a child thread to execute an action within a scope.

Note: The child thread does not mask asynchronous exceptions, regardless of the parent thread's masking state. To create a child thread with a different initial masking state, use forkWith.

forkTry :: forall e a. Exception e => Scope -> IO a -> IO (Thread (Either e a)) Source #

Like fork, but the child thread does not propagate exceptions that are both:

await :: Thread a -> STM a Source #

Wait for a thread to terminate.

awaitAll :: Scope -> STM () Source #

Wait until all threads created within a scope terminate.

Extended API

fork_ :: Scope -> IO Void -> IO () Source #

Variant of fork for threads that never return.

forkWith :: Scope -> ThreadOptions -> IO a -> IO (Thread a) Source #

Variant of fork that takes an additional options argument.

forkWith_ :: Scope -> ThreadOptions -> IO Void -> IO () Source #

Variant of forkWith for threads that never return.

forkTryWith :: forall e a. Exception e => Scope -> ThreadOptions -> IO a -> IO (Thread (Either e a)) Source #

Variant of forkTry that takes an additional options argument.

Thread options

data ThreadOptions Source #

affinity

The affinity of a thread. A thread can be unbound, bound to a specific capability, or bound to a specific OS thread.

Default: Unbound

allocationLimit

The maximum number of bytes a thread may allocate before it is delivered an AllocationLimitExceeded exception. If caught, the thread is allowed to allocate an additional 100kb (tunable with +RTS -xq) to perform any necessary cleanup actions; if exceeded, the thread is delivered another.

Default: Nothing (no limit)

label

The label of a thread, visible in the event log (+RTS -l).

Default: "" (no label)

maskingState

The masking state a thread is created in. To unmask, use unsafeUnmask.

Default: Unmasked

Instances

Instances details
Show ThreadOptions Source # 
Instance details

Defined in Ki.Internal.Thread

Eq ThreadOptions Source # 
Instance details

Defined in Ki.Internal.Thread

defaultThreadOptions :: ThreadOptions Source #

Default thread options.

ThreadOptions
  { affinity = Nothing
  , allocationLimit = Nothing
  , label = ""
  , maskingState = Unmasked
  }

data ThreadAffinity Source #

What, if anything, a thread is bound to.

Constructors

Unbound

Unbound.

Capability Int

Bound to a capability.

OsThread

Bound to an OS thread.

Instances

Instances details
Show ThreadAffinity Source # 
Instance details

Defined in Ki.Internal.Thread

Eq ThreadAffinity Source # 
Instance details

Defined in Ki.Internal.Thread

Byte count

data ByteCount Source #

A number of bytes.

Instances

Instances details
Show ByteCount Source # 
Instance details

Defined in Ki.Internal.ByteCount

Eq ByteCount Source # 
Instance details

Defined in Ki.Internal.ByteCount

Ord ByteCount Source # 
Instance details

Defined in Ki.Internal.ByteCount

kilobytes :: Natural -> ByteCount Source #

A number of kilobytes.

megabytes :: Natural -> ByteCount Source #

A number of megabytes.