Copyright | (c) 2017 Composewell Technologies |
---|---|
License | BSD-3-Clause |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
This module contains functions ending in the shape:
t m a -> m b
We call them stream folding functions, they reduce a stream t m a
to a
monadic value m b
.
Synopsis
- fold :: Monad m => Fold m a b -> SerialT m a -> m b
- fold_ :: Monad m => Fold m a b -> SerialT m a -> m (b, SerialT m a)
- parse :: MonadThrow m => Parser m a b -> SerialT m a -> m b
- parseK :: MonadThrow m => Parser m a b -> SerialT m a -> m b
- parseD :: MonadThrow m => Parser m a b -> SerialT m a -> m b
- parse_ :: MonadThrow m => Parser m a b -> SerialT m a -> m (b, SerialT m a)
- parseD_ :: MonadThrow m => Parser m a b -> SerialT m a -> m (b, SerialT m a)
- uncons :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (a, t m a))
- foldrM :: Monad m => (a -> m b -> m b) -> m b -> SerialT m a -> m b
- foldr :: Monad m => (a -> b -> b) -> b -> SerialT m a -> m b
- foldl' :: Monad m => (b -> a -> b) -> b -> SerialT m a -> m b
- foldl1' :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a)
- foldlM' :: Monad m => (b -> a -> m b) -> m b -> SerialT m a -> m b
- mapM_ :: Monad m => (a -> m b) -> SerialT m a -> m ()
- drain :: Monad m => SerialT m a -> m ()
- last :: Monad m => SerialT m a -> m (Maybe a)
- length :: Monad m => SerialT m a -> m Int
- sum :: (Monad m, Num a) => SerialT m a -> m a
- product :: (Monad m, Num a) => SerialT m a -> m a
- mconcat :: (Monad m, Monoid a) => SerialT m a -> m a
- maximumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a)
- maximum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a)
- minimumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a)
- minimum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a)
- the :: (Eq a, Monad m) => SerialT m a -> m (Maybe a)
- drainN :: Monad m => Int -> SerialT m a -> m ()
- drainWhile :: Monad m => (a -> Bool) -> SerialT m a -> m ()
- (!!) :: Monad m => SerialT m a -> Int -> m (Maybe a)
- head :: Monad m => SerialT m a -> m (Maybe a)
- headElse :: Monad m => a -> SerialT m a -> m a
- tail :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a))
- init :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a))
- findM :: Monad m => (a -> m Bool) -> SerialT m a -> m (Maybe a)
- find :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe a)
- findIndex :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe Int)
- elemIndex :: (Monad m, Eq a) => a -> SerialT m a -> m (Maybe Int)
- lookup :: (Monad m, Eq a) => a -> SerialT m (a, b) -> m (Maybe b)
- null :: Monad m => SerialT m a -> m Bool
- elem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool
- notElem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool
- all :: Monad m => (a -> Bool) -> SerialT m a -> m Bool
- any :: Monad m => (a -> Bool) -> SerialT m a -> m Bool
- and :: Monad m => SerialT m Bool -> m Bool
- or :: Monad m => SerialT m Bool -> m Bool
- toList :: Monad m => SerialT m a -> m [a]
- toListRev :: Monad m => SerialT m a -> m [a]
- toStream :: Monad m => SerialT m a -> m (SerialT Identity a)
- toStreamRev :: Monad m => SerialT m a -> m (SerialT Identity a)
- foldAsync :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b
- (|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b
- (|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b
- eqBy :: (IsStream t, Monad m) => (a -> b -> Bool) -> t m a -> t m b -> m Bool
- cmpBy :: (IsStream t, Monad m) => (a -> b -> Ordering) -> t m a -> t m b -> m Ordering
- isPrefixOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool
- isInfixOf :: (MonadIO m, Eq a, Enum a, Storable a) => SerialT m a -> SerialT m a -> m Bool
- isSuffixOf :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m Bool
- isSubsequenceOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool
- stripPrefix :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m (Maybe (t m a))
- stripSuffix :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m (Maybe (SerialT m a))
- foldx :: Monad m => (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b
- foldxM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b
- foldr1 :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a)
- runStream :: Monad m => SerialT m a -> m ()
- runN :: Monad m => Int -> SerialT m a -> m ()
- runWhile :: Monad m => (a -> Bool) -> SerialT m a -> m ()
- toHandle :: MonadIO m => Handle -> SerialT m String -> m ()
Running Examples
>>>
:m
>>>
import Streamly.Prelude (SerialT)
>>>
import qualified Streamly.Prelude as Stream
>>>
import qualified Streamly.Internal.Data.Stream.IsStream as Stream
>>>
import qualified Streamly.Internal.Data.Parser as Parser
>>>
import qualified Streamly.Data.Fold as Fold
Running a Fold
fold :: Monad m => Fold m a b -> SerialT m a -> m b Source #
Fold a stream using the supplied left Fold
and reducing the resulting
expression strictly at each step. The behavior is similar to foldl'
. A
Fold
can terminate early without consuming the full stream. See the
documentation of individual Fold
s for termination behavior.
>>>
Stream.fold Fold.sum (Stream.enumerateFromTo 1 100)
5050
Folds never fail, therefore, they produce a default value even when no input is provided. It means we can always fold an empty stream and get a valid result. For example:
>>>
Stream.fold Fold.sum Stream.nil
0
However, foldMany
on an empty stream results in an empty stream.
Therefore, Stream.fold f
is not the same as Stream.head . Stream.foldMany
f
.
fold f = Stream.parse (Parser.fromFold f)
Since: 0.7.0
Running a Parser
parse :: MonadThrow m => Parser m a b -> SerialT m a -> m b Source #
Parse a stream using the supplied Parser
.
Unlike folds, parsers may not always result in a valid output, they may result in an error. For example:
>>>
Stream.parse (Parser.takeEQ 1 Fold.drain) Stream.nil
*** Exception: ParseError "takeEQ: Expecting exactly 1 elements, input terminated on 0"
Note:
fold f = Stream.parse (Parser.fromFold f)
parse p
is not the same as head . parseMany p
on an empty stream.
Pre-release
parseK :: MonadThrow m => Parser m a b -> SerialT m a -> m b Source #
Parse a stream using the supplied ParserK Parser
.
Internal
parseD :: MonadThrow m => Parser m a b -> SerialT m a -> m b Source #
Parse a stream using the supplied ParserD Parser
.
Internal
parse_ :: MonadThrow m => Parser m a b -> SerialT m a -> m (b, SerialT m a) Source #
Parse a stream using the supplied Parser
.
Internal
Stream Deconstruction
foldr and foldl do not provide the remaining stream. uncons
is more
general, as it can be used to implement those as well. It allows to use
the stream one element at a time, and we have the remaining stream all
the time.
uncons :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (a, t m a)) Source #
Decompose a stream into its head and tail. If the stream is empty, returns
Nothing
. If the stream is non-empty, returns Just (a, ma)
, where a
is
the head of the stream and ma
its tail.
This is a brute force primitive. Avoid using it as long as possible, use it when no other combinator can do the job. This can be used to do pretty much anything in an imperative manner, as it just breaks down the stream into individual elements and we can loop over them as we deem fit. For example, this can be used to convert a streamly stream into other stream types.
All the folds in this module can be expressed in terms of uncons
, however
the specific implementations are generally more efficient.
Since: 0.1.0
Right Folds
foldrM :: Monad m => (a -> m b -> m b) -> m b -> SerialT m a -> m b Source #
Right associative/lazy pull fold. foldrM build final stream
constructs
an output structure using the step function build
. build
is invoked with
the next input element and the remaining (lazy) tail of the output
structure. It builds a lazy output expression using the two. When the "tail
structure" in the output expression is evaluated it calls build
again thus
lazily consuming the input stream
until either the output expression built
by build
is free of the "tail" or the input is exhausted in which case
final
is used as the terminating case for the output structure. For more
details see the description in the previous section.
Example, determine if any element is odd
in a stream:
>>>
Stream.foldrM (\x xs -> if odd x then return True else xs) (return False) $ Stream.fromList (2:4:5:undefined)
True
Since: 0.7.0 (signature changed)
Since: 0.2.0 (signature changed)
Since: 0.1.0
foldr :: Monad m => (a -> b -> b) -> b -> SerialT m a -> m b Source #
Right fold, lazy for lazy monads and pure streams, and strict for strict monads.
Please avoid using this routine in strict monads like IO unless you need a
strict right fold. This is provided only for use in lazy monads (e.g.
Identity) or pure streams. Note that with this signature it is not possible
to implement a lazy foldr when the monad m
is strict. In that case it
would be strict in its accumulator and therefore would necessarily consume
all its input.
Since: 0.1.0
Left Folds
foldl' :: Monad m => (b -> a -> b) -> b -> SerialT m a -> m b Source #
Left associative/strict push fold. foldl' reduce initial stream
invokes
reduce
with the accumulator and the next input in the input stream, using
initial
as the initial value of the current value of the accumulator. When
the input is exhausted the current value of the accumulator is returned.
Make sure to use a strict data structure for accumulator to not build
unnecessary lazy expressions unless that's what you want. See the previous
section for more details.
Since: 0.2.0
foldl1' :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a) Source #
Strict left fold, for non-empty streams, using first element as the
starting value. Returns Nothing
if the stream is empty.
Since: 0.5.0
Specific Fold Functions
Folds as functions of the shape t m a -> m b
.
These functions are good to run individually but they do not compose
well. Prefer writing folds as the Fold
data type. Use folds from
Streamly.Internal.Data.Fold instead of using the functions in this
section.
This section can possibly be removed in future. Are these better in
some case compared to Fold
? When the input stream is in CPS style
(StreamK) we may want to rewrite the function call to CPS implementation
of the fold through these definitions. Will that be more efficient for
StreamK?
Full Folds
mapM_ :: Monad m => (a -> m b) -> SerialT m a -> m () Source #
mapM_ = Stream.drain . Stream.mapM
Apply a monadic action to each element of the stream and discard the output of the action. This is not really a pure transformation operation but a transformation followed by fold.
Since: 0.1.0
drain :: Monad m => SerialT m a -> m () Source #
drain = mapM_ (\_ -> return ()) drain = Stream.fold Fold.drain
Run a stream, discarding the results. By default it interprets the stream
as SerialT
, to run other types of streams use the type adapting
combinators for example Stream.drain .
.fromAsync
Since: 0.7.0
last :: Monad m => SerialT m a -> m (Maybe a) Source #
Extract the last element of the stream, if any.
last xs = xs !! (Stream.length xs - 1) last = Stream.fold Fold.last
Since: 0.1.1
sum :: (Monad m, Num a) => SerialT m a -> m a Source #
Determine the sum of all elements of a stream of numbers. Returns 0
when
the stream is empty. Note that this is not numerically stable for floating
point numbers.
sum = Stream.fold Fold.sum
Since: 0.1.0
product :: (Monad m, Num a) => SerialT m a -> m a Source #
Determine the product of all elements of a stream of numbers. Returns 1
when the stream is empty.
product = Stream.fold Fold.product
Since: 0.1.1
mconcat :: (Monad m, Monoid a) => SerialT m a -> m a Source #
Fold a stream of monoid elements by appending them.
mconcat = Stream.fold Fold.mconcat
Pre-release
maximumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a) Source #
Determine the maximum element in a stream using the supplied comparison function.
maximumBy = Stream.fold Fold.maximumBy
Since: 0.6.0
maximum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a) Source #
maximum = maximumBy
compare
maximum = Stream.fold Fold.maximum
Determine the maximum element in a stream.
Since: 0.1.0
minimumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a) Source #
Determine the minimum element in a stream using the supplied comparison function.
minimumBy = Stream.fold Fold.minimumBy
Since: 0.6.0
minimum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a) Source #
minimum = minimumBy
compare
minimum = Stream.fold Fold.minimum
Determine the minimum element in a stream.
Since: 0.1.0
the :: (Eq a, Monad m) => SerialT m a -> m (Maybe a) Source #
Ensures that all the elements of the stream are identical and then returns that unique element.
Since: 0.6.0
Partial Folds
drainN :: Monad m => Int -> SerialT m a -> m () Source #
drainN n = Stream.drain . Stream.take n drainN n = Stream.fold (Fold.take n Fold.drain)
Run maximum up to n
iterations of a stream.
Since: 0.7.0
drainWhile :: Monad m => (a -> Bool) -> SerialT m a -> m () Source #
drainWhile p = Stream.drain . Stream.takeWhile p
Run a stream as long as the predicate holds true.
Since: 0.7.0
(!!) :: Monad m => SerialT m a -> Int -> m (Maybe a) Source #
Lookup the element at the given index.
Since: 0.6.0
head :: Monad m => SerialT m a -> m (Maybe a) Source #
Extract the first element of the stream, if any.
head = (!! 0) head = Stream.fold Fold.head
Since: 0.1.0
headElse :: Monad m => a -> SerialT m a -> m a Source #
Extract the first element of the stream, if any, otherwise use the supplied default value. It can help avoid one branch in high performance code.
Pre-release
tail :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) Source #
tail = fmap (fmap snd) . Stream.uncons
Extract all but the first element of the stream, if any.
Since: 0.1.1
init :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) Source #
Extract all but the last element of the stream, if any.
Since: 0.5.0
findM :: Monad m => (a -> m Bool) -> SerialT m a -> m (Maybe a) Source #
Returns the first element that satisfies the given predicate.
findM = Stream.fold Fold.findM
Since: 0.6.0
find :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe a) Source #
Like findM
but with a non-monadic predicate.
find p = findM (return . p) find = Stream.fold Fold.find
Since: 0.5.0
findIndex :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe Int) Source #
Returns the first index that satisfies the given predicate.
findIndex = Stream.fold Fold.findIndex
Since: 0.5.0
elemIndex :: (Monad m, Eq a) => a -> SerialT m a -> m (Maybe Int) Source #
Returns the first index where a given value is found in the stream.
elemIndex a = Stream.findIndex (== a)
Since: 0.5.0
lookup :: (Monad m, Eq a) => a -> SerialT m (a, b) -> m (Maybe b) Source #
In a stream of (key-value) pairs (a, b)
, return the value b
of the
first pair where the key equals the given value a
.
lookup = snd <$> Stream.find ((==) . fst) lookup = Stream.fold Fold.lookup
Since: 0.5.0
null :: Monad m => SerialT m a -> m Bool Source #
Determine whether the stream is empty.
null = Stream.fold Fold.null
Since: 0.1.1
elem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool Source #
Determine whether an element is present in the stream.
elem = Stream.fold Fold.elem
Since: 0.1.0
notElem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool Source #
Determine whether an element is not present in the stream.
notElem = Stream.fold Fold.length
Since: 0.1.0
all :: Monad m => (a -> Bool) -> SerialT m a -> m Bool Source #
Determine whether all elements of a stream satisfy a predicate.
all = Stream.fold Fold.all
Since: 0.1.0
any :: Monad m => (a -> Bool) -> SerialT m a -> m Bool Source #
Determine whether any of the elements of a stream satisfy a predicate.
any = Stream.fold Fold.any
Since: 0.1.0
and :: Monad m => SerialT m Bool -> m Bool Source #
Determines if all elements of a boolean stream are True.
and = Stream.fold Fold.and
Since: 0.5.0
or :: Monad m => SerialT m Bool -> m Bool Source #
Determines whether at least one element of a boolean stream is True.
or = Stream.fold Fold.or
Since: 0.5.0
To Containers
toList :: Monad m => SerialT m a -> m [a] Source #
toList = Stream.foldr (:) []
Convert a stream into a list in the underlying monad. The list can be
consumed lazily in a lazy monad (e.g. Identity
). In a strict monad (e.g.
IO) the whole list is generated and buffered before it can be consumed.
Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Array instead.
Since: 0.1.0
toListRev :: Monad m => SerialT m a -> m [a] Source #
toListRev = Stream.foldl' (flip (:)) []
Convert a stream into a list in reverse order in the underlying monad.
Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Array instead.
Pre-release
toStream :: Monad m => SerialT m a -> m (SerialT Identity a) Source #
Convert a stream to a pure stream.
toStream = Stream.foldr Stream.cons Stream.nil
Pre-release
toStreamRev :: Monad m => SerialT m a -> m (SerialT Identity a) Source #
Convert a stream to a pure stream in reverse order.
toStreamRev = Stream.foldl' (flip Stream.cons) Stream.nil
Pre-release
Concurrent Folds
foldAsync :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b Source #
Same as |$.
.
Internal
(|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b infixr 0 Source #
Parallel fold application operator; applies a fold function t m a -> m b
to a stream t m a
concurrently; The the input stream is evaluated
asynchronously in an independent thread yielding elements to a buffer and
the folding action runs in another thread consuming the input from the
buffer.
If you read the signature as (t m a -> m b) -> (t m a -> m b)
you can look
at it as a transformation that converts a fold function to a buffered
concurrent fold function.
The .
at the end of the operator is a mnemonic for termination of the
stream.
In the example below, each stage introduces a delay of 1 sec but output is printed every second because both stages are concurrent.
>>>
import Control.Concurrent (threadDelay)
>>>
import Streamly.Prelude ((|$.))
>>>
:{
Stream.foldlM' (\_ a -> threadDelay 1000000 >> print a) (return ()) |$. Stream.replicateM 3 (threadDelay 1000000 >> return 1) :} 1 1 1
Concurrent
Since: 0.3.0 (Streamly)
Since: 0.8.0
(|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b infixl 1 Source #
Multi-Stream folds
eqBy :: (IsStream t, Monad m) => (a -> b -> Bool) -> t m a -> t m b -> m Bool Source #
Compare two streams for equality using an equality function.
Since: 0.6.0
cmpBy :: (IsStream t, Monad m) => (a -> b -> Ordering) -> t m a -> t m b -> m Ordering Source #
Compare two streams lexicographically using a comparison function.
Since: 0.6.0
isPrefixOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool Source #
Returns True
if the first stream is the same as or a prefix of the
second. A stream is a prefix of itself.
>>>
Stream.isPrefixOf (Stream.fromList "hello") (Stream.fromList "hello" :: SerialT IO Char)
True
Since: 0.6.0
isSuffixOf :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m Bool Source #
Returns True
if the first stream is a suffix of the second. A stream is
considered a suffix of itself.
>>>
Stream.isSuffixOf (Stream.fromList "hello") (Stream.fromList "hello" :: SerialT IO Char)
True
Space: O(n)
, buffers entire input stream and the suffix.
Pre-release
Suboptimal - Help wanted.
isSubsequenceOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool Source #
Returns True
if all the elements of the first stream occur, in order, in
the second stream. The elements do not have to occur consecutively. A stream
is a subsequence of itself.
>>>
Stream.isSubsequenceOf (Stream.fromList "hlo") (Stream.fromList "hello" :: SerialT IO Char)
True
Since: 0.6.0
stripPrefix :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m (Maybe (t m a)) Source #
stripPrefix prefix stream
strips prefix
from stream
if it is a
prefix of stream. Returns Nothing
if the stream does not start with the
given prefix, stripped stream otherwise. Returns Just nil
when the prefix
is the same as the stream.
See also "Streamly.Internal.Data.Stream.IsStream.Nesting.dropPrefix".
Space: O(1)
Since: 0.6.0
stripSuffix :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m (Maybe (SerialT m a)) Source #
Drops the given suffix from a stream. Returns Nothing
if the stream does
not end with the given suffix. Returns Just nil
when the suffix is the
same as the stream.
It may be more efficient to convert the stream to an Array and use stripSuffix on that especially if the elements have a Storable or Prim instance.
See also "Streamly.Internal.Data.Stream.IsStream.Nesting.dropSuffix".
Space: O(n)
, buffers the entire input stream as well as the suffix
Pre-release
Deprecated
foldx :: Monad m => (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b Source #
Deprecated: Please use foldl' followed by fmap instead.
Strict left fold with an extraction function. Like the standard strict
left fold, but applies a user supplied extraction function (the third
argument) to the folded value at the end. This is designed to work with the
foldl
library. The suffix x
is a mnemonic for extraction.
Since: 0.2.0
foldxM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b Source #
Deprecated: Please use foldlM' followed by fmap instead.
Like foldx
, but with a monadic step function.
Since: 0.2.0
foldr1 :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a) Source #
Deprecated: Use foldrM instead.
Lazy right fold for non-empty streams, using first element as the starting
value. Returns Nothing
if the stream is empty.
Since: 0.5.0
runStream :: Monad m => SerialT m a -> m () Source #
Deprecated: Please use "drain" instead
Run a stream, discarding the results. By default it interprets the stream
as SerialT
, to run other types of streams use the type adapting
combinators for example runStream .
.fromAsync
Since: 0.2.0
runN :: Monad m => Int -> SerialT m a -> m () Source #
Deprecated: Please use "drainN" instead
runN n = runStream . take n
Run maximum up to n
iterations of a stream.
Since: 0.6.0