Copyright | (c) 2022 Composewell Technologies |
---|---|
License | BSD-3-Clause |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | Safe-Inferred |
Language | Haskell2010 |
For upgrading to streamly-0.9.0+ please read the Streamly-0.9.0 upgrade guide. Also, see the Streamly.Data.Stream.MkType module for direct replacement of stream types that have been removed in 0.9.0.
All Stream related combinators including the streamly-core Streamly.Data.Stream module, concurrency, time and lifted exception operations. For more pre-release operations also see Streamly.Internal.Data.Stream.Prelude module.
Synopsis
- module Streamly.Data.Stream
- type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m)
- data Config
- maxThreads :: Int -> Config -> Config
- maxBuffer :: Int -> Config -> Config
- data Rate = Rate {}
- rate :: Maybe Rate -> Config -> Config
- avgRate :: Double -> Config -> Config
- minRate :: Double -> Config -> Config
- maxRate :: Double -> Config -> Config
- constRate :: Double -> Config -> Config
- data StopWhen
- stopWhen :: StopWhen -> Config -> Config
- eager :: Bool -> Config -> Config
- ordered :: Bool -> Config -> Config
- interleaved :: Bool -> Config -> Config
- inspect :: Bool -> Config -> Config
- parEval :: MonadAsync m => (Config -> Config) -> Stream m a -> Stream m a
- parRepeatM :: MonadAsync m => (Config -> Config) -> m a -> Stream m a
- parReplicateM :: MonadAsync m => (Config -> Config) -> Int -> m a -> Stream m a
- fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> Stream m a
- parMapM :: MonadAsync m => (Config -> Config) -> (a -> m b) -> Stream m a -> Stream m b
- parSequence :: MonadAsync m => (Config -> Config) -> Stream m (m a) -> Stream m a
- parZipWithM :: MonadAsync m => (Config -> Config) -> (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
- parZipWith :: MonadAsync m => (Config -> Config) -> (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
- parMergeByM :: MonadAsync m => (Config -> Config) -> (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
- parMergeBy :: MonadAsync m => (Config -> Config) -> (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
- parList :: MonadAsync m => (Config -> Config) -> [Stream m a] -> Stream m a
- parApply :: MonadAsync m => (Config -> Config) -> Stream m (a -> b) -> Stream m a -> Stream m b
- parConcat :: MonadAsync m => (Config -> Config) -> Stream m (Stream m a) -> Stream m a
- parConcatMap :: MonadAsync m => (Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
- parConcatIterate :: MonadAsync m => (Config -> Config) -> (a -> Stream m a) -> Stream m a -> Stream m a
- parTapCount :: MonadAsync m => (a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a
- interject :: MonadAsync m => m a -> Double -> Stream m a -> Stream m a
- takeInterval :: MonadAsync m => Double -> Stream m a -> Stream m a
- dropInterval :: MonadAsync m => Double -> Stream m a -> Stream m a
- intervalsOf :: MonadAsync m => Double -> Fold m a b -> Stream m a -> Stream m b
- sampleIntervalEnd :: MonadAsync m => Double -> Stream m a -> Stream m a
- sampleIntervalStart :: MonadAsync m => Double -> Stream m a -> Stream m a
- sampleBurstEnd :: MonadAsync m => Double -> Stream m a -> Stream m a
- sampleBurstStart :: MonadAsync m => Double -> Stream m a -> Stream m a
- after :: (MonadIO m, MonadBaseControl IO m) => m b -> Stream m a -> Stream m a
- bracket :: (MonadAsync m, MonadCatch m) => m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
- finally :: (MonadAsync m, MonadCatch m) => m b -> Stream m a -> Stream m a
- tapCount :: MonadAsync m => (a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a
Streamly.Data.Stream
All Streamly.Data.Stream combinators are re-exported via this module. For more pre-release combinators also see Streamly.Internal.Data.Stream module.
module Streamly.Data.Stream
Concurrent Operations
Channels
At a lower level, concurrency is implemented using channels that support concurrent evaluation of streams. We create a channel, and add one or more streams to it. The channel evaluates multiple streams concurrently and then generates a single output stream from the results. How the streams are combined depends on the configuration of the channel.
Primitives
There are only a few fundamental abstractions for concurrency, parEval
,
parConcatMap
, and parConcatIterate
, all concurrency combinators can be
expressed in terms of these.
parEval
evaluates a stream as a whole asynchronously with respect to
the consumer of the stream. A worker thread evaluates multiple elements of
the stream ahead of time and buffers the results; the consumer of the stream
runs in another thread consuming the elements from the buffer, thus
decoupling the production and consumption of the stream. parEval
can be
used to run different stages of a pipeline concurrently.
parConcatMap
is used to evaluate multiple actions in a stream concurrently
with respect to each other or to evaluate multiple streams concurrently and
combine the results. A stream generator function is mapped to the input
stream and all the generated streams are then evaluated concurrently, and
the results are combined.
parConcatIterate
is like parConcatMap
but iterates a stream generator
function recursively over the stream. This can be used to traverse trees or
graphs.
Configuration
Concurrent combinators take a Config
argument which controls the
concurrent behavior. For example, maximum number of threads to be used
(maxThreads
) or the maxmimum size of the buffer (maxBuffer
), or how the
streams are scheduled with respect to each other (interleaved
), or how the
results are consumed (ordered
).
Configuration is specified as Config -> Config
modifier functions that can
be composed together using function composition. For example, to specify the
maximum threads we can use parConcatMap (maxThreads 10)
if we also want to
specify the maximum buffer we can compose the two options parConcatMap
(maxThreads 10 . maxBuffer 100)
. To use default configuration use id
as
the config modifier e.g. parConcatMap id
.
See the Configuration
section and individual configuration options'
documentation for the default behavior and default values of configuration
parameters.
Scheduling
The most important configuration option is to control whether the output of
the concurrent execution is consumed in the same order as the corresponding
actions in the input stream or as soon as they arrive. The default is the
latter, however, we can enforce the original order by using the ordered
option.
Another important option controls whether the number of worker threads are
automatically increased and decreased based on the consumption rate or
threads are started as aggresively as possible until the maxThreads
or
maxBuffer
limits are hit. The default is the former. However, the eager
option can be enabled to use the latter behavior. When eager
is on, even
if the stream consumer thread blocks it does not make any impact on the
scheduling of the available tasks.
Combinators
Using the few fundamental concurrency primitives we can implement all the
usual streaming combinators with concurrent behavior. Combinators like
unfoldrM
, iterateM
that are inherently serial can be evaluated
concurrently with respect to the consumer pipeline using parEval
.
Combinators like zipWithM
, mergeByM
can also use parEval
on the input
streams to evaluate them concurrently before combining.
Combinators like repeatM
, replicateM
, fromListM
, Traversable
, Traversable
in
which all actions are independent of each other can be made concurrent using
the parConcatMap
operation.
A concurrent repeatM
repeats an action using multiple concurrent
executions of the action. Similarly, a concurrent Traversable
performs the mapped
action in independent threads.
Some common concurrent combinators are provided in this module.
Types
type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m) Source #
A monad that can perform concurrent or parallel IO operations. Streams
that can be composed concurrently require the underlying monad to be
MonadAsync
.
Configuration
An abstract type for specifying the configuration parameters of a
Channel
. Use Config -> Config
modifier functions to modify the default
configuration. See the individual modifier documentation for default values.
Limits
maxThreads :: Int -> Config -> Config Source #
Specify the maximum number of threads that can be spawned by the channel. A value of 0 resets the thread limit to default, a negative value means there is no limit. The default value is 1500.
When the actions in a stream are IO bound, having blocking IO calls, this option can be used to control the maximum number of in-flight IO requests. When the actions are CPU bound this option can be used to control the amount of CPU used by the stream.
maxBuffer :: Int -> Config -> Config Source #
Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.
CAUTION! using an unbounded maxBuffer
value (i.e. a negative value)
coupled with an unbounded maxThreads
value is a recipe for disaster in
presence of infinite streams, or very large streams. Especially, it must
not be used when pure
is used in ZipAsyncM
streams as pure
in
applicative zip streams generates an infinite stream causing unbounded
concurrent generation with no limit on the buffer or threads.
Rate Control
Specifies the stream yield rate in yields per second (Hertz
).
We keep accumulating yield credits at rateGoal
. At any point of time we
allow only as many yields as we have accumulated as per rateGoal
since the
start of time. If the consumer or the producer is slower or faster, the
actual rate may fall behind or exceed rateGoal
. We try to recover the gap
between the two by increasing or decreasing the pull rate from the producer.
However, if the gap becomes more than rateBuffer
we try to recover only as
much as rateBuffer
.
rateLow
puts a bound on how low the instantaneous rate can go when
recovering the rate gap. In other words, it determines the maximum yield
latency. Similarly, rateHigh
puts a bound on how high the instantaneous
rate can go when recovering the rate gap. In other words, it determines the
minimum yield latency. We reduce the latency by increasing concurrency,
therefore we can say that it puts an upper bound on concurrency.
If the rateGoal
is 0 or negative the stream never yields a value.
If the rateBuffer
is 0 or negative we do not attempt to recover.
rate :: Maybe Rate -> Config -> Config Source #
Specify the stream evaluation rate of a channel.
A Nothing
value means there is no smart rate control, concurrent execution
blocks only if maxThreads
or maxBuffer
is reached, or there are no more
concurrent tasks to execute. This is the default.
When rate (throughput) is specified, concurrent production may be ramped
up or down automatically to achieve the specified stream throughput. The
specific behavior for different styles of Rate
specifications is
documented under Rate
. The effective maximum production rate achieved by
a channel is governed by:
- The
maxThreads
limit - The
maxBuffer
limit - The maximum rate that the stream producer can achieve
- The maximum rate that the stream consumer can achieve
Maximum production rate is given by:
\(rate = \frac{maxThreads}{latency}\)
If we know the average latency of the tasks we can set maxThreads
accordingly.
avgRate :: Double -> Config -> Config Source #
Same as rate (Just $ Rate (r/2) r (2*r) maxBound)
Specifies the average production rate of a stream in number of yields
per second (i.e. Hertz
). Concurrent production is ramped up or down
automatically to achieve the specified average yield rate. The rate can
go down to half of the specified rate on the lower side and double of
the specified rate on the higher side.
minRate :: Double -> Config -> Config Source #
Same as rate (Just $ Rate r r (2*r) maxBound)
Specifies the minimum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go below the specified rate, even though it may possibly go above it at times, the upper limit is double of the specified rate.
maxRate :: Double -> Config -> Config Source #
Same as rate (Just $ Rate (r/2) r r maxBound)
Specifies the maximum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go above the specified rate, even though it may possibly go below it at times, the lower limit is half of the specified rate. This can be useful in applications where certain resource usage must not be allowed to go beyond certain limits.
constRate :: Double -> Config -> Config Source #
Same as rate (Just $ Rate r r r 0)
Specifies a constant yield rate. If for some reason the actual rate goes above or below the specified rate we do not try to recover it by increasing or decreasing the rate in future. This can be useful in applications like graphics frame refresh where we need to maintain a constant refresh rate.
Stop behavior
Specify when the Channel
should stop.
FirstStops | Stop when the first stream ends. |
AllStop | Stop when all the streams end. |
AnyStops | Stop when any one stream ends. |
Scheduling behavior
eager :: Bool -> Config -> Config Source #
By default, processing of output from the worker threads is given priority
over dispatching new workers. More workers are dispatched only when there is
no output to process. When eager
is set to True
, workers are dispatched
aggresively as long as there is more work to do irrespective of whether
there is output pending to be processed by the stream consumer. However,
dispatching may stop if maxThreads
or maxBuffer
is reached.
Note: This option has no effect when rate has been specified.
Note: Not supported with interleaved
.
ordered :: Bool -> Config -> Config Source #
When enabled the streams may be evaluated cocnurrently but the results are produced in the same sequence as a serial evaluation would produce.
Note: Not supported with interleaved
.
interleaved :: Bool -> Config -> Config Source #
Interleave the streams fairly instead of prioritizing the left stream. This schedules all streams in a round robin fashion over limited number of threads.
Note: Can only be used on finite number of streams.
Note: Not supported with ordered
.
Diagnostics
inspect :: Bool -> Config -> Config Source #
Print debug information about the Channel
when the stream ends.
Combinators
Stream combinators using a concurrent channel.
Evaluate
Evaluate a stream as a whole concurrently with respect to the consumer of the stream.
parEval :: MonadAsync m => (Config -> Config) -> Stream m a -> Stream m a Source #
parEval
evaluates a stream as a whole asynchronously with respect to
the consumer of the stream. A worker thread evaluates multiple elements of
the stream ahead of time and buffers the results; the consumer of the stream
runs in another thread consuming the elements from the buffer, thus
decoupling the production and consumption of the stream. parEval
can be
used to run different stages of a pipeline concurrently.
It is important to note that parEval
does not evaluate individual actions
in the stream concurrently with respect to each other, it merely evaluates
the stream serially but in a different thread than the consumer thread,
thus the consumer and producer can run concurrently. See parMapM
and
parSequence
to evaluate actions in the stream concurrently.
The evaluation requires only one thread as only one stream needs to be
evaluated. Therefore, the concurrency options that are relevant to multiple
streams do not apply here e.g. maxThreads, eager, interleaved, ordered,
stopWhen options do not have any effect on parEval
.
Useful idioms:
>>>
parUnfoldrM step = Stream.parEval id . Stream.unfoldrM step
>>>
parIterateM step = Stream.parEval id . Stream.iterateM step
Generate
Generate a stream by evaluating multiple actions concurrently.
parRepeatM :: MonadAsync m => (Config -> Config) -> m a -> Stream m a Source #
Definition:
>>>
parRepeatM cfg = Stream.parSequence cfg . Stream.repeat
Generate a stream by repeatedly executing a monadic action forever.
parReplicateM :: MonadAsync m => (Config -> Config) -> Int -> m a -> Stream m a Source #
Generate a stream by concurrently performing a monadic action n
times.
Definition:
>>>
parReplicateM cfg n = Stream.parSequence cfg . Stream.replicate n
Example, parReplicateM
in the following example executes all the
replicated actions concurrently, thus taking only 1 second:
>>>
Stream.fold Fold.drain $ Stream.parReplicateM id 10 $ delay 1
...
fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> Stream m a Source #
fromCallback f
creates an entangled pair of a callback and a stream i.e.
whenever the callback is called a value appears in the stream. The function
f
is invoked with the callback as argument, and the stream is returned.
f
would store the callback for calling it later for generating values in
the stream.
The callback queues a value to a concurrent channel associated with the stream. The stream can be evaluated safely in any thread.
Pre-release
Map
Map actions on a stream such that the mapped actions are evaluated concurrently with each other.
parMapM :: MonadAsync m => (Config -> Config) -> (a -> m b) -> Stream m a -> Stream m b Source #
Definition:
>>>
parMapM modifier f = Stream.parConcatMap modifier (Stream.fromEffect . f)
For example, the following finishes in 3 seconds (as opposed to 6 seconds) because all actions run in parallel. Even though results are available out of order they are ordered due to the config option:
>>>
f x = delay x >> return x
>>>
Stream.fold Fold.toList $ Stream.parMapM (Stream.ordered True) f $ Stream.fromList [3,2,1]
1 sec 2 sec 3 sec [3,2,1]
parSequence :: MonadAsync m => (Config -> Config) -> Stream m (m a) -> Stream m a Source #
Definition:
>>>
parSequence modifier = Stream.parMapM modifier id
Useful idioms:
>>>
parFromListM = Stream.parSequence id . Stream.fromList
>>>
parFromFoldableM = Stream.parSequence id . StreamK.toStream . StreamK.fromFoldable
Combine two
Combine two streams such that each stream as a whole is evaluated concurrently with respect to the other stream as well as the consumer of the resulting stream.
parZipWithM :: MonadAsync m => (Config -> Config) -> (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c Source #
Evaluates the streams being zipped in separate threads than the consumer. The zip function is evaluated in the consumer thread.
>>>
parZipWithM cfg f m1 m2 = Stream.zipWithM f (Stream.parEval cfg m1) (Stream.parEval cfg m2)
Multi-stream concurrency options won't apply here, see the notes in
parEval
.
If you want to evaluate the zip function as well in a separate thread, you
can use a parEval
on parZipWithM
.
parZipWith :: MonadAsync m => (Config -> Config) -> (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c Source #
>>>
parZipWith cfg f = Stream.parZipWithM cfg (\a b -> return $ f a b)
>>>
m1 = Stream.fromList [1,2,3]
>>>
m2 = Stream.fromList [4,5,6]
>>>
Stream.fold Fold.toList $ Stream.parZipWith id (,) m1 m2
[(1,4),(2,5),(3,6)]
parMergeByM :: MonadAsync m => (Config -> Config) -> (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a Source #
Like mergeByM
but evaluates both the streams concurrently.
Definition:
>>>
parMergeByM cfg f m1 m2 = Stream.mergeByM f (Stream.parEval cfg m1) (Stream.parEval cfg m2)
parMergeBy :: MonadAsync m => (Config -> Config) -> (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a Source #
Like mergeBy
but evaluates both the streams concurrently.
Definition:
>>>
parMergeBy cfg f = Stream.parMergeByM cfg (\a b -> return $ f a b)
List of streams
Shares a single channel across many streams.
parList :: MonadAsync m => (Config -> Config) -> [Stream m a] -> Stream m a Source #
Like parConcat
but works on a list of streams.
>>>
parList modifier = Stream.parConcat modifier . Stream.fromList
Stream of streams
Apply
parApply :: MonadAsync m => (Config -> Config) -> Stream m (a -> b) -> Stream m a -> Stream m b Source #
Apply an argument stream to a function stream concurrently. Uses a shared channel for all individual applications within a stream application.
Concat
Shares a single channel across many streams.
parConcat :: MonadAsync m => (Config -> Config) -> Stream m (Stream m a) -> Stream m a Source #
Evaluate the streams in the input stream concurrently and combine them.
>>>
parConcat modifier = Stream.parConcatMap modifier id
parConcatMap :: MonadAsync m => (Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b Source #
Map each element of the input to a stream and then concurrently evaluate and concatenate the resulting streams. Multiple streams may be evaluated concurrently but earlier streams are perferred. Output from the streams are used as they arrive.
Definition:
>>>
parConcatMap modifier f stream = Stream.parConcat modifier $ fmap f stream
Examples:
>>>
f cfg xs = Stream.fold Fold.toList $ Stream.parConcatMap cfg id $ Stream.fromList xs
The following streams finish in 4 seconds:
>>>
stream1 = Stream.fromEffect (delay 4)
>>>
stream2 = Stream.fromEffect (delay 2)
>>>
stream3 = Stream.fromEffect (delay 1)
>>>
f id [stream1, stream2, stream3]
1 sec 2 sec 4 sec [1,2,4]
Limiting threads to 2 schedules the third stream only after one of the first two has finished, releasing a thread:
>>>
f (Stream.maxThreads 2) [stream1, stream2, stream3]
... [2,1,4]
When used with a Single thread it behaves like serial concatMap:
>>>
f (Stream.maxThreads 1) [stream1, stream2, stream3]
... [4,2,1]
>>>
stream1 = Stream.fromList [1,2,3]
>>>
stream2 = Stream.fromList [4,5,6]
>>>
f (Stream.maxThreads 1) [stream1, stream2]
[1,2,3,4,5,6]
Schedule all streams in a round robin fashion over the available threads:
>>>
f cfg xs = Stream.fold Fold.toList $ Stream.parConcatMap (Stream.interleaved True . cfg) id $ Stream.fromList xs
>>>
stream1 = Stream.fromList [1,2,3]
>>>
stream2 = Stream.fromList [4,5,6]
>>>
f (Stream.maxThreads 1) [stream1, stream2]
[1,4,2,5,3,6]
ConcatIterate
parConcatIterate :: MonadAsync m => (Config -> Config) -> (a -> Stream m a) -> Stream m a -> Stream m a Source #
Same as concatIterate
but concurrent.
Pre-release
Observation
parTapCount :: MonadAsync m => (a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a Source #
parTapCount predicate fold stream
taps the count of those elements in
the stream that pass the predicate
. The resulting count stream is sent to
a fold
running concurrently in another thread.
For example, to print the count of elements processed every second:
>>>
rate = Stream.rollingMap2 (flip (-)) . Stream.delayPost 1
>>>
report = Stream.fold (Fold.drainMapM print) . rate
>>>
tap = Stream.parTapCount (const True) report
>>>
go = Stream.fold Fold.drain $ tap $ Stream.enumerateFrom 0
Note: This may not work correctly on 32-bit machines because of Int overflow.
Pre-release
Time Related
Timers
interject :: MonadAsync m => m a -> Double -> Stream m a -> Stream m a Source #
Intersperse a monadic action into the input stream after every n
seconds.
Definition:
>>>
interject n f xs = Stream.parListEagerFst [xs, Stream.periodic f n]
Example:
>>>
s = Stream.fromList "hello"
>>>
input = Stream.mapM (\x -> threadDelay 1000000 >> putChar x) s
>>>
Stream.fold Fold.drain $ Stream.interject (putChar ',') 1.05 input
h,e,l,l,o
Trimming
takeInterval :: MonadAsync m => Double -> Stream m a -> Stream m a Source #
takeInterval interval
runs the stream only upto the specified time
interval
in seconds.
The interval starts when the stream is evaluated for the first time.
dropInterval :: MonadAsync m => Double -> Stream m a -> Stream m a Source #
dropInterval interval
drops all the stream elements that are generated
before the specified interval
in seconds has passed.
The interval begins when the stream is evaluated for the first time.
Chunking
intervalsOf :: MonadAsync m => Double -> Fold m a b -> Stream m a -> Stream m b Source #
Group the input stream into windows of n
second each and then fold each
group using the provided fold function.
>>>
twoPerSec = Stream.parEval (Stream.constRate 2) $ Stream.enumerateFrom 1
>>>
intervals = Stream.intervalsOf 1 Fold.toList twoPerSec
>>>
Stream.fold Fold.toList $ Stream.take 2 intervals
[...,...]
Sampling
sampleIntervalEnd :: MonadAsync m => Double -> Stream m a -> Stream m a Source #
Continuously evaluate the input stream and sample the last event in each
time window of n
seconds.
This is also known as throttle
in some libraries.
>>>
sampleIntervalEnd n = Stream.catMaybes . Stream.intervalsOf n Fold.latest
sampleIntervalStart :: MonadAsync m => Double -> Stream m a -> Stream m a Source #
Like sampleInterval
but samples at the beginning of the time window.
>>>
sampleIntervalStart n = Stream.catMaybes . Stream.intervalsOf n Fold.one
sampleBurstEnd :: MonadAsync m => Double -> Stream m a -> Stream m a Source #
Sample one event at the end of each burst of events. A burst is a group of events close together in time, it ends when an event is spaced by more than the specified time interval (in seconds) from the previous event.
This is known as debounce
in some libraries.
The clock granularity is 10 ms.
sampleBurstStart :: MonadAsync m => Double -> Stream m a -> Stream m a Source #
Like sampleBurstEnd
but samples the event at the beginning of the burst
instead of at the end of it.
Lifted Exceptions
after :: (MonadIO m, MonadBaseControl IO m) => m b -> Stream m a -> Stream m a Source #
Run the action m b
whenever the stream Stream m a
stops normally, or
if it is garbage collected after a partial lazy evaluation.
The semantics of the action m b
are similar to the semantics of cleanup
action in bracket
.
See also after_
bracket :: (MonadAsync m, MonadCatch m) => m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a Source #
Run the alloc action IO b
with async exceptions disabled but keeping
blocking operations interruptible (see mask
). Use the
output b
of the IO action as input to the function b -> Stream m a
to
generate an output stream.
b
is usually a resource under the IO monad, e.g. a file handle, that
requires a cleanup after use. The cleanup action b -> m c
, runs whenever
(1) the stream ends normally, (2) due to a sync or async exception or, (3)
if it gets garbage collected after a partial lazy evaluation. The exception
is not caught, it is rethrown.
bracket
only guarantees that the cleanup action runs, and it runs with
async exceptions enabled. The action must ensure that it can successfully
cleanup the resource in the face of sync or async exceptions.
When the stream ends normally or on a sync exception, cleanup action runs immediately in the current thread context, whereas in other cases it runs in the GC context, therefore, cleanup may be delayed until the GC gets to run.
See also: bracket_
Inhibits stream fusion
finally :: (MonadAsync m, MonadCatch m) => m b -> Stream m a -> Stream m a Source #
Run the action m b
whenever the stream Stream m a
stops normally,
aborts due to an exception or if it is garbage collected after a partial
lazy evaluation.
The semantics of running the action m b
are similar to the cleanup action
semantics described in bracket
.
>>>
finally action xs = Stream.bracket (return ()) (const action) (const xs)
See also finally_
Inhibits stream fusion
Deprecated
tapCount :: MonadAsync m => (a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a Source #
Deprecated: Please use parTapCount instead.
Same as parTapCount
. Deprecated.