streamly-0.7.2: Beautiful Streaming, Concurrent and Reactive Composition

Copyright(c) 2017 Harendra Kumar
LicenseBSD3
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Internal.Data.Stream.Combinators

Description

 
Synopsis

Documentation

maxThreads :: IsStream t => Int -> t m a -> t m a Source #

Specify the maximum number of threads that can be spawned concurrently for any concurrent combinator in a stream. A value of 0 resets the thread limit to default, a negative value means there is no limit. The default value is 1500. maxThreads does not affect ParallelT streams as they can use unbounded number of threads.

When the actions in a stream are IO bound, having blocking IO calls, this option can be used to control the maximum number of in-flight IO requests. When the actions are CPU bound this option can be used to control the amount of CPU used by the stream.

Since: 0.4.0

maxBuffer :: IsStream t => Int -> t m a -> t m a Source #

Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.

CAUTION! using an unbounded maxBuffer value (i.e. a negative value) coupled with an unbounded maxThreads value is a recipe for disaster in presence of infinite streams, or very large streams. Especially, it must not be used when pure is used in ZipAsyncM streams as pure in applicative zip streams generates an infinite stream causing unbounded concurrent generation with no limit on the buffer or threads.

Since: 0.4.0

maxYields :: IsStream t => Maybe Int64 -> t m a -> t m a Source #

rate :: IsStream t => Maybe Rate -> t m a -> t m a Source #

Specify the pull rate of a stream. A Nothing value resets the rate to default which is unlimited. When the rate is specified, concurrent production may be ramped up or down automatically to achieve the specified yield rate. The specific behavior for different styles of Rate specifications is documented under Rate. The effective maximum production rate achieved by a stream is governed by:

  • The maxThreads limit
  • The maxBuffer limit
  • The maximum rate that the stream producer can achieve
  • The maximum rate that the stream consumer can achieve

Since: 0.5.0

avgRate :: IsStream t => Double -> t m a -> t m a Source #

Same as rate (Just $ Rate (r/2) r (2*r) maxBound)

Specifies the average production rate of a stream in number of yields per second (i.e. Hertz). Concurrent production is ramped up or down automatically to achieve the specified average yield rate. The rate can go down to half of the specified rate on the lower side and double of the specified rate on the higher side.

Since: 0.5.0

minRate :: IsStream t => Double -> t m a -> t m a Source #

Same as rate (Just $ Rate r r (2*r) maxBound)

Specifies the minimum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go below the specified rate, even though it may possibly go above it at times, the upper limit is double of the specified rate.

Since: 0.5.0

maxRate :: IsStream t => Double -> t m a -> t m a Source #

Same as rate (Just $ Rate (r/2) r r maxBound)

Specifies the maximum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go above the specified rate, even though it may possibly go below it at times, the lower limit is half of the specified rate. This can be useful in applications where certain resource usage must not be allowed to go beyond certain limits.

Since: 0.5.0

constRate :: IsStream t => Double -> t m a -> t m a Source #

Same as rate (Just $ Rate r r r 0)

Specifies a constant yield rate. If for some reason the actual rate goes above or below the specified rate we do not try to recover it by increasing or decreasing the rate in future. This can be useful in applications like graphics frame refresh where we need to maintain a constant refresh rate.

Since: 0.5.0

inspectMode :: IsStream t => t m a -> t m a Source #

Print debug information about an SVar when the stream ends

Internal

printState :: MonadIO m => State Stream m a -> m () Source #