| Copyright | (c) 2021 Composewell Technologies | 
|---|---|
| License | BSD-3-Clause | 
| Maintainer | streamly@composewell.com | 
| Stability | released | 
| Portability | GHC | 
| Safe Haskell | Safe-Inferred | 
| Language | Haskell2010 | 
Streamly.Data.Fold.Prelude
Description
All Fold related combinators including the streamly-core Streamly.Data.Fold module, concurrency, unordered container operations.
Synopsis
Streamly.Data.Fold
All Streamly.Data.Fold combinators are re-exported via this module. For more pre-release combinators also see Streamly.Internal.Data.Fold module.
Concurrent Operations
Configuration
An abstract type for specifying the configuration parameters of a
 Channel. Use Config -> Config modifier functions to modify the default
 configuration. See the individual modifier documentation for default values.
maxBuffer :: Int -> Config -> Config Source #
Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.
CAUTION! using an unbounded maxBuffer value (i.e. a negative value)
 coupled with an unbounded maxThreads value is a recipe for disaster in
 presence of infinite streams, or very large streams.  Especially, it must
 not be used when pure is used in ZipAsyncM streams as pure in
 applicative zip streams generates an infinite stream causing unbounded
 concurrent generation with no limit on the buffer or threads.
boundThreads :: Bool -> Config -> Config Source #
Spawn bound threads (i.e., spawn threads using forkOS instead of
 forkIO). The default value is False.
Currently, this only takes effect only for concurrent folds.
inspect :: Bool -> Config -> Config Source #
Print debug information about the Channel when the stream ends.
Combinators
parEval :: MonadAsync m => (Config -> Config) -> Fold m a b -> Fold m a b Source #
Evaluate a fold asynchronously using a concurrent channel. The driver just
 queues the input stream values to the fold channel buffer and returns. The
 fold evaluates the queued values asynchronously. On finalization, parEval
 waits for the asynchronous fold to complete before it returns.
Container Related
toHashMapIO :: (MonadIO m, Hashable k, Ord k) => (a -> k) -> Fold m a b -> Fold m a (HashMap k b) Source #
Split the input stream based on a hashable component of the key field and fold each split using the given fold. Useful for map/reduce, bucketizing the input in different bins or for generating histograms.
>>>import Data.HashMap.Strict (HashMap, fromList)>>>import qualified Streamly.Data.Fold.Prelude as Fold>>>import qualified Streamly.Data.Stream as Stream
Consider a stream of key value pairs:
>>>input = Stream.fromList [("k1",1),("k1",1.1),("k2",2), ("k2",2.2)]
Classify each key to a different hash bin and fold the bins:
>>>classify = Fold.toHashMapIO fst (Fold.lmap snd Fold.toList)>>>Stream.fold classify input :: IO (HashMap String [Double])fromList [("k2",[2.0,2.2]),("k1",[1.0,1.1])]
Pre-release