streamly-0.8.3: Dataflow programming and declarative concurrency
Copyright(c) 2017 Composewell Technologies
LicenseBSD-3-Clause
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellSafe-Inferred
LanguageHaskell2010

Streamly.Internal.Data.Stream.IsStream.Eliminate

Description

This module contains functions ending in the shape:

t m a -> m b

We call them stream folding functions, they reduce a stream t m a to a monadic value m b.

Synopsis

Running Examples

>>> :m
>>> import Streamly.Prelude (SerialT)
>>> import qualified Streamly.Prelude as Stream
>>> import qualified Streamly.Internal.Data.Stream.IsStream as Stream
>>> import qualified Streamly.Internal.Data.Parser as Parser
>>> import qualified Streamly.Data.Fold as Fold

Running a Fold

fold :: Monad m => Fold m a b -> SerialT m a -> m b Source #

Fold a stream using the supplied left Fold and reducing the resulting expression strictly at each step. The behavior is similar to foldl'. A Fold can terminate early without consuming the full stream. See the documentation of individual Folds for termination behavior.

>>> Stream.fold Fold.sum (Stream.enumerateFromTo 1 100)
5050

Folds never fail, therefore, they produce a default value even when no input is provided. It means we can always fold an empty stream and get a valid result. For example:

>>> Stream.fold Fold.sum Stream.nil
0

However, foldMany on an empty stream results in an empty stream. Therefore, Stream.fold f is not the same as Stream.head . Stream.foldMany f.

fold f = Stream.parse (Parser.fromFold f)

Since: 0.7.0

fold_ :: Monad m => Fold m a b -> SerialT m a -> m (b, SerialT m a) Source #

foldOn :: Monad m => Fold m a b -> SerialT m a -> Fold m a b Source #

We can create higher order folds using foldOn. We can fold a number of streams to a given fold efficiently with full stream fusion. For example, to fold a list of streams on the same sum fold:

>>> concatFold = Prelude.foldl Stream.foldOn Fold.sum
>>> fold f = Fold.finish . Stream.foldOn f

Internal

Running a Parser

parse :: MonadThrow m => Parser m a b -> SerialT m a -> m b Source #

Parse a stream using the supplied Parser.

Unlike folds, parsers may not always result in a valid output, they may result in an error. For example:

>>> Stream.parse (Parser.takeEQ 1 Fold.drain) Stream.nil
*** Exception: ParseError "takeEQ: Expecting exactly 1 elements, input terminated on 0"

Note:

fold f = Stream.parse (Parser.fromFold f)

parse p is not the same as head . parseMany p on an empty stream.

Pre-release

parseK :: MonadThrow m => Parser m a b -> SerialT m a -> m b Source #

Parse a stream using the supplied ParserK Parser.

Internal

parseD :: MonadThrow m => Parser m a b -> SerialT m a -> m b Source #

Parse a stream using the supplied ParserD Parser.

Internal

parse_ :: MonadThrow m => Parser m a b -> SerialT m a -> m (b, SerialT m a) Source #

Parse a stream using the supplied Parser.

Internal

parseD_ :: MonadThrow m => Parser m a b -> SerialT m a -> m (b, SerialT m a) Source #

Stream Deconstruction

foldr and foldl do not provide the remaining stream. uncons is more general, as it can be used to implement those as well. It allows to use the stream one element at a time, and we have the remaining stream all the time.

uncons :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (a, t m a)) Source #

Decompose a stream into its head and tail. If the stream is empty, returns Nothing. If the stream is non-empty, returns Just (a, ma), where a is the head of the stream and ma its tail.

This is a brute force primitive. Avoid using it as long as possible, use it when no other combinator can do the job. This can be used to do pretty much anything in an imperative manner, as it just breaks down the stream into individual elements and we can loop over them as we deem fit. For example, this can be used to convert a streamly stream into other stream types.

All the folds in this module can be expressed in terms of uncons, however the specific implementations are generally more efficient.

Since: 0.1.0

Right Folds

foldrM :: Monad m => (a -> m b -> m b) -> m b -> SerialT m a -> m b Source #

Right associative/lazy pull fold. foldrM build final stream constructs an output structure using the step function build. build is invoked with the next input element and the remaining (lazy) tail of the output structure. It builds a lazy output expression using the two. When the "tail structure" in the output expression is evaluated it calls build again thus lazily consuming the input stream until either the output expression built by build is free of the "tail" or the input is exhausted in which case final is used as the terminating case for the output structure. For more details see the description in the previous section.

Example, determine if any element is odd in a stream:

>>> Stream.foldrM (\x xs -> if odd x then return True else xs) (return False) $ Stream.fromList (2:4:5:undefined)
True

Since: 0.7.0 (signature changed)

Since: 0.2.0 (signature changed)

Since: 0.1.0

foldr :: Monad m => (a -> b -> b) -> b -> SerialT m a -> m b Source #

Right fold, lazy for lazy monads and pure streams, and strict for strict monads.

Please avoid using this routine in strict monads like IO unless you need a strict right fold. This is provided only for use in lazy monads (e.g. Identity) or pure streams. Note that with this signature it is not possible to implement a lazy foldr when the monad m is strict. In that case it would be strict in its accumulator and therefore would necessarily consume all its input.

Since: 0.1.0

Left Folds

foldlS :: IsStream t => (t m b -> a -> t m b) -> t m b -> t m a -> t m b Source #

Lazy left fold to a stream.

foldlT :: (Monad m, IsStream t, Monad (s m), MonadTrans s) => (s m b -> a -> s m b) -> s m b -> t m a -> s m b Source #

Lazy left fold to a transformer monad.

For example, to reverse a stream:

S.toList $ S.foldlT (flip S.cons) S.nil $ (S.fromList [1..5] :: SerialT IO Int)

foldl' :: Monad m => (b -> a -> b) -> b -> SerialT m a -> m b Source #

Left associative/strict push fold. foldl' reduce initial stream invokes reduce with the accumulator and the next input in the input stream, using initial as the initial value of the current value of the accumulator. When the input is exhausted the current value of the accumulator is returned. Make sure to use a strict data structure for accumulator to not build unnecessary lazy expressions unless that's what you want. See the previous section for more details.

Since: 0.2.0

foldl1' :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a) Source #

Strict left fold, for non-empty streams, using first element as the starting value. Returns Nothing if the stream is empty.

Since: 0.5.0

foldlM' :: Monad m => (b -> a -> m b) -> m b -> SerialT m a -> m b Source #

Like foldl' but with a monadic step function.

Since: 0.2.0

Since: 0.8.0 (signature change)

Specific Fold Functions

Folds as functions of the shape t m a -> m b.

These functions are good to run individually but they do not compose well. Prefer writing folds as the Fold data type. Use folds from Streamly.Internal.Data.Fold instead of using the functions in this section.

This section can possibly be removed in future. Are these better in some case compared to Fold? When the input stream is in CPS style (StreamK) we may want to rewrite the function call to CPS implementation of the fold through these definitions. Will that be more efficient for StreamK?

Full Folds

mapM_ :: Monad m => (a -> m b) -> SerialT m a -> m () Source #

mapM_ = Stream.drain . Stream.mapM

Apply a monadic action to each element of the stream and discard the output of the action. This is not really a pure transformation operation but a transformation followed by fold.

Since: 0.1.0

drain :: Monad m => SerialT m a -> m () Source #

drain = mapM_ (\_ -> return ())
drain = Stream.fold Fold.drain

Run a stream, discarding the results. By default it interprets the stream as SerialT, to run other types of streams use the type adapting combinators for example Stream.drain . fromAsync.

Since: 0.7.0

last :: Monad m => SerialT m a -> m (Maybe a) Source #

Extract the last element of the stream, if any.

last xs = xs !! (Stream.length xs - 1)
last = Stream.fold Fold.last

Since: 0.1.1

length :: Monad m => SerialT m a -> m Int Source #

Determine the length of the stream.

Since: 0.1.0

sum :: (Monad m, Num a) => SerialT m a -> m a Source #

Determine the sum of all elements of a stream of numbers. Returns 0 when the stream is empty. Note that this is not numerically stable for floating point numbers.

sum = Stream.fold Fold.sum

Since: 0.1.0

product :: (Monad m, Num a) => SerialT m a -> m a Source #

Determine the product of all elements of a stream of numbers. Returns 1 when the stream is empty.

product = Stream.fold Fold.product

Since: 0.1.1

mconcat :: (Monad m, Monoid a) => SerialT m a -> m a Source #

Fold a stream of monoid elements by appending them.

mconcat = Stream.fold Fold.mconcat

Pre-release

maximumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a) Source #

Determine the maximum element in a stream using the supplied comparison function.

maximumBy = Stream.fold Fold.maximumBy

Since: 0.6.0

maximum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a) Source #

maximum = maximumBy compare
maximum = Stream.fold Fold.maximum

Determine the maximum element in a stream.

Since: 0.1.0

minimumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a) Source #

Determine the minimum element in a stream using the supplied comparison function.

minimumBy = Stream.fold Fold.minimumBy

Since: 0.6.0

minimum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a) Source #

minimum = minimumBy compare
minimum = Stream.fold Fold.minimum

Determine the minimum element in a stream.

Since: 0.1.0

the :: (Eq a, Monad m) => SerialT m a -> m (Maybe a) Source #

Ensures that all the elements of the stream are identical and then returns that unique element.

Since: 0.6.0

Partial Folds

drainN :: Monad m => Int -> SerialT m a -> m () Source #

drainN n = Stream.drain . Stream.take n
drainN n = Stream.fold (Fold.take n Fold.drain)

Run maximum up to n iterations of a stream.

Since: 0.7.0

drainWhile :: Monad m => (a -> Bool) -> SerialT m a -> m () Source #

drainWhile p = Stream.drain . Stream.takeWhile p

Run a stream as long as the predicate holds true.

Since: 0.7.0

(!!) :: Monad m => SerialT m a -> Int -> m (Maybe a) Source #

Lookup the element at the given index.

Since: 0.6.0

head :: Monad m => SerialT m a -> m (Maybe a) Source #

Extract the first element of the stream, if any.

head = (!! 0)
head = Stream.fold Fold.head

Since: 0.1.0

headElse :: Monad m => a -> SerialT m a -> m a Source #

Extract the first element of the stream, if any, otherwise use the supplied default value. It can help avoid one branch in high performance code.

Pre-release

tail :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) Source #

tail = fmap (fmap snd) . Stream.uncons

Extract all but the first element of the stream, if any.

Since: 0.1.1

init :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) Source #

Extract all but the last element of the stream, if any.

Since: 0.5.0

findM :: Monad m => (a -> m Bool) -> SerialT m a -> m (Maybe a) Source #

Returns the first element that satisfies the given predicate.

findM = Stream.fold Fold.findM

Since: 0.6.0

find :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe a) Source #

Like findM but with a non-monadic predicate.

find p = findM (return . p)
find = Stream.fold Fold.find

Since: 0.5.0

findIndex :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe Int) Source #

Returns the first index that satisfies the given predicate.

findIndex = Stream.fold Fold.findIndex

Since: 0.5.0

elemIndex :: (Monad m, Eq a) => a -> SerialT m a -> m (Maybe Int) Source #

Returns the first index where a given value is found in the stream.

elemIndex a = Stream.findIndex (== a)

Since: 0.5.0

lookup :: (Monad m, Eq a) => a -> SerialT m (a, b) -> m (Maybe b) Source #

In a stream of (key-value) pairs (a, b), return the value b of the first pair where the key equals the given value a.

lookup = snd <$> Stream.find ((==) . fst)
lookup = Stream.fold Fold.lookup

Since: 0.5.0

null :: Monad m => SerialT m a -> m Bool Source #

Determine whether the stream is empty.

null = Stream.fold Fold.null

Since: 0.1.1

elem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool Source #

Determine whether an element is present in the stream.

elem = Stream.fold Fold.elem

Since: 0.1.0

notElem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool Source #

Determine whether an element is not present in the stream.

notElem = Stream.fold Fold.length

Since: 0.1.0

all :: Monad m => (a -> Bool) -> SerialT m a -> m Bool Source #

Determine whether all elements of a stream satisfy a predicate.

all = Stream.fold Fold.all

Since: 0.1.0

any :: Monad m => (a -> Bool) -> SerialT m a -> m Bool Source #

Determine whether any of the elements of a stream satisfy a predicate.

any = Stream.fold Fold.any

Since: 0.1.0

and :: Monad m => SerialT m Bool -> m Bool Source #

Determines if all elements of a boolean stream are True.

and = Stream.fold Fold.and

Since: 0.5.0

or :: Monad m => SerialT m Bool -> m Bool Source #

Determines whether at least one element of a boolean stream is True.

or = Stream.fold Fold.or

Since: 0.5.0

To Containers

toList :: Monad m => SerialT m a -> m [a] Source #

toList = Stream.foldr (:) []

Convert a stream into a list in the underlying monad. The list can be consumed lazily in a lazy monad (e.g. Identity). In a strict monad (e.g. IO) the whole list is generated and buffered before it can be consumed.

Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Array instead.

Since: 0.1.0

toListRev :: Monad m => SerialT m a -> m [a] Source #

toListRev = Stream.foldl' (flip (:)) []

Convert a stream into a list in reverse order in the underlying monad.

Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Array instead.

Pre-release

toStream :: Monad m => SerialT m a -> m (SerialT n a) Source #

Convert a stream to a pure stream.

toStream = Stream.foldr Stream.cons Stream.nil

Pre-release

toStreamRev :: Monad m => SerialT m a -> m (SerialT n a) Source #

Convert a stream to a pure stream in reverse order.

toStreamRev = Stream.foldl' (flip Stream.cons) Stream.nil

Pre-release

Concurrent Folds

foldAsync :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b Source #

Same as |$..

Internal

(|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b infixr 0 Source #

Parallel fold application operator; applies a fold function t m a -> m b to a stream t m a concurrently; The the input stream is evaluated asynchronously in an independent thread yielding elements to a buffer and the folding action runs in another thread consuming the input from the buffer.

If you read the signature as (t m a -> m b) -> (t m a -> m b) you can look at it as a transformation that converts a fold function to a buffered concurrent fold function.

The . at the end of the operator is a mnemonic for termination of the stream.

In the example below, each stage introduces a delay of 1 sec but output is printed every second because both stages are concurrent.

>>> import Control.Concurrent (threadDelay)
>>> import Streamly.Prelude ((|$.))
>>> :{
 Stream.foldlM' (\_ a -> threadDelay 1000000 >> print a) (return ())
     |$. Stream.replicateM 3 (threadDelay 1000000 >> return 1)
:}
1
1
1

Concurrent

Since: 0.3.0 (Streamly)

Since: 0.8.0

(|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b infixl 1 Source #

Same as |$. but with arguments reversed.

(|&.) = flip (|$.)

Concurrent

Since: 0.3.0 (Streamly)

Since: 0.8.0

Multi-Stream folds

eqBy :: (IsStream t, Monad m) => (a -> b -> Bool) -> t m a -> t m b -> m Bool Source #

Compare two streams for equality using an equality function.

Since: 0.6.0

cmpBy :: (IsStream t, Monad m) => (a -> b -> Ordering) -> t m a -> t m b -> m Ordering Source #

Compare two streams lexicographically using a comparison function.

Since: 0.6.0

isPrefixOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool Source #

Returns True if the first stream is the same as or a prefix of the second. A stream is a prefix of itself.

>>> Stream.isPrefixOf (Stream.fromList "hello") (Stream.fromList "hello" :: SerialT IO Char)
True

Since: 0.6.0

isInfixOf :: (MonadIO m, Eq a, Enum a, Storable a) => SerialT m a -> SerialT m a -> m Bool Source #

Returns True if the first stream is an infix of the second. A stream is considered an infix of itself.

Stream.isInfixOf (Stream.fromList "hello") (Stream.fromList "hello" :: SerialT IO Char)

True

Space: O(n) worst case where n is the length of the infix.

Pre-release

Requires Storable constraint

isSuffixOf :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m Bool Source #

Returns True if the first stream is a suffix of the second. A stream is considered a suffix of itself.

>>> Stream.isSuffixOf (Stream.fromList "hello") (Stream.fromList "hello" :: SerialT IO Char)
True

Space: O(n), buffers entire input stream and the suffix.

Pre-release

Suboptimal - Help wanted.

isSubsequenceOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool Source #

Returns True if all the elements of the first stream occur, in order, in the second stream. The elements do not have to occur consecutively. A stream is a subsequence of itself.

>>> Stream.isSubsequenceOf (Stream.fromList "hlo") (Stream.fromList "hello" :: SerialT IO Char)
True

Since: 0.6.0

stripPrefix :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m (Maybe (t m a)) Source #

stripPrefix prefix stream strips prefix from stream if it is a prefix of stream. Returns Nothing if the stream does not start with the given prefix, stripped stream otherwise. Returns Just nil when the prefix is the same as the stream.

See also "Streamly.Internal.Data.Stream.IsStream.Nesting.dropPrefix".

Space: O(1)

Since: 0.6.0

stripSuffix :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m (Maybe (SerialT m a)) Source #

Drops the given suffix from a stream. Returns Nothing if the stream does not end with the given suffix. Returns Just nil when the suffix is the same as the stream.

It may be more efficient to convert the stream to an Array and use stripSuffix on that especially if the elements have a Storable or Prim instance.

See also "Streamly.Internal.Data.Stream.IsStream.Nesting.dropSuffix".

Space: O(n), buffers the entire input stream as well as the suffix

Pre-release

Deprecated

foldx :: Monad m => (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b Source #

Deprecated: Please use foldl' followed by fmap instead.

Strict left fold with an extraction function. Like the standard strict left fold, but applies a user supplied extraction function (the third argument) to the folded value at the end. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction.

Since: 0.2.0

foldxM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b Source #

Deprecated: Please use foldlM' followed by fmap instead.

Like foldx, but with a monadic step function.

Since: 0.2.0

foldr1 :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a) Source #

Deprecated: Use foldrM instead.

Lazy right fold for non-empty streams, using first element as the starting value. Returns Nothing if the stream is empty.

Since: 0.5.0

runStream :: Monad m => SerialT m a -> m () Source #

Deprecated: Please use "drain" instead

Run a stream, discarding the results. By default it interprets the stream as SerialT, to run other types of streams use the type adapting combinators for example runStream . fromAsync.

Since: 0.2.0

runN :: Monad m => Int -> SerialT m a -> m () Source #

Deprecated: Please use "drainN" instead

runN n = runStream . take n

Run maximum up to n iterations of a stream.

Since: 0.6.0

runWhile :: Monad m => (a -> Bool) -> SerialT m a -> m () Source #

Deprecated: Please use "drainWhile" instead

runWhile p = runStream . takeWhile p

Run a stream as long as the predicate holds true.

Since: 0.6.0

toHandle :: MonadIO m => Handle -> SerialT m String -> m () Source #

Deprecated: Please use Streamly.FileSystem.Handle module (see the changelog)

toHandle h = S.mapM_ $ hPutStrLn h

Write a stream of Strings to an IO Handle.

Since: 0.1.0