| Copyright | (c) 2022 Composewell Technologies | 
|---|---|
| License | BSD-3-Clause | 
| Maintainer | streamly@composewell.com | 
| Stability | experimental | 
| Portability | GHC | 
| Safe Haskell | Safe-Inferred | 
| Language | Haskell2010 | 
Streamly.Internal.Data.Stream.Prelude
Description
Synopsis
- data Channel m a = Channel {- svarMrun :: RunInIO m
- outputQueue :: IORef ([ChildEvent a], Int)
- outputDoorBell :: MVar ()
- readOutputQ :: m [ChildEvent a]
- postProcess :: m Bool
- maxWorkerLimit :: Limit
- maxBufferLimit :: Limit
- remainingWork :: Maybe (IORef Count)
- yieldRateInfo :: Maybe YieldRateInfo
- enqueue :: Bool -> (RunInIO m, StreamK m a) -> IO ()
- eagerDispatch :: m ()
- isWorkDone :: IO Bool
- isQueueDone :: IO Bool
- doorBellOnWorkQ :: IORef Bool
- workLoop :: Maybe WorkerInfo -> m ()
- workerThreads :: IORef (Set ThreadId)
- workerCount :: IORef Int
- accountThread :: ThreadId -> m ()
- workerStopMVar :: MVar ()
- svarRef :: Maybe (IORef ())
- svarStats :: SVarStats
- svarInspectMode :: Bool
- svarCreator :: ThreadId
 
- yield :: Channel m a -> Maybe WorkerInfo -> a -> IO Bool
- stop :: Channel m a -> Maybe WorkerInfo -> IO ()
- stopChannel :: MonadIO m => Channel m a -> m ()
- dumpSVar :: Channel m a -> IO String
- toChannel :: MonadRunInIO m => Channel m a -> Stream m a -> m ()
- toChannelK :: MonadRunInIO m => Channel m a -> StreamK m a -> m ()
- fromChannel :: MonadAsync m => Channel m a -> Stream m a
- fromChannelK :: MonadAsync m => Channel m a -> StreamK m a
- newAppendChannel :: MonadRunInIO m => (Config -> Config) -> m (Channel m a)
- newInterleaveChannel :: MonadAsync m => (Config -> Config) -> m (Channel m a)
- pushWorker :: MonadRunInIO m => Count -> Channel m a -> m ()
- dispatchWorker :: MonadRunInIO m => Count -> Channel m a -> m Bool
- dispatchWorkerPaced :: MonadRunInIO m => Channel m a -> m Bool
- sendWorkerWait :: MonadIO m => Bool -> (Channel m a -> IO ()) -> (Channel m a -> m Bool) -> Channel m a -> m ()
- startChannel :: MonadRunInIO m => Channel m a -> m ()
- sendWorkerDelay :: Channel m a -> IO ()
- sendWorkerDelayPaced :: Channel m a -> IO ()
- readOutputQPaced :: MonadRunInIO m => Channel m a -> m [ChildEvent a]
- readOutputQBounded :: MonadRunInIO m => Bool -> Channel m a -> m [ChildEvent a]
- postProcessPaced :: MonadRunInIO m => Channel m a -> m Bool
- postProcessBounded :: MonadRunInIO m => Channel m a -> m Bool
- data Channel m a = Channel {- svarMrun :: RunInIO m
- outputQueue :: IORef ([ChildEvent a], Int)
- outputDoorBell :: MVar ()
- readOutputQ :: m [ChildEvent a]
- postProcess :: m Bool
- maxWorkerLimit :: Limit
- maxBufferLimit :: Limit
- remainingWork :: Maybe (IORef Count)
- yieldRateInfo :: Maybe YieldRateInfo
- enqueue :: Bool -> (RunInIO m, StreamK m a) -> IO ()
- eagerDispatch :: m ()
- isWorkDone :: IO Bool
- isQueueDone :: IO Bool
- doorBellOnWorkQ :: IORef Bool
- workLoop :: Maybe WorkerInfo -> m ()
- workerThreads :: IORef (Set ThreadId)
- workerCount :: IORef Int
- accountThread :: ThreadId -> m ()
- workerStopMVar :: MVar ()
- svarRef :: Maybe (IORef ())
- svarStats :: SVarStats
- svarInspectMode :: Bool
- svarCreator :: ThreadId
 
- newChannel :: MonadAsync m => (Config -> Config) -> m (Channel m a)
- withChannel :: MonadAsync m => (Config -> Config) -> Stream m a -> (Channel m b -> Stream m a -> Stream m b) -> Stream m b
- withChannelK :: MonadAsync m => (Config -> Config) -> StreamK m a -> (Channel m b -> StreamK m a -> StreamK m b) -> StreamK m b
- data Config
- defaultConfig :: Config
- maxThreads :: Int -> Config -> Config
- maxBuffer :: Int -> Config -> Config
- data Rate = Rate {}
- rate :: Maybe Rate -> Config -> Config
- avgRate :: Double -> Config -> Config
- minRate :: Double -> Config -> Config
- maxRate :: Double -> Config -> Config
- constRate :: Double -> Config -> Config
- data StopWhen
- stopWhen :: StopWhen -> Config -> Config
- getStopWhen :: Config -> StopWhen
- eager :: Bool -> Config -> Config
- ordered :: Bool -> Config -> Config
- interleaved :: Bool -> Config -> Config
- inspect :: Bool -> Config -> Config
- type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m)
- parEval :: MonadAsync m => (Config -> Config) -> Stream m a -> Stream m a
- parRepeatM :: MonadAsync m => (Config -> Config) -> m a -> Stream m a
- parReplicateM :: MonadAsync m => (Config -> Config) -> Int -> m a -> Stream m a
- parMapM :: MonadAsync m => (Config -> Config) -> (a -> m b) -> Stream m a -> Stream m b
- parSequence :: MonadAsync m => (Config -> Config) -> Stream m (m a) -> Stream m a
- parTwo :: MonadAsync m => (Config -> Config) -> Stream m a -> Stream m a -> Stream m a
- parZipWithM :: MonadAsync m => (Config -> Config) -> (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
- parZipWith :: MonadAsync m => (Config -> Config) -> (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
- parMergeByM :: MonadAsync m => (Config -> Config) -> (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
- parMergeBy :: MonadAsync m => (Config -> Config) -> (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
- parListLazy :: MonadAsync m => [Stream m a] -> Stream m a
- parListOrdered :: MonadAsync m => [Stream m a] -> Stream m a
- parListInterleaved :: MonadAsync m => [Stream m a] -> Stream m a
- parListEager :: MonadAsync m => [Stream m a] -> Stream m a
- parListEagerFst :: MonadAsync m => [Stream m a] -> Stream m a
- parListEagerMin :: MonadAsync m => [Stream m a] -> Stream m a
- parList :: MonadAsync m => (Config -> Config) -> [Stream m a] -> Stream m a
- parApply :: MonadAsync m => (Config -> Config) -> Stream m (a -> b) -> Stream m a -> Stream m b
- parConcat :: MonadAsync m => (Config -> Config) -> Stream m (Stream m a) -> Stream m a
- parConcatMap :: MonadAsync m => (Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b
- parConcatIterate :: MonadAsync m => (Config -> Config) -> (a -> Stream m a) -> Stream m a -> Stream m a
- fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> Stream m a
- parTapCount :: MonadAsync m => (a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a
- tapCount :: MonadAsync m => (a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a
- periodic :: MonadIO m => m a -> Double -> Stream m a
- ticks :: MonadIO m => Double -> Stream m ()
- ticksRate :: MonadAsync m => Rate -> Stream m ()
- interject :: MonadAsync m => m a -> Double -> Stream m a -> Stream m a
- takeInterval :: MonadAsync m => Double -> Stream m a -> Stream m a
- takeLastInterval :: Double -> Stream m a -> Stream m a
- dropInterval :: MonadAsync m => Double -> Stream m a -> Stream m a
- dropLastInterval :: Int -> Stream m a -> Stream m a
- intervalsOf :: MonadAsync m => Double -> Fold m a b -> Stream m a -> Stream m b
- groupsOfTimeout :: MonadAsync m => Int -> Double -> Fold m a b -> Stream m a -> Stream m b
- sampleIntervalEnd :: MonadAsync m => Double -> Stream m a -> Stream m a
- sampleIntervalStart :: MonadAsync m => Double -> Stream m a -> Stream m a
- sampleBurst :: MonadAsync m => Bool -> Double -> Stream m a -> Stream m a
- sampleBurstEnd :: MonadAsync m => Double -> Stream m a -> Stream m a
- sampleBurstStart :: MonadAsync m => Double -> Stream m a -> Stream m a
- classifySessionsByGeneric :: forall m f a b. (MonadAsync m, IsMap f) => Proxy (f :: Type -> Type) -> Double -> Bool -> (Int -> m Bool) -> Double -> Fold m a b -> Stream m (AbsTime, (Key f, a)) -> Stream m (Key f, b)
- classifySessionsBy :: (MonadAsync m, Ord k) => Double -> Bool -> (Int -> m Bool) -> Double -> Fold m a b -> Stream m (AbsTime, (k, a)) -> Stream m (k, b)
- classifySessionsOf :: (MonadAsync m, Ord k) => (Int -> m Bool) -> Double -> Fold m a b -> Stream m (AbsTime, (k, a)) -> Stream m (k, b)
- classifyKeepAliveSessions :: (MonadAsync m, Ord k) => (Int -> m Bool) -> Double -> Fold m a b -> Stream m (AbsTime, (k, a)) -> Stream m (k, b)
- bufferLatest :: Stream m a -> Stream m (Maybe a)
- bufferLatestN :: Int -> Stream m a -> Stream m a
- bufferOldestN :: Int -> Stream m a -> Stream m a
- after :: (MonadIO m, MonadBaseControl IO m) => m b -> Stream m a -> Stream m a
- bracket :: (MonadAsync m, MonadCatch m) => m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
- bracket3 :: (MonadAsync m, MonadCatch m) => m b -> (b -> m c) -> (b -> m d) -> (b -> m e) -> (b -> Stream m a) -> Stream m a
- finally :: (MonadAsync m, MonadCatch m) => m b -> Stream m a -> Stream m a
- retry :: (MonadCatch m, Exception e, Ord e) => Map e Int -> (e -> Stream m a) -> Stream m a -> Stream m a
- afterD :: MonadRunInIO m => m b -> Stream m a -> Stream m a
- bracket3D :: (MonadAsync m, MonadCatch m) => m b -> (b -> m c) -> (b -> m d) -> (b -> m e) -> (b -> Stream m a) -> Stream m a
- retryD :: forall e m a. (Exception e, Ord e, MonadCatch m) => Map e Int -> (e -> Stream m a) -> Stream m a -> Stream m a
Imports
Imports for example snippets in this module.
>>>:m>>>{-# LANGUAGE FlexibleContexts #-}>>>import Control.Concurrent (threadDelay)>>>import qualified Streamly.Data.Array as Array>>>import qualified Streamly.Data.Fold as Fold>>>import qualified Streamly.Data.Parser as Parser>>>import qualified Streamly.Data.StreamK as StreamK>>>import qualified Streamly.Internal.Data.Stream as Stream hiding (append2)>>>import qualified Streamly.Internal.Data.Stream.Concurrent as Stream>>>import Prelude hiding (concatMap, concat, zipWith)>>>:{delay n = do threadDelay (n * 1000000) -- sleep for n seconds putStrLn (show n ++ " sec") -- print "n sec" return n -- IO Int :}
A mutable channel to evaluate multiple streams concurrently and provide the combined results as output stream.
Constructors
| Channel | |
| Fields 
 | |
stopChannel :: MonadIO m => Channel m a -> m () Source #
Stop the channel. Kill all running worker threads.
toChannel :: MonadRunInIO m => Channel m a -> Stream m a -> m () Source #
Send a stream to a given channel for concurrent evaluation.
toChannelK :: MonadRunInIO m => Channel m a -> StreamK m a -> m () Source #
Write a stream to an SVar in a non-blocking manner. The stream can then
 be read back from the SVar using fromSVar.
fromChannel :: MonadAsync m => Channel m a -> Stream m a Source #
Generate a stream of results from concurrent evaluations from a channel. Evaluation of the channel does not start until this API is called. This API must not be called more than once on a channel. It kicks off evaluation of the channel by dispatching concurrent workers and ensures that as long there is work queued on the channel workers are dispatched proportional to the demand by the consumer.
fromChannelK :: MonadAsync m => Channel m a -> StreamK m a Source #
newAppendChannel :: MonadRunInIO m => (Config -> Config) -> m (Channel m a) Source #
Create a new async style concurrent stream evaluation channel. The monad state used to run the stream actions is taken from the call site of newAppendChannel.
newInterleaveChannel :: MonadAsync m => (Config -> Config) -> m (Channel m a) Source #
Create a new async style concurrent stream evaluation channel. The monad state used to run the stream actions is taken from the call site of newInterleaveChannel.
Dispatching
pushWorker :: MonadRunInIO m => Count -> Channel m a -> m () Source #
dispatchWorker :: MonadRunInIO m => Count -> Channel m a -> m Bool Source #
dispatchWorkerPaced :: MonadRunInIO m => Channel m a -> m Bool Source #
sendWorkerWait :: MonadIO m => Bool -> (Channel m a -> IO ()) -> (Channel m a -> m Bool) -> Channel m a -> m () Source #
startChannel :: MonadRunInIO m => Channel m a -> m () Source #
Start the evaluation of the channel's work queue by kicking off a worker. Note: Work queue must not be empty otherwise the worker will exit without doing anything.
sendWorkerDelay :: Channel m a -> IO () Source #
sendWorkerDelayPaced :: Channel m a -> IO () Source #
Read Output
readOutputQPaced :: MonadRunInIO m => Channel m a -> m [ChildEvent a] Source #
readOutputQBounded :: MonadRunInIO m => Bool -> Channel m a -> m [ChildEvent a] Source #
Postprocess Hook After Reading
postProcessPaced :: MonadRunInIO m => Channel m a -> m Bool Source #
postProcessBounded :: MonadRunInIO m => Channel m a -> m Bool Source #
Channel
A mutable channel to evaluate multiple streams concurrently and provide the combined results as output stream.
Constructors
| Channel | |
| Fields 
 | |
newChannel :: MonadAsync m => (Config -> Config) -> m (Channel m a) Source #
Create a new concurrent stream evaluation channel. The monad state used to run the stream actions is captured from the call site of newChannel.
withChannel :: MonadAsync m => (Config -> Config) -> Stream m a -> (Channel m b -> Stream m a -> Stream m b) -> Stream m b Source #
Allocate a channel and evaluate the stream using the channel and the supplied evaluator function. The evaluator is run in a worker thread.
withChannelK :: MonadAsync m => (Config -> Config) -> StreamK m a -> (Channel m b -> StreamK m a -> StreamK m b) -> StreamK m b Source #
Allocate a channel and evaluate the stream using the channel and the supplied evaluator function. The evaluator is run in a worker thread.
Configuration
An abstract type for specifying the configuration parameters of a
 Channel. Use Config -> Config modifier functions to modify the default
 configuration. See the individual modifier documentation for default values.
defaultConfig :: Config Source #
The fields prefixed by an _ are not to be accessed or updated directly but via smart accessor APIs. Use get/set routines instead of directly accessing the Config fields
Limits
maxThreads :: Int -> Config -> Config Source #
Specify the maximum number of threads that can be spawned by the channel. A value of 0 resets the thread limit to default, a negative value means there is no limit. The default value is 1500.
When the actions in a stream are IO bound, having blocking IO calls, this option can be used to control the maximum number of in-flight IO requests. When the actions are CPU bound this option can be used to control the amount of CPU used by the stream.
maxBuffer :: Int -> Config -> Config Source #
Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.
CAUTION! using an unbounded maxBuffer value (i.e. a negative value)
 coupled with an unbounded maxThreads value is a recipe for disaster in
 presence of infinite streams, or very large streams.  Especially, it must
 not be used when pure is used in ZipAsyncM streams as pure in
 applicative zip streams generates an infinite stream causing unbounded
 concurrent generation with no limit on the buffer or threads.
Rate Control
Specifies the stream yield rate in yields per second (Hertz).
 We keep accumulating yield credits at rateGoal. At any point of time we
 allow only as many yields as we have accumulated as per rateGoal since the
 start of time. If the consumer or the producer is slower or faster, the
 actual rate may fall behind or exceed rateGoal.  We try to recover the gap
 between the two by increasing or decreasing the pull rate from the producer.
 However, if the gap becomes more than rateBuffer we try to recover only as
 much as rateBuffer.
rateLow puts a bound on how low the instantaneous rate can go when
 recovering the rate gap.  In other words, it determines the maximum yield
 latency.  Similarly, rateHigh puts a bound on how high the instantaneous
 rate can go when recovering the rate gap.  In other words, it determines the
 minimum yield latency. We reduce the latency by increasing concurrency,
 therefore we can say that it puts an upper bound on concurrency.
If the rateGoal is 0 or negative the stream never yields a value.
 If the rateBuffer is 0 or negative we do not attempt to recover.
rate :: Maybe Rate -> Config -> Config Source #
Specify the stream evaluation rate of a channel.
A Nothing value means there is no smart rate control, concurrent execution
 blocks only if maxThreads or maxBuffer is reached, or there are no more
 concurrent tasks to execute. This is the default.
When rate (throughput) is specified, concurrent production may be ramped
 up or down automatically to achieve the specified stream throughput. The
 specific behavior for different styles of Rate specifications is
 documented under Rate.  The effective maximum production rate achieved by
 a channel is governed by:
- The maxThreadslimit
- The maxBufferlimit
- The maximum rate that the stream producer can achieve
- The maximum rate that the stream consumer can achieve
Maximum production rate is given by:
\(rate = \frac{maxThreads}{latency}\)
If we know the average latency of the tasks we can set maxThreads
 accordingly.
avgRate :: Double -> Config -> Config Source #
Same as rate (Just $ Rate (r/2) r (2*r) maxBound)
Specifies the average production rate of a stream in number of yields
 per second (i.e.  Hertz).  Concurrent production is ramped up or down
 automatically to achieve the specified average yield rate. The rate can
 go down to half of the specified rate on the lower side and double of
 the specified rate on the higher side.
minRate :: Double -> Config -> Config Source #
Same as rate (Just $ Rate r r (2*r) maxBound)
Specifies the minimum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go below the specified rate, even though it may possibly go above it at times, the upper limit is double of the specified rate.
maxRate :: Double -> Config -> Config Source #
Same as rate (Just $ Rate (r/2) r r maxBound)
Specifies the maximum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go above the specified rate, even though it may possibly go below it at times, the lower limit is half of the specified rate. This can be useful in applications where certain resource usage must not be allowed to go beyond certain limits.
constRate :: Double -> Config -> Config Source #
Same as rate (Just $ Rate r r r 0)
Specifies a constant yield rate. If for some reason the actual rate goes above or below the specified rate we do not try to recover it by increasing or decreasing the rate in future. This can be useful in applications like graphics frame refresh where we need to maintain a constant refresh rate.
Stop behavior
Specify when the Channel should stop.
Constructors
| FirstStops | Stop when the first stream ends. | 
| AllStop | Stop when all the streams end. | 
| AnyStops | Stop when any one stream ends. | 
getStopWhen :: Config -> StopWhen Source #
Scheduling behavior
eager :: Bool -> Config -> Config Source #
By default, processing of output from the worker threads is given priority
 over dispatching new workers. More workers are dispatched only when there is
 no output to process. When eager is set to True, workers are dispatched
 aggresively as long as there is more work to do irrespective of whether
 there is output pending to be processed by the stream consumer. However,
 dispatching may stop if maxThreads or maxBuffer is reached.
Note: This option has no effect when rate has been specified.
Note: Not supported with interleaved.
ordered :: Bool -> Config -> Config Source #
When enabled the streams may be evaluated cocnurrently but the results are produced in the same sequence as a serial evaluation would produce.
Note: Not supported with interleaved.
interleaved :: Bool -> Config -> Config Source #
Interleave the streams fairly instead of prioritizing the left stream. This schedules all streams in a round robin fashion over limited number of threads.
Note: Can only be used on finite number of streams.
Note: Not supported with ordered.
Diagnostics
inspect :: Bool -> Config -> Config Source #
Print debug information about the Channel when the stream ends.
Types
type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m) Source #
A monad that can perform concurrent or parallel IO operations. Streams
 that can be composed concurrently require the underlying monad to be
 MonadAsync.
Combinators
Stream combinators using a concurrent channel
Evaluate
Evaluates a stream concurrently using a channel.
parEval :: MonadAsync m => (Config -> Config) -> Stream m a -> Stream m a Source #
parEval evaluates a stream as a whole asynchronously with respect to
 the consumer of the stream. A worker thread evaluates multiple elements of
 the stream ahead of time and buffers the results; the consumer of the stream
 runs in another thread consuming the elements from the buffer, thus
 decoupling the production and consumption of the stream. parEval can be
 used to run different stages of a pipeline concurrently.
It is important to note that parEval does not evaluate individual actions
 in the stream concurrently with respect to each other, it merely evaluates
 the stream serially but in a different thread than the consumer thread,
 thus the consumer and producer can run concurrently. See parMapM and
 parSequence to evaluate actions in the stream concurrently.
The evaluation requires only one thread as only one stream needs to be
 evaluated. Therefore, the concurrency options that are relevant to multiple
 streams do not apply here e.g. maxThreads, eager, interleaved, ordered,
 stopWhen options do not have any effect on parEval.
Useful idioms:
>>>parUnfoldrM step = Stream.parEval id . Stream.unfoldrM step>>>parIterateM step = Stream.parEval id . Stream.iterateM step
Generate
Uses a single channel to evaluate all actions.
parRepeatM :: MonadAsync m => (Config -> Config) -> m a -> Stream m a Source #
Definition:
>>>parRepeatM cfg = Stream.parSequence cfg . Stream.repeat
Generate a stream by repeatedly executing a monadic action forever.
parReplicateM :: MonadAsync m => (Config -> Config) -> Int -> m a -> Stream m a Source #
Generate a stream by concurrently performing a monadic action n times.
Definition:
>>>parReplicateM cfg n = Stream.parSequence cfg . Stream.replicate n
Example, parReplicateM in the following example executes all the
 replicated actions concurrently, thus taking only 1 second:
>>>Stream.fold Fold.drain $ Stream.parReplicateM id 10 $ delay 1...
Map
Uses a single channel to evaluate all actions.
parMapM :: MonadAsync m => (Config -> Config) -> (a -> m b) -> Stream m a -> Stream m b Source #
Definition:
>>>parMapM modifier f = Stream.parConcatMap modifier (Stream.fromEffect . f)
For example, the following finishes in 3 seconds (as opposed to 6 seconds) because all actions run in parallel. Even though results are available out of order they are ordered due to the config option:
>>>f x = delay x >> return x>>>Stream.fold Fold.toList $ Stream.parMapM (Stream.ordered True) f $ Stream.fromList [3,2,1]1 sec 2 sec 3 sec [3,2,1]
parSequence :: MonadAsync m => (Config -> Config) -> Stream m (m a) -> Stream m a Source #
Definition:
>>>parSequence modifier = Stream.parMapM modifier id
Useful idioms:
>>>parFromListM = Stream.parSequence id . Stream.fromList>>>parFromFoldableM = Stream.parSequence id . StreamK.toStream . StreamK.fromFoldable
Combine two
Use a channel for each pair.
parTwo :: MonadAsync m => (Config -> Config) -> Stream m a -> Stream m a -> Stream m a Source #
Binary operation to evaluate two streams concurrently using a channel.
If you want to combine more than two streams you almost always want the
 parList or parConcat operation instead. The performance of this
 operation degrades rapidly when more streams are combined as each operation
 adds one more concurrent channel. On the other hand, parConcat uses a
 single channel for all streams. However, with this operation you can
 precisely control the scheduling by creating arbitrary shape expression
 trees.
Definition:
>>>parTwo cfg x y = Stream.parList cfg [x, y]
Example, the following code finishes in 4 seconds:
>>>async = Stream.parTwo id>>>stream1 = Stream.fromEffect (delay 4)>>>stream2 = Stream.fromEffect (delay 2)>>>Stream.fold Fold.toList $ stream1 `async` stream22 sec 4 sec [2,4]
parZipWithM :: MonadAsync m => (Config -> Config) -> (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c Source #
Evaluates the streams being zipped in separate threads than the consumer. The zip function is evaluated in the consumer thread.
>>>parZipWithM cfg f m1 m2 = Stream.zipWithM f (Stream.parEval cfg m1) (Stream.parEval cfg m2)
Multi-stream concurrency options won't apply here, see the notes in
 parEval.
If you want to evaluate the zip function as well in a separate thread, you
 can use a parEval on parZipWithM.
parZipWith :: MonadAsync m => (Config -> Config) -> (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c Source #
>>>parZipWith cfg f = Stream.parZipWithM cfg (\a b -> return $ f a b)
>>>m1 = Stream.fromList [1,2,3]>>>m2 = Stream.fromList [4,5,6]>>>Stream.fold Fold.toList $ Stream.parZipWith id (,) m1 m2[(1,4),(2,5),(3,6)]
parMergeByM :: MonadAsync m => (Config -> Config) -> (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a Source #
Like mergeByM but evaluates both the streams concurrently.
Definition:
>>>parMergeByM cfg f m1 m2 = Stream.mergeByM f (Stream.parEval cfg m1) (Stream.parEval cfg m2)
parMergeBy :: MonadAsync m => (Config -> Config) -> (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a Source #
Like mergeBy but evaluates both the streams concurrently.
Definition:
>>>parMergeBy cfg f = Stream.parMergeByM cfg (\a b -> return $ f a b)
List of streams
Shares a single channel across many streams.
parListLazy :: MonadAsync m => [Stream m a] -> Stream m a Source #
Like concat but works on a list of streams.
>>>parListLazy = Stream.parList id
parListOrdered :: MonadAsync m => [Stream m a] -> Stream m a Source #
Like parListLazy but with ordered on.
>>>parListOrdered = Stream.parList (Stream.ordered True)
parListInterleaved :: MonadAsync m => [Stream m a] -> Stream m a Source #
Like parListLazy but interleaves the streams fairly instead of prioritizing
 the left stream. This schedules all streams in a round robin fashion over
 limited number of threads.
>>>parListInterleaved = Stream.parList (Stream.interleaved True)
parListEager :: MonadAsync m => [Stream m a] -> Stream m a Source #
Like parListLazy but with eager on.
>>>parListEager = Stream.parList (Stream.eager True)
parListEagerFst :: MonadAsync m => [Stream m a] -> Stream m a Source #
Like parListEager but stops the output as soon as the first stream stops.
>>>parListEagerFst = Stream.parList (Stream.eager True . Stream.stopWhen Stream.FirstStops)
parListEagerMin :: MonadAsync m => [Stream m a] -> Stream m a Source #
Like parListEager but stops the output as soon as any of the two streams
 stops.
Definition:
>>>parListEagerMin = Stream.parList (Stream.eager True . Stream.stopWhen Stream.AnyStops)
parList :: MonadAsync m => (Config -> Config) -> [Stream m a] -> Stream m a Source #
Like parConcat but works on a list of streams.
>>>parList modifier = Stream.parConcat modifier . Stream.fromList
Stream of streams
Apply
parApply :: MonadAsync m => (Config -> Config) -> Stream m (a -> b) -> Stream m a -> Stream m b Source #
Apply an argument stream to a function stream concurrently. Uses a shared channel for all individual applications within a stream application.
Concat
Shares a single channel across many streams.
parConcat :: MonadAsync m => (Config -> Config) -> Stream m (Stream m a) -> Stream m a Source #
Evaluate the streams in the input stream concurrently and combine them.
>>>parConcat modifier = Stream.parConcatMap modifier id
parConcatMap :: MonadAsync m => (Config -> Config) -> (a -> Stream m b) -> Stream m a -> Stream m b Source #
Map each element of the input to a stream and then concurrently evaluate and concatenate the resulting streams. Multiple streams may be evaluated concurrently but earlier streams are perferred. Output from the streams are used as they arrive.
Definition:
>>>parConcatMap modifier f stream = Stream.parConcat modifier $ fmap f stream
Examples:
>>>f cfg xs = Stream.fold Fold.toList $ Stream.parConcatMap cfg id $ Stream.fromList xs
The following streams finish in 4 seconds:
>>>stream1 = Stream.fromEffect (delay 4)>>>stream2 = Stream.fromEffect (delay 2)>>>stream3 = Stream.fromEffect (delay 1)>>>f id [stream1, stream2, stream3]1 sec 2 sec 4 sec [1,2,4]
Limiting threads to 2 schedules the third stream only after one of the first two has finished, releasing a thread:
>>>f (Stream.maxThreads 2) [stream1, stream2, stream3]... [2,1,4]
When used with a Single thread it behaves like serial concatMap:
>>>f (Stream.maxThreads 1) [stream1, stream2, stream3]... [4,2,1]
>>>stream1 = Stream.fromList [1,2,3]>>>stream2 = Stream.fromList [4,5,6]>>>f (Stream.maxThreads 1) [stream1, stream2][1,2,3,4,5,6]
Schedule all streams in a round robin fashion over the available threads:
>>>f cfg xs = Stream.fold Fold.toList $ Stream.parConcatMap (Stream.interleaved True . cfg) id $ Stream.fromList xs
>>>stream1 = Stream.fromList [1,2,3]>>>stream2 = Stream.fromList [4,5,6]>>>f (Stream.maxThreads 1) [stream1, stream2][1,4,2,5,3,6]
ConcatIterate
parConcatIterate :: MonadAsync m => (Config -> Config) -> (a -> Stream m a) -> Stream m a -> Stream m a Source #
Same as concatIterate but concurrent.
Pre-release
Reactive
fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> Stream m a Source #
fromCallback f creates an entangled pair of a callback and a stream i.e.
 whenever the callback is called a value appears in the stream. The function
 f is invoked with the callback as argument, and the stream is returned.
 f would store the callback for calling it later for generating values in
 the stream.
The callback queues a value to a concurrent channel associated with the stream. The stream can be evaluated safely in any thread.
Pre-release
parTapCount :: MonadAsync m => (a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a Source #
parTapCount predicate fold stream taps the count of those elements in
 the stream that pass the predicate. The resulting count stream is sent to
 a fold running concurrently in another thread.
For example, to print the count of elements processed every second:
>>>rate = Stream.rollingMap2 (flip (-)) . Stream.delayPost 1>>>report = Stream.fold (Fold.drainMapM print) . rate>>>tap = Stream.parTapCount (const True) report>>>go = Stream.fold Fold.drain $ tap $ Stream.enumerateFrom 0
Note: This may not work correctly on 32-bit machines because of Int overflow.
Pre-release
tapCount :: MonadAsync m => (a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a Source #
Deprecated: Please use parTapCount instead.
Same as parTapCount. Deprecated.
Imports for Examples
Imports for example snippets in this module.
>>>:m>>>import Control.Concurrent (threadDelay)>>>import qualified Streamly.Data.Array as Array>>>import qualified Streamly.Data.Fold as Fold>>>import qualified Streamly.Data.Parser as Parser>>>import qualified Streamly.Data.Stream as Stream>>>import qualified Streamly.Data.Stream.Prelude as Stream>>>import qualified Streamly.Internal.Data.Stream as Stream (delayPost, timestamped)>>>import qualified Streamly.Internal.Data.Stream.Concurrent as Stream (parListEagerFst)>>>import qualified Streamly.Internal.Data.Stream.Time as Stream>>>import Prelude hiding (concatMap, concat)>>>:{delay n = do threadDelay (n * 1000000) -- sleep for n seconds putStrLn (show n ++ " sec") -- print "n sec" return n -- IO Int :}
Timers
periodic :: MonadIO m => m a -> Double -> Stream m a Source #
Generate a stream by running an action periodically at the specified time interval.
ticks :: MonadIO m => Double -> Stream m () Source #
Generate a tick stream consisting of () elements, each tick is generated
 after the specified time delay given in seconds.
>>>ticks = Stream.periodic (return ())
ticksRate :: MonadAsync m => Rate -> Stream m () Source #
Generate a tick stream, ticks are generated at the specified Rate. The
 rate is adaptive, the tick generation speed can be increased or decreased at
 different times to achieve the specified rate.  The specific behavior for
 different styles of Rate specifications is documented under Rate.  The
 effective maximum rate achieved by a stream is governed by the processor
 speed.
>>>tickStream = Stream.repeatM (return ())>>>ticksRate r = Stream.parEval (Stream.rate (Just r)) tickStream
interject :: MonadAsync m => m a -> Double -> Stream m a -> Stream m a Source #
Intersperse a monadic action into the input stream after every n
 seconds.
Definition:
>>>interject n f xs = Stream.parListEagerFst [xs, Stream.periodic f n]
Example:
>>>s = Stream.fromList "hello">>>input = Stream.mapM (\x -> threadDelay 1000000 >> putChar x) s>>>Stream.fold Fold.drain $ Stream.interject (putChar ',') 1.05 inputh,e,l,l,o
Trimming
takeInterval :: MonadAsync m => Double -> Stream m a -> Stream m a Source #
takeInterval interval runs the stream only upto the specified time
 interval in seconds.
The interval starts when the stream is evaluated for the first time.
takeLastInterval :: Double -> Stream m a -> Stream m a Source #
Take time interval i seconds at the end of the stream.
O(n) space, where n is the number elements taken.
Unimplemented
dropInterval :: MonadAsync m => Double -> Stream m a -> Stream m a Source #
dropInterval interval drops all the stream elements that are generated
 before the specified interval in seconds has passed.
The interval begins when the stream is evaluated for the first time.
dropLastInterval :: Int -> Stream m a -> Stream m a Source #
Drop time interval i seconds at the end of the stream.
O(n) space, where n is the number elements dropped.
Unimplemented
Chunking
intervalsOf :: MonadAsync m => Double -> Fold m a b -> Stream m a -> Stream m b Source #
Group the input stream into windows of n second each and then fold each
 group using the provided fold function.
>>>twoPerSec = Stream.parEval (Stream.constRate 2) $ Stream.enumerateFrom 1>>>intervals = Stream.intervalsOf 1 Fold.toList twoPerSec>>>Stream.fold Fold.toList $ Stream.take 2 intervals[...,...]
groupsOfTimeout :: MonadAsync m => Int -> Double -> Fold m a b -> Stream m a -> Stream m b Source #
Like chunksOf but if the chunk is not completed within the specified
 time interval then emit whatever we have collected till now. The chunk
 timeout is reset whenever a chunk is emitted. The granularity of the clock
 is 100 ms.
>>>s = Stream.delayPost 0.3 $ Stream.fromList [1..1000]>>>f = Stream.fold (Fold.drainMapM print) $ Stream.groupsOfTimeout 5 1 Fold.toList s
Pre-release
Sampling
sampleIntervalEnd :: MonadAsync m => Double -> Stream m a -> Stream m a Source #
Continuously evaluate the input stream and sample the last event in each
 time window of n seconds.
This is also known as throttle in some libraries.
>>>sampleIntervalEnd n = Stream.catMaybes . Stream.intervalsOf n Fold.latest
sampleIntervalStart :: MonadAsync m => Double -> Stream m a -> Stream m a Source #
Like sampleInterval but samples at the beginning of the time window.
>>>sampleIntervalStart n = Stream.catMaybes . Stream.intervalsOf n Fold.one
sampleBurst :: MonadAsync m => Bool -> Double -> Stream m a -> Stream m a Source #
sampleBurstEnd :: MonadAsync m => Double -> Stream m a -> Stream m a Source #
Sample one event at the end of each burst of events. A burst is a group of events close together in time, it ends when an event is spaced by more than the specified time interval (in seconds) from the previous event.
This is known as debounce in some libraries.
The clock granularity is 10 ms.
sampleBurstStart :: MonadAsync m => Double -> Stream m a -> Stream m a Source #
Like sampleBurstEnd but samples the event at the beginning of the burst
 instead of at the end of it.
Windowed Sessions
classifySessionsByGeneric Source #
Arguments
| :: forall m f a b. (MonadAsync m, IsMap f) | |
| => Proxy (f :: Type -> Type) | |
| -> Double | timer tick in seconds | 
| -> Bool | reset the timer when an event is received | 
| -> (Int -> m Bool) | predicate to eject sessions based on session count | 
| -> Double | session timeout in seconds | 
| -> Fold m a b | Fold to be applied to session data | 
| -> Stream m (AbsTime, (Key f, a)) | timestamp, (session key, session data) | 
| -> Stream m (Key f, b) | session key, fold result | 
Arguments
| :: (MonadAsync m, Ord k) | |
| => Double | timer tick in seconds | 
| -> Bool | reset the timer when an event is received | 
| -> (Int -> m Bool) | predicate to eject sessions based on session count | 
| -> Double | session timeout in seconds | 
| -> Fold m a b | Fold to be applied to session data | 
| -> Stream m (AbsTime, (k, a)) | timestamp, (session key, session data) | 
| -> Stream m (k, b) | session key, fold result | 
classifySessionsBy tick keepalive predicate timeout fold stream
 classifies an input event stream consisting of  (timestamp, (key,
 value)) into sessions based on the key, folding all the values
 corresponding to the same key into a session using the supplied fold.
When the fold terminates or a timeout occurs, a tuple consisting of the
 session key and the folded value is emitted in the output stream. The
 timeout is measured from the first event in the session.  If the keepalive
 option is set to True the timeout is reset to 0 whenever an event is
 received.
The timestamp in the input stream is an absolute time from some epoch,
 characterizing the time when the input event was generated.  The notion of
 current time is maintained by a monotonic event time clock using the
 timestamps seen in the input stream. The latest timestamp seen till now is
 used as the base for the current time.  When no new events are seen, a timer
 is started with a clock resolution of tick seconds. This timer is used to
 detect session timeouts in the absence of new events.
To ensure an upper bound on the memory used the number of sessions can be
 limited to an upper bound. If the ejection predicate returns True, the
 oldest session is ejected before inserting a new session.
When the stream ends any buffered sessions are ejected immediately.
If a session key is received even after a session has finished, another session is created for that key.
>>>:{Stream.fold (Fold.drainMapM print) $ Stream.classifySessionsBy 1 False (const (return False)) 3 (Fold.take 3 Fold.toList) $ Stream.timestamped $ Stream.delay 0.1 $ Stream.fromList ((,) <$> [1,2,3] <*> ['a','b','c']) :} (1,"abc") (2,"abc") (3,"abc")
Pre-release
Arguments
| :: (MonadAsync m, Ord k) | |
| => (Int -> m Bool) | predicate to eject sessions on session count | 
| -> Double | time window size | 
| -> Fold m a b | Fold to be applied to session data | 
| -> Stream m (AbsTime, (k, a)) | timestamp, (session key, session data) | 
| -> Stream m (k, b) | 
Same as classifySessionsBy with a timer tick of 1 second and keepalive
 option set to False.
>>>classifySessionsOf = Stream.classifySessionsBy 1 False
Pre-release
classifyKeepAliveSessions Source #
Arguments
| :: (MonadAsync m, Ord k) | |
| => (Int -> m Bool) | predicate to eject sessions on session count | 
| -> Double | session inactive timeout | 
| -> Fold m a b | Fold to be applied to session payload data | 
| -> Stream m (AbsTime, (k, a)) | timestamp, (session key, session data) | 
| -> Stream m (k, b) | 
Same as classifySessionsBy with a timer tick of 1 second and keepalive
 option set to True.
classifyKeepAliveSessions = classifySessionsBy 1 True
Pre-release
Buffering
Evaluate strictly using a buffer of results. When the buffer becomes full we can block, drop the new elements, drop the oldest element and insert the new at the end.
bufferLatest :: Stream m a -> Stream m (Maybe a) Source #
Always produce the latest available element from the stream without any delay. The stream is continuously evaluated at the highest possible rate and only the latest element is retained for sampling.
Unimplemented
bufferLatestN :: Int -> Stream m a -> Stream m a Source #
Evaluate the input stream continuously and keep only the latest n
 elements in a ring buffer, keep discarding the older ones to make space for
 the new ones.  When the output stream is evaluated the buffer collected till
 now is streamed and it starts filling again.
Unimplemented
bufferOldestN :: Int -> Stream m a -> Stream m a Source #
Evaluate the input stream continuously and keep only the oldest n
 elements in the buffer, discard the new ones when the buffer is full.  When
 the output stream is evaluated the collected buffer is streamed and the
 buffer starts filling again.
Unimplemented
after :: (MonadIO m, MonadBaseControl IO m) => m b -> Stream m a -> Stream m a Source #
Run the action m b whenever the stream Stream m a stops normally, or
 if it is garbage collected after a partial lazy evaluation.
The semantics of the action m b are similar to the semantics of cleanup
 action in bracket.
See also after_
bracket :: (MonadAsync m, MonadCatch m) => m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a Source #
Run the alloc action IO b with async exceptions disabled but keeping
 blocking operations interruptible (see mask).  Use the
 output b of the IO action as input to the function b -> Stream m a to
 generate an output stream.
b is usually a resource under the IO monad, e.g. a file handle, that
 requires a cleanup after use. The cleanup action b -> m c, runs whenever
 (1) the stream ends normally, (2) due to a sync or async exception or, (3)
 if it gets garbage collected after a partial lazy evaluation. The exception
 is not caught, it is rethrown.
bracket only guarantees that the cleanup action runs, and it runs with
 async exceptions enabled. The action must ensure that it can successfully
 cleanup the resource in the face of sync or async exceptions.
When the stream ends normally or on a sync exception, cleanup action runs immediately in the current thread context, whereas in other cases it runs in the GC context, therefore, cleanup may be delayed until the GC gets to run.
See also: bracket_
Inhibits stream fusion
bracket3 :: (MonadAsync m, MonadCatch m) => m b -> (b -> m c) -> (b -> m d) -> (b -> m e) -> (b -> Stream m a) -> Stream m a Source #
Like bracket but can use 3 separate cleanup actions depending on the
 mode of termination:
- When the stream stops normally
- When the stream is garbage collected
- When the stream encounters an exception
bracket3 before onStop onGC onException action runs action using the
 result of before. If the stream stops, onStop action is executed, if the
 stream is abandoned onGC is executed, if the stream encounters an
 exception onException is executed.
The exception is not caught, it is rethrown.
Pre-release
finally :: (MonadAsync m, MonadCatch m) => m b -> Stream m a -> Stream m a Source #
Run the action m b whenever the stream Stream m a stops normally,
 aborts due to an exception or if it is garbage collected after a partial
 lazy evaluation.
The semantics of running the action m b are similar to the cleanup action
 semantics described in bracket.
>>>finally action xs = Stream.bracket (return ()) (const action) (const xs)
See also finally_
Inhibits stream fusion
Arguments
| :: (MonadCatch m, Exception e, Ord e) | |
| => Map e Int | map from exception to retry count | 
| -> (e -> Stream m a) | default handler for those exceptions that are not in the map | 
| -> Stream m a | |
| -> Stream m a | 
retry takes 3 arguments
- A map mwhose keys are exceptions and values are the number of times to retry the action given that the exception occurs.
- A handler hanthat decides how to handle an exception when the exception cannot be retried.
- The stream itself that we want to run this mechanism on.
When evaluating a stream if an exception occurs,
- The stream evaluation aborts
- The exception is looked up in m
a. If the exception exists and the mapped value is > 0 then,
i. The value is decreased by 1.
ii. The stream is resumed from where the exception was called, retrying the action.
b. If the exception exists and the mapped value is == 0 then the stream evaluation stops.
c. If the exception does not exist then we handle the exception using
    han.
Internal
afterD :: MonadRunInIO m => m b -> Stream m a -> Stream m a Source #
bracket3D :: (MonadAsync m, MonadCatch m) => m b -> (b -> m c) -> (b -> m d) -> (b -> m e) -> (b -> Stream m a) -> Stream m a Source #