Safe Haskell | Safe-Inferred |
---|---|
Language | Haskell2010 |
Lifted Control.Concurrent.STM.
Synopsis
- data Concurrent :: Effect
- runConcurrent :: (HasCallStack, IOE :> es) => Eff (Concurrent : es) a -> Eff es a
- data STM a
- atomically :: Concurrent :> es => STM a -> Eff es a
- retry :: STM a
- orElse :: STM a -> STM a -> STM a
- check :: Bool -> STM ()
- throwSTM :: Exception e => e -> STM a
- catchSTM :: Exception e => STM a -> (e -> STM a) -> STM a
- data TVar a
- newTVarIO :: Concurrent :> es => a -> Eff es (TVar a)
- readTVarIO :: Concurrent :> es => TVar a -> Eff es a
- newTVar :: a -> STM (TVar a)
- readTVar :: TVar a -> STM a
- writeTVar :: TVar a -> a -> STM ()
- modifyTVar :: TVar a -> (a -> a) -> STM ()
- modifyTVar' :: TVar a -> (a -> a) -> STM ()
- swapTVar :: TVar a -> a -> STM a
- registerDelay :: Concurrent :> es => Int -> Eff es (TVar Bool)
- mkWeakTVar :: (HasCallStack, Concurrent :> es) => TVar a -> Eff es () -> Eff es (Weak (TVar a))
- data TMVar a
- newTMVar :: a -> STM (TMVar a)
- newEmptyTMVar :: STM (TMVar a)
- newTMVarIO :: Concurrent :> es => a -> Eff es (TMVar a)
- newEmptyTMVarIO :: Concurrent :> es => Eff es (TMVar a)
- takeTMVar :: TMVar a -> STM a
- putTMVar :: TMVar a -> a -> STM ()
- readTMVar :: TMVar a -> STM a
- tryReadTMVar :: TMVar a -> STM (Maybe a)
- swapTMVar :: TMVar a -> a -> STM a
- tryTakeTMVar :: TMVar a -> STM (Maybe a)
- tryPutTMVar :: TMVar a -> a -> STM Bool
- isEmptyTMVar :: TMVar a -> STM Bool
- mkWeakTMVar :: (HasCallStack, Concurrent :> es) => TMVar a -> Eff es () -> Eff es (Weak (TMVar a))
- data TChan a
- newTChan :: STM (TChan a)
- newTChanIO :: Concurrent :> es => Eff es (TChan a)
- newBroadcastTChan :: STM (TChan a)
- newBroadcastTChanIO :: Concurrent :> es => Eff es (TChan a)
- dupTChan :: TChan a -> STM (TChan a)
- cloneTChan :: TChan a -> STM (TChan a)
- readTChan :: TChan a -> STM a
- tryReadTChan :: TChan a -> STM (Maybe a)
- peekTChan :: TChan a -> STM a
- tryPeekTChan :: TChan a -> STM (Maybe a)
- writeTChan :: TChan a -> a -> STM ()
- unGetTChan :: TChan a -> a -> STM ()
- isEmptyTChan :: TChan a -> STM Bool
- data TQueue a
- newTQueue :: STM (TQueue a)
- newTQueueIO :: Concurrent :> es => Eff es (TQueue a)
- readTQueue :: TQueue a -> STM a
- tryReadTQueue :: TQueue a -> STM (Maybe a)
- peekTQueue :: TQueue a -> STM a
- tryPeekTQueue :: TQueue a -> STM (Maybe a)
- flushTQueue :: TQueue a -> STM [a]
- writeTQueue :: TQueue a -> a -> STM ()
- unGetTQueue :: TQueue a -> a -> STM ()
- isEmptyTQueue :: TQueue a -> STM Bool
- data TBQueue a
- newTBQueue :: Natural -> STM (TBQueue a)
- newTBQueueIO :: Concurrent :> es => Natural -> Eff es (TBQueue a)
- readTBQueue :: TBQueue a -> STM a
- tryReadTBQueue :: TBQueue a -> STM (Maybe a)
- peekTBQueue :: TBQueue a -> STM a
- tryPeekTBQueue :: TBQueue a -> STM (Maybe a)
- flushTBQueue :: TBQueue a -> STM [a]
- writeTBQueue :: TBQueue a -> a -> STM ()
- unGetTBQueue :: TBQueue a -> a -> STM ()
- lengthTBQueue :: TBQueue a -> STM Natural
- isEmptyTBQueue :: TBQueue a -> STM Bool
- isFullTBQueue :: TBQueue a -> STM Bool
Effect
data Concurrent :: Effect Source #
Provide the ability to run Eff
computations concurrently in multiple
threads and communicate between them.
Warning: unless you stick to high level functions from the
withAsync
family, the Concurrent
effect makes
it possible to escape the scope of any scoped effect operation. Consider the
following:
>>>
import Effectful.Reader.Static qualified as R
>>>
printAsk msg = liftIO . putStrLn . (msg ++) . (": " ++) =<< R.ask
>>>
:{
runEff . R.runReader "GLOBAL" . runConcurrent $ do a <- R.local (const "LOCAL") $ do a <- async $ do printAsk "child (first)" threadDelay 20000 printAsk "child (second)" threadDelay 10000 printAsk "parent (inside)" pure a printAsk "parent (outside)" wait a :} child (first): LOCAL parent (inside): LOCAL parent (outside): GLOBAL child (second): LOCAL
Note that the asynchronous computation doesn't respect the scope of
local
, i.e. the child thread still behaves like
it's inside the local
block, even though the parent
thread already got out of it.
This is because the value provided by the Reader
effect is thread local, i.e. each thread manages its own version of it. For
the Reader
it is the only reasonable behavior, it
wouldn't be very useful if its "read only" value was affected by calls to
local
from its parent or child threads.
However, the cut isn't so clear if it comes to effects that provide access to
a mutable state. That's why statically dispatched State
and Writer
effects come in two flavors, local and shared:
>>>
import Effectful.State.Static.Local qualified as SL
>>>
:{
runEff . SL.execState "Hi" . runConcurrent $ do replicateConcurrently_ 3 $ SL.modify (++ "!") :} "Hi"
>>>
import Effectful.State.Static.Shared qualified as SS
>>>
:{
runEff . SS.execState "Hi" . runConcurrent $ do replicateConcurrently_ 3 $ SS.modify (++ "!") :} "Hi!!!"
In the first example state updates made concurrently are not reflected in the parent thread because the value is thread local, but in the second example they are, because the value is shared.
Instances
type DispatchOf Concurrent Source # | |
Defined in Effectful.Concurrent.Effect | |
data StaticRep Concurrent Source # | |
Defined in Effectful.Concurrent.Effect |
Handlers
runConcurrent :: (HasCallStack, IOE :> es) => Eff (Concurrent : es) a -> Eff es a Source #
Run the Concurrent
effect.
Core
A monad supporting atomic memory transactions.
Instances
Alternative STM | Takes the first non- Since: base-4.8.0.0 |
Applicative STM | Since: base-4.8.0.0 |
Functor STM | Since: base-4.3.0.0 |
Monad STM | Since: base-4.3.0.0 |
MonadPlus STM | Takes the first non- Since: base-4.3.0.0 |
MonadBaseControl STM STM | |
Monoid a => Monoid (STM a) | Since: base-4.17.0.0 |
Semigroup a => Semigroup (STM a) | Since: base-4.17.0.0 |
type StM STM a | |
Defined in Control.Monad.Trans.Control |
atomically :: Concurrent :> es => STM a -> Eff es a Source #
Lifted atomically
.
Retry execution of the current memory transaction because it has seen
values in TVar
s which mean that it should not continue (e.g. the TVar
s
represent a shared buffer that is now empty). The implementation may
block the thread until one of the TVar
s that it has read from has been
updated. (GHC only)
Check that the boolean condition is true and, if not, retry
.
In other words, check b = unless b retry
.
Since: stm-2.1.1
throwSTM :: Exception e => e -> STM a #
A variant of throw
that can only be used within the STM
monad.
Throwing an exception in STM
aborts the transaction and propagates the
exception. If the exception is caught via catchSTM
, only the changes
enclosed by the catch are rolled back; changes made outside of catchSTM
persist.
If the exception is not caught inside of the STM
, it is re-thrown by
atomically
, and the entire STM
is rolled back.
Although throwSTM
has a type that is an instance of the type of throw
, the
two functions are subtly different:
throw e `seq` x ===> throw e throwSTM e `seq` x ===> x
The first example will cause the exception e
to be raised,
whereas the second one won't. In fact, throwSTM
will only cause
an exception to be raised when it is used within the STM
monad.
The throwSTM
variant should be used in preference to throw
to
raise an exception within the STM
monad because it guarantees
ordering with respect to other STM
operations, whereas throw
does not.
TVar
Shared memory locations that support atomic memory transactions.
readTVarIO :: Concurrent :> es => TVar a -> Eff es a Source #
Lifted readTVarIO
.
modifyTVar :: TVar a -> (a -> a) -> STM () #
Mutate the contents of a TVar
. N.B., this version is
non-strict.
Since: stm-2.3
modifyTVar' :: TVar a -> (a -> a) -> STM () #
Strict version of modifyTVar
.
Since: stm-2.3
registerDelay :: Concurrent :> es => Int -> Eff es (TVar Bool) Source #
Lifted registerDelay
.
mkWeakTVar :: (HasCallStack, Concurrent :> es) => TVar a -> Eff es () -> Eff es (Weak (TVar a)) Source #
Lifted mkWeakTVar
.
Note: the finalizer will run a cloned environment, so any changes it makes to thread local data will not be visible outside of it.
TMVar
A TMVar
is a synchronising variable, used
for communication between concurrent threads. It can be thought of
as a box, which may be empty or full.
newEmptyTMVar :: STM (TMVar a) #
Create a TMVar
which is initially empty.
newTMVarIO :: Concurrent :> es => a -> Eff es (TMVar a) Source #
Lifted newTMVarIO
.
newEmptyTMVarIO :: Concurrent :> es => Eff es (TMVar a) Source #
Lifted newEmptyTMVarIO
.
tryReadTMVar :: TMVar a -> STM (Maybe a) #
A version of readTMVar
which does not retry. Instead it
returns Nothing
if no value is available.
Since: stm-2.3
tryTakeTMVar :: TMVar a -> STM (Maybe a) #
A version of takeTMVar
that does not retry
. The tryTakeTMVar
function returns Nothing
if the TMVar
was empty, or
if
the Just
aTMVar
was full with contents a
. After tryTakeTMVar
, the
TMVar
is left empty.
tryPutTMVar :: TMVar a -> a -> STM Bool #
mkWeakTMVar :: (HasCallStack, Concurrent :> es) => TMVar a -> Eff es () -> Eff es (Weak (TMVar a)) Source #
Lifted mkWeakTMVar
.
Note: the finalizer will run a cloned environment, so any changes it makes to thread local data will not be visible outside of it.
TChan
TChan
is an abstract type representing an unbounded FIFO channel.
newTChanIO :: Concurrent :> es => Eff es (TChan a) Source #
Lifted newTChanIO
.
newBroadcastTChan :: STM (TChan a) #
Create a write-only TChan
. More precisely, readTChan
will retry
even after items have been written to the channel. The only way to read
a broadcast channel is to duplicate it with dupTChan
.
Consider a server that broadcasts messages to clients:
serve :: TChan Message -> Client -> IO loop serve broadcastChan client = do myChan <- dupTChan broadcastChan forever $ do message <- readTChan myChan send client message
The problem with using newTChan
to create the broadcast channel is that if
it is only written to and never read, items will pile up in memory. By
using newBroadcastTChan
to create the broadcast channel, items can be
garbage collected after clients have seen them.
Since: stm-2.4
newBroadcastTChanIO :: Concurrent :> es => Eff es (TChan a) Source #
Lifted newBroadcastTChanIO
.
dupTChan :: TChan a -> STM (TChan a) #
Duplicate a TChan
: the duplicate channel begins empty, but data written to
either channel from then on will be available from both. Hence this creates
a kind of broadcast channel, where data written by anyone is seen by
everyone else.
cloneTChan :: TChan a -> STM (TChan a) #
Clone a TChan
: similar to dupTChan, but the cloned channel starts with the
same content available as the original channel.
Since: stm-2.4
tryReadTChan :: TChan a -> STM (Maybe a) #
A version of readTChan
which does not retry. Instead it
returns Nothing
if no value is available.
Since: stm-2.3
peekTChan :: TChan a -> STM a #
Get the next value from the TChan
without removing it,
retrying if the channel is empty.
Since: stm-2.3
tryPeekTChan :: TChan a -> STM (Maybe a) #
A version of peekTChan
which does not retry. Instead it
returns Nothing
if no value is available.
Since: stm-2.3
writeTChan :: TChan a -> a -> STM () #
Write a value to a TChan
.
unGetTChan :: TChan a -> a -> STM () #
Put a data item back onto a channel, where it will be the next item read.
TQueue
TQueue
is an abstract type representing an unbounded FIFO channel.
Since: stm-2.4
newTQueueIO :: Concurrent :> es => Eff es (TQueue a) Source #
Lifted newTQueueIO
.
readTQueue :: TQueue a -> STM a #
Read the next value from the TQueue
.
tryReadTQueue :: TQueue a -> STM (Maybe a) #
A version of readTQueue
which does not retry. Instead it
returns Nothing
if no value is available.
peekTQueue :: TQueue a -> STM a #
Get the next value from the TQueue
without removing it,
retrying if the channel is empty.
tryPeekTQueue :: TQueue a -> STM (Maybe a) #
A version of peekTQueue
which does not retry. Instead it
returns Nothing
if no value is available.
flushTQueue :: TQueue a -> STM [a] #
Efficiently read the entire contents of a TQueue
into a list. This
function never retries.
Since: stm-2.4.5
writeTQueue :: TQueue a -> a -> STM () #
Write a value to a TQueue
.
unGetTQueue :: TQueue a -> a -> STM () #
Put a data item back onto a channel, where it will be the next item read.
TBQueue
TBQueue
is an abstract type representing a bounded FIFO channel.
Since: stm-2.4
Builds and returns a new instance of TBQueue
.
newTBQueueIO :: Concurrent :> es => Natural -> Eff es (TBQueue a) Source #
Lifted newTBQueueIO
.
readTBQueue :: TBQueue a -> STM a #
Read the next value from the TBQueue
.
tryReadTBQueue :: TBQueue a -> STM (Maybe a) #
A version of readTBQueue
which does not retry. Instead it
returns Nothing
if no value is available.
peekTBQueue :: TBQueue a -> STM a #
Get the next value from the TBQueue
without removing it,
retrying if the channel is empty.
tryPeekTBQueue :: TBQueue a -> STM (Maybe a) #
A version of peekTBQueue
which does not retry. Instead it
returns Nothing
if no value is available.
flushTBQueue :: TBQueue a -> STM [a] #
Efficiently read the entire contents of a TBQueue
into a list. This
function never retries.
Since: stm-2.4.5
writeTBQueue :: TBQueue a -> a -> STM () #
Write a value to a TBQueue
; blocks if the queue is full.
unGetTBQueue :: TBQueue a -> a -> STM () #
Put a data item back onto a channel, where it will be the next item read. Blocks if the queue is full.