| Copyright | (c) 2018 Composewell Technologies (c) Roman Leshchinskiy 2008-2010 |
|---|---|
| License | BSD-3-Clause |
| Maintainer | streamly@composewell.com |
| Stability | experimental |
| Portability | GHC |
| Safe Haskell | Safe-Inferred |
| Language | Haskell2010 |
Streamly.Internal.Data.Stream.StreamD.Type
Description
Synopsis
- data Step s a
- data Stream m a where
- data CrossStream m a
- unCross :: CrossStream m a -> Stream m a
- mkCross :: Stream m a -> CrossStream m a
- fromStreamK :: Applicative m => StreamK m a -> Stream m a
- toStreamK :: Monad m => Stream m a -> StreamK m a
- unfold :: Applicative m => Unfold m a b -> a -> Stream m b
- nilM :: Applicative m => m b -> Stream m a
- consM :: Applicative m => m a -> Stream m a -> Stream m a
- fromPure :: Applicative m => a -> Stream m a
- fromEffect :: Applicative m => m a -> Stream m a
- fromList :: Applicative m => [a] -> Stream m a
- uncons :: Monad m => Stream m a -> m (Maybe (a, Stream m a))
- fold :: Monad m => Fold m a b -> Stream m a -> m b
- foldBreak :: Monad m => Fold m a b -> Stream m a -> m (b, Stream m a)
- foldAddLazy :: Monad m => Fold m a b -> Stream m a -> Fold m a b
- foldAdd :: Monad m => Fold m a b -> Stream m a -> m (Fold m a b)
- foldEither :: Monad m => Fold m a b -> Stream m a -> m (Either (Fold m a b) (b, Stream m a))
- foldl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> m b
- foldlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> m b
- foldlx' :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream m a -> m b
- foldlMx' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> m b
- foldrM :: Monad m => (a -> m b -> m b) -> m b -> Stream m a -> m b
- foldrMx :: Monad m => (a -> m x -> m x) -> m x -> (m x -> m b) -> Stream m a -> m b
- foldr :: Monad m => (a -> b -> b) -> b -> Stream m a -> m b
- foldrS :: Monad m => (a -> Stream m b -> Stream m b) -> Stream m b -> Stream m a -> Stream m b
- drain :: Monad m => Stream m a -> m ()
- toList :: Monad m => Stream m a -> m [a]
- map :: Monad m => (a -> b) -> Stream m a -> Stream m b
- mapM :: Monad m => (a -> m b) -> Stream m a -> Stream m b
- take :: Applicative m => Int -> Stream m a -> Stream m a
- takeWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
- takeWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
- takeEndBy :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
- takeEndByM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
- zipWithM :: Monad m => (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
- zipWith :: Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
- crossApply :: Functor f => Stream f (a -> b) -> Stream f a -> Stream f b
- crossApplyFst :: Functor f => Stream f a -> Stream f b -> Stream f a
- crossApplySnd :: Functor f => Stream f a -> Stream f b -> Stream f b
- crossWith :: Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
- cross :: Monad m => Stream m a -> Stream m b -> Stream m (a, b)
- data ConcatMapUState o i
- = ConcatMapUOuter o
- | ConcatMapUInner o i
- unfoldMany :: Monad m => Unfold m a b -> Stream m a -> Stream m b
- concatEffect :: Monad m => m (Stream m a) -> Stream m a
- concatMap :: Monad m => (a -> Stream m b) -> Stream m a -> Stream m b
- concatMapM :: Monad m => (a -> m (Stream m b)) -> Stream m a -> Stream m b
- concat :: Monad m => Stream m (Stream m a) -> Stream m a
- unfoldIterateDfs :: Monad m => Unfold m a a -> Stream m a -> Stream m a
- unfoldIterateBfs :: Monad m => Unfold m a a -> Stream m a -> Stream m a
- unfoldIterateBfsRev :: Monad m => Unfold m a a -> Stream m a -> Stream m a
- concatIterateScan :: Monad m => (b -> a -> m b) -> (b -> m (Maybe (b, Stream m a))) -> b -> Stream m a
- concatIterateDfs :: Monad m => (a -> Maybe (Stream m a)) -> Stream m a -> Stream m a
- concatIterateBfs :: Monad m => (a -> Maybe (Stream m a)) -> Stream m a -> Stream m a
- concatIterateBfsRev :: Monad m => (a -> Maybe (Stream m a)) -> Stream m a -> Stream m a
- data FoldMany s fs b a
- = FoldManyStart s
- | FoldManyFirst fs s
- | FoldManyLoop s fs
- | FoldManyYield b (FoldMany s fs b a)
- | FoldManyDone
- data FoldManyPost s fs b a
- = FoldManyPostStart s
- | FoldManyPostLoop s fs
- | FoldManyPostYield b (FoldManyPost s fs b a)
- | FoldManyPostDone
- foldMany :: Monad m => Fold m a b -> Stream m a -> Stream m b
- foldManyPost :: Monad m => Fold m a b -> Stream m a -> Stream m b
- groupsOf :: Monad m => Int -> Fold m a b -> Stream m a -> Stream m b
- refoldMany :: Monad m => Refold m x a b -> m x -> Stream m a -> Stream m b
- reduceIterateBfs :: Monad m => (a -> a -> m a) -> Stream m a -> m (Maybe a)
- foldIterateBfs :: Fold m a (Either a a) -> Stream m a -> m (Maybe a)
- eqBy :: Monad m => (a -> b -> Bool) -> Stream m a -> Stream m b -> m Bool
- cmpBy :: Monad m => (a -> b -> Ordering) -> Stream m a -> Stream m b -> m Ordering
The stream type
A stream consists of a step function that generates the next step given a current state, and the current state.
Instances
CrossStream type wrapper
data CrossStream m a Source #
A newtype wrapper for the Stream type with a cross product style monad
instance.
A Monad bind behaves like a for loop:
>>>:{Stream.fold Fold.toList $ Stream.unCross $ do x <- Stream.mkCross $ Stream.fromList [1,2] -- Perform the following actions for each x in the stream return x :} [1,2]
Nested monad binds behave like nested for loops:
>>>:{Stream.fold Fold.toList $ Stream.unCross $ do x <- Stream.mkCross $ Stream.fromList [1,2] y <- Stream.mkCross $ Stream.fromList [3,4] -- Perform the following actions for each x, for each y return (x, y) :} [(1,3),(1,4),(2,3),(2,4)]
Instances
unCross :: CrossStream m a -> Stream m a Source #
mkCross :: Stream m a -> CrossStream m a Source #
Conversion to StreamK
fromStreamK :: Applicative m => StreamK m a -> Stream m a Source #
Convert a CPS encoded StreamK to direct style step encoded StreamD
toStreamK :: Monad m => Stream m a -> StreamK m a Source #
Convert a direct style step encoded StreamD to a CPS encoded StreamK
From Unfold
unfold :: Applicative m => Unfold m a b -> a -> Stream m b Source #
Convert an Unfold into a stream by supplying it an input seed.
>>>s = Stream.unfold Unfold.replicateM (3, putStrLn "hello")>>>Stream.fold Fold.drain shello hello hello
Construction
Primitives
nilM :: Applicative m => m b -> Stream m a Source #
A stream that terminates without producing any output, but produces a side effect.
>>>Stream.fold Fold.toList (Stream.nilM (print "nil"))"nil" []
Pre-release
consM :: Applicative m => m a -> Stream m a -> Stream m a Source #
Like cons but fuses an effect instead of a pure value.
From Values
fromPure :: Applicative m => a -> Stream m a Source #
Create a singleton stream from a pure value.
>>>fromPure a = a `Stream.cons` Stream.nil>>>fromPure = pure>>>fromPure = Stream.fromEffect . pure
fromEffect :: Applicative m => m a -> Stream m a Source #
Create a singleton stream from a monadic action.
>>>fromEffect m = m `Stream.consM` Stream.nil>>>fromEffect = Stream.sequence . Stream.fromPure
>>>Stream.fold Fold.drain $ Stream.fromEffect (putStrLn "hello")hello
From Containers
fromList :: Applicative m => [a] -> Stream m a Source #
Construct a stream from a list of pure values.
Elimination
Primitives
uncons :: Monad m => Stream m a -> m (Maybe (a, Stream m a)) Source #
Decompose a stream into its head and tail. If the stream is empty, returns
Nothing. If the stream is non-empty, returns Just (a, ma), where a is
the head of the stream and ma its tail.
Properties:
>>>Nothing <- Stream.uncons Stream.nil>>>Just ("a", t) <- Stream.uncons (Stream.cons "a" Stream.nil)
This can be used to consume the stream in an imperative manner one element at a time, as it just breaks down the stream into individual elements and we can loop over them as we deem fit. For example, this can be used to convert a streamly stream into other stream types.
All the folds in this module can be expressed in terms of uncons, however,
this is generally less efficient than specific folds because it takes apart
the stream one element at a time, therefore, does not take adavantage of
stream fusion.
foldBreak is a more general way of consuming a stream piecemeal.
>>>:{uncons xs = do r <- Stream.foldBreak Fold.one xs return $ case r of (Nothing, _) -> Nothing (Just h, t) -> Just (h, t) :}
Strict Left Folds
fold :: Monad m => Fold m a b -> Stream m a -> m b Source #
Fold a stream using the supplied left Fold and reducing the resulting
expression strictly at each step. The behavior is similar to foldl'. A
Fold can terminate early without consuming the full stream. See the
documentation of individual Folds for termination behavior.
Definitions:
>>>fold f = fmap fst . Stream.foldBreak f>>>fold f = Stream.parse (Parser.fromFold f)
Example:
>>>Stream.fold Fold.sum (Stream.enumerateFromTo 1 100)5050
foldAddLazy :: Monad m => Fold m a b -> Stream m a -> Fold m a b Source #
Append a stream to a fold lazily to build an accumulator incrementally.
Example, to continue folding a list of streams on the same sum fold:
>>>streams = [Stream.fromList [1..5], Stream.fromList [6..10]]>>>f = Prelude.foldl Stream.foldAddLazy Fold.sum streams>>>Stream.fold f Stream.nil55
foldAdd :: Monad m => Fold m a b -> Stream m a -> m (Fold m a b) Source #
>>>foldAdd = flip Fold.addStream
foldEither :: Monad m => Fold m a b -> Stream m a -> m (Either (Fold m a b) (b, Stream m a)) Source #
Fold resulting in either breaking the stream or continuation of the fold. Instead of supplying the input stream in one go we can run the fold multiple times, each time supplying the next segment of the input stream. If the fold has not yet finished it returns a fold that can be run again otherwise it returns the fold result and the residual stream.
Internal
Lazy Right Folds
foldrM :: Monad m => (a -> m b -> m b) -> m b -> Stream m a -> m b Source #
Right associative/lazy pull fold. foldrM build final stream constructs
an output structure using the step function build. build is invoked with
the next input element and the remaining (lazy) tail of the output
structure. It builds a lazy output expression using the two. When the "tail
structure" in the output expression is evaluated it calls build again thus
lazily consuming the input stream until either the output expression built
by build is free of the "tail" or the input is exhausted in which case
final is used as the terminating case for the output structure. For more
details see the description in the previous section.
Example, determine if any element is odd in a stream:
>>>s = Stream.fromList (2:4:5:undefined)>>>step x xs = if odd x then return True else xs>>>Stream.foldrM step (return False) sTrue
foldr :: Monad m => (a -> b -> b) -> b -> Stream m a -> m b Source #
Right fold, lazy for lazy monads and pure streams, and strict for strict monads.
Please avoid using this routine in strict monads like IO unless you need a
strict right fold. This is provided only for use in lazy monads (e.g.
Identity) or pure streams. Note that with this signature it is not possible
to implement a lazy foldr when the monad m is strict. In that case it
would be strict in its accumulator and therefore would necessarily consume
all its input.
>>>foldr f z = Stream.foldrM (\a b -> f a <$> b) (return z)
Note: This is similar to Fold.foldr' (the right fold via left fold), but could be more efficient.
foldrS :: Monad m => (a -> Stream m b -> Stream m b) -> Stream m b -> Stream m a -> Stream m b Source #
Specific Folds
drain :: Monad m => Stream m a -> m () Source #
Definitions:
>>>drain = Stream.fold Fold.drain>>>drain = Stream.foldrM (\_ xs -> xs) (return ())
Run a stream, discarding the results.
toList :: Monad m => Stream m a -> m [a] Source #
Definitions:
>>>toList = Stream.foldr (:) []>>>toList = Stream.fold Fold.toList
Convert a stream into a list in the underlying monad. The list can be
consumed lazily in a lazy monad (e.g. Identity). In a strict monad (e.g.
IO) the whole list is generated and buffered before it can be consumed.
Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Data.Array instead.
Note that this could a bit more efficient compared to Stream.fold
Fold.toList, and it can fuse with pure list consumers.
Mapping
mapM :: Monad m => (a -> m b) -> Stream m a -> Stream m b Source #
>>>mapM f = Stream.sequence . fmap f
Apply a monadic function to each element of the stream and replace it with the output of the resulting action.
>>>s = Stream.fromList ["a", "b", "c"]>>>Stream.fold Fold.drain $ Stream.mapM putStr sabc
Stateful Filters
take :: Applicative m => Int -> Stream m a -> Stream m a Source #
Take first n elements from the stream and discard the rest.
takeWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a Source #
End the stream as soon as the predicate fails on an element.
takeWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a Source #
Same as takeWhile but with a monadic predicate.
Combining Two Streams
Zipping
zipWithM :: Monad m => (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c Source #
Like zipWith but using a monadic zipping function.
zipWith :: Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c Source #
Stream a is evaluated first, followed by stream b, the resulting
elements a and b are then zipped using the supplied zip function and the
result c is yielded to the consumer.
If stream a or stream b ends, the zipped stream ends. If stream b ends
first, the element a from previous evaluation of stream a is discarded.
>>>s1 = Stream.fromList [1,2,3]>>>s2 = Stream.fromList [4,5,6]>>>Stream.fold Fold.toList $ Stream.zipWith (+) s1 s2[5,7,9]
Cross Product
crossApply :: Functor f => Stream f (a -> b) -> Stream f a -> Stream f b Source #
Apply a stream of functions to a stream of values and flatten the results.
Note that the second stream is evaluated multiple times.
>>>crossApply = Stream.crossWith id
crossWith :: Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c Source #
Definition:
>>>crossWith f m1 m2 = fmap f m1 `Stream.crossApply` m2
Note that the second stream is evaluated multiple times.
cross :: Monad m => Stream m a -> Stream m b -> Stream m (a, b) Source #
Given a Stream m a and Stream m b generate a stream with all possible
combinations of the tuple (a, b).
Definition:
>>>cross = Stream.crossWith (,)
The second stream is evaluated multiple times. If that is not desired it can
be cached in an Array and then generated from the array before
calling this function. Caching may also improve performance if the stream is
expensive to evaluate.
See cross for a much faster fused
alternative.
Time: O(m x n)
Pre-release
Unfold Many
data ConcatMapUState o i Source #
Constructors
| ConcatMapUOuter o | |
| ConcatMapUInner o i |
unfoldMany :: Monad m => Unfold m a b -> Stream m a -> Stream m b Source #
unfoldMany unfold stream uses unfold to map the input stream elements
to streams and then flattens the generated streams into a single output
stream.
Like concatMap but uses an Unfold for stream generation. Unlike
concatMap this can fuse the Unfold code with the inner loop and
therefore provide many times better performance.
Concat
concatMap :: Monad m => (a -> Stream m b) -> Stream m a -> Stream m b Source #
Map a stream producing function on each element of the stream and then flatten the results into a single stream.
>>>concatMap f = Stream.concatMapM (return . f)>>>concatMap f = Stream.concat . fmap f>>>concatMap f = Stream.unfoldMany (Unfold.lmap f Unfold.fromStream)
See unfoldMany for a fusible alternative.
concatMapM :: Monad m => (a -> m (Stream m b)) -> Stream m a -> Stream m b Source #
Map a stream producing monadic function on each element of the stream
and then flatten the results into a single stream. Since the stream
generation function is monadic, unlike concatMap, it can produce an
effect at the beginning of each iteration of the inner loop.
See unfoldMany for a fusible alternative.
concat :: Monad m => Stream m (Stream m a) -> Stream m a Source #
Flatten a stream of streams to a single stream.
>>>concat = Stream.concatMap id
Pre-release
Unfold Iterate
unfoldIterateDfs :: Monad m => Unfold m a a -> Stream m a -> Stream m a Source #
Same as concatIterateDfs but more efficient due to stream fusion.
Example, list a directory tree using DFS:
>>>f = Unfold.either Dir.eitherReaderPaths Unfold.nil>>>input = Stream.fromPure (Left ".")>>>ls = Stream.unfoldIterateDfs f input
Pre-release
unfoldIterateBfs :: Monad m => Unfold m a a -> Stream m a -> Stream m a Source #
Like unfoldIterateDfs but uses breadth first style traversal.
Pre-release
unfoldIterateBfsRev :: Monad m => Unfold m a a -> Stream m a -> Stream m a Source #
Like unfoldIterateBfs but processes the children in reverse order,
therefore, may be slightly faster.
Pre-release
Concat Iterate
concatIterateScan :: Monad m => (b -> a -> m b) -> (b -> m (Maybe (b, Stream m a))) -> b -> Stream m a Source #
Generate a stream from an initial state, scan and concat the stream, generate a stream again from the final state of the previous scan and repeat the process.
concatIterateDfs :: Monad m => (a -> Maybe (Stream m a)) -> Stream m a -> Stream m a Source #
Traverse the stream in depth first style (DFS). Map each element in the input stream to a stream and flatten, recursively map the resulting elements as well to a stream and flatten until no more streams are generated.
Example, list a directory tree using DFS:
>>>f = either (Just . Dir.readEitherPaths) (const Nothing)>>>input = Stream.fromPure (Left ".")>>>ls = Stream.concatIterateDfs f input
This is equivalent to using concatIterateWith StreamK.append.
Pre-release
concatIterateBfs :: Monad m => (a -> Maybe (Stream m a)) -> Stream m a -> Stream m a Source #
Similar to concatIterateDfs except that it traverses the stream in
breadth first style (BFS). First, all the elements in the input stream are
emitted, and then their traversals are emitted.
Example, list a directory tree using BFS:
>>>f = either (Just . Dir.readEitherPaths) (const Nothing)>>>input = Stream.fromPure (Left ".")>>>ls = Stream.concatIterateBfs f input
Pre-release
concatIterateBfsRev :: Monad m => (a -> Maybe (Stream m a)) -> Stream m a -> Stream m a Source #
Same as concatIterateBfs except that the traversal of the last
element on a level is emitted first and then going backwards up to the first
element (reversed ordering). This may be slightly faster than
concatIterateBfs.
Fold Many
data FoldMany s fs b a Source #
Constructors
| FoldManyStart s | |
| FoldManyFirst fs s | |
| FoldManyLoop s fs | |
| FoldManyYield b (FoldMany s fs b a) | |
| FoldManyDone |
data FoldManyPost s fs b a Source #
Constructors
| FoldManyPostStart s | |
| FoldManyPostLoop s fs | |
| FoldManyPostYield b (FoldManyPost s fs b a) | |
| FoldManyPostDone |
foldMany :: Monad m => Fold m a b -> Stream m a -> Stream m b Source #
Apply a Fold repeatedly on a stream and emit the results in the output
stream.
Definition:
>>>foldMany f = Stream.parseMany (Parser.fromFold f)
Example, empty stream:
>>>f = Fold.take 2 Fold.sum>>>fmany = Stream.fold Fold.toList . Stream.foldMany f>>>fmany $ Stream.fromList [][]
Example, last fold empty:
>>>fmany $ Stream.fromList [1..4][3,7]
Example, last fold non-empty:
>>>fmany $ Stream.fromList [1..5][3,7,5]
Note that using a closed fold e.g. Fold.take 0, would result in an
infinite stream on a non-empty input stream.
foldManyPost :: Monad m => Fold m a b -> Stream m a -> Stream m b Source #
Like foldMany but evaluates the fold even if the fold did not receive
any input, therefore, always results in a non-empty output even on an empty
stream (default result of the fold).
Example, empty stream:
>>>f = Fold.take 2 Fold.sum>>>fmany = Stream.fold Fold.toList . Stream.foldManyPost f>>>fmany $ Stream.fromList [][0]
Example, last fold empty:
>>>fmany $ Stream.fromList [1..4][3,7,0]
Example, last fold non-empty:
>>>fmany $ Stream.fromList [1..5][3,7,5]
Note that using a closed fold e.g. Fold.take 0, would result in an
infinite stream without consuming the input.
Pre-release
Fold Iterate
reduceIterateBfs :: Monad m => (a -> a -> m a) -> Stream m a -> m (Maybe a) Source #
Binary BFS style reduce, folds a level entirely using the supplied fold function, collecting the outputs as next level of the tree, then repeats the same process on the next level. The last elements of a previously folded level are folded first.
foldIterateBfs :: Fold m a (Either a a) -> Stream m a -> m (Maybe a) Source #
N-Ary BFS style iterative fold, if the input stream finished before the fold then it returns Left otherwise Right. If the fold returns Left we terminate.
Unimplemented