This library provides an assortment of tools for concurrency-related tasks:


data Queue d :: Effect Source #

Abstracts queues like TBQueue.

For documentation on the constructors, see the module Polysemy.Conc.Data.Queue.

import Polysemy.Conc (Queue, QueueResult)
import Polysemy.Conc.Effect.Queue as Queue

prog :: Member (Queue Int) r => Sem r (QueueResult Int)
prog = do
  Queue.write 5
  Queue.write 10
  Queue.read >>= \case
    QueueResult.Success i -> fmap (i +) <$> Queue.read
    r -> pure r


Instances details
type DefiningModule Queue Source # 
Instance details

Defined in Polysemy.Conc.Effect.Queue

type DefiningModule Queue = "Polysemy.Conc.Effect.Queue"

data QueueResult d Source #

Encodes failure reasons for queues.

For documentation on the constructors, see the module Polysemy.Conc.Data.QueueResult.

import qualified Polysemy.Conc.Data.QueueResult as QueueResult


Instances details
Functor QueueResult Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult


fmap :: (a -> b) -> QueueResult a -> QueueResult b #

(<$) :: a -> QueueResult b -> QueueResult a #

Eq d => Eq (QueueResult d) Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

Ord d => Ord (QueueResult d) Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

Show d => Show (QueueResult d) Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

Generic (QueueResult d) Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

Associated Types

type Rep (QueueResult d) :: Type -> Type #


from :: QueueResult d -> Rep (QueueResult d) x #

to :: Rep (QueueResult d) x -> QueueResult d #

Semigroup d => Semigroup (QueueResult d) Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

Monoid d => Monoid (QueueResult d) Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

type Rep (QueueResult d) Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

type Rep (QueueResult d) = D1 ('MetaData "QueueResult" "Polysemy.Conc.Data.QueueResult" "polysemy-conc-" 'False) (C1 ('MetaCons "Success" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 d)) :+: (C1 ('MetaCons "NotAvailable" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "Closed" 'PrefixI 'False) (U1 :: Type -> Type)))


interpretQueueTBM Source #


:: forall d r. Members [Resource, Race, Embed IO] r 
=> Int

Buffer size

-> InterpreterFor (Queue d) r 

Interpret Queue with a TBMQueue.

interpretQueueTB Source #


:: forall d r. Members [Race, Embed IO] r 
=> Natural

Buffer size

-> InterpreterFor (Queue d) r 

Interpret Queue with a TBQueue.

interpretQueueListReadOnlyAtomicWith :: forall d r. Member (AtomicState [d]) r => InterpreterFor (Queue d) r Source #

Reinterpret Queue as AtomicState with a list that cannot be written to. Useful for testing.

interpretQueueListReadOnlyStateWith :: forall d r. Member (State [d]) r => InterpreterFor (Queue d) r Source #

Reinterpret Queue as State with a list that cannot be written to. Useful for testing.


loop :: Member (Queue d) r => (d -> Sem r ()) -> Sem r () Source #

Read from a Queue repeatedly until it is closed.

When an element is received, call action and recurse.

loopOr :: Member (Queue d) r => Sem r Bool -> (d -> Sem r Bool) -> Sem r () Source #

Read from a Queue repeatedly until it is closed.

When an element is received, call action and recurse if it returns True. When no element is available, evaluate na and recurse if it returns True.


An MVar is abstracted as Sync since it can be used to synchronize threads.

data Sync d :: Effect Source #

Abstracts an MVar.

For documentation on the constructors, see the module Polysemy.Conc.Effect.Sync.

import Polysemy.Conc (Sync)
import qualified Polysemy.Conc.Effect.Sync as Sync

prog :: Member (Sync Int) r => Sem r Int
prog = do
  Sync.putTry 5


Instances details
type DefiningModule Sync Source # 
Instance details

Defined in Polysemy.Conc.Effect.Sync

type DefiningModule Sync = "Polysemy.Conc.Effect.Sync"

type ScopedSync res a = Scoped (SyncResources res) (Sync a) Source #

Convenience alias.


interpretSync :: forall d r. Members [Race, Embed IO] r => InterpreterFor (Sync d) r Source #

Interpret Sync with an empty MVar.

interpretSyncAs :: forall d r. Members [Race, Embed IO] r => d -> InterpreterFor (Sync d) r Source #

Interpret Sync with an MVar containing the specified value.

withSync :: forall d res r. Member (ScopedSync res d) r => InterpreterFor (Sync d) r Source #

Run an action with a locally scoped Sync variable.

lock :: forall l r a. Members [Sync l, Resource] r => l -> Sem r a -> Sem r a Source #

Run the action ma with an exclusive lock (mutex). When multiple threads call the action concurrently, only one is allowed to execute it at a time. The value l is used to disambiguate the Sync from other uses of the combinator. You can pass in something like Proxy "db-write"@.

Note: The Sync must be interpreted with an initially full MVar, e.g. using interpretSyncAs.

interpretScopedSync :: forall d r. Members [Resource, Race, Embed IO] r => InterpreterFor (Scoped (SyncResources (MVar d)) (Sync d)) r Source #

Interpret Sync for locally scoped use with an empty MVar.

interpretScopedSyncAs :: forall d r. Members [Resource, Race, Embed IO] r => d -> InterpreterFor (Scoped (SyncResources (MVar d)) (Sync d)) r Source #

Interpret Sync for locally scoped use with an MVar containing the specified value.


Racing works like this:

prog =
 Polysemy.Conc.race (httpRequest "hackage.haskell.org") (readFile "/path/to/file") >>= \case
   Left _ -> putStrLn "hackage was faster"
   Right _ -> putStrLn "file was faster"

When the first thunk finishes, the other will be killed.

data Race :: Effect Source #

Abstract the concept of running two programs concurrently, aborting the other when one terminates. Timeout is a simpler variant, where one thread just sleeps for a given interval.


Instances details
type DefiningModule Race Source # 
Instance details

Defined in Polysemy.Conc.Effect.Race

type DefiningModule Race = "Polysemy.Conc.Effect.Race"

race :: forall a b r. Member Race r => Sem r a -> Sem r b -> Sem r (Either a b) Source #

Run both programs concurrently, returning the result of the faster one.

race_ :: Member Race r => Sem r a -> Sem r a -> Sem r a Source #

Specialization of race for the case where both thunks return the same type, obviating the need for Either.

timeout :: forall a b u r. TimeUnit u => Member Race r => Sem r a -> u -> Sem r b -> Sem r (Either a b) Source #

Run the fallback action if the given program doesn't finish within the specified interval.

timeout_ :: TimeUnit u => Member Race r => Sem r a -> u -> Sem r a -> Sem r a Source #

Specialization of timeout for the case where the thunk return the same type as the fallback, obviating the need for Either.

timeoutAs :: TimeUnit u => Member Race r => a -> u -> Sem r b -> Sem r (Either a b) Source #

Version of timeout that takes a pure fallback value.

timeoutAs_ :: TimeUnit u => Member Race r => a -> u -> Sem r a -> Sem r a Source #

Specialization of timeoutAs for the case where the thunk return the same type as the fallback, obviating the need for Either.

timeoutU :: TimeUnit u => Member Race r => u -> Sem r () -> Sem r () Source #

Specialization of timeout for unit actions.

timeoutMaybe :: TimeUnit u => Member Race r => u -> Sem r a -> Sem r (Maybe a) Source #

Variant of timeout that returns Maybe.

retrying Source #


:: forall e w u t d r a. TimeUnit w 
=> TimeUnit u 
=> Members [Race, Time t d] r 
=> w

The timeout after which the attempt is abandoned.

-> u

The waiting interval between two tries.

-> Sem r (Either e a) 
-> Sem r (Maybe a) 

Run an action repeatedly until it returns Right or the timout has been exceeded.

retryingWithError Source #


:: forall e w u t d r a. TimeUnit w 
=> TimeUnit u 
=> Members [Race, Time t d, Embed IO] r 
=> w

The timeout after which the attempt is abandoned.

-> u

The waiting interval between two tries.

-> Sem r (Either e a) 
-> Sem r (Maybe (Either e a)) 

Run an action repeatedly until it returns Right or the timout has been exceeded.

If the action failed at least once, the last error will be returned in case of timeout.


interpretRace :: Member (Final IO) r => InterpreterFor Race r Source #

Interpret Race in terms of race and timeout. Since this has to pass higher-order thunks as IO arguments, it is interpreted in terms of 'Final IO'.

Signal Handling

data Interrupt :: Effect Source #

The interrupt handler effect allows three kinds of interaction for interrupt signals:

  • Execute a callback when a signal is received
  • Block a thread until a signal is received
  • Kill a thread when a signal is received

For documentation on the constructors, see the module Polysemy.Conc.Effect.Interrupt.

import qualified Polysemy.Conc.Effect.Interrupt as Interrupt

prog = do
  Interrupt.register "task 1" (putStrLn "interrupted")
  Interrupt.killOnQuit $ forever do


Instances details
type DefiningModule Interrupt Source # 
Instance details

Defined in Polysemy.Conc.Effect.Interrupt

type DefiningModule Interrupt = "Polysemy.Conc.Effect.Interrupt"


interpretInterrupt :: Members [Critical, Race, Async, Embed IO] r => InterpreterFor Interrupt r Source #

Interpret Interrupt by installing a signal handler.

Catches repeat invocations of SIGINT.

interpretInterruptOnce :: Members [Critical, Race, Async, Embed IO] r => InterpreterFor Interrupt r Source #

Interpret Interrupt by installing a signal handler.

Catches only the first invocation of SIGINT.

interpretInterruptNull :: InterpreterFor Interrupt r Source #

Eliminate Interrupt without interpreting.

Event Channels

data Events (resource :: Type) (e :: Type) :: Effect Source #

An event publisher that can be consumed from multiple threads.


Instances details
type DefiningModule Events Source # 
Instance details

Defined in Polysemy.Conc.Effect.Events

type DefiningModule Events = "Polysemy.Conc.Effect.Events"

publish :: forall e resource r. Member (Events resource e) r => e -> Sem r () Source #

Publish one event.

consume :: forall e r. Member (Consume e) r => Sem r e Source #

Consume one event emitted by Events.

subscribe :: forall e resource r. Member (Scoped (EventResource resource) (Consume e)) r => InterpreterFor (Consume e) r Source #

Create a new scope for Events, causing the nested program to get its own copy of the event stream. To be used with interpretEventsChan.

subscribeWhile :: forall e token r. Member (EventConsumer token e) r => (e -> Sem r Bool) -> Sem r () Source #

Pull repeatedly from the Events channel, passing the event to the supplied callback. Stop when the action returns False.

subscribeLoop :: forall e token r. Member (EventConsumer token e) r => (e -> Sem r ()) -> Sem r () Source #

Pull repeatedly from the Events channel, passing the event to the supplied callback.

data EventResource resource Source #

Marker for the Scoped resource for Events.


Instances details
Eq resource => Eq (EventResource resource) Source # 
Instance details

Defined in Polysemy.Conc.Effect.Events


(==) :: EventResource resource -> EventResource resource -> Bool #

(/=) :: EventResource resource -> EventResource resource -> Bool #

Show resource => Show (EventResource resource) Source # 
Instance details

Defined in Polysemy.Conc.Effect.Events


showsPrec :: Int -> EventResource resource -> ShowS #

show :: EventResource resource -> String #

showList :: [EventResource resource] -> ShowS #

Generic (EventResource resource) Source # 
Instance details

Defined in Polysemy.Conc.Effect.Events

Associated Types

type Rep (EventResource resource) :: Type -> Type #


from :: EventResource resource -> Rep (EventResource resource) x #

to :: Rep (EventResource resource) x -> EventResource resource #

type Rep (EventResource resource) Source # 
Instance details

Defined in Polysemy.Conc.Effect.Events

type Rep (EventResource resource) = D1 ('MetaData "EventResource" "Polysemy.Conc.Effect.Events" "polysemy-conc-" 'True) (C1 ('MetaCons "EventResource" 'PrefixI 'True) (S1 ('MetaSel ('Just "unEventToken") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 resource)))

type EventChan e = EventResource (OutChan e) Source #

Convenience alias for the default EventResource that uses an OutChan.

type ChanEvents e = Events (OutChan e) e Source #

Convenience alias for the default Events that uses an OutChan.

type EventConsumer token e = Scoped (EventResource token) (Consume e) Source #

Convenience alias for the consumer effect.

type ChanConsumer e = Scoped (EventChan e) (Consume e) Source #

Convenience alias for the consumer effect using the default implementation.


interpretEventsChan :: forall e r. Members [Resource, Race, Async, Embed IO] r => InterpretersFor [Events (OutChan e) e, ChanConsumer e] r Source #

Interpret Events and Consume together by connecting them to the two ends of an unagi channel. Consume is only interpreted in a Scoped manner, ensuring that a new duplicate of the channel is created so that all consumers see all events (from the moment they are connected).

This should be used in conjunction with subscribe:

interpretEventsChan do
  async $ subscribe do
    putStrLn =<< consume
  publish "hello"

Whenever subscribe creates a new scope, this interpreter calls dupChan and passes the duplicate to interpretConsumeChan.


data Critical :: Effect Source #

An effect that catches exceptions.

Provides the exact functionality of fromExceptionSem, but pushes the dependency on Final IO to the interpreter, and makes it optional.


Instances details
type DefiningModule Critical Source # 
Instance details

Defined in Polysemy.Conc.Effect.Critical

type DefiningModule Critical = "Polysemy.Conc.Effect.Critical"



type Mask resource = Scoped (MaskResource resource) RestoreMask Source #

The scoped masking effect.

type UninterruptipleMask resource = Scoped (UninterruptipleMaskResource resource) RestoreMask Source #

The scoped uninterruptible masking effect.

mask :: forall resource r. Member (Mask resource) r => InterpreterFor RestoreMask r Source #

Mark a region as masked. Uses the Scoped pattern.

uninterruptibleMask :: forall resource r. Member (UninterruptipleMask resource) r => InterpreterFor RestoreMask r Source #

Mark a region as uninterruptibly masked. Uses the Scoped pattern.

restore :: forall r a. Member RestoreMask r => Sem r a -> Sem r a Source #

Restore the previous masking state. Can only be called inside of an action passed to mask or uninterruptibleMask.


Scoped Effects

data Scoped (resource :: Type) (effect :: Effect) :: Effect Source #

Scoped transforms a program so that effect is associated with a resource within that program. This requires the interpreter for effect to be parameterized by resource and constructed for every program using Scoped separately.

An application for this is Events, in which each program using the effect Consume is interpreted with its own copy of the event channel; or a database transaction, in which a transaction handle is created for the wrapped program and passed to the interpreter for the database effect.

Resource creation is performed by the function passed to runScoped.

The constructors are not intended to be used directly; the smart constructor scoped is used like a local interpreter for effect.

scoped :: forall resource effect r. Member (Scoped resource effect) r => InterpreterFor effect r Source #

Constructor for Scoped, taking a nested program and transforming all instances of effect to Scoped resource effect.


runScoped :: forall resource effect r. (forall x. (resource -> Sem r x) -> Sem r x) -> (resource -> InterpreterFor effect r) -> InterpreterFor (Scoped resource effect) r Source #

Interpreter for Scoped, taking a resource allocation function and a parameterized interpreter for the plain effect.

withResource is a callback function, allowing the user to acquire the resource for each program from other effects.

scopedInterpreter is a regular interpreter that is called with the resource argument produced by scope. Note: This function will be called for each action in the program, so if the interpreter allocates any resources, they will be scoped to a single action. Move them to withResource instead.

runScopedAs :: forall resource effect r. Sem r resource -> (resource -> InterpreterFor effect r) -> InterpreterFor (Scoped resource effect) r Source #

Variant of runScoped in which the resource allocator is a plain action.

interpretScoped :: forall resource effect r. (forall x. (resource -> Sem r x) -> Sem r x) -> (forall r0 x. resource -> effect (Sem r0) x -> Sem r x) -> InterpreterFor (Scoped resource effect) r Source #

Variant of runScoped that takes a handler instead of an interpreter.

interpretScopedH :: forall resource effect r. (forall x. (resource -> Sem r x) -> Sem r x) -> (forall r0 x. resource -> effect (Sem r0) x -> Tactical effect (Sem r0) r x) -> InterpreterFor (Scoped resource effect) r Source #

Variant of runScoped that takes a higher-order handler instead of an interpreter.

interpretScopedAs :: forall resource effect r. Sem r resource -> (forall r0 x. resource -> effect (Sem r0) x -> Sem r x) -> InterpreterFor (Scoped resource effect) r Source #

Variant of interpretScoped in which the resource allocator is a plain action.


data Monitor (action :: Type) :: Effect Source #

Mark a region as being subject to intervention by a monitoring program. This can mean that a thread is repeatedly checking a condition and cancelling this region when it is unmet. A use case could be checking whether a remote service is available, or whether the system was suspended and resumed. This should be used in a Scoped context, like withMonitor.


Instances details
type DefiningModule Monitor Source # 
Instance details

Defined in Polysemy.Conc.Effect.Monitor

type DefiningModule Monitor = "Polysemy.Conc.Effect.Monitor"

monitor :: forall action r a. Member (Monitor action) r => Sem r a -> Sem r a Source #

Mark a region as being subject to intervention by a monitoring program.

withMonitor :: forall resource action r. Member (ScopedMonitor resource action) r => InterpreterFor (Monitor action) r Source #

Start a region that can contain monitor-intervention regions.

restart :: forall resource r. Member (ScopedMonitor resource Restart) r => InterpreterFor (Monitor Restart) r Source #

Variant of withMonitor that uses the Restart strategy.

data Restart Source #

Marker type for the restarting action for Monitor.


Instances details
Eq Restart Source # 
Instance details

Defined in Polysemy.Conc.Effect.Monitor


(==) :: Restart -> Restart -> Bool #

(/=) :: Restart -> Restart -> Bool #

Show Restart Source # 
Instance details

Defined in Polysemy.Conc.Effect.Monitor

type RestartingMonitor (resource :: Type) = ScopedMonitor resource Restart Source #

Monitor specialized to the Restart action.

newtype MonitorResource a Source #

Marker type for a Scoped Monitor.


MonitorResource a 

type ScopedMonitor (resource :: Type) (action :: Type) = Scoped (MonitorResource resource) (Monitor action) Source #

Convenience alias for a Scoped Monitor.


interpretMonitorRestart :: forall t d r. Members [Time t d, Resource, Async, Race, Final IO] r => MonitorCheck r -> InterpreterFor (RestartingMonitor CancelResource) r Source #

Interpret Scoped Monitor with the Restart strategy. This takes a check action that may put an MVar when the scoped region should be restarted. The check is executed in a loop, with an interval given in MonitorCheck.

monitorClockSkew :: forall t d diff r. Torsor t diff => TimeUnit diff => Members [AtomicState (Maybe t), Time t d, Embed IO] r => ClockSkewConfig -> MonitorCheck r Source #

Check for Monitor that checks every interval whether the difference between the current time and the time at the last check is larger than interval + tolerance. Can be used to detect that the operating system suspended and resumed.

clockSkewConfig :: TimeUnit u1 => TimeUnit u2 => u1 -> u2 -> ClockSkewConfig Source #

Smart constructor for ClockSkewConfig that takes arbitrary TimeUnits.

Other Combinators

type ConcStack = [UninterruptipleMask Restoration, Mask Restoration, Race, Async, Resource, Embed IO, Final IO] Source #

A default basic stack with Final for _polysemy-conc_.

runConc :: Sem ConcStack a -> IO a Source #

Interprets UninterruptipleMask, Mask and Race in terms of Final IO and runs the entire rest of the stack.

interpretAtomic :: forall a r. Member (Embed IO) r => a -> InterpreterFor (AtomicState a) r Source #

Convenience wrapper around runAtomicStateTVar that creates a new TVar.

withAsyncBlock :: Members [Resource, Async] r => Sem r b -> (Async (Maybe b) -> Sem r a) -> Sem r a Source #

Run the first action asynchronously while the second action executes, then cancel the first action. Passes the handle into the action to allow it to await its result.

When cancelling, this variant will wait indefinitely for the thread to be gone.

withAsync :: Members [Resource, Race, Async] r => Sem r b -> (Async (Maybe b) -> Sem r a) -> Sem r a Source #

Run the first action asynchronously while the second action executes, then cancel the first action. Passes the handle into the action to allow it to await its result.

When cancelling, this variant will wait for 500ms for the thread to be gone.

withAsync_ :: Members [Resource, Race, Async] r => Sem r b -> Sem r a -> Sem r a Source #

Run the first action asynchronously while the second action executes, then cancel the first action. Discards the handle, expecting the async action to either terminate or be cancelled.

When cancelling, this variant will wait for 500ms for the thread to be gone.

scheduleAsync :: forall res b r a. Members [ScopedSync res (), Async, Race] r => Sem r b -> (Async (Maybe b) -> Sem (Sync () ': r) () -> Sem (Sync () ': r) a) -> Sem r a Source #

Run an action with async, but don't start it right away, so the thread handle can be processed before the action executes.

Takes a callback function that is invoked after spawning the thread. The callback receives the Async handle and a unit action that starts the computation.

This is helpful if the Async has to be stored in state and the same state is written when the action finishes. In that case, the race condition causes the handle to be written over the finished state.

makeRequest = put Nothing

main = scheduleAsync makeRequest  handle start -> do
  put (Just handle)
  start -- now makeRequest is executed

scheduleAsyncIO :: forall b r a. Members [Resource, Async, Race, Embed IO] r => Sem r b -> (Async (Maybe b) -> Sem (Sync () ': r) () -> Sem (Sync () ': r) a) -> Sem r a Source #

Variant of scheduleAsync that directly interprets the MVar used for signalling.