effectful-2.5.1.0: An easy to use, performant extensible effects library.
Safe HaskellSafe-Inferred
LanguageHaskell2010

Effectful.Concurrent.STM

Description

Synopsis

Effect

data Concurrent :: Effect Source #

Provide the ability to run Eff computations concurrently in multiple threads and communicate between them.

Warning: unless you stick to high level functions from the withAsync family, the Concurrent effect makes it possible to escape the scope of any scoped effect operation. Consider the following:

>>> import Effectful.Reader.Static qualified as R
>>> printAsk msg = liftIO . putStrLn . (msg ++) . (": " ++) =<< R.ask
>>> :{
  runEff . R.runReader "GLOBAL" . runConcurrent $ do
    a <- R.local (const "LOCAL") $ do
      a <- async $ do
        printAsk "child (first)"
        threadDelay 20000
        printAsk "child (second)"
      threadDelay 10000
      printAsk "parent (inside)"
      pure a
    printAsk "parent (outside)"
    wait a
:}
child (first): LOCAL
parent (inside): LOCAL
parent (outside): GLOBAL
child (second): LOCAL

Note that the asynchronous computation doesn't respect the scope of local, i.e. the child thread still behaves like it's inside the local block, even though the parent thread already got out of it.

This is because the value provided by the Reader effect is thread local, i.e. each thread manages its own version of it. For the Reader it is the only reasonable behavior, it wouldn't be very useful if its "read only" value was affected by calls to local from its parent or child threads.

However, the cut isn't so clear if it comes to effects that provide access to a mutable state. That's why statically dispatched State and Writer effects come in two flavors, local and shared:

>>> import Effectful.State.Static.Local qualified as SL
>>> :{
  runEff . SL.execState "Hi" . runConcurrent $ do
    replicateConcurrently_ 3 $ SL.modify (++ "!")
:}
"Hi"
>>> import Effectful.State.Static.Shared qualified as SS
>>> :{
  runEff . SS.execState "Hi" . runConcurrent $ do
    replicateConcurrently_ 3 $ SS.modify (++ "!")
:}
"Hi!!!"

In the first example state updates made concurrently are not reflected in the parent thread because the value is thread local, but in the second example they are, because the value is shared.

Instances

Instances details
type DispatchOf Concurrent Source # 
Instance details

Defined in Effectful.Concurrent.Effect

data StaticRep Concurrent Source # 
Instance details

Defined in Effectful.Concurrent.Effect

Handlers

runConcurrent :: (HasCallStack, IOE :> es) => Eff (Concurrent : es) a -> Eff es a Source #

Run the Concurrent effect.

Core

data STM a #

A monad supporting atomic memory transactions.

Instances

Instances details
Alternative STM

Takes the first non-retrying STM action.

Since: base-4.8.0.0

Instance details

Defined in GHC.Conc.Sync

Methods

empty :: STM a #

(<|>) :: STM a -> STM a -> STM a #

some :: STM a -> STM [a] #

many :: STM a -> STM [a] #

Applicative STM

Since: base-4.8.0.0

Instance details

Defined in GHC.Conc.Sync

Methods

pure :: a -> STM a #

(<*>) :: STM (a -> b) -> STM a -> STM b #

liftA2 :: (a -> b -> c) -> STM a -> STM b -> STM c #

(*>) :: STM a -> STM b -> STM b #

(<*) :: STM a -> STM b -> STM a #

Functor STM

Since: base-4.3.0.0

Instance details

Defined in GHC.Conc.Sync

Methods

fmap :: (a -> b) -> STM a -> STM b #

(<$) :: a -> STM b -> STM a #

Monad STM

Since: base-4.3.0.0

Instance details

Defined in GHC.Conc.Sync

Methods

(>>=) :: STM a -> (a -> STM b) -> STM b #

(>>) :: STM a -> STM b -> STM b #

return :: a -> STM a #

MonadPlus STM

Takes the first non-retrying STM action.

Since: base-4.3.0.0

Instance details

Defined in GHC.Conc.Sync

Methods

mzero :: STM a #

mplus :: STM a -> STM a -> STM a #

MonadBaseControl STM STM 
Instance details

Defined in Control.Monad.Trans.Control

Associated Types

type StM STM a #

Methods

liftBaseWith :: (RunInBase STM STM -> STM a) -> STM a #

restoreM :: StM STM a -> STM a #

Monoid a => Monoid (STM a)

Since: base-4.17.0.0

Instance details

Defined in GHC.Conc.Sync

Methods

mempty :: STM a #

mappend :: STM a -> STM a -> STM a #

mconcat :: [STM a] -> STM a #

Semigroup a => Semigroup (STM a)

Since: base-4.17.0.0

Instance details

Defined in GHC.Conc.Sync

Methods

(<>) :: STM a -> STM a -> STM a #

sconcat :: NonEmpty (STM a) -> STM a #

stimes :: Integral b => b -> STM a -> STM a #

type StM STM a 
Instance details

Defined in Control.Monad.Trans.Control

type StM STM a = a

atomically :: Concurrent :> es => STM a -> Eff es a Source #

Lifted atomically.

retry :: STM a #

Retry execution of the current memory transaction because it has seen values in TVars which mean that it should not continue (e.g. the TVars represent a shared buffer that is now empty). The implementation may block the thread until one of the TVars that it has read from has been updated. (GHC only)

orElse :: STM a -> STM a -> STM a #

Compose two alternative STM actions (GHC only).

If the first action completes without retrying then it forms the result of the orElse. Otherwise, if the first action retries, then the second action is tried in its place. If both actions retry then the orElse as a whole retries.

check :: Bool -> STM () #

Check that the boolean condition is true and, if not, retry.

In other words, check b = unless b retry.

Since: stm-2.1.1

throwSTM :: Exception e => e -> STM a #

A variant of throw that can only be used within the STM monad.

Throwing an exception in STM aborts the transaction and propagates the exception. If the exception is caught via catchSTM, only the changes enclosed by the catch are rolled back; changes made outside of catchSTM persist.

If the exception is not caught inside of the STM, it is re-thrown by atomically, and the entire STM is rolled back.

Although throwSTM has a type that is an instance of the type of throw, the two functions are subtly different:

throw e    `seq` x  ===> throw e
throwSTM e `seq` x  ===> x

The first example will cause the exception e to be raised, whereas the second one won't. In fact, throwSTM will only cause an exception to be raised when it is used within the STM monad. The throwSTM variant should be used in preference to throw to raise an exception within the STM monad because it guarantees ordering with respect to other STM operations, whereas throw does not.

catchSTM :: Exception e => STM a -> (e -> STM a) -> STM a #

Exception handling within STM actions.

catchSTM m f catches any exception thrown by m using throwSTM, using the function f to handle the exception. If an exception is thrown, any changes made by m are rolled back, but changes prior to m persist.

TVar

data TVar a #

Shared memory locations that support atomic memory transactions.

Instances

Instances details
Eq (TVar a)

Since: base-4.8.0.0

Instance details

Defined in GHC.Conc.Sync

Methods

(==) :: TVar a -> TVar a -> Bool #

(/=) :: TVar a -> TVar a -> Bool #

newTVarIO :: Concurrent :> es => a -> Eff es (TVar a) Source #

Lifted newTVarIO.

readTVarIO :: Concurrent :> es => TVar a -> Eff es a Source #

Lifted readTVarIO.

newTVar :: a -> STM (TVar a) #

Create a new TVar holding a value supplied

readTVar :: TVar a -> STM a #

Return the current value stored in a TVar.

writeTVar :: TVar a -> a -> STM () #

Write the supplied value into a TVar.

modifyTVar :: TVar a -> (a -> a) -> STM () #

Mutate the contents of a TVar. N.B., this version is non-strict.

Since: stm-2.3

modifyTVar' :: TVar a -> (a -> a) -> STM () #

Strict version of modifyTVar.

Since: stm-2.3

swapTVar :: TVar a -> a -> STM a #

Swap the contents of a TVar for a new value.

Since: stm-2.3

mkWeakTVar :: (HasCallStack, Concurrent :> es) => TVar a -> Eff es () -> Eff es (Weak (TVar a)) Source #

Lifted mkWeakTVar.

Note: the finalizer will run a cloned environment, so any changes it makes to thread local data will not be visible outside of it.

TMVar

data TMVar a #

A TMVar is a synchronising variable, used for communication between concurrent threads. It can be thought of as a box, which may be empty or full.

Instances

Instances details
Eq (TMVar a) 
Instance details

Defined in Control.Concurrent.STM.TMVar

Methods

(==) :: TMVar a -> TMVar a -> Bool #

(/=) :: TMVar a -> TMVar a -> Bool #

newTMVar :: a -> STM (TMVar a) #

Create a TMVar which contains the supplied value.

newEmptyTMVar :: STM (TMVar a) #

Create a TMVar which is initially empty.

newTMVarIO :: Concurrent :> es => a -> Eff es (TMVar a) Source #

Lifted newTMVarIO.

takeTMVar :: TMVar a -> STM a #

Return the contents of the TMVar. If the TMVar is currently empty, the transaction will retry. After a takeTMVar, the TMVar is left empty.

putTMVar :: TMVar a -> a -> STM () #

Put a value into a TMVar. If the TMVar is currently full, putTMVar will retry.

readTMVar :: TMVar a -> STM a #

This is a combination of takeTMVar and putTMVar; ie. it takes the value from the TMVar, puts it back, and also returns it.

tryReadTMVar :: TMVar a -> STM (Maybe a) #

A version of readTMVar which does not retry. Instead it returns Nothing if no value is available.

Since: stm-2.3

swapTMVar :: TMVar a -> a -> STM a #

Swap the contents of a TMVar for a new value.

tryTakeTMVar :: TMVar a -> STM (Maybe a) #

A version of takeTMVar that does not retry. The tryTakeTMVar function returns Nothing if the TMVar was empty, or Just a if the TMVar was full with contents a. After tryTakeTMVar, the TMVar is left empty.

tryPutTMVar :: TMVar a -> a -> STM Bool #

A version of putTMVar that does not retry. The tryPutTMVar function attempts to put the value a into the TMVar, returning True if it was successful, or False otherwise.

isEmptyTMVar :: TMVar a -> STM Bool #

Check whether a given TMVar is empty.

mkWeakTMVar :: (HasCallStack, Concurrent :> es) => TMVar a -> Eff es () -> Eff es (Weak (TMVar a)) Source #

Lifted mkWeakTMVar.

Note: the finalizer will run a cloned environment, so any changes it makes to thread local data will not be visible outside of it.

TChan

data TChan a #

TChan is an abstract type representing an unbounded FIFO channel.

Instances

Instances details
Eq (TChan a) 
Instance details

Defined in Control.Concurrent.STM.TChan

Methods

(==) :: TChan a -> TChan a -> Bool #

(/=) :: TChan a -> TChan a -> Bool #

newTChan :: STM (TChan a) #

Build and return a new instance of TChan

newBroadcastTChan :: STM (TChan a) #

Create a write-only TChan. More precisely, readTChan will retry even after items have been written to the channel. The only way to read a broadcast channel is to duplicate it with dupTChan.

Consider a server that broadcasts messages to clients:

serve :: TChan Message -> Client -> IO loop
serve broadcastChan client = do
    myChan <- dupTChan broadcastChan
    forever $ do
        message <- readTChan myChan
        send client message

The problem with using newTChan to create the broadcast channel is that if it is only written to and never read, items will pile up in memory. By using newBroadcastTChan to create the broadcast channel, items can be garbage collected after clients have seen them.

Since: stm-2.4

dupTChan :: TChan a -> STM (TChan a) #

Duplicate a TChan: the duplicate channel begins empty, but data written to either channel from then on will be available from both. Hence this creates a kind of broadcast channel, where data written by anyone is seen by everyone else.

cloneTChan :: TChan a -> STM (TChan a) #

Clone a TChan: similar to dupTChan, but the cloned channel starts with the same content available as the original channel.

Since: stm-2.4

readTChan :: TChan a -> STM a #

Read the next value from the TChan.

tryReadTChan :: TChan a -> STM (Maybe a) #

A version of readTChan which does not retry. Instead it returns Nothing if no value is available.

Since: stm-2.3

peekTChan :: TChan a -> STM a #

Get the next value from the TChan without removing it, retrying if the channel is empty.

Since: stm-2.3

tryPeekTChan :: TChan a -> STM (Maybe a) #

A version of peekTChan which does not retry. Instead it returns Nothing if no value is available.

Since: stm-2.3

writeTChan :: TChan a -> a -> STM () #

Write a value to a TChan.

unGetTChan :: TChan a -> a -> STM () #

Put a data item back onto a channel, where it will be the next item read.

isEmptyTChan :: TChan a -> STM Bool #

Returns True if the supplied TChan is empty.

TQueue

data TQueue a #

TQueue is an abstract type representing an unbounded FIFO channel.

Since: stm-2.4

Instances

Instances details
Eq (TQueue a) 
Instance details

Defined in Control.Concurrent.STM.TQueue

Methods

(==) :: TQueue a -> TQueue a -> Bool #

(/=) :: TQueue a -> TQueue a -> Bool #

newTQueue :: STM (TQueue a) #

Build and returns a new instance of TQueue

readTQueue :: TQueue a -> STM a #

Read the next value from the TQueue.

tryReadTQueue :: TQueue a -> STM (Maybe a) #

A version of readTQueue which does not retry. Instead it returns Nothing if no value is available.

peekTQueue :: TQueue a -> STM a #

Get the next value from the TQueue without removing it, retrying if the channel is empty.

tryPeekTQueue :: TQueue a -> STM (Maybe a) #

A version of peekTQueue which does not retry. Instead it returns Nothing if no value is available.

flushTQueue :: TQueue a -> STM [a] #

Efficiently read the entire contents of a TQueue into a list. This function never retries.

Since: stm-2.4.5

writeTQueue :: TQueue a -> a -> STM () #

Write a value to a TQueue.

unGetTQueue :: TQueue a -> a -> STM () #

Put a data item back onto a channel, where it will be the next item read.

isEmptyTQueue :: TQueue a -> STM Bool #

Returns True if the supplied TQueue is empty.

TBQueue

data TBQueue a #

TBQueue is an abstract type representing a bounded FIFO channel.

Since: stm-2.4

Instances

Instances details
Eq (TBQueue a) 
Instance details

Defined in Control.Concurrent.STM.TBQueue

Methods

(==) :: TBQueue a -> TBQueue a -> Bool #

(/=) :: TBQueue a -> TBQueue a -> Bool #

newTBQueue #

Arguments

:: Natural

maximum number of elements the queue can hold

-> STM (TBQueue a) 

Builds and returns a new instance of TBQueue.

readTBQueue :: TBQueue a -> STM a #

Read the next value from the TBQueue.

tryReadTBQueue :: TBQueue a -> STM (Maybe a) #

A version of readTBQueue which does not retry. Instead it returns Nothing if no value is available.

peekTBQueue :: TBQueue a -> STM a #

Get the next value from the TBQueue without removing it, retrying if the channel is empty.

tryPeekTBQueue :: TBQueue a -> STM (Maybe a) #

A version of peekTBQueue which does not retry. Instead it returns Nothing if no value is available.

flushTBQueue :: TBQueue a -> STM [a] #

Efficiently read the entire contents of a TBQueue into a list. This function never retries.

Since: stm-2.4.5

writeTBQueue :: TBQueue a -> a -> STM () #

Write a value to a TBQueue; blocks if the queue is full.

unGetTBQueue :: TBQueue a -> a -> STM () #

Put a data item back onto a channel, where it will be the next item read. Blocks if the queue is full.

lengthTBQueue :: TBQueue a -> STM Natural #

Return the length of a TBQueue.

Since: stm-2.5.0.0

isEmptyTBQueue :: TBQueue a -> STM Bool #

Returns True if the supplied TBQueue is empty.

isFullTBQueue :: TBQueue a -> STM Bool #

Returns True if the supplied TBQueue is full.

Since: stm-2.4.3