| Copyright | (c) 2022 Composewell Technologies |
|---|---|
| License | BSD-3-Clause |
| Maintainer | streamly@composewell.com |
| Stability | experimental |
| Portability | GHC |
| Safe Haskell | Safe-Inferred |
| Language | Haskell2010 |
Streamly.Internal.Data.Fold.Prelude
Description
Synopsis
- takeInterval :: MonadAsync m => Double -> Fold m a b -> Fold m a b
- intervalsOf :: MonadAsync m => Double -> Fold m a b -> Fold m b c -> Fold m a c
- data Channel m a b = Channel {
- outputQueue :: IORef ([ChildEvent a], Int)
- maxBufferLimit :: Limit
- outputDoorBell :: MVar ()
- readOutputQ :: m [ChildEvent a]
- outputQueueFromConsumer :: IORef ([ChildEvent b], Int)
- outputDoorBellFromConsumer :: MVar ()
- bufferSpaceDoorBell :: MVar ()
- svarRef :: Maybe (IORef ())
- svarStats :: SVarStats
- svarInspectMode :: Bool
- svarCreator :: ThreadId
- newChannel :: MonadRunInIO m => (Config -> Config) -> Fold m a b -> m (Channel m a b)
- data Config
- sendToWorker :: MonadAsync m => Channel m a b -> a -> m (Maybe b)
- checkFoldStatus :: MonadAsync m => Channel m a b -> m (Maybe b)
- dumpSVar :: Channel m a b -> IO String
- maxBuffer :: Int -> Config -> Config
- boundThreads :: Bool -> Config -> Config
- inspect :: Bool -> Config -> Config
- parEval :: MonadAsync m => (Config -> Config) -> Fold m a b -> Fold m a b
- write :: MonadIO m => SVar t m a -> Maybe WorkerInfo -> Fold m a ()
- writeLimited :: MonadIO m => SVar t m a -> Maybe WorkerInfo -> Fold m a ()
Trimming
takeInterval :: MonadAsync m => Double -> Fold m a b -> Fold m a b Source #
takeInterval n fold uses fold to fold the input items arriving within
a window of first n seconds.
>>>input = Stream.delay 0.2 $ Stream.fromList [1..10]>>>Stream.fold (Fold.takeInterval 1.0 Fold.toList) input[1,2,3,4,5,6]
>>>f = Fold.takeInterval 0.5 Fold.toList>>>Stream.fold Fold.toList $ Stream.foldMany f input[[1,2,3,4],[5,6,7],[8,9,10]]
Stops when fold stops or when the timeout occurs. Note that the fold needs
an input after the timeout to stop. For example, if no input is pushed to
the fold until one hour after the timeout had occurred, then the fold will
be done only after consuming that input.
Pre-release
Splitting
intervalsOf :: MonadAsync m => Double -> Fold m a b -> Fold m b c -> Fold m a c Source #
Group the input stream into windows of n second each using the first fold and then fold the resulting groups using the second fold.
>>>intervals = Fold.intervalsOf 0.5 Fold.toList Fold.toList>>>Stream.fold intervals $ Stream.delay 0.2 $ Stream.fromList [1..10][[1,2,3,4],[5,6,7],[8,9,10]]
intervalsOf n split = many (takeInterval n split)
Pre-release
Constructors
| Channel | |
Fields
| |
newChannel :: MonadRunInIO m => (Config -> Config) -> Fold m a b -> m (Channel m a b) Source #
An abstract type for specifying the configuration parameters of a
Channel. Use Config -> Config modifier functions to modify the default
configuration. See the individual modifier documentation for default values.
sendToWorker :: MonadAsync m => Channel m a b -> a -> m (Maybe b) Source #
Push values from a driver to a fold worker via a Channel. Before pushing a
value to the Channel it polls for events received from the fold worker. If a
stop event is received then it returns True otherwise false. Propagates
exceptions received from the fold wroker.
checkFoldStatus :: MonadAsync m => Channel m a b -> m (Maybe b) Source #
Configuration
maxBuffer :: Int -> Config -> Config Source #
Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.
CAUTION! using an unbounded maxBuffer value (i.e. a negative value)
coupled with an unbounded maxThreads value is a recipe for disaster in
presence of infinite streams, or very large streams. Especially, it must
not be used when pure is used in ZipAsyncM streams as pure in
applicative zip streams generates an infinite stream causing unbounded
concurrent generation with no limit on the buffer or threads.
boundThreads :: Bool -> Config -> Config Source #
Spawn bound threads (i.e., spawn threads using forkOS instead of
forkIO). The default value is False.
Currently, this only takes effect only for concurrent folds.
inspect :: Bool -> Config -> Config Source #
Print debug information about the Channel when the stream ends.
Fold operations
parEval :: MonadAsync m => (Config -> Config) -> Fold m a b -> Fold m a b Source #
Evaluate a fold asynchronously using a concurrent channel. The driver just
queues the input stream values to the fold channel buffer and returns. The
fold evaluates the queued values asynchronously. On finalization, parEval
waits for the asynchronous fold to complete before it returns.
write :: MonadIO m => SVar t m a -> Maybe WorkerInfo -> Fold m a () Source #
A fold to write a stream to an SVar. Unlike toSVar this does not allow
for concurrent evaluation of the stream, as the fold receives the input one
element at a time, it just forwards the elements to the SVar. However, we
can safely execute the fold in an independent thread, the SVar can act as a
buffer decoupling the sender from the receiver. Also, we can have multiple
folds running concurrently pusing the streams to the SVar.
writeLimited :: MonadIO m => SVar t m a -> Maybe WorkerInfo -> Fold m a () Source #
Like write, but applies a yield limit.