streamly-0.7.2: Beautiful Streaming, Concurrent and Reactive Composition

Copyright(c) 2019 Composewell Technologies
(c) 2013 Gabriel Gonzalez
LicenseBSD3
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Internal.Data.Fold

Contents

Description

 
Synopsis

Fold Type

data Fold m a b Source #

Represents a left fold over an input stream consisting of values of type a to a single value of type b in Monad m.

The fold uses an intermediate state s as accumulator. The step function updates the state and returns the new state. When the fold is done the final result of the fold is extracted from the intermediate state using the extract function.

Since: 0.7.0

Constructors

Fold (s -> a -> m s) (m s) (s -> m b)

Fold step initial extract

Instances
Functor m => Functor (Fold m a) Source #

Maps a function on the output of the fold (the type b).

Instance details

Defined in Streamly.Internal.Data.Fold.Types

Methods

fmap :: (a0 -> b) -> Fold m a a0 -> Fold m a b #

(<$) :: a0 -> Fold m a b -> Fold m a a0 #

Applicative m => Applicative (Fold m a) Source #

The fold resulting from <*> distributes its input to both the argument folds and combines their output using the supplied function.

Instance details

Defined in Streamly.Internal.Data.Fold.Types

Methods

pure :: a0 -> Fold m a a0 #

(<*>) :: Fold m a (a0 -> b) -> Fold m a a0 -> Fold m a b #

liftA2 :: (a0 -> b -> c) -> Fold m a a0 -> Fold m a b -> Fold m a c #

(*>) :: Fold m a a0 -> Fold m a b -> Fold m a b #

(<*) :: Fold m a a0 -> Fold m a b -> Fold m a a0 #

(Monad m, Floating b) => Floating (Fold m a b) Source #

Combines the fold outputs using their Floating instances.

Instance details

Defined in Streamly.Internal.Data.Fold.Types

Methods

pi :: Fold m a b #

exp :: Fold m a b -> Fold m a b #

log :: Fold m a b -> Fold m a b #

sqrt :: Fold m a b -> Fold m a b #

(**) :: Fold m a b -> Fold m a b -> Fold m a b #

logBase :: Fold m a b -> Fold m a b -> Fold m a b #

sin :: Fold m a b -> Fold m a b #

cos :: Fold m a b -> Fold m a b #

tan :: Fold m a b -> Fold m a b #

asin :: Fold m a b -> Fold m a b #

acos :: Fold m a b -> Fold m a b #

atan :: Fold m a b -> Fold m a b #

sinh :: Fold m a b -> Fold m a b #

cosh :: Fold m a b -> Fold m a b #

tanh :: Fold m a b -> Fold m a b #

asinh :: Fold m a b -> Fold m a b #

acosh :: Fold m a b -> Fold m a b #

atanh :: Fold m a b -> Fold m a b #

log1p :: Fold m a b -> Fold m a b #

expm1 :: Fold m a b -> Fold m a b #

log1pexp :: Fold m a b -> Fold m a b #

log1mexp :: Fold m a b -> Fold m a b #

(Monad m, Fractional b) => Fractional (Fold m a b) Source #

Combines the fold outputs (type b) using their Fractional instances.

Instance details

Defined in Streamly.Internal.Data.Fold.Types

Methods

(/) :: Fold m a b -> Fold m a b -> Fold m a b #

recip :: Fold m a b -> Fold m a b #

fromRational :: Rational -> Fold m a b #

(Monad m, Num b) => Num (Fold m a b) Source #

Combines the fold outputs (type b) using their Num instances.

Instance details

Defined in Streamly.Internal.Data.Fold.Types

Methods

(+) :: Fold m a b -> Fold m a b -> Fold m a b #

(-) :: Fold m a b -> Fold m a b -> Fold m a b #

(*) :: Fold m a b -> Fold m a b -> Fold m a b #

negate :: Fold m a b -> Fold m a b #

abs :: Fold m a b -> Fold m a b #

signum :: Fold m a b -> Fold m a b #

fromInteger :: Integer -> Fold m a b #

(Semigroup b, Monad m) => Semigroup (Fold m a b) Source #

Combines the outputs of the folds (the type b) using their Semigroup instances.

Instance details

Defined in Streamly.Internal.Data.Fold.Types

Methods

(<>) :: Fold m a b -> Fold m a b -> Fold m a b #

sconcat :: NonEmpty (Fold m a b) -> Fold m a b #

stimes :: Integral b0 => b0 -> Fold m a b -> Fold m a b #

(Semigroup b, Monoid b, Monad m) => Monoid (Fold m a b) Source #

Combines the outputs of the folds (the type b) using their Monoid instances.

Instance details

Defined in Streamly.Internal.Data.Fold.Types

Methods

mempty :: Fold m a b #

mappend :: Fold m a b -> Fold m a b -> Fold m a b #

mconcat :: [Fold m a b] -> Fold m a b #

hoist :: (forall x. m x -> n x) -> Fold m a b -> Fold n a b Source #

Change the underlying monad of a fold

Internal

generally :: Monad m => Fold Identity a b -> Fold m a b Source #

Adapt a pure fold to any monad

generally = hoist (return . runIdentity)

Internal

Fold Creation Utilities

mkPure :: Monad m => (s -> a -> s) -> s -> (s -> b) -> Fold m a b Source #

Make a fold using a pure step function, a pure initial state and a pure state extraction function.

Internal

mkPureId :: Monad m => (b -> a -> b) -> b -> Fold m a b Source #

Make a fold using a pure step function and a pure initial state. The final state extracted is identical to the intermediate state.

Internal

mkFold :: (s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b Source #

Make a fold with an effectful step function and initial state, and a state extraction function.

mkFold = Fold

We can just use Fold but it is provided for completeness.

Internal

mkFoldId :: Monad m => (b -> a -> m b) -> m b -> Fold m a b Source #

Make a fold with an effectful step function and initial state. The final state extracted is identical to the intermediate state.

Internal

Full Folds

drain :: Monad m => Fold m a () Source #

A fold that drains all its input, running the effects and discarding the results.

Since: 0.7.0

drainBy :: Monad m => (a -> m b) -> Fold m a () Source #

drainBy f = lmapM f drain

Drain all input after passing it through a monadic function. This is the dual of mapM_ on stream producers.

Since: 0.7.0

drainBy2 :: Monad m => (a -> m b) -> Fold2 m c a () Source #

last :: Monad m => Fold m a (Maybe a) Source #

Extract the last element of the input stream, if any.

Since: 0.7.0

length :: Monad m => Fold m a Int Source #

Determine the length of the input stream.

Since: 0.7.0

sum :: (Monad m, Num a) => Fold m a a Source #

Determine the sum of all elements of a stream of numbers. Returns additive identity (0) when the stream is empty. Note that this is not numerically stable for floating point numbers.

Since: 0.7.0

product :: (Monad m, Num a) => Fold m a a Source #

Determine the product of all elements of a stream of numbers. Returns multiplicative identity (1) when the stream is empty.

Since: 0.7.0

maximumBy :: Monad m => (a -> a -> Ordering) -> Fold m a (Maybe a) Source #

Determine the maximum element in a stream using the supplied comparison function.

Since: 0.7.0

maximum :: (Monad m, Ord a) => Fold m a (Maybe a) Source #

maximum = maximumBy compare

Determine the maximum element in a stream.

Since: 0.7.0

minimumBy :: Monad m => (a -> a -> Ordering) -> Fold m a (Maybe a) Source #

Computes the minimum element with respect to the given comparison function

Since: 0.7.0

minimum :: (Monad m, Ord a) => Fold m a (Maybe a) Source #

Determine the minimum element in a stream using the supplied comparison function.

Since: 0.7.0

mean :: (Monad m, Fractional a) => Fold m a a Source #

Compute a numerically stable arithmetic mean of all elements in the input stream.

Since: 0.7.0

variance :: (Monad m, Fractional a) => Fold m a a Source #

Compute a numerically stable (population) variance over all elements in the input stream.

Since: 0.7.0

stdDev :: (Monad m, Floating a) => Fold m a a Source #

Compute a numerically stable (population) standard deviation over all elements in the input stream.

Since: 0.7.0

rollingHash :: (Monad m, Enum a) => Fold m a Int64 Source #

Compute an Int sized polynomial rolling hash of a stream.

rollingHash = rollingHashWithSalt defaultSalt

Since: 0.7.0

rollingHashWithSalt :: (Monad m, Enum a) => Int64 -> Fold m a Int64 Source #

Compute an Int sized polynomial rolling hash

H = salt * k ^ n + c1 * k ^ (n - 1) + c2 * k ^ (n - 2) + ... + cn * k ^ 0

Where c1, c2, cn are the elements in the input stream and k is a constant.

This hash is often used in Rabin-Karp string search algorithm.

See https://en.wikipedia.org/wiki/Rolling_hash

Since: 0.7.0

rollingHashFirstN :: (Monad m, Enum a) => Int -> Fold m a Int64 Source #

Compute an Int sized polynomial rolling hash of the first n elements of a stream.

rollingHashFirstN = ltake n rollingHash

Full Folds (Monoidal)

mconcat :: (Monad m, Monoid a) => Fold m a a Source #

Fold an input stream consisting of monoidal elements using mappend and mempty.

S.fold FL.mconcat (S.map Sum $ S.enumerateFromTo 1 10)

Since: 0.7.0

foldMap :: (Monad m, Monoid b) => (a -> b) -> Fold m a b Source #

foldMap f = map f mconcat

Make a fold from a pure function that folds the output of the function using mappend and mempty.

S.fold (FL.foldMap Sum) $ S.enumerateFromTo 1 10

Since: 0.7.0

foldMapM :: (Monad m, Monoid b) => (a -> m b) -> Fold m a b Source #

foldMapM f = mapM f mconcat

Make a fold from a monadic function that folds the output of the function using mappend and mempty.

S.fold (FL.foldMapM (return . Sum)) $ S.enumerateFromTo 1 10

Since: 0.7.0

Full Folds (To Containers)

toList :: Monad m => Fold m a [a] Source #

Folds the input stream to a list.

Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Memory.Array instead.

Since: 0.7.0

toListRevF :: Monad m => Fold m a [a] Source #

Buffers the input stream to a list in the reverse order of the input.

Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Array instead.

Since: 0.7.0

Partial Folds

drainN :: Monad m => Int -> Fold m a () Source #

A fold that drains the first n elements of its input, running the effects and discarding the results.

drainWhile :: Monad m => (a -> Bool) -> Fold m a () Source #

A fold that drains elements of its input as long as the predicate succeeds, running the effects and discarding the results.

index :: Monad m => Int -> Fold m a (Maybe a) Source #

Lookup the element at the given index.

Since: 0.7.0

head :: Monad m => Fold m a (Maybe a) Source #

Extract the first element of the stream, if any.

Since: 0.7.0

find :: Monad m => (a -> Bool) -> Fold m a (Maybe a) Source #

Returns the first element that satisfies the given predicate.

Since: 0.7.0

lookup :: (Eq a, Monad m) => a -> Fold m (a, b) (Maybe b) Source #

In a stream of (key-value) pairs (a, b), return the value b of the first pair where the key equals the given value a.

Since: 0.7.0

findIndex :: Monad m => (a -> Bool) -> Fold m a (Maybe Int) Source #

Returns the first index that satisfies the given predicate.

Since: 0.7.0

elemIndex :: (Eq a, Monad m) => a -> Fold m a (Maybe Int) Source #

Returns the first index where a given value is found in the stream.

Since: 0.7.0

null :: Monad m => Fold m a Bool Source #

Return True if the input stream is empty.

Since: 0.7.0

elem :: (Eq a, Monad m) => a -> Fold m a Bool Source #

Return True if the given element is present in the stream.

Since: 0.7.0

notElem :: (Eq a, Monad m) => a -> Fold m a Bool Source #

Returns True if the given element is not present in the stream.

Since: 0.7.0

all :: Monad m => (a -> Bool) -> Fold m a Bool Source #

all p = lmap p and

| Returns True if all elements of a stream satisfy a predicate.

Since: 0.7.0

any :: Monad m => (a -> Bool) -> Fold m a Bool Source #

any p = lmap p or

| Returns True if any of the elements of a stream satisfies a predicate.

Since: 0.7.0

and :: Monad m => Fold m Bool Bool Source #

Returns True if all elements are True, False otherwise

Since: 0.7.0

or :: Monad m => Fold m Bool Bool Source #

Returns True if any element is True, False otherwise

Since: 0.7.0

Transformations

Covariant Operations

sequence :: Monad m => Fold m a (m b) -> Fold m a b Source #

Flatten the monadic output of a fold to pure output.

Since: 0.7.0

mapM :: Monad m => (b -> m c) -> Fold m a b -> Fold m a c Source #

Map a monadic function on the output of a fold.

Since: 0.7.0

Mapping

transform :: Monad m => Pipe m a b -> Fold m b c -> Fold m a c Source #

Apply a transformation on a Fold using a Pipe.

Since: 0.7.0

lmap :: (a -> b) -> Fold m b r -> Fold m a r Source #

(lmap f fold) maps the function f on the input of the fold.

>>> S.fold (FL.lmap (\x -> x * x) FL.sum) (S.enumerateFromTo 1 100)
338350

Since: 0.7.0

lmapM :: Monad m => (a -> m b) -> Fold m b r -> Fold m a r Source #

(lmapM f fold) maps the monadic function f on the input of the fold.

Since: 0.7.0

Filtering

lfilter :: Monad m => (a -> Bool) -> Fold m a r -> Fold m a r Source #

Include only those elements that pass a predicate.

>>> S.fold (lfilter (> 5) FL.sum) [1..10]
40

Since: 0.7.0

lfilterM :: Monad m => (a -> m Bool) -> Fold m a r -> Fold m a r Source #

Like lfilter but with a monadic predicate.

Since: 0.7.0

lcatMaybes :: Monad m => Fold m a b -> Fold m (Maybe a) b Source #

Transform a fold from a pure input to a Maybe input, consuming only Just values.

Parsing

Trimming

ltake :: Monad m => Int -> Fold m a b -> Fold m a b Source #

Take first n elements from the stream and discard the rest.

Since: 0.7.0

ltakeWhile :: Monad m => (a -> Bool) -> Fold m a b -> Fold m a b Source #

Takes elements from the input as long as the predicate succeeds.

Since: 0.7.0

lsessionsOf :: MonadAsync m => Double -> Fold m a b -> Fold m b c -> Fold m a c Source #

Group the input stream into windows of n second each and then fold each group using the provided fold function.

For example, we can copy and distribute a stream to multiple folds where each fold can group the input differently e.g. by one second, one minute and one hour windows respectively and fold each resulting stream of folds.

-----Fold m a b----|-Fold n a c-|-Fold n a c-|-...-|----Fold m a c

lchunksOf :: Monad m => Int -> Fold m a b -> Fold m b c -> Fold m a c Source #

For every n input items, apply the first fold and supply the result to the next fold.

Breaking

splitAt :: Monad m => Int -> Fold m a b -> Fold m a c -> Fold m a (b, c) Source #

splitAt n f1 f2 composes folds f1 and f2 such that first n elements of its input are consumed by fold f1 and the rest of the stream is consumed by fold f2.

let splitAt_ n xs = S.fold (FL.splitAt n FL.toList FL.toList) $ S.fromList xs
>>> splitAt_ 6 "Hello World!"
> ("Hello ","World!")
>>> splitAt_ (-1) [1,2,3]
> ([],[1,2,3])
>>> splitAt_ 0 [1,2,3]
> ([],[1,2,3])
>>> splitAt_ 1 [1,2,3]
> ([1],[2,3])
>>> splitAt_ 3 [1,2,3]
> ([1,2,3],[])
>>> splitAt_ 4 [1,2,3]
> ([1,2,3],[])

Internal

span :: Monad m => (a -> Bool) -> Fold m a b -> Fold m a c -> Fold m a (b, c) Source #

span p f1 f2 composes folds f1 and f2 such that f1 consumes the input as long as the predicate p is True. f2 consumes the rest of the input.

let span_ p xs = S.fold (S.span p FL.toList FL.toList) $ S.fromList xs
>>> span_ (< 1) [1,2,3]
> ([],[1,2,3])
>>> span_ (< 2) [1,2,3]
> ([1],[2,3])
>>> span_ (< 4) [1,2,3]
> ([1,2,3],[])

Internal

break :: Monad m => (a -> Bool) -> Fold m a b -> Fold m a c -> Fold m a (b, c) Source #

break p = span (not . p)

Break as soon as the predicate becomes True. break p f1 f2 composes folds f1 and f2 such that f1 stops consuming input as soon as the predicate p becomes True. The rest of the input is consumed f2.

This is the binary version of splitBy.

let break_ p xs = S.fold (S.break p FL.toList FL.toList) $ S.fromList xs
>>> break_ (< 1) [3,2,1]
> ([3,2,1],[])
>>> break_ (< 2) [3,2,1]
> ([3,2],[1])
>>> break_ (< 4) [3,2,1]
> ([],[3,2,1])

Internal

spanBy :: Monad m => (a -> a -> Bool) -> Fold m a b -> Fold m a c -> Fold m a (b, c) Source #

Break the input stream into two groups, the first group takes the input as long as the predicate applied to the first element of the stream and next input element holds True, the second group takes the rest of the input.

Internal

spanByRolling :: Monad m => (a -> a -> Bool) -> Fold m a b -> Fold m a c -> Fold m a (b, c) Source #

Like spanBy but applies the predicate in a rolling fashion i.e. predicate is applied to the previous and the next input elements.

Internal

Distributing

tee :: Monad m => Fold m a b -> Fold m a c -> Fold m a (b, c) Source #

Distribute one copy of the stream to each fold and zip the results.

                |-------Fold m a b--------|
---stream m a---|                         |---m (b,c)
                |-------Fold m a c--------|
>>> S.fold (FL.tee FL.sum FL.length) (S.enumerateFromTo 1.0 100.0)
(5050.0,100)

Since: 0.7.0

distribute :: Monad m => [Fold m a b] -> Fold m a [b] Source #

Distribute one copy of the stream to each fold and collect the results in a container.

                |-------Fold m a b--------|
---stream m a---|                         |---m [b]
                |-------Fold m a b--------|
                |                         |
                           ...
>>> S.fold (FL.distribute [FL.sum, FL.length]) (S.enumerateFromTo 1 5)
[15,5]

This is the consumer side dual of the producer side sequence operation.

Since: 0.7.0

distribute_ :: Monad m => [Fold m a ()] -> Fold m a () Source #

Like distribute but for folds that return (), this can be more efficient than distribute as it does not need to maintain state.

Partitioning

partition :: Monad m => Fold m b x -> Fold m c y -> Fold m (Either b c) (x, y) Source #

Compose two folds such that the combined fold accepts a stream of Either and routes the Left values to the first fold and Right values to the second fold.

partition = partitionBy id

Since: 0.7.0

Demultiplexing

demux :: (Monad m, Ord k) => Map k (Fold m a b) -> Fold m (k, a) (Map k b) Source #

Fold a stream of key value pairs using a map of specific folds for each key into a map from keys to the results of fold outputs of the corresponding values.

> let table = Data.Map.fromList [("SUM", FL.sum), ("PRODUCT", FL.product)]
      input = S.fromList [("SUM",1),("PRODUCT",2),("SUM",3),("PRODUCT",4)]
  in S.fold (FL.demux table) input
fromList [(PRODUCT,8),(SUM,4)]

Since: 0.7.0

demux_ :: (Monad m, Ord k) => Map k (Fold m a ()) -> Fold m (k, a) () Source #

Given a stream of key value pairs and a map from keys to folds, fold the values for each key using the corresponding folds, discarding the outputs.

> let prn = FL.drainBy print
> let table = Data.Map.fromList [("ONE", prn), ("TWO", prn)]
      input = S.fromList [("ONE",1),("TWO",2)]
  in S.fold (FL.demux_ table) input
One 1
Two 2

Since: 0.7.0

demuxDefault_ :: (Monad m, Ord k) => Map k (Fold m a ()) -> Fold m (k, a) () -> Fold m (k, a) () Source #

demuxWithDefault_ :: (Monad m, Ord k) => (a -> (k, a')) -> Map k (Fold m a' b) -> Fold m (k, a') b -> Fold m a () Source #

Classifying

classify :: (Monad m, Ord k) => Fold m a b -> Fold m (k, a) (Map k b) Source #

Given an input stream of key value pairs and a fold for values, fold all the values belonging to each key. Useful for map/reduce, bucketizing the input in different bins or for generating histograms.

> let input = S.fromList [("ONE",1),("ONE",1.1),("TWO",2), ("TWO",2.2)]
  in S.fold (FL.classify FL.toList) input
fromList [("ONE",[1.1,1.0]),("TWO",[2.2,2.0])]

Since: 0.7.0

Unzipping

unzip :: Monad m => Fold m a x -> Fold m b y -> Fold m (a, b) (x, y) Source #

Send the elements of tuples in a stream of tuples through two different folds.

                          |-------Fold m a x--------|
---------stream of (a,b)--|                         |----m (x,y)
                          |-------Fold m b y--------|

This is the consumer side dual of the producer side zip operation.

Since: 0.7.0

Nested Folds

foldChunks :: Fold m a b -> Fold m b c -> Fold m a c Source #

Apply a terminating fold repeatedly to the input of another fold.

Compare with: Streamly.Prelude.concatMap, Streamly.Prelude.foldChunks

Unimplemented

duplicate :: Applicative m => Fold m a b -> Fold m a (Fold m a b) Source #

Modify the fold such that when the fold is done, instead of returning the accumulator, it returns a fold. The returned fold starts from where we left i.e. it uses the last accumulator value as the initial value of the accumulator. Thus we can resume the fold later and feed it more input.

> do
    more <- S.fold (FL.duplicate FL.sum) (S.enumerateFromTo 1 10)
    evenMore <- S.fold (FL.duplicate more) (S.enumerateFromTo 11 20)
    S.fold evenMore (S.enumerateFromTo 21 30)
 465

Since: 0.7.0

Running Folds

initialize :: Monad m => Fold m a b -> m (Fold m a b) Source #

Run the initialization effect of a fold. The returned fold would use the value returned by this effect as its initial value.

runStep :: Monad m => Fold m a b -> a -> m (Fold m a b) Source #

Run one step of a fold and store the accumulator as an initial value in the returned fold.

Folding to SVar

toParallelSVar :: MonadIO m => SVar t m a -> Maybe WorkerInfo -> Fold m a () Source #