Copyright | (c) 2019 Composewell Technologies (c) 2013 Gabriel Gonzalez |
---|---|
License | BSD3 |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
Synopsis
- data Fold m a b = Fold (s -> a -> m s) (m s) (s -> m b)
- hoist :: (forall x. m x -> n x) -> Fold m a b -> Fold n a b
- generally :: Monad m => Fold Identity a b -> Fold m a b
- mkPure :: Monad m => (s -> a -> s) -> s -> (s -> b) -> Fold m a b
- mkPureId :: Monad m => (b -> a -> b) -> b -> Fold m a b
- mkFold :: (s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b
- mkFoldId :: Monad m => (b -> a -> m b) -> m b -> Fold m a b
- drain :: Monad m => Fold m a ()
- drainBy :: Monad m => (a -> m b) -> Fold m a ()
- drainBy2 :: Monad m => (a -> m b) -> Fold2 m c a ()
- last :: Monad m => Fold m a (Maybe a)
- length :: Monad m => Fold m a Int
- sum :: (Monad m, Num a) => Fold m a a
- product :: (Monad m, Num a) => Fold m a a
- maximumBy :: Monad m => (a -> a -> Ordering) -> Fold m a (Maybe a)
- maximum :: (Monad m, Ord a) => Fold m a (Maybe a)
- minimumBy :: Monad m => (a -> a -> Ordering) -> Fold m a (Maybe a)
- minimum :: (Monad m, Ord a) => Fold m a (Maybe a)
- mean :: (Monad m, Fractional a) => Fold m a a
- variance :: (Monad m, Fractional a) => Fold m a a
- stdDev :: (Monad m, Floating a) => Fold m a a
- rollingHash :: (Monad m, Enum a) => Fold m a Int64
- rollingHashWithSalt :: (Monad m, Enum a) => Int64 -> Fold m a Int64
- rollingHashFirstN :: (Monad m, Enum a) => Int -> Fold m a Int64
- mconcat :: (Monad m, Monoid a) => Fold m a a
- foldMap :: (Monad m, Monoid b) => (a -> b) -> Fold m a b
- foldMapM :: (Monad m, Monoid b) => (a -> m b) -> Fold m a b
- toList :: Monad m => Fold m a [a]
- toListRevF :: Monad m => Fold m a [a]
- drainN :: Monad m => Int -> Fold m a ()
- drainWhile :: Monad m => (a -> Bool) -> Fold m a ()
- index :: Monad m => Int -> Fold m a (Maybe a)
- head :: Monad m => Fold m a (Maybe a)
- find :: Monad m => (a -> Bool) -> Fold m a (Maybe a)
- lookup :: (Eq a, Monad m) => a -> Fold m (a, b) (Maybe b)
- findIndex :: Monad m => (a -> Bool) -> Fold m a (Maybe Int)
- elemIndex :: (Eq a, Monad m) => a -> Fold m a (Maybe Int)
- null :: Monad m => Fold m a Bool
- elem :: (Eq a, Monad m) => a -> Fold m a Bool
- notElem :: (Eq a, Monad m) => a -> Fold m a Bool
- all :: Monad m => (a -> Bool) -> Fold m a Bool
- any :: Monad m => (a -> Bool) -> Fold m a Bool
- and :: Monad m => Fold m Bool Bool
- or :: Monad m => Fold m Bool Bool
- sequence :: Monad m => Fold m a (m b) -> Fold m a b
- mapM :: Monad m => (b -> m c) -> Fold m a b -> Fold m a c
- transform :: Monad m => Pipe m a b -> Fold m b c -> Fold m a c
- lmap :: (a -> b) -> Fold m b r -> Fold m a r
- lmapM :: Monad m => (a -> m b) -> Fold m b r -> Fold m a r
- lfilter :: Monad m => (a -> Bool) -> Fold m a r -> Fold m a r
- lfilterM :: Monad m => (a -> m Bool) -> Fold m a r -> Fold m a r
- lcatMaybes :: Monad m => Fold m a b -> Fold m (Maybe a) b
- ltake :: Monad m => Int -> Fold m a b -> Fold m a b
- ltakeWhile :: Monad m => (a -> Bool) -> Fold m a b -> Fold m a b
- lsessionsOf :: MonadAsync m => Double -> Fold m a b -> Fold m b c -> Fold m a c
- lchunksOf :: Monad m => Int -> Fold m a b -> Fold m b c -> Fold m a c
- splitAt :: Monad m => Int -> Fold m a b -> Fold m a c -> Fold m a (b, c)
- span :: Monad m => (a -> Bool) -> Fold m a b -> Fold m a c -> Fold m a (b, c)
- break :: Monad m => (a -> Bool) -> Fold m a b -> Fold m a c -> Fold m a (b, c)
- spanBy :: Monad m => (a -> a -> Bool) -> Fold m a b -> Fold m a c -> Fold m a (b, c)
- spanByRolling :: Monad m => (a -> a -> Bool) -> Fold m a b -> Fold m a c -> Fold m a (b, c)
- tee :: Monad m => Fold m a b -> Fold m a c -> Fold m a (b, c)
- distribute :: Monad m => [Fold m a b] -> Fold m a [b]
- distribute_ :: Monad m => [Fold m a ()] -> Fold m a ()
- partition :: Monad m => Fold m b x -> Fold m c y -> Fold m (Either b c) (x, y)
- demux :: (Monad m, Ord k) => Map k (Fold m a b) -> Fold m (k, a) (Map k b)
- demux_ :: (Monad m, Ord k) => Map k (Fold m a ()) -> Fold m (k, a) ()
- demuxDefault_ :: (Monad m, Ord k) => Map k (Fold m a ()) -> Fold m (k, a) () -> Fold m (k, a) ()
- demuxWithDefault_ :: (Monad m, Ord k) => (a -> (k, a')) -> Map k (Fold m a' b) -> Fold m (k, a') b -> Fold m a ()
- classify :: (Monad m, Ord k) => Fold m a b -> Fold m (k, a) (Map k b)
- unzip :: Monad m => Fold m a x -> Fold m b y -> Fold m (a, b) (x, y)
- foldChunks :: Fold m a b -> Fold m b c -> Fold m a c
- duplicate :: Applicative m => Fold m a b -> Fold m a (Fold m a b)
- initialize :: Monad m => Fold m a b -> m (Fold m a b)
- runStep :: Monad m => Fold m a b -> a -> m (Fold m a b)
- toParallelSVar :: MonadIO m => SVar t m a -> Maybe WorkerInfo -> Fold m a ()
- toParallelSVarLimited :: MonadIO m => SVar t m a -> Maybe WorkerInfo -> Fold m a ()
Fold Type
Represents a left fold over an input stream consisting of values of type
a
to a single value of type b
in Monad
m
.
The fold uses an intermediate state s
as accumulator. The step
function
updates the state and returns the new state. When the fold is done
the final result of the fold is extracted from the intermediate state
using the extract
function.
Since: 0.7.0
Fold (s -> a -> m s) (m s) (s -> m b) |
|
Instances
Functor m => Functor (Fold m a) Source # | Maps a function on the output of the fold (the type |
Applicative m => Applicative (Fold m a) Source # | The fold resulting from |
(Monad m, Floating b) => Floating (Fold m a b) Source # | Combines the fold outputs using their |
Defined in Streamly.Internal.Data.Fold.Types exp :: Fold m a b -> Fold m a b # log :: Fold m a b -> Fold m a b # sqrt :: Fold m a b -> Fold m a b # (**) :: Fold m a b -> Fold m a b -> Fold m a b # logBase :: Fold m a b -> Fold m a b -> Fold m a b # sin :: Fold m a b -> Fold m a b # cos :: Fold m a b -> Fold m a b # tan :: Fold m a b -> Fold m a b # asin :: Fold m a b -> Fold m a b # acos :: Fold m a b -> Fold m a b # atan :: Fold m a b -> Fold m a b # sinh :: Fold m a b -> Fold m a b # cosh :: Fold m a b -> Fold m a b # tanh :: Fold m a b -> Fold m a b # asinh :: Fold m a b -> Fold m a b # acosh :: Fold m a b -> Fold m a b # atanh :: Fold m a b -> Fold m a b # log1p :: Fold m a b -> Fold m a b # expm1 :: Fold m a b -> Fold m a b # | |
(Monad m, Fractional b) => Fractional (Fold m a b) Source # | Combines the fold outputs (type |
(Monad m, Num b) => Num (Fold m a b) Source # | Combines the fold outputs (type |
Defined in Streamly.Internal.Data.Fold.Types | |
(Semigroup b, Monad m) => Semigroup (Fold m a b) Source # | Combines the outputs of the folds (the type |
(Semigroup b, Monoid b, Monad m) => Monoid (Fold m a b) Source # | Combines the outputs of the folds (the type |
hoist :: (forall x. m x -> n x) -> Fold m a b -> Fold n a b Source #
Change the underlying monad of a fold
Internal
generally :: Monad m => Fold Identity a b -> Fold m a b Source #
Adapt a pure fold to any monad
generally = hoist (return . runIdentity)
Internal
Fold Creation Utilities
mkPure :: Monad m => (s -> a -> s) -> s -> (s -> b) -> Fold m a b Source #
Make a fold using a pure step function, a pure initial state and a pure state extraction function.
Internal
mkPureId :: Monad m => (b -> a -> b) -> b -> Fold m a b Source #
Make a fold using a pure step function and a pure initial state. The final state extracted is identical to the intermediate state.
Internal
mkFold :: (s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b Source #
Make a fold with an effectful step function and initial state, and a state extraction function.
mkFold = Fold
We can just use Fold
but it is provided for completeness.
Internal
mkFoldId :: Monad m => (b -> a -> m b) -> m b -> Fold m a b Source #
Make a fold with an effectful step function and initial state. The final state extracted is identical to the intermediate state.
Internal
Full Folds
drain :: Monad m => Fold m a () Source #
A fold that drains all its input, running the effects and discarding the results.
Since: 0.7.0
drainBy :: Monad m => (a -> m b) -> Fold m a () Source #
drainBy f = lmapM f drain
Drain all input after passing it through a monadic function. This is the dual of mapM_ on stream producers.
Since: 0.7.0
last :: Monad m => Fold m a (Maybe a) Source #
Extract the last element of the input stream, if any.
Since: 0.7.0
sum :: (Monad m, Num a) => Fold m a a Source #
Determine the sum of all elements of a stream of numbers. Returns additive
identity (0
) when the stream is empty. Note that this is not numerically
stable for floating point numbers.
Since: 0.7.0
product :: (Monad m, Num a) => Fold m a a Source #
Determine the product of all elements of a stream of numbers. Returns
multiplicative identity (1
) when the stream is empty.
Since: 0.7.0
maximumBy :: Monad m => (a -> a -> Ordering) -> Fold m a (Maybe a) Source #
Determine the maximum element in a stream using the supplied comparison function.
Since: 0.7.0
minimumBy :: Monad m => (a -> a -> Ordering) -> Fold m a (Maybe a) Source #
Computes the minimum element with respect to the given comparison function
Since: 0.7.0
minimum :: (Monad m, Ord a) => Fold m a (Maybe a) Source #
Determine the minimum element in a stream using the supplied comparison function.
Since: 0.7.0
mean :: (Monad m, Fractional a) => Fold m a a Source #
Compute a numerically stable arithmetic mean of all elements in the input stream.
Since: 0.7.0
variance :: (Monad m, Fractional a) => Fold m a a Source #
Compute a numerically stable (population) variance over all elements in the input stream.
Since: 0.7.0
stdDev :: (Monad m, Floating a) => Fold m a a Source #
Compute a numerically stable (population) standard deviation over all elements in the input stream.
Since: 0.7.0
rollingHash :: (Monad m, Enum a) => Fold m a Int64 Source #
Compute an Int
sized polynomial rolling hash of a stream.
rollingHash = rollingHashWithSalt defaultSalt
Since: 0.7.0
rollingHashWithSalt :: (Monad m, Enum a) => Int64 -> Fold m a Int64 Source #
Compute an Int
sized polynomial rolling hash
H = salt * k ^ n + c1 * k ^ (n - 1) + c2 * k ^ (n - 2) + ... + cn * k ^ 0
Where c1
, c2
, cn
are the elements in the input stream and k
is a
constant.
This hash is often used in Rabin-Karp string search algorithm.
See https://en.wikipedia.org/wiki/Rolling_hash
Since: 0.7.0
rollingHashFirstN :: (Monad m, Enum a) => Int -> Fold m a Int64 Source #
Compute an Int
sized polynomial rolling hash of the first n elements of
a stream.
rollingHashFirstN = ltake n rollingHash
Full Folds (Monoidal)
Full Folds (To Containers)
toList :: Monad m => Fold m a [a] Source #
Folds the input stream to a list.
Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Memory.Array instead.
Since: 0.7.0
toListRevF :: Monad m => Fold m a [a] Source #
Buffers the input stream to a list in the reverse order of the input.
Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Array instead.
Since: 0.7.0
Partial Folds
drainN :: Monad m => Int -> Fold m a () Source #
A fold that drains the first n elements of its input, running the effects and discarding the results.
drainWhile :: Monad m => (a -> Bool) -> Fold m a () Source #
A fold that drains elements of its input as long as the predicate succeeds, running the effects and discarding the results.
index :: Monad m => Int -> Fold m a (Maybe a) Source #
Lookup the element at the given index.
Since: 0.7.0
head :: Monad m => Fold m a (Maybe a) Source #
Extract the first element of the stream, if any.
Since: 0.7.0
find :: Monad m => (a -> Bool) -> Fold m a (Maybe a) Source #
Returns the first element that satisfies the given predicate.
Since: 0.7.0
lookup :: (Eq a, Monad m) => a -> Fold m (a, b) (Maybe b) Source #
In a stream of (key-value) pairs (a, b)
, return the value b
of the
first pair where the key equals the given value a
.
Since: 0.7.0
findIndex :: Monad m => (a -> Bool) -> Fold m a (Maybe Int) Source #
Returns the first index that satisfies the given predicate.
Since: 0.7.0
elemIndex :: (Eq a, Monad m) => a -> Fold m a (Maybe Int) Source #
Returns the first index where a given value is found in the stream.
Since: 0.7.0
elem :: (Eq a, Monad m) => a -> Fold m a Bool Source #
Return True
if the given element is present in the stream.
Since: 0.7.0
notElem :: (Eq a, Monad m) => a -> Fold m a Bool Source #
Returns True
if the given element is not present in the stream.
Since: 0.7.0
any :: Monad m => (a -> Bool) -> Fold m a Bool Source #
any p = lmap p or
| Returns True
if any of the elements of a stream satisfies a predicate.
Since: 0.7.0
Transformations
Covariant Operations
sequence :: Monad m => Fold m a (m b) -> Fold m a b Source #
Flatten the monadic output of a fold to pure output.
Since: 0.7.0
mapM :: Monad m => (b -> m c) -> Fold m a b -> Fold m a c Source #
Map a monadic function on the output of a fold.
Since: 0.7.0
Mapping
lmap :: (a -> b) -> Fold m b r -> Fold m a r Source #
(lmap f fold)
maps the function f
on the input of the fold.
>>>
S.fold (FL.lmap (\x -> x * x) FL.sum) (S.enumerateFromTo 1 100)
338350
Since: 0.7.0
lmapM :: Monad m => (a -> m b) -> Fold m b r -> Fold m a r Source #
(lmapM f fold)
maps the monadic function f
on the input of the fold.
Since: 0.7.0
Filtering
lfilter :: Monad m => (a -> Bool) -> Fold m a r -> Fold m a r Source #
Include only those elements that pass a predicate.
>>>
S.fold (lfilter (> 5) FL.sum) [1..10]
40
Since: 0.7.0
lfilterM :: Monad m => (a -> m Bool) -> Fold m a r -> Fold m a r Source #
Like lfilter
but with a monadic predicate.
Since: 0.7.0
Parsing
Trimming
ltake :: Monad m => Int -> Fold m a b -> Fold m a b Source #
Take first n
elements from the stream and discard the rest.
Since: 0.7.0
ltakeWhile :: Monad m => (a -> Bool) -> Fold m a b -> Fold m a b Source #
Takes elements from the input as long as the predicate succeeds.
Since: 0.7.0
lsessionsOf :: MonadAsync m => Double -> Fold m a b -> Fold m b c -> Fold m a c Source #
Group the input stream into windows of n second each and then fold each group using the provided fold function.
For example, we can copy and distribute a stream to multiple folds where each fold can group the input differently e.g. by one second, one minute and one hour windows respectively and fold each resulting stream of folds.
-----Fold m a b----|-Fold n a c-|-Fold n a c-|-...-|----Fold m a c
lchunksOf :: Monad m => Int -> Fold m a b -> Fold m b c -> Fold m a c Source #
For every n input items, apply the first fold and supply the result to the next fold.
Breaking
splitAt :: Monad m => Int -> Fold m a b -> Fold m a c -> Fold m a (b, c) Source #
splitAt n f1 f2
composes folds f1
and f2
such that first n
elements of its input are consumed by fold f1
and the rest of the stream
is consumed by fold f2
.
let splitAt_ n xs = S.fold (FL.splitAt n FL.toList FL.toList) $ S.fromList xs
>>>
splitAt_ 6 "Hello World!"
> ("Hello ","World!")
>>>
splitAt_ (-1) [1,2,3]
> ([],[1,2,3])
>>>
splitAt_ 0 [1,2,3]
> ([],[1,2,3])
>>>
splitAt_ 1 [1,2,3]
> ([1],[2,3])
>>>
splitAt_ 3 [1,2,3]
> ([1,2,3],[])
>>>
splitAt_ 4 [1,2,3]
> ([1,2,3],[])
Internal
span :: Monad m => (a -> Bool) -> Fold m a b -> Fold m a c -> Fold m a (b, c) Source #
span p f1 f2
composes folds f1
and f2
such that f1
consumes the
input as long as the predicate p
is True
. f2
consumes the rest of the
input.
let span_ p xs = S.fold (S.span p FL.toList FL.toList) $ S.fromList xs
>>>
span_ (< 1) [1,2,3]
> ([],[1,2,3])
>>>
span_ (< 2) [1,2,3]
> ([1],[2,3])
>>>
span_ (< 4) [1,2,3]
> ([1,2,3],[])
Internal
break :: Monad m => (a -> Bool) -> Fold m a b -> Fold m a c -> Fold m a (b, c) Source #
break p = span (not . p)
Break as soon as the predicate becomes True
. break p f1 f2
composes
folds f1
and f2
such that f1
stops consuming input as soon as the
predicate p
becomes True
. The rest of the input is consumed f2
.
This is the binary version of splitBy
.
let break_ p xs = S.fold (S.break p FL.toList FL.toList) $ S.fromList xs
>>>
break_ (< 1) [3,2,1]
> ([3,2,1],[])
>>>
break_ (< 2) [3,2,1]
> ([3,2],[1])
>>>
break_ (< 4) [3,2,1]
> ([],[3,2,1])
Internal
spanBy :: Monad m => (a -> a -> Bool) -> Fold m a b -> Fold m a c -> Fold m a (b, c) Source #
Break the input stream into two groups, the first group takes the input as
long as the predicate applied to the first element of the stream and next
input element holds True
, the second group takes the rest of the input.
Internal
spanByRolling :: Monad m => (a -> a -> Bool) -> Fold m a b -> Fold m a c -> Fold m a (b, c) Source #
Like spanBy
but applies the predicate in a rolling fashion i.e.
predicate is applied to the previous and the next input elements.
Internal
Distributing
tee :: Monad m => Fold m a b -> Fold m a c -> Fold m a (b, c) Source #
Distribute one copy of the stream to each fold and zip the results.
|-------Fold m a b--------| ---stream m a---| |---m (b,c) |-------Fold m a c--------|
>>>
S.fold (FL.tee FL.sum FL.length) (S.enumerateFromTo 1.0 100.0)
(5050.0,100)
Since: 0.7.0
distribute :: Monad m => [Fold m a b] -> Fold m a [b] Source #
Distribute one copy of the stream to each fold and collect the results in a container.
|-------Fold m a b--------| ---stream m a---| |---m [b] |-------Fold m a b--------| | | ...
>>>
S.fold (FL.distribute [FL.sum, FL.length]) (S.enumerateFromTo 1 5)
[15,5]
This is the consumer side dual of the producer side sequence
operation.
Since: 0.7.0
distribute_ :: Monad m => [Fold m a ()] -> Fold m a () Source #
Like distribute
but for folds that return (), this can be more efficient
than distribute
as it does not need to maintain state.
Partitioning
Demultiplexing
demux :: (Monad m, Ord k) => Map k (Fold m a b) -> Fold m (k, a) (Map k b) Source #
Fold a stream of key value pairs using a map of specific folds for each key into a map from keys to the results of fold outputs of the corresponding values.
> let table = Data.Map.fromList [("SUM", FL.sum), ("PRODUCT", FL.product)] input = S.fromList [("SUM",1),("PRODUCT",2),("SUM",3),("PRODUCT",4)] in S.fold (FL.demux table) input fromList [(PRODUCT,8),(SUM,4)]
Since: 0.7.0
demux_ :: (Monad m, Ord k) => Map k (Fold m a ()) -> Fold m (k, a) () Source #
Given a stream of key value pairs and a map from keys to folds, fold the values for each key using the corresponding folds, discarding the outputs.
> let prn = FL.drainBy print > let table = Data.Map.fromList [("ONE", prn), ("TWO", prn)] input = S.fromList [("ONE",1),("TWO",2)] in S.fold (FL.demux_ table) input One 1 Two 2
Since: 0.7.0
demuxDefault_ :: (Monad m, Ord k) => Map k (Fold m a ()) -> Fold m (k, a) () -> Fold m (k, a) () Source #
demuxWithDefault_ :: (Monad m, Ord k) => (a -> (k, a')) -> Map k (Fold m a' b) -> Fold m (k, a') b -> Fold m a () Source #
Classifying
classify :: (Monad m, Ord k) => Fold m a b -> Fold m (k, a) (Map k b) Source #
Given an input stream of key value pairs and a fold for values, fold all the values belonging to each key. Useful for map/reduce, bucketizing the input in different bins or for generating histograms.
> let input = S.fromList [("ONE",1),("ONE",1.1),("TWO",2), ("TWO",2.2)] in S.fold (FL.classify FL.toList) input fromList [("ONE",[1.1,1.0]),("TWO",[2.2,2.0])]
Since: 0.7.0
Unzipping
unzip :: Monad m => Fold m a x -> Fold m b y -> Fold m (a, b) (x, y) Source #
Send the elements of tuples in a stream of tuples through two different folds.
|-------Fold m a x--------| ---------stream of (a,b)--| |----m (x,y) |-------Fold m b y--------|
This is the consumer side dual of the producer side zip
operation.
Since: 0.7.0
Nested Folds
foldChunks :: Fold m a b -> Fold m b c -> Fold m a c Source #
Apply a terminating fold repeatedly to the input of another fold.
Compare with: Streamly.Prelude.concatMap, Streamly.Prelude.foldChunks
Unimplemented
duplicate :: Applicative m => Fold m a b -> Fold m a (Fold m a b) Source #
Modify the fold such that when the fold is done, instead of returning the accumulator, it returns a fold. The returned fold starts from where we left i.e. it uses the last accumulator value as the initial value of the accumulator. Thus we can resume the fold later and feed it more input.
> do more <- S.fold (FL.duplicate FL.sum) (S.enumerateFromTo 1 10) evenMore <- S.fold (FL.duplicate more) (S.enumerateFromTo 11 20) S.fold evenMore (S.enumerateFromTo 21 30) 465
Since: 0.7.0
Running Folds
initialize :: Monad m => Fold m a b -> m (Fold m a b) Source #
Run the initialization effect of a fold. The returned fold would use the value returned by this effect as its initial value.
runStep :: Monad m => Fold m a b -> a -> m (Fold m a b) Source #
Run one step of a fold and store the accumulator as an initial value in the returned fold.
Folding to SVar
toParallelSVar :: MonadIO m => SVar t m a -> Maybe WorkerInfo -> Fold m a () Source #
toParallelSVarLimited :: MonadIO m => SVar t m a -> Maybe WorkerInfo -> Fold m a () Source #