| Safe Haskell | None |
|---|---|
| Language | Haskell2010 |
Control.Concurrent.SCC.Streams
Contents
Description
This module defines Source and Sink types and pipe functions that create them. The method get on Source
abstracts away await, and the method put on Sink is a higher-level
abstraction of yield. With this arrangement, a single coroutine can
yield values to multiple sinks and await values from multiple sources with no need to change the
Coroutine functor. The only requirement is that each functor of the sources and sinks the
coroutine uses must be an AncestorFunctor of the coroutine's own functor. For
example, a coroutine that takes two sources and one sink might be declared like this:
zip :: forall m a1 a2 a3 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
=> Source m a1 [x] -> Source m a2 [y] -> Sink m a3 [(x, y)] -> Coroutine d m ()
Sources, sinks, and coroutines communicating through them are all created using the pipe function or one of its
variants. They effectively split the current coroutine into a producer-consumer coroutine pair. The producer gets a
new Sink to write to and the consumer a new Source to read from, in addition to all the streams they inherit from
the current coroutine. The following function, for example, uses the zip coroutine declard above to add together
the pairs of values from two Integer sources:
add :: forall m a1 a2 a3 d. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
=> Source m a1 [Integer] -> Source m a2 [Integer] -> Sink m a3 [Integer] -> Coroutine d m ()
add source1 source2 sink = do pipe
(pairSink-> zip source1 source2 pairSink) -- producer
(pairSource-> mapStream (List.map $ uncurry (+)) pairSource sink) -- consumer
return ()
- data Sink m a x
- data Source m a x
- type SinkFunctor a x = Sum a (Request x x)
- type SourceFunctor a x = Sum a (ReadRequest x)
- class (Functor a, Functor d) => AncestorFunctor a d
- pipe :: forall m a a1 a2 x r1 r2. (Monad m, Monoid x, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
- pipeP :: forall m a a1 a2 x r1 r2. (MonadParallel m, Monoid x, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
- pipeG :: forall m a a1 a2 x r1 r2. (Monad m, Monoid x, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => PairBinder m -> (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
- nullSink :: forall m a x. (Monad m, Monoid x) => Sink m a x
- get :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a [x] -> Coroutine d m (Maybe x)
- getWith :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => Source m a x -> (x -> Coroutine d m ()) -> Coroutine d m ()
- getPrime :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => Source m a x -> Coroutine d m x
- peek :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a [x] -> Coroutine d m (Maybe x)
- put :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a [x] -> x -> Coroutine d m ()
- tryPut :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a [x] -> x -> Coroutine d m Bool
- liftSink :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a x -> Sink m d x
- liftSource :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Source m d x
- pour :: forall m a1 a2 d x. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 x -> Sink m a2 x -> Coroutine d m Bool
- pour_ :: forall m a1 a2 d x. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 x -> Sink m a2 x -> Coroutine d m ()
- tee :: forall m a1 a2 a3 d x. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()
- teeSink :: forall m a1 a2 a3 x. (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3) => Sink m a1 x -> Sink m a2 x -> Sink m a3 x
- getAll :: forall m a d x. (Monad m, Monoid x, AncestorFunctor a d) => Source m a x -> Coroutine d m x
- putAll :: forall m a d x. (Monad m, MonoidNull x, AncestorFunctor a d) => x -> Sink m a x -> Coroutine d m x
- putChunk :: Sink m a x -> forall d. AncestorFunctor a d => x -> Coroutine d m x
- getParsed :: forall m a d p x y. (Monad m, Monoid x, Monoid y, AncestorFunctor a d) => Parser p x y -> Source m a x -> Coroutine d m y
- getRead :: forall m a d x y. (Monad m, Monoid x, AncestorFunctor a d) => Reader x (y -> y) y -> Source m a x -> Coroutine d m y
- getWhile :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (x -> Bool) -> Source m a x -> Coroutine d m x
- getUntil :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (x -> Bool) -> Source m a x -> Coroutine d m (x, Maybe x)
- pourRead :: forall m a1 a2 d x y. (Monad m, MonoidNull x, MonoidNull y, AncestorFunctor a1 d, AncestorFunctor a2 d) => Reader x y y -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
- pourParsed :: forall m p a1 a2 d x y. (Monad m, MonoidNull x, MonoidNull y, AncestorFunctor a1 d, AncestorFunctor a2 d) => Parser p x y -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
- pourWhile :: forall m a1 a2 d x. (Monad m, FactorialMonoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m ()
- pourUntil :: forall m a1 a2 d x. (Monad m, FactorialMonoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m (Maybe x)
- type Reader x py y = x -> Reading x py y
- data Reading x py y :: * -> * -> * -> *
- data ReadingResult x py y :: * -> * -> * -> *
- = ResultPart py (Reader x py y)
- | FinalResult y
- markDown :: forall m a x mark. (Monad m, MonoidNull x) => Sink m a x -> Sink m a [(x, mark)]
- markUpWith :: forall m a x mark. (Monad m, Monoid x) => mark -> Sink m a [(x, mark)] -> Sink m a x
- mapSink :: forall m a x y. Monad m => (x -> y) -> Sink m a [y] -> Sink m a [x]
- mapStream :: forall m a1 a2 d x y. (Monad m, FactorialMonoid x, Monoid y, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
- mapMaybeStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Maybe y) -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m ()
- concatMapStream :: forall m a1 a2 d x y. (Monad m, Monoid y, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 [x] -> Sink m a2 y -> Coroutine d m ()
- mapStreamChunks :: forall m a1 a2 d x y. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
- mapAccumStreamChunks :: forall m a1 a2 d x y acc. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, y)) -> acc -> Source m a1 x -> Sink m a2 y -> Coroutine d m acc
- foldStream :: forall m a d x acc. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (acc -> x -> acc) -> acc -> Source m a x -> Coroutine d m acc
- mapAccumStream :: forall m a1 a2 d x y acc. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, y)) -> acc -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m acc
- concatMapAccumStream :: forall m a1 a2 d x y acc. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, [y])) -> acc -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m acc
- partitionStream :: forall m a1 a2 a3 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> Bool) -> Source m a1 [x] -> Sink m a2 [x] -> Sink m a3 [x] -> Coroutine d m ()
- mapMStream :: forall m a1 a2 d x y. (Monad m, FactorialMonoid x, Monoid y, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
- mapMStream_ :: forall m a d x r. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (x -> Coroutine d m r) -> Source m a x -> Coroutine d m ()
- mapMStreamChunks_ :: forall m a d x r. (Monad m, Monoid x, AncestorFunctor a d) => (x -> Coroutine d m r) -> Source m a x -> Coroutine d m ()
- filterMStream :: forall m a1 a2 d x. (Monad m, FactorialMonoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m ()
- foldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a [x] -> Coroutine d m acc
- foldMStream_ :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a [x] -> Coroutine d m ()
- unfoldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> Coroutine d m (Maybe (x, acc))) -> acc -> Sink m a [x] -> Coroutine d m acc
- unmapMStream_ :: forall m a d x. (Monad m, AncestorFunctor a d) => Coroutine d m (Maybe x) -> Sink m a [x] -> Coroutine d m ()
- unmapMStreamChunks_ :: forall m a d x. (Monad m, MonoidNull x, AncestorFunctor a d) => Coroutine d m x -> Sink m a x -> Coroutine d m ()
- zipWithMStream :: forall m a1 a2 a3 d x y z. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 [x] -> Source m a2 [y] -> Sink m a3 [z] -> Coroutine d m ()
- parZipWithMStream :: forall m a1 a2 a3 d x y z. (MonadParallel m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 [x] -> Source m a2 [y] -> Sink m a3 [z] -> Coroutine d m ()
Sink and Source types
type SinkFunctor a x = Sum a (Request x x) Source
type SourceFunctor a x = Sum a (ReadRequest x) Source
class (Functor a, Functor d) => AncestorFunctor a d
Class of functors that can be lifted.
Minimal complete definition
Instances
| (Functor a, ChildFunctor d, (~) (* -> *) d' (Parent d), AncestorFunctor a d') => AncestorFunctor a d | |
| Functor a => AncestorFunctor a a |
Sink and Source constructors
pipe :: forall m a a1 a2 x r1 r2. (Monad m, Monoid x, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2) Source
pipeP :: forall m a a1 a2 x r1 r2. (MonadParallel m, Monoid x, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2) Source
pipeG :: forall m a a1 a2 x r1 r2. (Monad m, Monoid x, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => PairBinder m -> (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2) Source
A generic version of pipe. The first argument is used to combine two computation steps.
nullSink :: forall m a x. (Monad m, Monoid x) => Sink m a x Source
A disconnected sink that consumes and ignores all data put into it.
Operations on sinks and sources
Singleton operations
get :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a [x] -> Coroutine d m (Maybe x) Source
getWith :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => Source m a x -> (x -> Coroutine d m ()) -> Coroutine d m () Source
Invokes its first argument with the value it gets from the source, if there is any to get.
getPrime :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => Source m a x -> Coroutine d m x Source
peek :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a [x] -> Coroutine d m (Maybe x) Source
put :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a [x] -> x -> Coroutine d m () Source
tryPut :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a [x] -> x -> Coroutine d m Bool Source
Like put, but returns a Bool that determines if the sink is still active.
Lifting functions
liftSink :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a x -> Sink m d x Source
Converts a Sink on the ancestor functor a into a sink on the descendant functor d.
liftSource :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Source m d x Source
Converts a Source on the ancestor functor a into a source on the descendant functor d.
Bulk operations
Fetching and moving data
pour :: forall m a1 a2 d x. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 x -> Sink m a2 x -> Coroutine d m Bool Source
Copies all data from the source argument into the sink argument. The result indicates if there was any chunk to copy.
pour_ :: forall m a1 a2 d x. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 x -> Sink m a2 x -> Coroutine d m () Source
Copies all data from the source argument into the sink argument, like pour but ignoring the result.
tee :: forall m a1 a2 a3 d x. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m () Source
teeSink :: forall m a1 a2 a3 x. (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3) => Sink m a1 x -> Sink m a2 x -> Sink m a3 x Source
getAll :: forall m a d x. (Monad m, Monoid x, AncestorFunctor a d) => Source m a x -> Coroutine d m x Source
getAll consumes and returns all data generated by the source.
putAll :: forall m a d x. (Monad m, MonoidNull x, AncestorFunctor a d) => x -> Sink m a x -> Coroutine d m x Source
putAll puts an entire list into its sink argument. If the coroutine fed by the sink dies, the remainder of
the argument list is returned.
putChunk :: Sink m a x -> forall d. AncestorFunctor a d => x -> Coroutine d m x Source
This method puts a portion of the producer's output into the Sink. The intervening Coroutine computations
suspend up to the pipe invocation that has created the argument sink. The method returns the suffix of the
argument that could not make it into the sink because of the sibling coroutine's death.
getParsed :: forall m a d p x y. (Monad m, Monoid x, Monoid y, AncestorFunctor a d) => Parser p x y -> Source m a x -> Coroutine d m y Source
Consumes inputs from the source as long as the parser accepts it.
getRead :: forall m a d x y. (Monad m, Monoid x, AncestorFunctor a d) => Reader x (y -> y) y -> Source m a x -> Coroutine d m y Source
Consumes input from the source as long as the reader accepts it.
getWhile :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (x -> Bool) -> Source m a x -> Coroutine d m x Source
Consumes values from the source as long as each satisfies the predicate, then returns their list.
getUntil :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (x -> Bool) -> Source m a x -> Coroutine d m (x, Maybe x) Source
Consumes values from the source until one of them satisfies the predicate or the source is emptied, then returns the pair of the list of preceding values and maybe the one value that satisfied the predicate. The latter is not consumed.
pourRead :: forall m a1 a2 d x y. (Monad m, MonoidNull x, MonoidNull y, AncestorFunctor a1 d, AncestorFunctor a2 d) => Reader x y y -> Source m a1 x -> Sink m a2 y -> Coroutine d m () Source
Like pour, copies data from the source to the sink, but only as long as it satisfies the predicate.
pourParsed :: forall m p a1 a2 d x y. (Monad m, MonoidNull x, MonoidNull y, AncestorFunctor a1 d, AncestorFunctor a2 d) => Parser p x y -> Source m a1 x -> Sink m a2 y -> Coroutine d m () Source
Parses the input data using the given parser and copies the results to output.
pourWhile :: forall m a1 a2 d x. (Monad m, FactorialMonoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m () Source
Like pour, copies data from the source to the sink, but only as long as it satisfies the predicate.
pourUntil :: forall m a1 a2 d x. (Monad m, FactorialMonoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m (Maybe x) Source
Like pour, copies data from the source to the sink, but only until one value satisfies the predicate. That
value is returned rather than copied.
data Reading x py y :: * -> * -> * -> *
data ReadingResult x py y :: * -> * -> * -> *
Constructors
| ResultPart py (Reader x py y) | A part of the result with the reader of more input |
| FinalResult y | Final result chunk |
Stream transformations
markDown :: forall m a x mark. (Monad m, MonoidNull x) => Sink m a x -> Sink m a [(x, mark)] Source
A sink mark-down transformation: the marks get removed off each chunk.
markUpWith :: forall m a x mark. (Monad m, Monoid x) => mark -> Sink m a [(x, mark)] -> Sink m a x Source
A sink mark-up transformation: every chunk going into the sink is accompanied by the given value.
mapStream :: forall m a1 a2 d x y. (Monad m, FactorialMonoid x, Monoid y, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m () Source
mapMaybeStream :: forall m a1 a2 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Maybe y) -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m () Source
mapMaybeStream is to mapStream like mapMaybe is to map.
concatMapStream :: forall m a1 a2 d x y. (Monad m, Monoid y, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 [x] -> Sink m a2 y -> Coroutine d m () Source
concatMapStream is to mapStream like concatMap is to map.
mapStreamChunks :: forall m a1 a2 d x y. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m () Source
Like mapStream except it runs the argument function on whole chunks read from the input.
mapAccumStreamChunks :: forall m a1 a2 d x y acc. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, y)) -> acc -> Source m a1 x -> Sink m a2 y -> Coroutine d m acc Source
Like mapAccumStream except it runs the argument function on whole chunks read from the input.
foldStream :: forall m a d x acc. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (acc -> x -> acc) -> acc -> Source m a x -> Coroutine d m acc Source
mapAccumStream :: forall m a1 a2 d x y acc. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, y)) -> acc -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m acc Source
mapAccumStream is similar to mapAccumL except it reads the values from a Source instead of a list
and writes the mapped values into a Sink instead of returning another list.
concatMapAccumStream :: forall m a1 a2 d x y acc. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (acc -> x -> (acc, [y])) -> acc -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m acc Source
concatMapAccumStream is a love child of concatMapStream and mapAccumStream: it threads the accumulator like
the latter, but its argument function returns not a single value, but a list of values to write into the sink.
partitionStream :: forall m a1 a2 a3 d x. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> Bool) -> Source m a1 [x] -> Sink m a2 [x] -> Sink m a3 [x] -> Coroutine d m () Source
Monadic stream transformations
mapMStream :: forall m a1 a2 d x y. (Monad m, FactorialMonoid x, Monoid y, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m () Source
mapMStream is similar to mapM. It draws the values from a Source instead of a list, writes the
mapped values to a Sink, and returns a Coroutine.
mapMStream_ :: forall m a d x r. (Monad m, FactorialMonoid x, AncestorFunctor a d) => (x -> Coroutine d m r) -> Source m a x -> Coroutine d m () Source
mapMStream_ is similar to mapM_ except it draws the values from a Source instead of a list and
works with Coroutine instead of an arbitrary monad.
mapMStreamChunks_ :: forall m a d x r. (Monad m, Monoid x, AncestorFunctor a d) => (x -> Coroutine d m r) -> Source m a x -> Coroutine d m () Source
Like mapMStream_ except it runs the argument function on whole chunks read from the input.
filterMStream :: forall m a1 a2 d x. (Monad m, FactorialMonoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Coroutine d m Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m () Source
foldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a [x] -> Coroutine d m acc Source
foldMStream is similar to foldM except it draws the values from a Source instead of a list and
works with Coroutine instead of an arbitrary monad.
foldMStream_ :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> x -> Coroutine d m acc) -> acc -> Source m a [x] -> Coroutine d m () Source
A variant of foldMStream that discards the final result value.
unfoldMStream :: forall m a d x acc. (Monad m, AncestorFunctor a d) => (acc -> Coroutine d m (Maybe (x, acc))) -> acc -> Sink m a [x] -> Coroutine d m acc Source
unfoldMStream is a version of unfoldr that writes the generated values into a Sink instead of
returning a list.
unmapMStream_ :: forall m a d x. (Monad m, AncestorFunctor a d) => Coroutine d m (Maybe x) -> Sink m a [x] -> Coroutine d m () Source
unmapMStream_ is opposite of mapMStream_; it takes a Sink instead of a Source argument and writes the
generated values into it.
unmapMStreamChunks_ :: forall m a d x. (Monad m, MonoidNull x, AncestorFunctor a d) => Coroutine d m x -> Sink m a x -> Coroutine d m () Source
Like unmapMStream_ but writing whole chunks of generated data into the argument sink.
zipWithMStream :: forall m a1 a2 a3 d x y z. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 [x] -> Source m a2 [y] -> Sink m a3 [z] -> Coroutine d m () Source
zipWithMStream is similar to zipWithM except it draws the values from two Source arguments
instead of two lists, sends the results into a Sink, and works with Coroutine instead of an arbitrary monad.
parZipWithMStream :: forall m a1 a2 a3 d x y z. (MonadParallel m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => (x -> y -> Coroutine d m z) -> Source m a1 [x] -> Source m a2 [y] -> Sink m a3 [z] -> Coroutine d m () Source
parZipWithMStream is equivalent to zipWithMStream, but it consumes the two sources in parallel.