Copyright | (c) 2017 Composewell Technologies |
---|---|
License | BSD-3-Clause |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
Expand a stream by combining two or more streams or by combining streams with unfolds.
Synopsis
- serial :: IsStream t => t m a -> t m a -> t m a
- ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- parallelFst :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- parallelMin :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- append :: (IsStream t, Monad m) => t m b -> t m b -> t m b
- wSerial :: IsStream t => t m a -> t m a -> t m a
- wSerialFst :: IsStream t => t m a -> t m a -> t m a
- wSerialMin :: IsStream t => t m a -> t m a -> t m a
- interleave :: (IsStream t, Monad m) => t m b -> t m b -> t m b
- interleaveMin :: (IsStream t, Monad m) => t m b -> t m b -> t m b
- interleaveSuffix :: (IsStream t, Monad m) => t m b -> t m b -> t m b
- interleaveInfix :: (IsStream t, Monad m) => t m b -> t m b -> t m b
- roundrobin :: (IsStream t, Monad m) => t m b -> t m b -> t m b
- zipWith :: (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c
- zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c
- zipAsyncWith :: (IsStream t, MonadAsync m) => (a -> b -> c) -> t m a -> t m b -> t m c
- zipAsyncWithM :: (IsStream t, MonadAsync m) => (a -> b -> m c) -> t m a -> t m b -> t m c
- mergeBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a
- mergeByM :: (IsStream t, Monad m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a
- mergeAsyncBy :: (IsStream t, MonadAsync m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a
- mergeAsyncByM :: (IsStream t, MonadAsync m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a
- unfoldMany :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b
- unfoldManyInterleave :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b
- unfoldManyRoundRobin :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b
- interpose :: (IsStream t, Monad m) => c -> Unfold m b c -> t m b -> t m c
- interposeSuffix :: (IsStream t, Monad m) => c -> Unfold m b c -> t m b -> t m c
- intercalate :: (IsStream t, Monad m) => Unfold m b c -> b -> t m b -> t m c
- intercalateSuffix :: (IsStream t, Monad m) => Unfold m b c -> b -> t m b -> t m c
- gintercalate :: (IsStream t, Monad m) => Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c
- gintercalateSuffix :: (IsStream t, Monad m) => Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c
- concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b
- concatMap :: (IsStream t, Monad m) => (a -> t m b) -> t m a -> t m b
- concatM :: (IsStream t, Monad m) => m (t m a) -> t m a
- concat :: (IsStream t, Monad m) => t m (t m a) -> t m a
- concatFoldableWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a
- concatMapFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b
- concatForFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b
- concatMapWith :: IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
- bindWith :: IsStream t => (t m b -> t m b -> t m b) -> t m a -> (a -> t m b) -> t m b
- concatSmapMWith :: (IsStream t, Monad m) => (t m b -> t m b -> t m b) -> (s -> a -> m (s, t m b)) -> m s -> t m a -> t m b
- concatPairsWith :: IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
- iterateMapWith :: IsStream t => (t m a -> t m a -> t m a) -> (a -> t m a) -> t m a -> t m a
- iterateSmapMWith :: (IsStream t, Monad m) => (t m a -> t m a -> t m a) -> (b -> a -> m (b, t m a)) -> m b -> t m a -> t m a
- iterateMapLeftsWith :: IsStream t => (t m (Either a b) -> t m (Either a b) -> t m (Either a b)) -> (a -> t m (Either a b)) -> t m (Either a b) -> t m (Either a b)
- concatUnfold :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b
Binary Combinators (Linear)
Functions ending in the shape:
t m a -> t m a -> t m a
.
The functions in this section have a linear or flat n-ary combining
characterstics. It means that when combined n
times (e.g. a
) the resulting expression will have an serial
b serial
c ...O(n)
complexity (instead O(n^2) for pair wise combinators described in the
next section. These functions can be used efficiently with
concatMapWith
et. al. combinators that combine streams in a linear
fashion (contrast with concatPairsWith
which combines streams as a
binary tree).
serial :: IsStream t => t m a -> t m a -> t m a infixr 6 Source #
Appends two streams sequentially, yielding all elements from the first stream, and then all elements from the second stream.
>>>
import Streamly.Prelude (serial)
>>>
stream1 = Stream.fromList [1,2]
>>>
stream2 = Stream.fromList [3,4]
>>>
Stream.toList $ stream1 `serial` stream2
[1,2,3,4]
This operation can be used to fold an infinite lazy container of streams.
Since: 0.2.0 (Streamly)
Since: 0.8.0
ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #
Appends two streams, both the streams may be evaluated concurrently but the outputs are used in the same order as the corresponding actions in the original streams, side effects will happen in the order in which the streams are evaluated:
>>>
import Streamly.Prelude (ahead, SerialT)
>>>
stream1 = Stream.fromEffect (delay 4) :: SerialT IO Int
>>>
stream2 = Stream.fromEffect (delay 2) :: SerialT IO Int
>>>
Stream.toList $ stream1 `ahead` stream2 :: IO [Int]
2 sec 4 sec [4,2]
Multiple streams can be combined. With enough threads, all of them can be scheduled simultaneously:
>>>
stream3 = Stream.fromEffect (delay 1)
>>>
Stream.toList $ stream1 `ahead` stream2 `ahead` stream3
1 sec 2 sec 4 sec [4,2,1]
With 2 threads, only two can be scheduled at a time, when one of those finishes, the third one gets scheduled:
>>>
Stream.toList $ Stream.maxThreads 2 $ stream1 `ahead` stream2 `ahead` stream3
2 sec 1 sec 4 sec [4,2,1]
Only streams are scheduled for ahead evaluation, how actions within a stream
are evaluated depends on the stream type. If it is a concurrent stream they
will be evaluated concurrently. It may not make much sense combining serial
streams using ahead
.
ahead
can be safely used to fold an infinite lazy container of streams.
Since: 0.3.0 (Streamly)
Since: 0.8.0
async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #
Merges two streams, both the streams may be evaluated concurrently, outputs from both are used as they arrive:
>>>
import Streamly.Prelude (async)
>>>
stream1 = Stream.fromEffect (delay 4)
>>>
stream2 = Stream.fromEffect (delay 2)
>>>
Stream.toList $ stream1 `async` stream2
2 sec 4 sec [2,4]
Multiple streams can be combined. With enough threads, all of them can be scheduled simultaneously:
>>>
stream3 = Stream.fromEffect (delay 1)
>>>
Stream.toList $ stream1 `async` stream2 `async` stream3
... [1,2,4]
With 2 threads, only two can be scheduled at a time, when one of those finishes, the third one gets scheduled:
>>>
Stream.toList $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3
... [2,1,4]
With a single thread, it becomes serial:
>>>
Stream.toList $ Stream.maxThreads 1 $ stream1 `async` stream2 `async` stream3
... [4,2,1]
Only streams are scheduled for async evaluation, how actions within a stream are evaluated depends on the stream type. If it is a concurrent stream they will be evaluated concurrently.
In the following example, both the streams are scheduled for concurrent evaluation but each individual stream is evaluated serially:
>>>
stream1 = Stream.fromListM $ Prelude.map delay [3,3] -- SerialT IO Int
>>>
stream2 = Stream.fromListM $ Prelude.map delay [1,1] -- SerialT IO Int
>>>
Stream.toList $ stream1 `async` stream2 -- IO [Int]
... [1,1,3,3]
If total threads are 2, the third stream is scheduled only after one of the first two has finished:
>>>
stream3 = Stream.fromListM $ Prelude.map delay [2,2] -- SerialT IO Int
>>>
Stream.toList $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3 -- IO [Int]
... [1,1,3,2,3,2]
Thus async
goes deep in first few streams rather than going wide in all
streams. It prefers to evaluate the leftmost streams as much as possible.
Because of this behavior, async
can be safely used to fold an infinite
lazy container of streams.
Since: 0.2.0 (Streamly)
Since: 0.8.0
wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #
For singleton streams, wAsync
is the same as async
. See async
for
singleton stream behavior. For multi-element streams, while async
is left
biased i.e. it tries to evaluate the left side stream as much as possible,
wAsync
tries to schedule them both fairly. In other words, async
goes
deep while wAsync
goes wide. However, outputs are always used as they
arrive.
With a single thread, async
starts behaving like serial
while wAsync
starts behaving like wSerial
.
>>>
import Streamly.Prelude (wAsync)
>>>
stream1 = Stream.fromList [1,2,3]
>>>
stream2 = Stream.fromList [4,5,6]
>>>
Stream.toList $ Stream.fromAsync $ Stream.maxThreads 1 $ stream1 `async` stream2
[1,2,3,4,5,6]
>>>
Stream.toList $ Stream.fromWAsync $ Stream.maxThreads 1 $ stream1 `wAsync` stream2
[1,4,2,5,3,6]
With two threads available, and combining three streams:
>>>
stream3 = Stream.fromList [7,8,9]
>>>
Stream.toList $ Stream.fromAsync $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3
[1,2,3,4,5,6,7,8,9]
>>>
Stream.toList $ Stream.fromWAsync $ Stream.maxThreads 2 $ stream1 `wAsync` stream2 `wAsync` stream3
[1,4,2,7,5,3,8,6,9]
This operation cannot be used to fold an infinite lazy container of streams, because it schedules all the streams in a round robin manner.
Note that WSerialT
and single threaded WAsyncT
both interleave streams
but the exact scheduling is slightly different in both cases.
Since: 0.2.0 (Streamly)
Since: 0.8.0
parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #
Like async
except that the execution is much more
strict. There is no limit on the number of threads. While
async
may not schedule a stream if there is no demand
from the consumer, parallel
always evaluates both the streams immediately.
The only limit that applies to parallel
is maxBuffer
.
Evaluation may block if the output buffer becomes full.
>>>
import Streamly.Prelude (parallel)
>>>
stream = Stream.fromEffect (delay 2) `parallel` Stream.fromEffect (delay 1)
>>>
Stream.toList stream -- IO [Int]
1 sec 2 sec [1,2]
parallel
guarantees that all the streams are scheduled for execution
immediately, therefore, we could use things like starting timers inside the
streams and relying on the fact that all timers were started at the same
time.
Unlike async
this operation cannot be used to fold an infinite lazy
container of streams, because it schedules all the streams strictly
concurrently.
Since: 0.2.0 (Streamly)
Since: 0.8.0
parallelFst :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #
Like parallel
but stops the output as soon as the first stream stops.
Pre-release
parallelMin :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #
Like parallel
but stops the output as soon as any of the two streams
stops.
Pre-release
Binary Combinators (Pair Wise)
Like the functions in the section above these functions also combine
two streams into a single stream but when used n
times linearly they
exhibit O(n^2) complexity. They are best combined in a binary tree
fashion using concatPairsWith
giving a n * log n
complexity. Avoid
using these with concatMapWith
when combining a large or infinite
number of streams.
Append
append :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #
Append the outputs of two streams, yielding all the elements from the first stream and then yielding all the elements from the second stream.
IMPORTANT NOTE: This could be 100x faster than serial/<>
for appending a
few (say 100) streams because it can fuse via stream fusion. However, it
does not scale for a large number of streams (say 1000s) and becomes
qudartically slow. Therefore use this for custom appending of a few streams
but use concatMap
or 'concatMapWith serial' for appending n
streams or
infinite containers of streams.
Pre-release
wSerial
wSerial
is a CPS based stream interleaving functions. It can be
used with concatMapWith
as well, however, the interleaving behavior of
n
streams would be asymmetric giving exponentially more weightage to
streams that come earlier in the composition.
wSerial :: IsStream t => t m a -> t m a -> t m a infixr 6 Source #
Interleaves two streams, yielding one element from each stream alternately. When one stream stops the rest of the other stream is used in the output stream.
>>>
import Streamly.Prelude (wSerial)
>>>
stream1 = Stream.fromList [1,2]
>>>
stream2 = Stream.fromList [3,4]
>>>
Stream.toList $ Stream.fromWSerial $ stream1 `wSerial` stream2
[1,3,2,4]
Note, for singleton streams wSerial
and serial
are identical.
Note that this operation cannot be used to fold a container of infinite streams but it can be used for very large streams as the state that it needs to maintain is proportional to the logarithm of the number of streams.
Since: 0.2.0 (Streamly)
Since: 0.8.0
wSerialFst :: IsStream t => t m a -> t m a -> t m a Source #
Like wSerial
but stops interleaving as soon as the first stream stops.
Since: 0.7.0
wSerialMin :: IsStream t => t m a -> t m a -> t m a Source #
Like wSerial
but stops interleaving as soon as any of the two streams
stops.
Since: 0.7.0
Interleave
interleave
is like wSerial
but using a direct style
implementation instead of CPS. It is faster than wSerial
due to stream
fusion but has worse efficiency when used with concatMapWith
for large
number of streams.
interleave :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #
Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. If any of the streams finishes early the other stream continues alone until it too finishes.
>>>
:set -XOverloadedStrings
>>>
import Data.Functor.Identity (Identity)
>>>
Stream.interleave "ab" ",,,," :: Stream.SerialT Identity Char
fromList "a,b,,,"
>>>
Stream.interleave "abcd" ",," :: Stream.SerialT Identity Char
fromList "a,b,cd"
interleave
is dual to interleaveMin
, it can be called interleaveMax
.
Do not use at scale in concatMapWith.
Pre-release
interleaveMin :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #
Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. The output stops as soon as any of the two streams finishes, discarding the remaining part of the other stream. The last element of the resulting stream would be from the longer stream.
>>>
:set -XOverloadedStrings
>>>
import Data.Functor.Identity (Identity)
>>>
Stream.interleaveMin "ab" ",,,," :: Stream.SerialT Identity Char
fromList "a,b,">>>
Stream.interleaveMin "abcd" ",," :: Stream.SerialT Identity Char
fromList "a,b,c"
interleaveMin
is dual to interleave
.
Do not use at scale in concatMapWith.
Pre-release
interleaveSuffix :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #
Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. As soon as the first stream finishes, the output stops, discarding the remaining part of the second stream. In this case, the last element in the resulting stream would be from the second stream. If the second stream finishes early then the first stream still continues to yield elements until it finishes.
>>>
:set -XOverloadedStrings
>>>
import Data.Functor.Identity (Identity)
>>>
Stream.interleaveSuffix "abc" ",,,," :: Stream.SerialT Identity Char
fromList "a,b,c,">>>
Stream.interleaveSuffix "abc" "," :: Stream.SerialT Identity Char
fromList "a,bc"
interleaveSuffix
is a dual of interleaveInfix
.
Do not use at scale in concatMapWith.
Pre-release
interleaveInfix :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #
Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream and ending at the first stream. If the second stream is longer than the first, elements from the second stream are infixed with elements from the first stream. If the first stream is longer then it continues yielding elements even after the second stream has finished.
>>>
:set -XOverloadedStrings
>>>
import Data.Functor.Identity (Identity)
>>>
Stream.interleaveInfix "abc" ",,,," :: Stream.SerialT Identity Char
fromList "a,b,c">>>
Stream.interleaveInfix "abc" "," :: Stream.SerialT Identity Char
fromList "a,bc"
interleaveInfix
is a dual of interleaveSuffix
.
Do not use at scale in concatMapWith.
Pre-release
Round Robin
roundrobin :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #
Schedule the execution of two streams in a fair round-robin manner,
executing each stream once, alternately. Execution of a stream may not
necessarily result in an output, a stream may chose to Skip
producing an
element until later giving the other stream a chance to run. Therefore, this
combinator fairly interleaves the execution of two streams rather than
fairly interleaving the output of the two streams. This can be useful in
co-operative multitasking without using explicit threads. This can be used
as an alternative to async
.
Do not use at scale in concatMapWith.
Pre-release
Zip
zipWith :: (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c Source #
Zip two streams serially using a pure zipping function.
> S.toList $ S.zipWith (+) (S.fromList [1,2,3]) (S.fromList [4,5,6]) [5,7,9]
Since: 0.1.0
zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c Source #
Like zipWith
but using a monadic zipping function.
Since: 0.4.0
zipAsyncWith :: (IsStream t, MonadAsync m) => (a -> b -> c) -> t m a -> t m b -> t m c Source #
Like zipWith
but zips concurrently i.e. both the streams being zipped
are generated concurrently.
Since: 0.1.0
zipAsyncWithM :: (IsStream t, MonadAsync m) => (a -> b -> m c) -> t m a -> t m b -> t m c Source #
Like zipWithM
but zips concurrently i.e. both the streams being zipped
are generated concurrently.
Since: 0.4.0
Merge
mergeBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #
Merge two streams using a comparison function. The head elements of both the streams are compared and the smaller of the two elements is emitted, if both elements are equal then the element from the first stream is used first.
If the streams are sorted in ascending order, the resulting stream would also remain sorted in ascending order.
>>> Stream.toList $ Stream.mergeBy compare (Stream.fromList [1,3,5]) (Stream.fromList [2,4,6,8]) [1,2,3,4,5,6,8]
Since: 0.6.0
mergeByM :: (IsStream t, Monad m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #
Like mergeBy
but with a monadic comparison function.
Merge two streams randomly:
> randomly _ _ = randomIO >>= x -> return $ if x then LT else GT > Stream.toList $ Stream.mergeByM randomly (Stream.fromList [1,1,1,1]) (Stream.fromList [2,2,2,2]) [2,1,2,2,2,1,1,1]
Merge two streams in a proportion of 2:1:
>>> :{ do let proportionately m n = do ref <- newIORef $ cycle $ Prelude.concat [Prelude.replicate m LT, Prelude.replicate n GT] return $ _ _ -> do r <- readIORef ref writeIORef ref $ Prelude.tail r return $ Prelude.head r f <- proportionately 2 1 xs <- Stream.toList $ Stream.mergeByM f (Stream.fromList [1,1,1,1,1,1]) (Stream.fromList [2,2,2]) print xs :} [1,1,2,1,1,2,1,1,2]
Since: 0.6.0
mergeAsyncBy :: (IsStream t, MonadAsync m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #
Like mergeBy
but merges concurrently (i.e. both the elements being
merged are generated concurrently).
Since: 0.6.0
mergeAsyncByM :: (IsStream t, MonadAsync m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #
Like mergeByM
but merges concurrently (i.e. both the elements being
merged are generated concurrently).
Since: 0.6.0
Combine Streams and Unfolds
Expand a stream by repeatedly using an unfold and merging the resulting streams. Functions generally ending in the shape:
Unfold m a b -> t m a -> t m b
Append Many (Unfold)
Unfold and flatten streams.
unfoldManyInterleave :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b Source #
Like unfoldMany
but interleaves the streams in the same way as
interleave
behaves instead of appending them.
Pre-release
unfoldManyRoundRobin :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b Source #
Like unfoldMany
but executes the streams in the same way as
roundrobin
.
Pre-release
Interpose
Insert effects between streams. Like unfoldMany but intersperses an effect between the streams. A special case of gintercalate.
interpose :: (IsStream t, Monad m) => c -> Unfold m b c -> t m b -> t m c Source #
Unfold the elements of a stream, intersperse the given element between the unfolded streams and then concat them into a single stream.
unwords = S.interpose ' '
Pre-release
interposeSuffix :: (IsStream t, Monad m) => c -> Unfold m b c -> t m b -> t m c Source #
Unfold the elements of a stream, append the given element after each unfolded stream and then concat them into a single stream.
unlines = S.interposeSuffix '\n'
Pre-release
Intercalate
Insert Streams between Streams. Like unfoldMany but intersperses streams from another source between the streams from the first source.
intercalate :: (IsStream t, Monad m) => Unfold m b c -> b -> t m b -> t m c Source #
intersperse
followed by unfold and concat.
intercalate unf a str = unfoldMany unf $ intersperse a str intersperse = intercalate (Unfold.function id) unwords = intercalate Unfold.fromList " "
>>>
Stream.toList $ Stream.intercalate Unfold.fromList " " $ Stream.fromList ["abc", "def", "ghi"]
"abc def ghi"
Since: 0.8.0
intercalateSuffix :: (IsStream t, Monad m) => Unfold m b c -> b -> t m b -> t m c Source #
intersperseSuffix
followed by unfold and concat.
intercalateSuffix unf a str = unfoldMany unf $ intersperseSuffix a str intersperseSuffix = intercalateSuffix (Unfold.function id) unlines = intercalateSuffix Unfold.fromList "\n"
>>>
Stream.toList $ Stream.intercalateSuffix Unfold.fromList "\n" $ Stream.fromList ["abc", "def", "ghi"]
"abc\ndef\nghi\n"
Since: 0.8.0
gintercalate :: (IsStream t, Monad m) => Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c Source #
interleaveInfix
followed by unfold and concat.
Pre-release
gintercalateSuffix :: (IsStream t, Monad m) => Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c Source #
interleaveSuffix
followed by unfold and concat.
Pre-release
Append Many (concatMap)
Map and serially append streams. concatMapM
is a generalization of
the binary append operation to append many streams.
concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b Source #
Map a stream producing monadic function on each element of the stream
and then flatten the results into a single stream. Since the stream
generation function is monadic, unlike concatMap
, it can produce an
effect at the beginning of each iteration of the inner loop.
Since: 0.6.0
concatMap :: (IsStream t, Monad m) => (a -> t m b) -> t m a -> t m b Source #
Map a stream producing function on each element of the stream and then flatten the results into a single stream.
concatMap f =concatMapM
(return . f) concatMap =concatMapWith
serial
concatMap f = 'concat . map f' concatMap f =unfoldMany
(UF.lmap f UF.fromStream)
Since: 0.6.0
concat :: (IsStream t, Monad m) => t m (t m a) -> t m a Source #
Flatten a stream of streams to a single stream.
concat = concatMap id
Pre-release
Flatten Containers
Flatten Foldable
containers using the binary stream merging
operations.
concatFoldableWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a Source #
A variant of fold
that allows you to fold a Foldable
container of streams using the specified stream sum operation.
concatFoldableWith async
$ map return [1..3]
Equivalent to:
concatFoldableWith f = Prelude.foldr f S.nil concatFoldableWith f = S.concatMapFoldableWith f id
Since: 0.8.0 (Renamed foldWith to concatFoldableWith)
Since: 0.1.0 (Streamly)
concatMapFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b Source #
A variant of foldMap
that allows you to map a monadic streaming action
on a Foldable
container and then fold it using the specified stream merge
operation.
concatMapFoldableWith async
return [1..3]
Equivalent to:
concatMapFoldableWith f g = Prelude.foldr (f . g) S.nil concatMapFoldableWith f g xs = S.concatMapWith f g (S.fromFoldable xs)
Since: 0.8.0 (Renamed foldMapWith to concatMapFoldableWith)
Since: 0.1.0 (Streamly)
concatForFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b Source #
Like concatMapFoldableWith
but with the last two arguments reversed i.e. the
monadic streaming function is the last argument.
Equivalent to:
concatForFoldableWith f xs g = Prelude.foldr (f . g) S.nil xs concatForFoldableWith = flip S.concatMapFoldableWith
Since: 0.8.0 (Renamed forEachWith to concatForFoldableWith)
Since: 0.1.0 (Streamly)
ConcatMapWith
Map and flatten a stream like concatMap
but using a custom binary
stream merging combinator instead of just appending the streams. The
merging occurs sequentially, it works efficiently for serial
, async
,
ahead
like merge operations where we consume one stream before the
next or in case of wAsync
or parallel
where we consume all streams
simultaneously anyway.
However, in cases where the merging consumes streams in a round robin
fashion, a pair wise merging using concatPairsWith
would be more
efficient. These cases include operations like mergeBy
or zipWith
.
concatMapWith :: IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b Source #
concatMapWith mixer generator stream
is a two dimensional looping
combinator. The generator
function is used to generate streams from the
elements in the input stream
and the mixer
function is used to merge
those streams.
Note we can merge streams concurrently by using a concurrent merge function.
Since: 0.7.0
Since: 0.8.0 (signature change)
concatSmapMWith :: (IsStream t, Monad m) => (t m b -> t m b -> t m b) -> (s -> a -> m (s, t m b)) -> m s -> t m a -> t m b Source #
Like concatMapWith
but carries a state which can be used to share
information across multiple steps of concat.
concatSmapMWith combine f initial = concatMapWith combine id . smapM f initial
Pre-release
ConcatPairsWith
See the notes about suitable merge functions in the concatMapWith
section.
concatPairsWith :: IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b Source #
Combine streams in pairs using a binary stream combinator, then combine the resulting streams in pairs recursively until we get to a single combined stream.
For example, you can sort a stream using merge sort like this:
>>>
Stream.toList $ Stream.concatPairsWith (Stream.mergeBy compare) Stream.fromPure $ Stream.fromList [5,1,7,9,2]
[1,2,5,7,9]
Caution: the stream of streams must be finite
Pre-release
IterateMap
Map and flatten Trees of Streams
iterateMapWith :: IsStream t => (t m a -> t m a -> t m a) -> (a -> t m a) -> t m a -> t m a Source #
Like iterateM
but iterates after mapping a stream generator on the
output.
Yield an input element in the output stream, map a stream generator on it and then do the same on the resulting stream. This can be used for a depth first traversal of a tree like structure.
Note that iterateM
is a special case of iterateMapWith
:
iterateM f = iterateMapWith serial (fromEffect . f) . fromEffect
It can be used to traverse a tree structure. For example, to list a directory tree:
Stream.iterateMapWith Stream.serial (either Dir.toEither (const nil)) (fromPure (Left "tmp"))
Pre-release
iterateSmapMWith :: (IsStream t, Monad m) => (t m a -> t m a -> t m a) -> (b -> a -> m (b, t m a)) -> m b -> t m a -> t m a Source #
Like iterateMap
but carries a state in the stream generation function.
This can be used to traverse graph like structures, we can remember the
visited nodes in the state to avoid cycles.
Note that a combination of iterateMap
and usingState
can also be used to
traverse graphs. However, this function provides a more localized state
instead of using a global state.
See also: mfix
Pre-release
iterateMapLeftsWith :: IsStream t => (t m (Either a b) -> t m (Either a b) -> t m (Either a b)) -> (a -> t m (Either a b)) -> t m (Either a b) -> t m (Either a b) Source #
In an Either
stream iterate on Left
s. This is a special case of
iterateMapWith
:
iterateMapLeftsWith combine f = iterateMapWith combine (either f (const nil))
To traverse a directory tree:
iterateMapLeftsWith serial Dir.toEither (fromPure (Left "tmp"))
Pre-release