Copyright | (c) 2017 Composewell Technologies |
---|---|
License | BSD3 |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
Synopsis
- fromSVar :: (MonadAsync m, IsStream t) => SVar Stream m a -> t m a
- fromSVarD :: MonadAsync m => SVar t m a -> Stream m a
- toSVar :: (IsStream t, MonadAsync m) => SVar Stream m a -> t m a -> m ()
- toSVarParallel :: MonadAsync m => State t m a -> SVar t m a -> Stream m a -> m ()
- fromConsumer :: MonadAsync m => SVar Stream m a -> m Bool
- pushToFold :: MonadAsync m => SVar Stream m a -> a -> m Bool
- teeToSVar :: (IsStream t, MonadAsync m) => SVar Stream m a -> t m a -> t m a
- newFoldSVar :: (IsStream t, MonadAsync m) => State Stream m a -> (t m a -> m b) -> m (SVar Stream m a)
- newFoldSVarF :: MonadAsync m => State t m a -> Fold m a b -> m (SVar t m a)
Unfold streams from SVar
Usually the SVar is used to concurrently evaluate multiple actions in a stream using many worker threads that push the results to the SVar and a single puller that pulls them from SVar generating the evaluated stream.
input stream | <-----------------|<--------worker | exceptions | output stream <------SVar<------worker | |<--------worker
The puller itself schedules the worker threads based on demand. Exceptions are propagated from the worker threads to the puller.
fromSVar :: (MonadAsync m, IsStream t) => SVar Stream m a -> t m a Source #
Generate a stream from an SVar. An unevaluated stream can be pushed to an
SVar using toSVar
. As we pull a stream from the SVar the input stream
gets evaluated concurrently. The evaluation depends on the SVar style and
the configuration parameters e.g. using the maxBuffer/maxThreads
combinators.
fromSVarD :: MonadAsync m => SVar t m a -> Stream m a Source #
Like fromSVar
but generates a StreamD style stream instead of CPS.
Fold streams to SVar
toSVarParallel :: MonadAsync m => State t m a -> SVar t m a -> Stream m a -> m () Source #
Concurrent folds
To run folds concurrently, we need to decouple the fold execution from the stream production. We use the SVar to do that, we have a single worker pushing the stream elements to the SVar and on the consumer side a fold driver pulls the values and folds them.
Fold worker <------SVar<------input stream | exceptions | --------------->
We need a channel for pushing exceptions from the fold worker to the stream pusher. The stream may be pushed to multiple folds at the same time. For that we need one SVar per fold:
Fold worker <------SVar<--- | | Fold worker <------SVar<------input stream | | Fold worker <------SVar<---
Unlike in case concurrent stream evaluation, the puller does not drive the scheduling and concurrent execution of the stream. The stream is simply pushed by the stream producer at its own rate. The fold worker just pulls it and folds it.
Note: If the stream pusher terminates due to an exception, we do not actively terminate the fold. It gets cleaned up by the GC.
fromConsumer :: MonadAsync m => SVar Stream m a -> m Bool Source #
pushToFold :: MonadAsync m => SVar Stream m a -> a -> m Bool Source #
Push values from a stream to a fold worker via an SVar. Before pushing a
value to the SVar it polls for events received from the fold consumer. If a
stop event is received then it returns True
otherwise false. Propagates
exceptions received from the fold consumer.
teeToSVar :: (IsStream t, MonadAsync m) => SVar Stream m a -> t m a -> t m a Source #
Tap a stream and send the elements to the specified SVar in addition to yielding them again. The SVar runs a fold consumer. Elements are tapped and sent to the SVar until the fold finishes. Any exceptions from the fold evaluation are propagated in the current thread.
------input stream---------output stream-----> /|\ | exceptions | | input | \|/ ----SVar | Fold
newFoldSVar :: (IsStream t, MonadAsync m) => State Stream m a -> (t m a -> m b) -> m (SVar Stream m a) Source #
Create a Fold style SVar that runs a supplied fold function as the consumer. Any elements sent to the SVar are consumed by the supplied fold function.
newFoldSVarF :: MonadAsync m => State t m a -> Fold m a b -> m (SVar t m a) Source #
Like newFoldSVar
except that it uses a Fold
instead of a fold
function.