polysemy-conc-0.12.0.0: Polysemy effects for concurrency
Safe HaskellSafe-Inferred
LanguageHaskell2010

Polysemy.Conc

Description

 
Synopsis

Introduction

This library provides an assortment of tools for concurrency-related tasks:

Queues

data Queue d :: Effect Source #

Abstracts queues like TBQueue.

For documentation on the constructors, see the module Polysemy.Conc.Data.Queue.

import Polysemy.Conc (Queue, QueueResult)
import Polysemy.Conc.Effect.Queue as Queue

prog :: Member (Queue Int) r => Sem r (QueueResult Int)
prog = do
  Queue.write 5
  Queue.write 10
  Queue.read >>= \case
    QueueResult.Success i -> fmap (i +) <$> Queue.read
    r -> pure r

data QueueResult d Source #

Encodes failure reasons for queues.

For documentation on the constructors, see the module Polysemy.Conc.Data.QueueResult.

import qualified Polysemy.Conc.Data.QueueResult as QueueResult

Instances

Instances details
Functor QueueResult Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

Methods

fmap :: (a -> b) -> QueueResult a -> QueueResult b #

(<$) :: a -> QueueResult b -> QueueResult a #

Monoid d => Monoid (QueueResult d) Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

Semigroup d => Semigroup (QueueResult d) Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

Generic (QueueResult d) Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

Associated Types

type Rep (QueueResult d) :: Type -> Type #

Methods

from :: QueueResult d -> Rep (QueueResult d) x #

to :: Rep (QueueResult d) x -> QueueResult d #

Show d => Show (QueueResult d) Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

Eq d => Eq (QueueResult d) Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

Ord d => Ord (QueueResult d) Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

type Rep (QueueResult d) Source # 
Instance details

Defined in Polysemy.Conc.Data.QueueResult

type Rep (QueueResult d) = D1 ('MetaData "QueueResult" "Polysemy.Conc.Data.QueueResult" "polysemy-conc-0.12.0.0-4eKdhUwnJjzCLqIbLhs0OO" 'False) (C1 ('MetaCons "Success" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 d)) :+: (C1 ('MetaCons "NotAvailable" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "Closed" 'PrefixI 'False) (U1 :: Type -> Type)))

Interpreters

interpretQueueTBM Source #

Arguments

:: forall d r. Members [Resource, Race, Embed IO] r 
=> Int

Buffer size

-> InterpreterFor (Queue d) r 

Interpret Queue with a TBMQueue.

interpretQueueTB Source #

Arguments

:: forall d r. Members [Race, Embed IO] r 
=> Natural

Buffer size

-> InterpreterFor (Queue d) r 

Interpret Queue with a TBQueue.

interpretQueueListReadOnlyAtomicWith :: forall d r. Member (AtomicState [d]) r => InterpreterFor (Queue d) r Source #

Reinterpret Queue as AtomicState with a list that cannot be written to. Useful for testing.

interpretQueueListReadOnlyStateWith :: forall d r. Member (State [d]) r => InterpreterFor (Queue d) r Source #

Reinterpret Queue as State with a list that cannot be written to. Useful for testing.

Combinators

loop :: Member (Queue d) r => (d -> Sem r ()) -> Sem r () Source #

Read from a Queue repeatedly until it is closed.

When an element is received, call action and recurse.

loopOr :: Member (Queue d) r => Sem r Bool -> (d -> Sem r Bool) -> Sem r () Source #

Read from a Queue repeatedly until it is closed.

When an element is received, call action and recurse if it returns True. When no element is available, evaluate na and recurse if it returns True.

MVars

An MVar is abstracted as Sync since it can be used to synchronize threads.

data Sync d :: Effect Source #

Abstracts an MVar.

For documentation on the constructors, see the module Polysemy.Conc.Effect.Sync.

import Polysemy.Conc (Sync)
import qualified Polysemy.Conc.Effect.Sync as Sync

prog :: Member (Sync Int) r => Sem r Int
prog = do
  Sync.putTry 5
  Sync.takeBlock

data SyncRead (d :: Type) :: Effect Source #

An interface to a shared variable (MVar) that can only be read.

type ScopedSync a = Scoped_ (Sync a) Source #

Convenience alias.

Interpreters

interpretSync :: forall d r. Members [Race, Embed IO] r => InterpreterFor (Sync d) r Source #

Interpret Sync with an empty MVar.

interpretSyncAs :: forall d r. Members [Race, Embed IO] r => d -> InterpreterFor (Sync d) r Source #

Interpret Sync with an MVar containing the specified value.

withSync :: forall d r. Member (ScopedSync d) r => InterpreterFor (Sync d) r Source #

Run an action with a locally scoped Sync variable.

This avoids a dependency on Embed IO in application logic while still allowing the variable to be scoped.

interpretScopedSync :: forall d r. Members [Resource, Race, Embed IO] r => InterpreterFor (Scoped_ (Sync d)) r Source #

Interpret Sync for locally scoped use with an empty MVar.

interpretScopedSyncAs :: forall d r. Members [Resource, Race, Embed IO] r => d -> InterpreterFor (Scoped_ (Sync d)) r Source #

Interpret Sync for locally scoped use with an MVar containing the specified value.

syncRead :: forall d r. Member (Sync d) r => InterpreterFor (SyncRead d) r Source #

Run SyncRead in terms of Sync.

Lock

data Lock :: Effect Source #

An exclusive lock or mutex, protecting a region from concurrent access.

lock :: forall r a. Member Lock r => Sem r a -> Sem r a Source #

Run an action if the lock is available, block otherwise.

lockOr :: forall r a. Member Lock r => Sem r a -> Sem r a -> Sem r a Source #

Run an action if the lock is available, block otherwise.

lockOrSkip :: forall r a. Member Lock r => Sem r a -> Sem r (Maybe a) Source #

Run an action if the lock is available, skip and return Nothing otherwise.

lockOrSkip_ :: forall r a. Member Lock r => Sem r a -> Sem r () Source #

Run an action if the lock is available, skip otherwise. Return ().

Interpreters

interpretLockReentrant :: Members [Resource, Race, Mask, Embed IO] r => InterpreterFor Lock r Source #

Interpret Lock as a reentrant lock, allowing nested calls to lock unless called from a different thread (as in, async was called in a higher-order action passed to lock.)

interpretLockPermissive :: InterpreterFor Lock r Source #

Interpret Lock by executing all actions unconditionally.

Semaphores

data Semaphore :: Effect Source #

This effect abstracts over the concept of a quantity semaphore, a concurrency primitive that contains a number of slots that can be acquired and released.

Interpreters

Gate

data Gate :: Effect Source #

A single-use synchronization point that blocks all consumers who called gate until signal is called.

The constructors are exported from Polysemy.Conc.Gate.

type Gates = Scoped_ Gate Source #

Convenience alias for scoped Gate.

Interpreters

interpretGate :: forall r. Member (Embed IO) r => InterpreterFor Gate r Source #

Interpret Gate with an MVar.

Racing

Racing works like this:

prog =
 Polysemy.Conc.race (httpRequest "hackage.haskell.org") (readFile "/path/to/file") >>= \case
   Left _ -> putStrLn "hackage was faster"
   Right _ -> putStrLn "file was faster"

When the first thunk finishes, the other will be killed.

data Race :: Effect Source #

Abstract the concept of running two programs concurrently, aborting the other when one terminates. Timeout is a simpler variant, where one thread just sleeps for a given interval.

race :: forall a b r. Member Race r => Sem r a -> Sem r b -> Sem r (Either a b) Source #

Run both programs concurrently, returning the result of the faster one.

race_ :: Member Race r => Sem r a -> Sem r a -> Sem r a Source #

Specialization of race for the case where both actions return the same type, obviating the need for Either.

timeout :: forall a b u r. TimeUnit u => Member Race r => Sem r a -> u -> Sem r b -> Sem r (Either a b) Source #

Run the fallback action if the given program doesn't finish within the specified interval.

timeout_ :: TimeUnit u => Member Race r => Sem r a -> u -> Sem r a -> Sem r a Source #

Specialization of timeout for the case where the main action returns the same type as the fallback, obviating the need for Either.

timeoutAs :: TimeUnit u => Member Race r => a -> u -> Sem r b -> Sem r (Either a b) Source #

Version of timeout that takes a pure fallback value.

timeoutAs_ :: TimeUnit u => Member Race r => a -> u -> Sem r a -> Sem r a Source #

Specialization of timeoutAs for the case where the main action return the same type as the fallback, obviating the need for Either.

timeoutU :: TimeUnit u => Member Race r => u -> Sem r () -> Sem r () Source #

Specialization of timeout for unit actions.

timeoutMaybe :: TimeUnit u => Member Race r => u -> Sem r a -> Sem r (Maybe a) Source #

Variant of timeout that returns Maybe.

timeoutStop :: TimeUnit u => Members [Race, Stop err] r => err -> u -> Sem r a -> Sem r a Source #

Variant of timeout that calls Stop with the supplied error when the action times out.

retrying Source #

Arguments

:: forall e w u t d r a. TimeUnit w 
=> TimeUnit u 
=> Members [Race, Time t d] r 
=> w

The timeout after which the attempt is abandoned.

-> u

The waiting interval between two tries.

-> Sem r (Either e a) 
-> Sem r (Maybe a) 

Run an action repeatedly until it returns Right or the timout has been exceeded.

retryingWithError Source #

Arguments

:: forall e w u t d r a. TimeUnit w 
=> TimeUnit u 
=> Members [Race, Time t d, Embed IO] r 
=> w

The timeout after which the attempt is abandoned.

-> u

The waiting interval between two tries.

-> Sem r (Either e a) 
-> Sem r (Maybe (Either e a)) 

Run an action repeatedly until it returns Right or the timout has been exceeded.

If the action failed at least once, the last error will be returned in case of timeout.

Interpreters

interpretRace :: Member (Final IO) r => InterpreterFor Race r Source #

Interpret Race in terms of race and timeout. Since this has to pass higher-order thunks as IO arguments, it is interpreted in terms of 'Final IO'.

Signal Handling

data Interrupt :: Effect Source #

The interrupt handler effect allows three kinds of interaction for interrupt signals:

  • Execute a callback when a signal is received
  • Block a thread until a signal is received
  • Kill a thread when a signal is received

For documentation on the constructors, see the module Polysemy.Conc.Effect.Interrupt.

import qualified Polysemy.Conc.Effect.Interrupt as Interrupt

prog = do
  Interrupt.register "task 1" (putStrLn "interrupted")
  Interrupt.killOnQuit $ forever do
   doSomeWork

Interpreters

interpretInterrupt :: Members [Critical, Race, Async, Embed IO] r => InterpreterFor Interrupt r Source #

Interpret Interrupt by installing a signal handler.

Catches repeat invocations of SIGINT.

interpretInterruptOnce :: Members [Critical, Race, Async, Embed IO] r => InterpreterFor Interrupt r Source #

Interpret Interrupt by installing a signal handler.

Catches only the first invocation of SIGINT.

interpretInterruptNull :: InterpreterFor Interrupt r Source #

Eliminate Interrupt without interpreting.

Event Channels

data Events (e :: Type) :: Effect Source #

An event publisher that can be consumed from multiple threads.

data Consume (e :: Type) :: Effect Source #

Consume events emitted by Events.

publish :: forall e r. Member (Events e) r => e -> Sem r () Source #

Publish one event.

consume :: forall e r. Member (Consume e) r => Sem r e Source #

Consume one event emitted by Events.

subscribe :: forall e r. Member (Scoped_ (Consume e)) r => InterpreterFor (Consume e) r Source #

Create a new scope for Events, causing the nested program to get its own copy of the event stream. To be used with interpretEventsChan.

subscribeGated :: forall e r. Members [EventConsumer e, Gate] r => InterpreterFor (Consume e) r Source #

Create a new scope for Events, causing the nested program to get its own copy of the event stream.

Calls signal before running the argument to ensure that subscribe has finished creating a channel, for use with asynchronous execution.

subscribeAsync :: forall e r a. Members [EventConsumer e, Scoped_ Gate, Resource, Race, Async] r => Sem (Consume e ': r) () -> Sem r a -> Sem r a Source #

Create a new scope for Events, causing the nested program to get its own copy of the event stream.

Executes in a new thread, ensuring that the main thread blocks until subscribe has finished creating a channel.

subscribeWhile :: forall e r. Member (EventConsumer e) r => (e -> Sem r Bool) -> Sem r () Source #

Pull repeatedly from the Events channel, passing the event to the supplied callback. Stop when the action returns False.

subscribeWhileGated :: forall e r. Members [EventConsumer e, Gate] r => (e -> Sem r Bool) -> Sem r () Source #

Pull repeatedly from the Events channel, passing the event to the supplied callback. Stop when the action returns False.

Signals the caller that the channel was successfully subscribed to using the Gate effect.

subscribeWhileAsync :: forall e r a. Members [EventConsumer e, Gates, Resource, Race, Async] r => (e -> Sem (Consume e ': r) Bool) -> Sem r a -> Sem r a Source #

Start a new thread that pulls repeatedly from the Events channel, passing the event to the supplied callback and stops when the action returns False.

subscribeLoop :: forall e r. Member (EventConsumer e) r => (e -> Sem r ()) -> Sem r () Source #

Pull repeatedly from the Events channel, passing the event to the supplied callback.

subscribeLoopGated :: forall e r. Members [EventConsumer e, Gate] r => (e -> Sem r ()) -> Sem r () Source #

Pull repeatedly from the Events channel, passing the event to the supplied callback.

Signals the caller that the channel was successfully subscribed to using the Gate effect.

subscribeLoopAsync :: forall e r a. Members [EventConsumer e, Gates, Resource, Race, Async] r => (e -> Sem (Consume e ': r) ()) -> Sem r a -> Sem r a Source #

Start a new thread that pulls repeatedly from the Events channel, passing the event to the supplied callback.

subscribeFind :: forall e r. Member (EventConsumer e) r => (e -> Sem (Consume e ': r) Bool) -> Sem r e Source #

Block until a value matching the predicate has been published to the Events channel.

subscribeFirstJust :: forall e a r. Member (EventConsumer e) r => (e -> Sem (Consume e ': r) (Maybe a)) -> Sem r a Source #

Return the first value published to the Events channel for which the function produces Just.

subscribeElem :: forall e r. Eq e => Member (EventConsumer e) r => e -> Sem r () Source #

Block until the specified value has been published to the Events channel.

consumeWhile :: Member (Consume e) r => (e -> Sem r Bool) -> Sem r () Source #

Pull repeatedly from Consume, passing the event to the supplied callback. Stop when the action returns False.

consumeLoop :: Member (Consume e) r => (e -> Sem r ()) -> Sem r () Source #

Pull repeatedly from Consume, passing the event to the supplied callback.

consumeFind :: forall e r. Member (Consume e) r => (e -> Sem r Bool) -> Sem r e Source #

Block until a value matching the predicate has been returned by Consume.

consumeFirstJust :: forall e a r. Member (Consume e) r => (e -> Sem r (Maybe a)) -> Sem r a Source #

Return the first value returned by Consume for which the function produces Just.

consumeElem :: forall e r. Eq e => Member (Consume e) r => e -> Sem r () Source #

Block until the specified value has been returned by Consume.

type EventConsumer e = Scoped_ (Consume e) Source #

Convenience alias for the consumer effect.

Interpreters

interpretEventsChan :: forall e r. Members [Resource, Race, Async, Embed IO] r => InterpretersFor [Events e, EventConsumer e] r Source #

Interpret Events and Consume together by connecting them to the two ends of an unagi channel. Consume is only interpreted in a Scoped manner, ensuring that a new duplicate of the channel is created so that all consumers see all events (from the moment they are connected).

This should be used in conjunction with subscribe:

interpretEventsChan do
  async $ subscribe do
    putStrLn =<< consume
  publish "hello"

Whenever subscribe creates a new scope, this interpreter calls dupChan and passes the duplicate to interpretConsumeChan.

Exceptions

data Critical :: Effect Source #

An effect that catches exceptions.

Provides the exact functionality of fromExceptionSem, but pushes the dependency on Final IO to the interpreter, and makes it optional.

Interpreters

Masking

type Mask = Scoped_ RestoreMask Source #

The scoped masking effect.

type UninterruptibleMask = Scoped_ RestoreMask Source #

The scoped uninterruptible masking effect.

mask :: Member Mask r => InterpreterFor RestoreMask r Source #

Mark a region as masked. Uses the Scoped_ pattern.

uninterruptibleMask :: Member UninterruptibleMask r => InterpreterFor RestoreMask r Source #

Mark a region as uninterruptibly masked. Uses the Scoped_ pattern.

restore :: forall r a. Member RestoreMask r => Sem r a -> Sem r a Source #

Restore the previous masking state. Can only be called inside of an action passed to mask or uninterruptibleMask.

data Restoration Source #

Resource type for the scoped Mask effect, wrapping the restore callback passed in by mask.

Interpreters

interpretMaskPure :: InterpreterFor Mask r Source #

Interpret Mask by sequencing the action without masking.

interpretUninterruptibleMaskPure :: InterpreterFor UninterruptibleMask r Source #

Interpret UninterruptibleMask by sequencing the action without masking.

Scoped Effects

data Scoped (param :: Type) (effect :: Effect) :: Effect Source #

Deprecated: Scoped has been moved to Polysemy.Scoped

Scoped transforms a program so that an interpreter for effect may perform arbitrary actions, like resource management, before and after the computation wrapped by a call to scoped is executed.

Note: This effect has been merged to Polysemy and will be released there soon.

An application for this is Polysemy.Conc.Events from https://hackage.haskell.org/package/polysemy-conc, in which each program using the effect Polysemy.Conc.Consume is interpreted with its own copy of the event channel; or a database transaction, in which a transaction handle is created for the wrapped program and passed to the interpreter for the database effect.

For a longer exposition, see https://www.tweag.io/blog/2022-01-05-polysemy-scoped/. Note that the interface has changed since the blog post was published: The resource parameter no longer exists.

Resource allocation is performed by a function passed to interpretScoped.

The constructors are not intended to be used directly; the smart constructor scoped is used like a local interpreter for effect. scoped takes an argument of type param, which will be passed through to the interpreter, to be used by the resource allocation function.

As an example, imagine an effect for writing lines to a file:

data Write :: Effect where
  Write :: Text -> Write m ()
makeSem ''Write

If we now have the following requirements:

  1. The file should be opened and closed right before and after the part of the program in which we write lines
  2. The file name should be specifiable at the point in the program where writing begins
  3. We don't want to commit to IO, lines should be stored in memory when running tests

Then we can take advantage of Scoped to write this program:

prog :: Member (Scoped FilePath Write) r => Sem r ()
prog = do
  scoped "file1.txt" do
    write "line 1"
    write "line 2"
  scoped "file2.txt" do
    write "line 1"
    write "line 2"

Here scoped creates a prompt for an interpreter to start allocating a resource for "file1.txt" and handling Write actions using that resource. When the scoped block ends, the resource should be freed.

The interpreter may look like this:

interpretWriteFile :: Members '[Resource, Embed IO] => InterpreterFor (Scoped FilePath Write) r
interpretWriteFile =
  interpretScoped allocator handler
  where
    allocator name use = bracket (openFile name WriteMode) hClose use
    handler fileHandle (Write line) = embed (Text.hPutStrLn fileHandle line)

Essentially, the bracket is executed at the point where scoped was called, wrapping the following block. When the second scoped is executed, another call to bracket is performed.

The effect of this is that the operation that uses Embed IO was moved from the call site to the interpreter, while the interpreter may be executed at the outermost layer of the app.

This makes it possible to use a pure interpreter for testing:

interpretWriteOutput :: Member (Output (FilePath, Text)) r => InterpreterFor (Scoped FilePath Write) r
interpretWriteOutput =
  interpretScoped (\ name use -> use name) \ name -> \case
    Write line -> output (name, line)

Here we simply pass the name to the interpreter in the resource allocation function.

Now imagine that we drop requirement 2 from the initial list – we still want the file to be opened and closed as late/early as possible, but the file name is globally fixed. For this case, the param type is unused, and the API provides some convenience aliases to make your code more concise:

prog :: Member (Scoped_ Write) r => Sem r ()
prog = do
  scoped_ do
    write "line 1"
    write "line 2"
  scoped_ do
    write "line 1"
    write "line 2"

The type Scoped_ and the constructor scoped_ simply fix param to ().

type Scoped_ effect = Scoped () effect Source #

A convenience alias for a scope without parameters.

scoped :: forall param effect r. Member (Scoped param effect) r => param -> InterpreterFor effect r Source #

Constructor for Scoped, taking a nested program and transforming all instances of effect to Scoped param effect.

Please consult the documentation of Scoped for details and examples.

scoped_ :: forall effect r. Member (Scoped_ effect) r => InterpreterFor effect r Source #

Constructor for Scoped_, taking a nested program and transforming all instances of effect to Scoped_ effect.

Please consult the documentation of Scoped for details and examples.

rescope :: forall param0 param1 effect r. Member (Scoped param1 effect) r => (param0 -> param1) -> InterpreterFor (Scoped param0 effect) r Source #

Transform the parameters of a Scoped program.

This allows incremental additions to the data passed to the interpreter, for example to create an API that permits different ways of running an effect with some fundamental parameters being supplied at scope creation and some optional or specific parameters being selected by the user downstream.

Interpreters

interpretScoped :: forall resource param effect r. (forall x. param -> (resource -> Sem r x) -> Sem r x) -> (forall m x. resource -> effect m x -> Sem r x) -> InterpreterFor (Scoped param effect) r Source #

First-order variant of interpretScopedH.

interpretScopedH Source #

Arguments

:: forall resource param effect r. (forall x. param -> (resource -> Sem r x) -> Sem r x)

A callback function that allows the user to acquire a resource for each computation wrapped by scoped using other effects, with an additional argument that contains the call site parameter passed to scoped.

-> (forall r0 x. resource -> effect (Sem r0) x -> Tactical effect (Sem r0) r x)

A handler like the one expected by interpretH with an additional parameter that contains the resource allocated by the first argument.

-> InterpreterFor (Scoped param effect) r 

Construct an interpreter for a higher-order effect wrapped in a Scoped, given a resource allocation function and a parameterized handler for the plain effect.

This combinator is analogous to interpretH in that it allows the handler to use the Tactical environment and transforms the effect into other effects on the stack.

interpretScopedH' :: forall resource param effect r. (forall e r0 x. param -> (resource -> Tactical e (Sem r0) r x) -> Tactical e (Sem r0) r x) -> (forall r0 x. resource -> effect (Sem r0) x -> Tactical (Scoped param effect) (Sem r0) r x) -> InterpreterFor (Scoped param effect) r Source #

Variant of interpretScopedH that allows the resource acquisition function to use Tactical.

interpretScopedAs :: forall resource param effect r. (param -> Sem r resource) -> (forall m x. resource -> effect m x -> Sem r x) -> InterpreterFor (Scoped param effect) r Source #

Variant of interpretScoped in which the resource allocator is a plain action.

interpretScopedWith :: forall extra param resource effect r r1. r1 ~ (extra ++ r) => KnownList extra => (forall x. param -> (resource -> Sem r1 x) -> Sem r x) -> (forall m x. resource -> effect m x -> Sem r1 x) -> InterpreterFor (Scoped param effect) r Source #

First-order variant of interpretScopedWithH.

Note: It is necessary to specify the list of local interpreters with a type application; GHC won't be able to figure them out from the type of withResource:

data SomeAction :: Effect where
  SomeAction :: SomeAction m ()

foo :: InterpreterFor (Scoped () SomeAction) r
foo =
  interpretScopedWith @[Reader Int, State Bool] localEffects \ () -> \case
    SomeAction -> put . (> 0) =<< ask @Int
  where
    localEffects () use = evalState False (runReader 5 (use ()))

interpretScopedWithH :: forall extra resource param effect r r1. KnownList extra => r1 ~ (extra ++ r) => (forall x. param -> (resource -> Sem r1 x) -> Sem r x) -> (forall r0 x. resource -> effect (Sem r0) x -> Tactical effect (Sem r0) r1 x) -> InterpreterFor (Scoped param effect) r Source #

Higher-order interpreter for Scoped that allows the handler to use additional effects that are interpreted by the resource allocator.

Note: It is necessary to specify the list of local interpreters with a type application; GHC won't be able to figure them out from the type of withResource.

As an example for a higher order effect, consider a mutexed concurrent state effect, where an effectful function may lock write access to the state while making it still possible to read it:

data MState s :: Effect where
  MState :: (s -> m (s, a)) -> MState s m a
  MRead :: MState s m s

makeSem ''MState

We can now use an AtomicState to store the current value and lock write access with an MVar. Since the state callback is effectful, we need a higher order interpreter:

withResource ::
  Member (Embed IO) r =>
  s ->
  (MVar () -> Sem (AtomicState s : r) a) ->
  Sem r a
withResource initial use = do
  tv <- embed (newTVarIO initial)
  lock <- embed (newMVar ())
  runAtomicStateTVar tv $ use lock

interpretMState ::
  ∀ s r .
  Members [Resource, Embed IO] r =>
  InterpreterFor (Scoped s (MState s)) r
interpretMState =
  interpretScopedWithH @'[AtomicState s] withResource \ lock -> \case
    MState f ->
      bracket_ (embed (takeMVar lock)) (embed (tryPutMVar lock ())) do
        s0 <- atomicGet
        res <- runTSimple (f s0)
        Inspector ins <- getInspectorT
        for_ (ins res) \ (s, _) -> atomicPut s
        pure (snd <$> res)
    MRead ->
      liftT atomicGet

interpretScopedWith_ :: forall extra param effect r r1. r1 ~ (extra ++ r) => KnownList extra => (forall x. param -> Sem r1 x -> Sem r x) -> (forall m x. effect m x -> Sem r1 x) -> InterpreterFor (Scoped param effect) r Source #

Variant of interpretScopedWith in which no resource is used and the resource allocator is a plain interpreter. This is useful for scopes that only need local effects, but no resources in the handler.

See the Note on interpretScopedWithH.

runScoped :: forall resource param effect r. (forall x. param -> (resource -> Sem r x) -> Sem r x) -> (resource -> InterpreterFor effect r) -> InterpreterFor (Scoped param effect) r Source #

Variant of interpretScoped that uses another interpreter instead of a handler.

This is mostly useful if you want to reuse an interpreter that you cannot easily rewrite (like from another library). If you have full control over the implementation, interpretScoped should be preferred.

Note: In previous versions of Polysemy, the wrapped interpreter was executed fully, including the initializing code surrounding its handler, for each action in the program. However, new and continuing discoveries regarding Scoped has allowed the improvement of having the interpreter be used only once per use of scoped, and have it cover the same scope of actions that withResource does.

This renders withResource practically redundant; for the moment, the API surrounding Scoped remains the same, but work is in progress to revamp the entire API of Scoped.

runScopedAs :: forall resource param effect r. (param -> Sem r resource) -> (resource -> InterpreterFor effect r) -> InterpreterFor (Scoped param effect) r Source #

Variant of runScoped in which the resource allocator returns the resource rather tnen calling a continuation.

interpretScopedResumable :: forall param resource effect err r. (forall x. param -> (resource -> Sem (Stop err ': r) x) -> Sem (Stop err ': r) x) -> (forall r0 x. resource -> effect (Sem r0) x -> Sem (Stop err ': r) x) -> InterpreterFor (Scoped param effect !! err) r Source #

Combined interpreter for Scoped and Resumable. This allows Stop to be sent from within the resource allocator so that the consumer receives it, terminating the entire scope.

interpretScopedResumableH :: forall param resource effect err r. (forall x. param -> (resource -> Sem (Stop err ': r) x) -> Sem (Stop err ': r) x) -> (forall r0 x. resource -> effect (Sem r0) x -> Tactical effect (Sem r0) (Stop err ': r) x) -> InterpreterFor (Scoped param effect !! err) r Source #

Combined higher-order interpreter for Scoped and Resumable. This allows Stop to be sent from within the resource allocator so that the consumer receives it, terminating the entire scope.

interpretScopedResumable_ :: forall param resource effect err r. (param -> Sem (Stop err ': r) resource) -> (forall r0 x. resource -> effect (Sem r0) x -> Sem (Stop err ': r) x) -> InterpreterFor (Scoped param effect !! err) r Source #

Combined interpreter for Scoped and Resumable. This allows Stop to be sent from within the resource allocator so that the consumer receives it, terminating the entire scope. In this variant, the resource allocator is a plain action.

interpretScopedResumableWith :: forall extra param resource effect err r r1. r1 ~ ((extra ++ '[Stop err]) ++ r) => KnownList (extra ++ '[Stop err]) => (forall x. param -> (resource -> Sem r1 x) -> Sem (Stop err ': r) x) -> (forall r0 x. resource -> effect (Sem r0) x -> Sem r1 x) -> InterpreterFor (Scoped param effect !! err) r Source #

Combined interpreter for Scoped and Resumable that allows the handler to use additional effects that are interpreted by the resource allocator. This allows Stop to be sent from within the resource allocator so that the consumer receives it, terminating the entire scope.

interpretScopedResumableWithH :: forall extra param resource effect err r r1 extraerr. extraerr ~ (extra ++ '[Stop err]) => r1 ~ (extraerr ++ r) => KnownList (extra ++ '[Stop err]) => (forall x. param -> (resource -> Sem r1 x) -> Sem (Stop err ': r) x) -> (forall r0 x. resource -> effect (Sem r0) x -> Tactical effect (Sem r0) r1 x) -> InterpreterFor (Scoped param effect !! err) r Source #

Combined higher-order interpreter for Scoped and Resumable that allows the handler to use additional effects that are interpreted by the resource allocator. This allows Stop to be sent from within the resource allocator so that the consumer receives it, terminating the entire scope.

interpretScopedResumableWith_ :: forall extra param effect err r r1. r1 ~ ((extra ++ '[Stop err]) ++ r) => KnownList (extra ++ '[Stop err]) => (forall x. param -> Sem r1 x -> Sem (Stop err ': r) x) -> (forall r0 x. effect (Sem r0) x -> Sem r1 x) -> InterpreterFor (Scoped param effect !! err) r Source #

Combined interpreter for Scoped and Resumable that allows the handler to use additional effects that are interpreted by the resource allocator. This allows Stop to be sent from within the resource allocator so that the consumer receives it, terminating the entire scope. In this variant, no resource is used and the allocator is a plain interpreter.

interpretResumableScoped :: forall param resource effect err r. (forall x. param -> (resource -> Sem r x) -> Sem r x) -> (forall r0 x. resource -> effect (Sem r0) x -> Sem (Stop err ': r) x) -> InterpreterFor (Scoped param (effect !! err)) r Source #

Combined interpreter for Resumable and Scoped. In this variant, only the handler may send Stop, but this allows resumption to happen on each action inside of the scope.

interpretResumableScopedH :: forall param resource effect err r. (forall x. param -> (resource -> Sem r x) -> Sem r x) -> (forall r0 x. resource -> effect (Sem r0) x -> Tactical (effect !! err) (Sem r0) (Stop err ': r) x) -> InterpreterFor (Scoped param (effect !! err)) r Source #

Combined higher-order interpreter for Resumable and Scoped. In this variant, only the handler may send Stop, but this allows resumption to happen on each action inside of the scope.

interpretResumableScoped_ :: forall param resource effect err r. (param -> Sem r resource) -> (forall r0 x. resource -> effect (Sem r0) x -> Sem (Stop err ': r) x) -> InterpreterFor (Scoped param (effect !! err)) r Source #

Combined interpreter for Resumable and Scoped. In this variant: - Only the handler may send Stop, but this allows resumption to happen on each action inside of the scope. - The resource allocator is a plain action.

interpretResumableScopedWith :: forall extra param resource effect err r r1. r1 ~ (extra ++ r) => KnownList extra => (forall x. param -> (resource -> Sem r1 x) -> Sem r x) -> (forall r0 x. resource -> effect (Sem r0) x -> Sem (Stop err ': r1) x) -> InterpreterFor (Scoped param (effect !! err)) r Source #

Combined interpreter for Resumable and Scoped that allows the handler to use additional effects that are interpreted by the resource allocator. In this variant, only the handler may send Stop, but this allows resumption to happen on each action inside of the scope.

interpretResumableScopedWithH :: forall extra param resource effect err r r1. r1 ~ (extra ++ r) => KnownList extra => (forall x. param -> (resource -> Sem r1 x) -> Sem r x) -> (forall r0 x. resource -> effect (Sem r0) x -> Tactical (effect !! err) (Sem r0) (Stop err ': r1) x) -> InterpreterFor (Scoped param (effect !! err)) r Source #

Combined higher-order interpreter for Resumable and Scoped that allows the handler to use additional effects that are interpreted by the resource allocator. In this variant, only the handler may send Stop, but this allows resumption to happen on each action inside of the scope.

interpretResumableScopedWith_ :: forall extra param effect err r r1. r1 ~ (extra ++ r) => KnownList extra => (forall x. param -> Sem r1 x -> Sem r x) -> (forall r0 x. effect (Sem r0) x -> Sem (Stop err ': r1) x) -> InterpreterFor (Scoped param (effect !! err)) r Source #

Combined interpreter for Resumable and Scoped that allows the handler to use additional effects that are interpreted by the resource allocator. In this variant: - Only the handler may send Stop, but this allows resumption to happen on each action inside of the scope. - No resource is used and the allocator is a plain interpreter.

interpretScopedR :: forall param resource effect eo ei r. (forall x. param -> (resource -> Sem (Stop eo ': r) x) -> Sem (Stop eo ': r) x) -> (forall r0 x. resource -> effect (Sem r0) x -> Sem (Stop ei ': (Stop eo ': r)) x) -> InterpreterFor (Scoped param (effect !! ei) !! eo) r Source #

Combined interpreter for Scoped and Resumable. In this variant, both the handler and the scope may send different errors via Stop, encoding the concept that the resource allocation may fail to prevent the scope from being executed, and each individual scoped action may fail, continuing the scope execution on resumption.

interpretScopedRH :: forall param resource effect eo ei r. (forall x. param -> (resource -> Sem (Stop eo ': r) x) -> Sem (Stop eo ': r) x) -> (forall r0 x. resource -> effect (Sem r0) x -> Tactical (effect !! ei) (Sem r0) (Stop ei ': (Stop eo ': r)) x) -> InterpreterFor (Scoped param (effect !! ei) !! eo) r Source #

Combined higher-order interpreter for Resumable and Scoped. In this variant, both the handler and the scope may send different errors via Stop, encoding the concept that the resource allocation may fail to prevent the scope from being executed, and each individual scoped action may fail, continuing the scope execution on resumption.

interpretScopedR_ :: forall param resource effect eo ei r. (param -> Sem (Stop eo ': r) resource) -> (forall r0 x. resource -> effect (Sem r0) x -> Sem (Stop ei ': (Stop eo ': r)) x) -> InterpreterFor (Scoped param (effect !! ei) !! eo) r Source #

Combined interpreter for Scoped and Resumable. In this variant: - Both the handler and the scope may send different errors via Stop, encoding the concept that the resource allocation may fail to prevent the scope from being executed, and each individual scoped action may fail, continuing the scope execution on resumption. - The resource allocator is a plain action.

interpretScopedRWith :: forall extra param resource effect eo ei r r1. r1 ~ (extra ++ (Stop eo ': r)) => r1 ~ ((extra ++ '[Stop eo]) ++ r) => KnownList (extra ++ '[Stop eo]) => (forall x. param -> (resource -> Sem r1 x) -> Sem (Stop eo ': r) x) -> (forall r0 x. resource -> effect (Sem r0) x -> Sem (Stop ei ': r1) x) -> InterpreterFor (Scoped param (effect !! ei) !! eo) r Source #

Combined interpreter for Scoped and Resumable that allows the handler to use additional effects that are interpreted by the resource allocator. In this variant, both the handler and the scope may send different errors via Stop, encoding the concept that the resource allocation may fail to prevent the scope from being executed, and each individual scoped action may fail, continuing the scope execution on resumption.

interpretScopedRWithH :: forall extra param resource effect eo ei r r1 extraerr. extraerr ~ (extra ++ '[Stop eo]) => r1 ~ (extra ++ (Stop eo ': r)) => r1 ~ ((extra ++ '[Stop eo]) ++ r) => KnownList (extra ++ '[Stop eo]) => (forall x. param -> (resource -> Sem (extra ++ (Stop eo ': r)) x) -> Sem (Stop eo ': r) x) -> (forall r0 x. resource -> effect (Sem r0) x -> Tactical (effect !! ei) (Sem r0) (Stop ei ': r1) x) -> InterpreterFor (Scoped param (effect !! ei) !! eo) r Source #

Combined higher-order interpreter for Scoped and Resumable that allows the handler to use additional effects that are interpreted by the resource allocator. In this variant, both the handler and the scope may send different errors via Stop, encoding the concept that the resource allocation may fail to prevent the scope from being executed, and each individual scoped action may fail, continuing the scope execution on resumption.

interpretScopedRWith_ :: forall extra param effect eo ei r r1. r1 ~ (extra ++ (Stop eo ': r)) => r1 ~ ((extra ++ '[Stop eo]) ++ r) => KnownList (extra ++ '[Stop eo]) => (forall x. param -> Sem r1 x -> Sem (Stop eo ': r) x) -> (forall r0 x. effect (Sem r0) x -> Sem (Stop ei ': r1) x) -> InterpreterFor (Scoped param (effect !! ei) !! eo) r Source #

Combined interpreter for Scoped and Resumable that allows the handler to use additional effects that are interpreted by the resource allocator. - Both the handler and the scope may send different errors via Stop, encoding the concept that the resource allocation may fail to prevent the scope from being executed, and each individual scoped action may fail, continuing the scope execution on resumption. - The resource allocator is a plain action.

Monitoring

data Monitor (action :: Type) :: Effect Source #

Mark a region as being subject to intervention by a monitoring program. This can mean that a thread is repeatedly checking a condition and cancelling this region when it is unmet. A use case could be checking whether a remote service is available, or whether the system was suspended and resumed. This should be used in a Scoped_ context, like withMonitor.

monitor :: forall action r a. Member (Monitor action) r => Sem r a -> Sem r a Source #

Mark a region as being subject to intervention by a monitoring program.

withMonitor :: forall action r. Member (ScopedMonitor action) r => InterpreterFor (Monitor action) r Source #

Start a region that can contain monitor-intervention regions.

restart :: Member (ScopedMonitor Restart) r => InterpreterFor (Monitor Restart) r Source #

Variant of withMonitor that uses the Restart strategy.

data Restart Source #

Marker type for the restarting action for Monitor.

Instances

Instances details
Show Restart Source # 
Instance details

Defined in Polysemy.Conc.Effect.Monitor

Eq Restart Source # 
Instance details

Defined in Polysemy.Conc.Effect.Monitor

Methods

(==) :: Restart -> Restart -> Bool #

(/=) :: Restart -> Restart -> Bool #

type RestartingMonitor = ScopedMonitor Restart Source #

Monitor specialized to the Restart action.

type ScopedMonitor (action :: Type) = Scoped_ (Monitor action) Source #

Convenience alias for a Scoped_ Monitor.

Interpreters

interpretMonitorRestart :: forall t d r. Members [Time t d, Resource, Async, Race, Final IO] r => MonitorCheck r -> InterpreterFor RestartingMonitor r Source #

Interpret Scoped Monitor with the Restart strategy. This takes a check action that may put an MVar when the scoped region should be restarted. The check is executed in a loop, with an interval given in MonitorCheck.

monitorClockSkew :: forall t d diff r. Torsor t diff => TimeUnit diff => Members [AtomicState (Maybe t), Time t d, Embed IO] r => ClockSkewConfig -> MonitorCheck r Source #

Check for Monitor that checks every interval whether the difference between the current time and the time at the last check is larger than interval + tolerance. Can be used to detect that the operating system suspended and resumed.

clockSkewConfig :: TimeUnit u1 => TimeUnit u2 => u1 -> u2 -> ClockSkewConfig Source #

Smart constructor for ClockSkewConfig that takes arbitrary TimeUnits.

Other Combinators

type ConcStack = [UninterruptibleMask, Mask, Gates, Race, Async, Resource, Embed IO, Final IO] Source #

A default basic stack with Final for _polysemy-conc_.

runConc :: Sem ConcStack a -> IO a Source #

Interprets UninterruptibleMask, Mask and Race in terms of Final IO and runs the entire rest of the stack.

interpretAtomic :: forall a r. Member (Embed IO) r => a -> InterpreterFor (AtomicState a) r Source #

Convenience wrapper around runAtomicStateTVar that creates a new TVar.

withAsyncBlock :: Members [Resource, Async] r => Sem r b -> (Async (Maybe b) -> Sem r a) -> Sem r a Source #

Run the first action asynchronously while the second action executes, then cancel the first action. Passes the handle into the action to allow it to await its result.

When cancelling, this variant will wait indefinitely for the thread to be gone.

withAsync :: Members [Resource, Race, Async] r => Sem r b -> (Async (Maybe b) -> Sem r a) -> Sem r a Source #

Run the first action asynchronously while the second action executes, then cancel the first action. Passes the handle into the sync action to allow it to await the async action's result.

When cancelling, this variant will wait for 500ms for the thread to be gone.

withAsync_ :: Members [Resource, Race, Async] r => Sem r b -> Sem r a -> Sem r a Source #

Run the first action asynchronously while the second action executes, then cancel the first action. Discards the handle, expecting the async action to either terminate or be cancelled.

When cancelling, this variant will wait for 500ms for the thread to be gone.

scheduleAsync :: forall b r a. Members [ScopedSync (), Async, Race] r => Sem r b -> (Async (Maybe b) -> Sem (Sync () ': r) () -> Sem (Sync () ': r) a) -> Sem r a Source #

Run an action with async, but don't start it right away, so the thread handle can be processed before the action executes.

Takes a callback function that is invoked after spawning the thread. The callback receives the Async handle and a unit action that starts the computation.

This is helpful if the Async has to be stored in state and the same state is written when the action finishes. In that case, the race condition causes the handle to be written over the finished state.

makeRequest = put Nothing

main = scheduleAsync makeRequest  handle start -> do
  put (Just handle)
  start -- now makeRequest is executed

scheduleAsyncIO :: forall b r a. Members [Resource, Async, Race, Embed IO] r => Sem r b -> (Async (Maybe b) -> Sem (Sync () ': r) () -> Sem (Sync () ': r) a) -> Sem r a Source #

Variant of scheduleAsync that directly interprets the MVar used for signalling.

withAsyncGated :: forall b r a. Members [Scoped_ Gate, Resource, Race, Async] r => Sem (Gate ': r) b -> (Async (Maybe b) -> Sem r a) -> Sem r a Source #

Run the first action asynchronously while the second action executes, then cancel the first action.

The second action will only start when the first action calls signal.

Passes the handle into the sync action to allow it to await the async action's result.

This can be used to ensure that the async action has acquired its resources before the main action starts.

withAsyncGated_ :: forall b r a. Members [Scoped_ Gate, Resource, Race, Async] r => Sem (Gate ': r) b -> Sem r a -> Sem r a Source #

Run the first action asynchronously while the second action executes, then cancel the first action.

The second action will only start when the first action calls signal.

This can be used to ensure that the async action has acquired its resources before the main action starts.