Copyright | (c) 2019 Composewell Technologies (c) 2013 Gabriel Gonzalez |
---|---|
License | BSD3 |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
Stream Consumers
We can classify stream consumers in the following categories in order of increasing complexity and power:
Accumulators
These are the simplest folds that never fail and never terminate, they
accumulate the input values forever and can always accept new inputs (never
terminate) and always have a valid result value. A
sum
operation is an example of an accumulator.
Traditional Haskell left folds like foldl
are accumulators.
We can distribute an input stream to two or more accumulators using a tee
style composition. Accumulators cannot be applied on a stream one after the
other, which we call a serial
append style composition of folds. This is
because accumulators never terminate, since the first accumulator in a
series will never terminate, the next one will never get to run.
Terminating Folds
Terminating folds are accumulators that can terminate. Once a fold
terminates it no longer accepts any more inputs. Terminating folds can be
used in a serial
append style composition where one fold can be applied
after the other on an input stream. We can apply a terminating fold
repeatedly on an input stream, splitting the stream and consuming it in
fragments. Terminating folds never fail, therefore, they do not need
backtracking.
The take
operation is an example of a
terminating fold It terminates after consuming n
items. Coupled with an
accumulator (e.g. sum) it can be used to split and process the stream into
chunks of fixed size.
Terminating Folds with Leftovers
The next upgrade after terminating folds is terminating folds with leftover
inputs. Consider the example of takeWhile
operation, it needs to inspect
an element for termination decision. However, it does not consume the
element on which it terminates. To implement takeWhile
a terminating fold
will have to implement a way to return unconsumed input to the fold driver.
Single element leftover case is the most common and its easy to implement it
in terminating folds using a Done1
constructor in the Step
type which
indicates that the last element was not consumed by the fold. The following
additional operations can be implemented as terminating folds if we do that.
takeWhile groupBy wordBy
However, it creates several complications. The many
combinator requires
a Partial1
(Partial
with leftover) to handle a Done1
from the top
level fold, for efficient implementation. If the collecting fold in "many"
returns a Partial1
or Done1
then what to do with all the elements that
have been consumed?
Similarly, in distribute, if one fold consumes a value and others say its a leftover then what do we do? Folds like "many" require the leftover to be fed to it again. So in a distribute operation those folds which gave a leftover will have to be fed the leftover while the folds that consumed will have to be fed the next input. This is very complicated to implement. We have the same issue in backtracking parsers being used in a distribute operation.
To avoid these issues we want to enforce by typing that the collecting folds
can never return a leftover. So we need a fold type without Done1
or
Partial1
. This leads us to design folds to never return a leftover and the
use cases of single leftover are transferred to parsers where we have
general backtracking mechanism and single leftover is just a special case of
backtracking.
This means: takeWhile, groupBy, wordBy would be implemented as parsers.
"take 0" can implemented as a fold if we make initial return Step
type.
"takeInterval" can be implemented without Done1
.
Parsers
The next upgrade after terminating folds with a leftover are parsers.
Parsers are terminating folds that can fail and backtrack. Parsers can be
composed using an alternative
style composition where they can backtrack
and apply another parser if one parser fails.
satisfy
is a simple example of a parser, it
would succeed if the condition is satisfied and it would fail otherwise, on
failure an alternative parser can be used on the same input.
Types for Stream Consumers
In streamly, there is no separate type for accumulators. Terminating folds
are a superset of accumulators and to avoid too many types we represent both
using the same type, Fold
.
We do not club the leftovers functionality with terminating folds because of
the reasons explained earlier. Instead combinators that require leftovers
are implemented as the Parser
type. This is
a sweet spot to balance ease of use, type safety and performance. Using
separate Accumulator and terminating fold types would encode more
information in types but it would make ease of use, implementation,
maintenance effort worse. Combining Accumulator, terminating folds and
Parser into a single Parser
type would make
ease of use even better but type safety and performance worse.
One of the design requirements that we have placed for better ease of use
and code reuse is that Parser
type should be
a strict superset of the Fold
type i.e. it can do everything that a Fold
can do and more. Therefore, folds can be easily upgraded to parsers and we
can use parser combinators on folds as well when needed.
Fold Design
A fold is represented by a collection of "initial", "step" and "extract"
functions. The "initial" action generates the initial state of the fold. The
state is internal to the fold and maintains the accumulated output. The
"step" function is invoked using the current state and the next input value
and results in a Partial
or Done
. A Partial
returns the next intermediate
state of the fold, a Done
indicates that the fold has terminated and
returns the final value of the accumulator.
Every Partial
indicates that a new accumulated output is available. The
accumulated output can be extracted from the state at any point using
"extract". "extract" can never fail. A fold returns a valid output even
without any input i.e. even if you call "extract" on "initial" state it
provides an output. This is not true for parsers.
In general, "extract" is used in two cases:
- When the fold is used as a scan
extract
is called on the intermediate state every time it is yielded by the fold, the resulting value is yielded as a stream. - When the fold is used as a regular fold,
extract
is called once when we are done feeding input to the fold.
Alternate Designs
An alternate and simpler design would be to return the intermediate output
via Partial
along with the state, instead of using "extract" on the yielded
state and remove the extract function altogether.
This may even facilitate more efficient implementation. Extract from the intermediate state after each yield may be more costly compared to the fold step itself yielding the output. The fold may have more efficient ways to retrieve the output rather than stuffing it in the state and using extract on the state.
However, removing extract altogether may lead to less optimal code in some
cases because the driver of the fold needs to thread around the intermediate
output to return it if the stream stops before the fold could Done
. When
using this approach, the parseMany (FL.take filesize)
benchmark shows a
2x worse performance even after ensuring everything fuses. So we keep the
"extract" approach to ensure better perf in all cases.
But we could still yield both state and the output in Partial
, the output
can be used for the scan use case, instead of using extract. Extract would
then be used only for the case when the stream stops before the fold
completes.
Accumulators and Terminating Folds
Folds in this module can be classified in two categories viz. accumulators
and terminating folds. Accumulators do not have a terminating condition,
they run forever and consume the entire stream, for example the length
fold. Terminating folds have a terminating condition and can terminate
without consuming the entire stream, for example, the head
fold.
Monoids
Monoids allow generalized, modular folding. The accumulators in this module
can be expressed using mconcat
and a suitable Monoid
. Instead of
writing folds we can write Monoids and turn them into folds.
Performance Notes
Prelude
module provides fold functions to directly fold streams
e.g. Streamly.Prelude/sum
serves the same purpose as
Fold/sum
. However, the functions in Streamly.Prelude cannot be
efficiently combined together e.g. we cannot drive the input stream through
sum
and length
fold functions simultaneously. Using the Fold
type we
can efficiently split the stream across multiple folds because it allows the
compiler to perform stream fusion optimizations.
Synopsis
- data Step s b
- data Fold m a b = forall s. Fold (s -> a -> m (Step s b)) (m (Step s b)) (s -> m b)
- foldl' :: Monad m => (b -> a -> b) -> b -> Fold m a b
- foldlM' :: Monad m => (b -> a -> m b) -> m b -> Fold m a b
- foldl1' :: Monad m => (a -> a -> a) -> Fold m a (Maybe a)
- foldr :: Monad m => (a -> b -> b) -> b -> Fold m a b
- foldrM :: Monad m => (a -> b -> m b) -> m b -> Fold m a b
- mkFold :: Monad m => (s -> a -> Step s b) -> Step s b -> (s -> b) -> Fold m a b
- mkFold_ :: Monad m => (b -> a -> Step b b) -> Step b b -> Fold m a b
- mkFoldM :: (s -> a -> m (Step s b)) -> m (Step s b) -> (s -> m b) -> Fold m a b
- mkFoldM_ :: Monad m => (b -> a -> m (Step b b)) -> m (Step b b) -> Fold m a b
- fromPure :: Applicative m => b -> Fold m a b
- fromEffect :: Applicative m => m b -> Fold m a b
- drain :: Monad m => Fold m a ()
- toList :: Monad m => Fold m a [a]
- rmapM :: Monad m => (b -> m c) -> Fold m a b -> Fold m a c
- map :: (a -> b) -> Fold m b r -> Fold m a r
- lmap :: (a -> b) -> Fold m b r -> Fold m a r
- lmapM :: Monad m => (a -> m b) -> Fold m b r -> Fold m a r
- filter :: Monad m => (a -> Bool) -> Fold m a r -> Fold m a r
- filterM :: Monad m => (a -> m Bool) -> Fold m a r -> Fold m a r
- catMaybes :: Monad m => Fold m a b -> Fold m (Maybe a) b
- take :: Monad m => Int -> Fold m a b -> Fold m a b
- takeInterval :: MonadAsync m => Double -> Fold m a b -> Fold m a b
- serialWith :: Monad m => (a -> b -> c) -> Fold m x a -> Fold m x b -> Fold m x c
- serial_ :: Fold m x a -> Fold m x b -> Fold m x b
- data GenericRunner sL sR bL bR
- teeWith :: Monad m => (a -> b -> c) -> Fold m x a -> Fold m x b -> Fold m x c
- teeWithFst :: (b -> c -> d) -> Fold m a b -> Fold m a c -> Fold m a d
- teeWithMin :: (b -> c -> d) -> Fold m a b -> Fold m a c -> Fold m a d
- shortest :: Fold m x a -> Fold m x a -> Fold m x a
- longest :: Fold m x a -> Fold m x a -> Fold m x a
- data ManyState s1 s2
- many :: Monad m => Fold m a b -> Fold m b c -> Fold m a c
- manyPost :: Monad m => Fold m a b -> Fold m b c -> Fold m a c
- chunksOf :: Monad m => Int -> Fold m a b -> Fold m b c -> Fold m a c
- intervalsOf :: MonadAsync m => Double -> Fold m a b -> Fold m b c -> Fold m a c
- concatMap :: Monad m => (b -> Fold m a c) -> Fold m a b -> Fold m a c
- duplicate :: Monad m => Fold m a b -> Fold m a (Fold m a b)
- initialize :: Monad m => Fold m a b -> m (Fold m a b)
- runStep :: Monad m => Fold m a b -> a -> m (Fold m a b)
- data Fold2 m c a b = forall s. Fold2 (s -> a -> m s) (c -> m s) (s -> m b)
- simplify :: Functor m => Fold2 m c a b -> c -> Fold m a b
- chunksOf2 :: Monad m => Int -> Fold m a b -> Fold2 m x b c -> Fold2 m x a c
Types
Represents the result of the step
of a Fold
. Partial
returns an
intermediate state of the fold, the fold step can be called again with the
state or the driver can use extract
on the state to get the result out.
Done
returns the final result and the fold cannot be driven further.
Pre-release
The type Fold m a b
having constructor Fold step initial extract
represents a fold over an input stream of values of type a
to a final
value of type b
in Monad
m
.
The fold uses an intermediate state s
as accumulator, the type s
is
internal to the specific fold definition. The initial value of the fold
state s
is returned by initial
. The step
function consumes an input
and either returns the final result b
if the fold is done or the next
intermediate state (see Step
). At any point the fold driver can extract
the result from the intermediate state using the extract
function.
NOTE: The constructor is not yet exposed via exposed modules, smart constructors are provided to create folds. If you think you need the constructor of this type please consider using the smart constructors in Streamly.Internal.Data.Fold instead.
since 0.8.0 (type changed)
Since: 0.7.0
Constructors
foldl' :: Monad m => (b -> a -> b) -> b -> Fold m a b Source #
Make a fold from a left fold style pure step function and initial value of the accumulator.
If your Fold
returns only Partial
(i.e. never returns a Done
) then you
can use foldl'*
constructors.
A fold with an extract function can be expressed using fmap:
mkfoldlx :: Monad m => (s -> a -> s) -> s -> (s -> b) -> Fold m a b mkfoldlx step initial extract = fmap extract (foldl' step initial)
See also: Streamly.Prelude.foldl'
Since: 0.8.0
foldlM' :: Monad m => (b -> a -> m b) -> m b -> Fold m a b Source #
Make a fold from a left fold style monadic step function and initial value of the accumulator.
A fold with an extract function can be expressed using rmapM:
mkFoldlxM :: Functor m => (s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b mkFoldlxM step initial extract = rmapM extract (foldlM' step initial)
See also: Streamly.Prelude.foldlM'
Since: 0.8.0
foldl1' :: Monad m => (a -> a -> a) -> Fold m a (Maybe a) Source #
Make a strict left fold, for non-empty streams, using first element as the starting value. Returns Nothing if the stream is empty.
See also: Streamly.Prelude.foldl1'
Pre-release
foldr :: Monad m => (a -> b -> b) -> b -> Fold m a b Source #
Make a fold using a right fold style step function and a terminal value. It performs a strict right fold via a left fold using function composition. Note that this is strict fold, it can only be useful for constructing strict structures in memory. For reductions this will be very inefficient.
For example,
toList = foldr (:) []
See also: foldr
Since: 0.8.0
mkFold :: Monad m => (s -> a -> Step s b) -> Step s b -> (s -> b) -> Fold m a b Source #
Make a terminating fold using a pure step function, a pure initial state and a pure state extraction function.
Pre-release
mkFold_ :: Monad m => (b -> a -> Step b b) -> Step b b -> Fold m a b Source #
Similar to mkFold
but the final state extracted is identical to the
intermediate state.
mkFold_ step initial = mkFold step initial id
Pre-release
mkFoldM :: (s -> a -> m (Step s b)) -> m (Step s b) -> (s -> m b) -> Fold m a b Source #
Make a terminating fold with an effectful step function and initial state, and a state extraction function.
mkFoldM = Fold
We can just use Fold
but it is provided for completeness.
Pre-release
mkFoldM_ :: Monad m => (b -> a -> m (Step b b)) -> m (Step b b) -> Fold m a b Source #
Similar to mkFoldM
but the final state extracted is identical to the
intermediate state.
mkFoldM_ step initial = mkFoldM step initial return
Pre-release
Folds
fromPure :: Applicative m => b -> Fold m a b Source #
A fold that always yields a pure value without consuming any input.
Pre-release
fromEffect :: Applicative m => m b -> Fold m a b Source #
A fold that always yields the result of an effectful action without consuming any input.
Pre-release
drain :: Monad m => Fold m a () Source #
A fold that drains all its input, running the effects and discarding the results.
drain = drainBy (const (return ()))
Since: 0.7.0
toList :: Monad m => Fold m a [a] Source #
Folds the input stream to a list.
Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Data.Array.Foreign instead.
toList = foldr (:) []
Since: 0.7.0
Combinators
Mapping output
rmapM :: Monad m => (b -> m c) -> Fold m a b -> Fold m a c Source #
Map a monadic function on the output of a fold.
Since: 0.8.0
Mapping Input
lmap :: (a -> b) -> Fold m b r -> Fold m a r Source #
lmap f fold
maps the function f
on the input of the fold.
>>>
Stream.fold (Fold.lmap (\x -> x * x) Fold.sum) (Stream.enumerateFromTo 1 100)
338350
lmap = Fold.lmapM return
Since: 0.8.0
lmapM :: Monad m => (a -> m b) -> Fold m b r -> Fold m a r Source #
lmapM f fold
maps the monadic function f
on the input of the fold.
Since: 0.8.0
Filtering
filter :: Monad m => (a -> Bool) -> Fold m a r -> Fold m a r Source #
Include only those elements that pass a predicate.
>>>
Stream.fold (Fold.filter (> 5) Fold.sum) $ Stream.fromList [1..10]
40
filter f = Fold.filterM (return . f)
Since: 0.8.0
filterM :: Monad m => (a -> m Bool) -> Fold m a r -> Fold m a r Source #
Like filter
but with a monadic predicate.
Since: 0.8.0
Trimming
take :: Monad m => Int -> Fold m a b -> Fold m a b Source #
Take at most n
input elements and fold them using the supplied fold. A
negative count is treated as 0.
>>>
Stream.fold (Fold.take 2 Fold.toList) $ Stream.fromList [1..10]
[1,2]
Since: 0.8.0
takeInterval :: MonadAsync m => Double -> Fold m a b -> Fold m a b Source #
takeInterval n fold
uses fold
to fold the input items arriving within
a window of first n
seconds.
>>>
Stream.fold (Fold.takeInterval 1.0 Fold.toList) $ Stream.delay 0.1 $ Stream.fromList [1..]
[1,2,3,4,5,6,7,8,9,10,11]
Stops when fold
stops or when the timeout occurs. Note that the fold needs
an input after the timeout to stop. For example, if no input is pushed to
the fold until one hour after the timeout had occurred, then the fold will
be done only after consuming that input.
Pre-release
Serial Append
serialWith :: Monad m => (a -> b -> c) -> Fold m x a -> Fold m x b -> Fold m x c Source #
Sequential fold application. Apply two folds sequentially to an input stream. The input is provided to the first fold, when it is done - the remaining input is provided to the second fold. When the second fold is done or if the input stream is over, the outputs of the two folds are combined using the supplied function.
>>>
f = Fold.serialWith (,) (Fold.take 8 Fold.toList) (Fold.takeEndBy (== '\n') Fold.toList)
>>>
Stream.fold f $ Stream.fromList "header: hello\n"
("header: ","hello\n")
Note: This is dual to appending streams using serial
.
Note: this implementation allows for stream fusion but has quadratic time complexity, because each composition adds a new branch that each subsequent fold's input element has to traverse, therefore, it cannot scale to a large number of compositions. After around 100 compositions the performance starts dipping rapidly compared to a CPS style implementation.
Time: O(n^2) where n is the number of compositions.
Since: 0.8.0
serial_ :: Fold m x a -> Fold m x b -> Fold m x b Source #
Same as applicative *>
. Run two folds serially one after the other
discarding the result of the first.
Unimplemented
Parallel Distribution
teeWith :: Monad m => (a -> b -> c) -> Fold m x a -> Fold m x b -> Fold m x c Source #
teeWith k f1 f2
distributes its input to both f1
and f2
until both
of them terminate and combines their output using k
.
>>>
avg = Fold.teeWith (/) Fold.sum (fmap fromIntegral Fold.length)
>>>
Stream.fold avg $ Stream.fromList [1.0..100.0]
50.5
teeWith k f1 f2 = fmap (uncurry k) ((Fold.tee f1 f2)
For applicative composition using this combinator see Streamly.Internal.Data.Fold.Tee.
See also: Streamly.Internal.Data.Fold.Tee
Since: 0.8.0
teeWithFst :: (b -> c -> d) -> Fold m a b -> Fold m a c -> Fold m a d Source #
Like teeWith
but terminates as soon as the first fold terminates.
Unimplemented
teeWithMin :: (b -> c -> d) -> Fold m a b -> Fold m a c -> Fold m a d Source #
Like teeWith
but terminates as soon as any one of the two folds
terminates.
Unimplemented
Parallel Alternative
shortest :: Fold m x a -> Fold m x a -> Fold m x a Source #
Shortest alternative. Apply both folds in parallel but choose the result from the one which consumed least input i.e. take the shortest succeeding fold.
Unimplemented
longest :: Fold m x a -> Fold m x a -> Fold m x a Source #
Longest alternative. Apply both folds in parallel but choose the result from the one which consumed more input i.e. take the longest succeeding fold.
Unimplemented
Splitting
many :: Monad m => Fold m a b -> Fold m b c -> Fold m a c Source #
Collect zero or more applications of a fold. many split collect
applies
the split
fold repeatedly on the input stream and accumulates zero or more
fold results using collect
.
>>>
two = Fold.take 2 Fold.toList
>>>
twos = Fold.many two Fold.toList
>>>
Stream.fold twos $ Stream.fromList [1..10]
[[1,2],[3,4],[5,6],[7,8],[9,10]]
Stops when collect
stops.
Since: 0.8.0
chunksOf :: Monad m => Int -> Fold m a b -> Fold m b c -> Fold m a c Source #
chunksOf n split collect
repeatedly applies the split
fold to chunks
of n
items in the input stream and supplies the result to the collect
fold.
>>>
twos = Fold.chunksOf 2 Fold.toList Fold.toList
>>>
Stream.fold twos $ Stream.fromList [1..10]
[[1,2],[3,4],[5,6],[7,8],[9,10]]
chunksOf n split = many (take n split)
Stops when collect
stops.
Since: 0.8.0
intervalsOf :: MonadAsync m => Double -> Fold m a b -> Fold m b c -> Fold m a c Source #
Group the input stream into windows of n second each using the first fold and then fold the resulting groups using the second fold.
>>>
intervals = Fold.intervalsOf 0.5 Fold.toList Fold.toList
>>>
Stream.fold intervals $ Stream.delay 0.2 $ Stream.fromList [1..10]
[[1,2,3,4],[5,6,7],[8,9,10]]
intervalsOf n split = many (takeInterval n split)
Pre-release
Nesting
concatMap :: Monad m => (b -> Fold m a c) -> Fold m a b -> Fold m a c Source #
Map a Fold
returning function on the result of a Fold
and run the
returned fold. This operation can be used to express data dependencies
between fold operations.
Let's say the first element in the stream is a count of the following elements that we have to add, then:
>>>
import Data.Maybe (fromJust)
>>>
count = fmap fromJust Fold.head
>>>
total n = Fold.take n Fold.sum
>>>
Stream.fold (Fold.concatMap total count) $ Stream.fromList [10,9..1]
45
Time: O(n^2) where n
is the number of compositions.
See also: foldIterateM
Since: 0.8.0
Running Partially
duplicate :: Monad m => Fold m a b -> Fold m a (Fold m a b) Source #
Modify the fold such that it returns a new Fold
instead of the output.
If the fold was already done the returned fold would always yield the
result. If the fold was partial, the returned fold starts from where we left
i.e. it uses the last accumulator value as the initial value of the
accumulator. Thus we can resume the fold later and feed it more input.
>>>
:{
do more <- Stream.fold (Fold.duplicate Fold.sum) (Stream.enumerateFromTo 1 10) evenMore <- Stream.fold (Fold.duplicate more) (Stream.enumerateFromTo 11 20) Stream.fold evenMore (Stream.enumerateFromTo 21 30) :} 465
Pre-release
initialize :: Monad m => Fold m a b -> m (Fold m a b) Source #
Run the initialization effect of a fold. The returned fold would use the value returned by this effect as its initial value.
Pre-release
runStep :: Monad m => Fold m a b -> a -> m (Fold m a b) Source #
Run one step of a fold and store the accumulator as an initial value in the returned fold.
Pre-release
Fold2
Experimental type to provide a side input to the fold for generating the
initial state. For example, if we have to fold chunks of a stream and write
each chunk to a different file, then we can generate the file name using a
monadic action. This is a generalized version of Fold
.
Internal
forall s. Fold2 (s -> a -> m s) (c -> m s) (s -> m b) |
|