streamly-0.8.3: Dataflow programming and declarative concurrency
Copyright(c) 2017 Composewell Technologies
LicenseBSD-3-Clause
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellSafe-Inferred
LanguageHaskell2010

Streamly.Internal.Data.Stream.IsStream.Expand

Description

Expand a stream by combining two or more streams or by combining streams with unfolds.

Synopsis

Binary Combinators (Linear)

Functions ending in the shape:

t m a -> t m a -> t m a.

The functions in this section have a linear or flat n-ary combining characterstics. It means that when combined n times (e.g. a serial b serial c ...) the resulting expression will have an O(n) complexity (instead O(n^2) for pair wise combinators described in the next section. These functions can be used efficiently with concatMapWith et. al. combinators that combine streams in a linear fashion (contrast with concatPairsWith which combines streams as a binary tree).

serial :: IsStream t => t m a -> t m a -> t m a infixr 6 Source #

Appends two streams sequentially, yielding all elements from the first stream, and then all elements from the second stream.

>>> import Streamly.Prelude (serial)
>>> stream1 = Stream.fromList [1,2]
>>> stream2 = Stream.fromList [3,4]
>>> Stream.toList $ stream1 `serial` stream2
[1,2,3,4]

This operation can be used to fold an infinite lazy container of streams.

Since: 0.2.0 (Streamly)

Since: 0.8.0

ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

Appends two streams, both the streams may be evaluated concurrently but the outputs are used in the same order as the corresponding actions in the original streams, side effects will happen in the order in which the streams are evaluated:

>>> import Streamly.Prelude (ahead, SerialT)
>>> stream1 = Stream.fromEffect (delay 4) :: SerialT IO Int
>>> stream2 = Stream.fromEffect (delay 2) :: SerialT IO Int
>>> Stream.toList $ stream1 `ahead` stream2 :: IO [Int]
2 sec
4 sec
[4,2]

Multiple streams can be combined. With enough threads, all of them can be scheduled simultaneously:

>>> stream3 = Stream.fromEffect (delay 1)
>>> Stream.toList $ stream1 `ahead` stream2 `ahead` stream3
1 sec
2 sec
4 sec
[4,2,1]

With 2 threads, only two can be scheduled at a time, when one of those finishes, the third one gets scheduled:

>>> Stream.toList $ Stream.maxThreads 2 $ stream1 `ahead` stream2 `ahead` stream3
2 sec
1 sec
4 sec
[4,2,1]

Only streams are scheduled for ahead evaluation, how actions within a stream are evaluated depends on the stream type. If it is a concurrent stream they will be evaluated concurrently. It may not make much sense combining serial streams using ahead.

ahead can be safely used to fold an infinite lazy container of streams.

Since: 0.3.0 (Streamly)

Since: 0.8.0

async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

Merges two streams, both the streams may be evaluated concurrently, outputs from both are used as they arrive:

>>> import Streamly.Prelude (async)
>>> stream1 = Stream.fromEffect (delay 4)
>>> stream2 = Stream.fromEffect (delay 2)
>>> Stream.toList $ stream1 `async` stream2
2 sec
4 sec
[2,4]

Multiple streams can be combined. With enough threads, all of them can be scheduled simultaneously:

>>> stream3 = Stream.fromEffect (delay 1)
>>> Stream.toList $ stream1 `async` stream2 `async` stream3
...
[1,2,4]

With 2 threads, only two can be scheduled at a time, when one of those finishes, the third one gets scheduled:

>>> Stream.toList $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3
...
[2,1,4]

With a single thread, it becomes serial:

>>> Stream.toList $ Stream.maxThreads 1 $ stream1 `async` stream2 `async` stream3
...
[4,2,1]

Only streams are scheduled for async evaluation, how actions within a stream are evaluated depends on the stream type. If it is a concurrent stream they will be evaluated concurrently.

In the following example, both the streams are scheduled for concurrent evaluation but each individual stream is evaluated serially:

>>> stream1 = Stream.fromListM $ Prelude.map delay [3,3] -- SerialT IO Int
>>> stream2 = Stream.fromListM $ Prelude.map delay [1,1] -- SerialT IO Int
>>> Stream.toList $ stream1 `async` stream2 -- IO [Int]
...
[1,1,3,3]

If total threads are 2, the third stream is scheduled only after one of the first two has finished:

stream3 = Stream.fromListM $ Prelude.map delay [2,2] -- SerialT IO Int
Stream.toList $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3 -- IO [Int]

... [1,1,3,2,3,2]

Thus async goes deep in first few streams rather than going wide in all streams. It prefers to evaluate the leftmost streams as much as possible. Because of this behavior, async can be safely used to fold an infinite lazy container of streams.

Since: 0.2.0 (Streamly)

Since: 0.8.0

wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

For singleton streams, wAsync is the same as async. See async for singleton stream behavior. For multi-element streams, while async is left biased i.e. it tries to evaluate the left side stream as much as possible, wAsync tries to schedule them both fairly. In other words, async goes deep while wAsync goes wide. However, outputs are always used as they arrive.

With a single thread, async starts behaving like serial while wAsync starts behaving like wSerial.

>>> import Streamly.Prelude (async, wAsync)
>>> stream1 = Stream.fromList [1,2,3]
>>> stream2 = Stream.fromList [4,5,6]
>>> Stream.toList $ Stream.fromAsync $ Stream.maxThreads 1 $ stream1 `async` stream2
[1,2,3,4,5,6]
>>> Stream.toList $ Stream.fromWAsync $ Stream.maxThreads 1 $ stream1 `wAsync` stream2
[1,4,2,5,3,6]

With two threads available, and combining three streams:

>>> stream3 = Stream.fromList [7,8,9]
>>> Stream.toList $ Stream.fromAsync $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3
[1,2,3,4,5,6,7,8,9]
>>> Stream.toList $ Stream.fromWAsync $ Stream.maxThreads 2 $ stream1 `wAsync` stream2 `wAsync` stream3
[1,4,2,7,5,3,8,6,9]

This operation cannot be used to fold an infinite lazy container of streams, because it schedules all the streams in a round robin manner.

Note that WSerialT and single threaded WAsyncT both interleave streams but the exact scheduling is slightly different in both cases.

Since: 0.2.0 (Streamly)

Since: 0.8.0

parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #

Like async except that the execution is much more strict. There is no limit on the number of threads. While async may not schedule a stream if there is no demand from the consumer, parallel always evaluates both the streams immediately. The only limit that applies to parallel is maxBuffer. Evaluation may block if the output buffer becomes full.

>>> import Streamly.Prelude (parallel)
>>> stream = Stream.fromEffect (delay 2) `parallel` Stream.fromEffect (delay 1)
>>> Stream.toList stream -- IO [Int]
1 sec
2 sec
[1,2]

parallel guarantees that all the streams are scheduled for execution immediately, therefore, we could use things like starting timers inside the streams and relying on the fact that all timers were started at the same time.

Unlike async this operation cannot be used to fold an infinite lazy container of streams, because it schedules all the streams strictly concurrently.

Since: 0.2.0 (Streamly)

Since: 0.8.0

parallelFst :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Like parallel but stops the output as soon as the first stream stops.

Pre-release

parallelMin :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Like parallel but stops the output as soon as any of the two streams stops.

Pre-release

Binary Combinators (Pair Wise)

Like the functions in the section above these functions also combine two streams into a single stream but when used n times linearly they exhibit O(n^2) complexity. They are best combined in a binary tree fashion using concatPairsWith giving a n * log n complexity. Avoid using these with concatMapWith when combining a large or infinite number of streams.

Append

append :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Append the outputs of two streams, yielding all the elements from the first stream and then yielding all the elements from the second stream.

IMPORTANT NOTE: This could be 100x faster than serial/<> for appending a few (say 100) streams because it can fuse via stream fusion. However, it does not scale for a large number of streams (say 1000s) and becomes qudartically slow. Therefore use this for custom appending of a few streams but use concatMap or 'concatMapWith serial' for appending n streams or infinite containers of streams.

Pre-release

wSerial

wSerial is a CPS based stream interleaving functions. Use 'concatPairsWith wSerial' to interleave n streams uniformly. It can be used with concatMapWith as well, however, the interleaving behavior of n streams would be asymmetric giving exponentially more weightage to streams that come earlier in the composition.

wSerial :: IsStream t => t m a -> t m a -> t m a infixr 6 Source #

Interleaves two streams, yielding one element from each stream alternately. When one stream stops the rest of the other stream is used in the output stream.

>>> import Streamly.Prelude (wSerial)
>>> stream1 = Stream.fromList [1,2]
>>> stream2 = Stream.fromList [3,4]
>>> Stream.toList $ Stream.fromWSerial $ stream1 `wSerial` stream2
[1,3,2,4]

Note, for singleton streams wSerial and serial are identical.

Note that this operation cannot be used to fold a container of infinite streams but it can be used for very large streams as the state that it needs to maintain is proportional to the logarithm of the number of streams.

Since: 0.2.0 (Streamly)

Since: 0.8.0

Interleave

interleave is like wSerial but using a direct style implementation instead of CPS. It is faster than wSerial due to stream fusion but has worse efficiency when used with concatMapWith for large number of streams.

interleave :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. If any of the streams finishes early the other stream continues alone until it too finishes.

>>> :set -XOverloadedStrings
>>> import Data.Functor.Identity (Identity)
>>> Stream.interleave "ab" ",,,," :: Stream.SerialT Identity Char
fromList "a,b,,,"
>>> Stream.interleave "abcd" ",," :: Stream.SerialT Identity Char
fromList "a,b,cd"

interleave is dual to interleaveMin, it can be called interleaveMax.

Do not use at scale in concatMapWith.

Pre-release

interleaveMin :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. The output stops as soon as any of the two streams finishes, discarding the remaining part of the other stream. The last element of the resulting stream would be from the longer stream.

>>> :set -XOverloadedStrings
>>> import Data.Functor.Identity (Identity)
>>> Stream.interleaveMin "ab" ",,,," :: Stream.SerialT Identity Char
fromList "a,b,"
>>> Stream.interleaveMin "abcd" ",," :: Stream.SerialT Identity Char
fromList "a,b,c"

interleaveMin is dual to interleave.

Do not use at scale in concatMapWith.

Pre-release

interleaveSuffix :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. As soon as the first stream finishes, the output stops, discarding the remaining part of the second stream. In this case, the last element in the resulting stream would be from the second stream. If the second stream finishes early then the first stream still continues to yield elements until it finishes.

>>> :set -XOverloadedStrings
>>> import Data.Functor.Identity (Identity)
>>> Stream.interleaveSuffix "abc" ",,,," :: Stream.SerialT Identity Char
fromList "a,b,c,"
>>> Stream.interleaveSuffix "abc" "," :: Stream.SerialT Identity Char
fromList "a,bc"

interleaveSuffix is a dual of interleaveInfix.

Do not use at scale in concatMapWith.

Pre-release

interleaveInfix :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream and ending at the first stream. If the second stream is longer than the first, elements from the second stream are infixed with elements from the first stream. If the first stream is longer then it continues yielding elements even after the second stream has finished.

>>> :set -XOverloadedStrings
>>> import Data.Functor.Identity (Identity)
>>> Stream.interleaveInfix "abc" ",,,," :: Stream.SerialT Identity Char
fromList "a,b,c"
>>> Stream.interleaveInfix "abc" "," :: Stream.SerialT Identity Char
fromList "a,bc"

interleaveInfix is a dual of interleaveSuffix.

Do not use at scale in concatMapWith.

Pre-release

Round Robin

roundrobin :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #

Schedule the execution of two streams in a fair round-robin manner, executing each stream once, alternately. Execution of a stream may not necessarily result in an output, a stream may chose to Skip producing an element until later giving the other stream a chance to run. Therefore, this combinator fairly interleaves the execution of two streams rather than fairly interleaving the output of the two streams. This can be useful in co-operative multitasking without using explicit threads. This can be used as an alternative to async.

Do not use at scale in concatMapWith.

Pre-release

Zip

zipWith :: (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c Source #

Stream a is evaluated first, followed by stream b, the resulting elements a and b are then zipped using the supplied zip function and the result c is yielded to the consumer.

If stream a or stream b ends, the zipped stream ends. If stream b ends first, the element a from previous evaluation of stream a is discarded.

> S.toList $ S.zipWith (+) (S.fromList [1,2,3]) (S.fromList [4,5,6])
[5,7,9]

Since: 0.1.0

zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c Source #

Like zipWith but using a monadic zipping function.

Since: 0.4.0

zipAsyncWith :: (IsStream t, MonadAsync m) => (a -> b -> c) -> t m a -> t m b -> t m c Source #

Like zipWith but zips concurrently i.e. both the streams being zipped are evaluated concurrently using the ParallelT concurrent evaluation style. The maximum number of elements of each stream evaluated in advance can be controlled by maxBuffer.

The stream ends if stream a or stream b ends. However, if stream b ends while we are still evaluating stream a and waiting for a result then stream will not end until after the evaluation of stream a finishes. This behavior can potentially be changed in future to end the stream immediately as soon as any of the stream end is detected.

Since: 0.1.0

zipAsyncWithM :: (IsStream t, MonadAsync m) => (a -> b -> m c) -> t m a -> t m b -> t m c Source #

Like zipAsyncWith but with a monadic zipping function.

Since: 0.4.0

Merge

mergeBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #

Merge two streams using a comparison function. The head elements of both the streams are compared and the smaller of the two elements is emitted, if both elements are equal then the element from the first stream is used first.

If the streams are sorted in ascending order, the resulting stream would also remain sorted in ascending order.

>>> Stream.toList $ Stream.mergeBy compare (Stream.fromList [1,3,5]) (Stream.fromList [2,4,6,8])
[1,2,3,4,5,6,8]

See also: mergeByMFused

Since: 0.6.0

mergeByM :: (IsStream t, Monad m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #

Like mergeBy but with a monadic comparison function.

Merge two streams randomly:

> randomly _ _ = randomIO >>= x -> return $ if x then LT else GT
> Stream.toList $ Stream.mergeByM randomly (Stream.fromList [1,1,1,1]) (Stream.fromList [2,2,2,2])
[2,1,2,2,2,1,1,1]

Merge two streams in a proportion of 2:1:

>>> :{
do
 let proportionately m n = do
      ref <- newIORef $ cycle $ Prelude.concat [Prelude.replicate m LT, Prelude.replicate n GT]
      return $ _ _ -> do
         r <- readIORef ref
         writeIORef ref $ Prelude.tail r
         return $ Prelude.head r
 f <- proportionately 2 1
 xs <- Stream.toList $ Stream.mergeByM f (Stream.fromList [1,1,1,1,1,1]) (Stream.fromList [2,2,2])
 print xs
:}
[1,1,2,1,1,2,1,1,2]

See also: mergeByMFused

Since: 0.6.0

mergeByMFused :: (IsStream t, Monad m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #

Like mergeByM but much faster, works best when merging statically known number of streams. When merging more than two streams try to merge pairs and pair pf pairs in a tree like structure.mergeByM works better with variable number of streams being merged using concatPairsWith.

Internal

mergeAsyncBy :: (IsStream t, MonadAsync m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #

Like mergeBy but merges concurrently (i.e. both the elements being merged are generated concurrently).

Since: 0.6.0

mergeAsyncByM :: (IsStream t, MonadAsync m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #

Like mergeByM but merges concurrently (i.e. both the elements being merged are generated concurrently).

Since: 0.6.0

Combine Streams and Unfolds

Expand a stream by repeatedly using an unfold and merging the resulting streams. Functions generally ending in the shape:

Unfold m a b -> t m a -> t m b

Append Many (Unfold)

Unfold and flatten streams.

unfoldMany :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b Source #

Like concatMap but uses an Unfold for stream generation. Unlike concatMap this can fuse the Unfold code with the inner loop and therefore provide many times better performance.

Since: 0.8.0

unfoldManyInterleave :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b Source #

Like unfoldMany but interleaves the streams in the same way as interleave behaves instead of appending them.

Pre-release

unfoldManyRoundRobin :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b Source #

Like unfoldMany but executes the streams in the same way as roundrobin.

Pre-release

Interpose

Insert effects between streams. Like unfoldMany but intersperses an effect between the streams. A special case of gintercalate.

interpose :: (IsStream t, Monad m) => c -> Unfold m b c -> t m b -> t m c Source #

Unfold the elements of a stream, intersperse the given element between the unfolded streams and then concat them into a single stream.

unwords = S.interpose ' '

Pre-release

interposeSuffix :: (IsStream t, Monad m) => c -> Unfold m b c -> t m b -> t m c Source #

Unfold the elements of a stream, append the given element after each unfolded stream and then concat them into a single stream.

unlines = S.interposeSuffix '\n'

Pre-release

Intercalate

Insert Streams between Streams. Like unfoldMany but intersperses streams from another source between the streams from the first source.

intercalate :: (IsStream t, Monad m) => Unfold m b c -> b -> t m b -> t m c Source #

intersperse followed by unfold and concat.

intercalate unf a str = unfoldMany unf $ intersperse a str
intersperse = intercalate (Unfold.function id)
unwords = intercalate Unfold.fromList " "
>>> Stream.toList $ Stream.intercalate Unfold.fromList " " $ Stream.fromList ["abc", "def", "ghi"]
"abc def ghi"

Since: 0.8.0

intercalateSuffix :: (IsStream t, Monad m) => Unfold m b c -> b -> t m b -> t m c Source #

intersperseSuffix followed by unfold and concat.

intercalateSuffix unf a str = unfoldMany unf $ intersperseSuffix a str
intersperseSuffix = intercalateSuffix (Unfold.function id)
unlines = intercalateSuffix Unfold.fromList "\n"
>>> Stream.toList $ Stream.intercalateSuffix Unfold.fromList "\n" $ Stream.fromList ["abc", "def", "ghi"]
"abc\ndef\nghi\n"

Since: 0.8.0

gintercalate :: (IsStream t, Monad m) => Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c Source #

interleaveInfix followed by unfold and concat.

Pre-release

gintercalateSuffix :: (IsStream t, Monad m) => Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c Source #

interleaveSuffix followed by unfold and concat.

Pre-release

Append Many (concatMap)

Map and serially append streams. concatMapM is a generalization of the binary append operation to append many streams.

concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b Source #

Map a stream producing monadic function on each element of the stream and then flatten the results into a single stream. Since the stream generation function is monadic, unlike concatMap, it can produce an effect at the beginning of each iteration of the inner loop.

Since: 0.6.0

concatMap :: (IsStream t, Monad m) => (a -> t m b) -> t m a -> t m b Source #

Map a stream producing function on each element of the stream and then flatten the results into a single stream.

>>> concatMap f = Stream.concatMapM (return . f)
>>> concatMap f = Stream.concatMapWith Stream.serial f
>>> concatMap f = Stream.concat . Stream.map f
>>> concatMap f = Stream.unfoldMany (Unfold.lmap f Unfold.fromStream)

Since: 0.6.0

concatM :: (IsStream t, Monad m) => m (t m a) -> t m a Source #

Given a stream value in the underlying monad, lift and join the underlying monad with the stream monad.

>>> concatM = Stream.concat . Stream.fromEffect
>>> concatM = Stream.concat . lift    -- requires (MonadTrans t)
>>> concatM = join . lift             -- requires (MonadTrans t, Monad (t m))

See also: concat, sequence

Internal

concat :: (IsStream t, Monad m) => t m (t m a) -> t m a Source #

Flatten a stream of streams to a single stream.

concat = concatMap id

Pre-release

Flatten Containers

Flatten Foldable containers using the binary stream merging operations.

concatFoldableWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a Source #

A variant of fold that allows you to fold a Foldable container of streams using the specified stream sum operation.

concatFoldableWith async $ map return [1..3]

Equivalent to:

concatFoldableWith f = Prelude.foldr f S.nil
concatFoldableWith f = S.concatMapFoldableWith f id

Since: 0.8.0 (Renamed foldWith to concatFoldableWith)

Since: 0.1.0 (Streamly)

concatMapFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b Source #

A variant of foldMap that allows you to map a monadic streaming action on a Foldable container and then fold it using the specified stream merge operation.

concatMapFoldableWith async return [1..3]

Equivalent to:

concatMapFoldableWith f g = Prelude.foldr (f . g) S.nil
concatMapFoldableWith f g xs = S.concatMapWith f g (S.fromFoldable xs)

Since: 0.8.0 (Renamed foldMapWith to concatMapFoldableWith)

Since: 0.1.0 (Streamly)

concatForFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b Source #

Like concatMapFoldableWith but with the last two arguments reversed i.e. the monadic streaming function is the last argument.

Equivalent to:

concatForFoldableWith f xs g = Prelude.foldr (f . g) S.nil xs
concatForFoldableWith f = flip (S.concatMapFoldableWith f)

Since: 0.8.0 (Renamed forEachWith to concatForFoldableWith)

Since: 0.1.0 (Streamly)

ConcatMapWith

Map and flatten a stream like concatMap but using a custom binary stream merging combinator instead of just appending the streams. The merging occurs sequentially, it works efficiently for serial, async, ahead like merge operations where we consume one stream before the next or in case of wAsync or parallel where we consume all streams simultaneously anyway.

However, in cases where the merging consumes streams in a round robin fashion, a pair wise merging using concatPairsWith would be more efficient. These cases include operations like mergeBy or zipWith.

concatMapWith :: IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b Source #

concatMapWith mixer generator stream is a two dimensional looping combinator. The generator function is used to generate streams from the elements in the input stream and the mixer function is used to merge those streams.

Note we can merge streams concurrently by using a concurrent merge function.

Since: 0.7.0

Since: 0.8.0 (signature change)

bindWith :: IsStream t => (t m b -> t m b -> t m b) -> t m a -> (a -> t m b) -> t m b Source #

concatSmapMWith :: (IsStream t, Monad m) => (t m b -> t m b -> t m b) -> (s -> a -> m (s, t m b)) -> m s -> t m a -> t m b Source #

Like concatMapWith but carries a state which can be used to share information across multiple steps of concat.

concatSmapMWith combine f initial = concatMapWith combine id . smapM f initial

Pre-release

ConcatPairsWith

See the notes about suitable merge functions in the concatMapWith section.

concatPairsWith :: IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b Source #

Combine streams in pairs using a binary stream combinator, then combine the resulting streams in pairs recursively until we get to a single combined stream.

For example, you can sort a stream using merge sort like this:

>>> Stream.toList $ Stream.concatPairsWith (Stream.mergeBy compare) Stream.fromPure $ Stream.fromList [5,1,7,9,2]
[1,2,5,7,9]

Caution: the stream of streams must be finite

Pre-release

IterateMap

Map and flatten Trees of Streams

iterateMapWith :: IsStream t => (t m a -> t m a -> t m a) -> (a -> t m a) -> t m a -> t m a Source #

Like iterateM but iterates after mapping a stream generator on the output.

Yield an input element in the output stream, map a stream generator on it and then do the same on the resulting stream. This can be used for a depth first traversal of a tree like structure.

Note that iterateM is a special case of iterateMapWith:

iterateM f = iterateMapWith serial (fromEffect . f) . fromEffect

It can be used to traverse a tree structure. For example, to list a directory tree:

Stream.iterateMapWith Stream.serial
    (either Dir.toEither (const nil))
    (fromPure (Left "tmp"))

Pre-release

iterateSmapMWith :: (IsStream t, Monad m) => (t m a -> t m a -> t m a) -> (b -> a -> m (b, t m a)) -> m b -> t m a -> t m a Source #

Like iterateMap but carries a state in the stream generation function. This can be used to traverse graph like structures, we can remember the visited nodes in the state to avoid cycles.

Note that a combination of iterateMap and usingState can also be used to traverse graphs. However, this function provides a more localized state instead of using a global state.

See also: mfix

Pre-release

iterateMapLeftsWith :: IsStream t => (t m (Either a b) -> t m (Either a b) -> t m (Either a b)) -> (a -> t m (Either a b)) -> t m (Either a b) -> t m (Either a b) Source #

In an Either stream iterate on Lefts. This is a special case of iterateMapWith:

iterateMapLeftsWith combine f = iterateMapWith combine (either f (const nil))

To traverse a directory tree:

iterateMapLeftsWith serial Dir.toEither (fromPure (Left "tmp"))

Pre-release

Deprecated

concatUnfold :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b Source #

Deprecated: Please use unfoldMany instead.

(<=>) :: IsStream t => t m a -> t m a -> t m a infixr 5 Source #

Deprecated: Please use wSerial instead.

Same as wSerial.

Since: 0.1.0

(<|) :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Deprecated: Please use async instead.

Same as async.

Since: 0.1.0