Copyright | (c) 2017 Composewell Technologies |
---|---|
License | BSD3 |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | Safe-Inferred |
Language | Haskell2010 |
Deprecated: Please use Streamly.Prelude instead.
Streamly is a general purpose programming framework using cocnurrent data
flow programming paradigm. It can be considered as a generalization of
Haskell lists to monadic streaming with concurrent composition capability.
The serial stream type in streamly SerialT m a
is like the list type [a]
parameterized by the monad m
. For example, SerialT IO a
is a moral
equivalent of [a]
in the IO monad. Streams are constructed very much like
lists, except that they use nil
and cons
instead of '[]' and :
.
> import Streamly > import Streamly.Prelude (cons, consM) > import qualified Streamly.Prelude as S > > S.toList $ 1 `cons` 2 `cons` 3 `cons` nil [1,2,3]
Unlike lists, streams can be constructed from monadic effects:
> S.toList
$getLine
`consM`getLine
`consM` S.nil
hello world ["hello","world"]
Streams are processed just like lists, with list like combinators, except that they are monadic and work in a streaming fashion. Here is a simple console echo program example:
> S.drain $ S.repeatM getLine & S.mapM putStrLn
SerialT Identity a
is a moral equivalent of pure lists. Streamly utilizes
fusion for high performance, therefore, we can represent and process strings
as streams of Char
, encode and decode the streams to/from UTF8 and
serialize them to Array Word8
obviating the need for special purpose
libraries like bytestring
and text
.
For more details please see the Streamly.Tutorial module and the examples directory in this package.
Synopsis
- type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m)
- data SerialT m a
- data WSerialT m a
- data AheadT m a
- data AsyncT m a
- data WAsyncT m a
- data ParallelT m a
- data ZipSerialM m a
- data ZipAsyncM m a
- (|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b
- (|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b
- (|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b
- (|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b
- mkAsync :: (IsStream t, MonadAsync m) => t m a -> m (t m a)
- serial :: IsStream t => t m a -> t m a -> t m a
- wSerial :: IsStream t => t m a -> t m a -> t m a
- ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- maxThreads :: IsStream t => Int -> t m a -> t m a
- maxBuffer :: IsStream t => Int -> t m a -> t m a
- data Rate = Rate {}
- rate :: IsStream t => Maybe Rate -> t m a -> t m a
- avgRate :: IsStream t => Double -> t m a -> t m a
- minRate :: IsStream t => Double -> t m a -> t m a
- maxRate :: IsStream t => Double -> t m a -> t m a
- constRate :: IsStream t => Double -> t m a -> t m a
- class (forall m a. MonadAsync m => Semigroup (t m a), forall m a. MonadAsync m => Monoid (t m a), forall m. Monad m => Functor (t m), forall m. MonadAsync m => Applicative (t m)) => IsStream t
- serially :: IsStream t => SerialT m a -> t m a
- wSerially :: IsStream t => WSerialT m a -> t m a
- asyncly :: IsStream t => AsyncT m a -> t m a
- aheadly :: IsStream t => AheadT m a -> t m a
- wAsyncly :: IsStream t => WAsyncT m a -> t m a
- parallely :: IsStream t => ParallelT m a -> t m a
- zipSerially :: IsStream t => ZipSerialM m a -> t m a
- zipAsyncly :: IsStream t => ZipAsyncM m a -> t m a
- adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a
- type Serial = SerialT IO
- type WSerial = WSerialT IO
- type Ahead = AheadT IO
- type Async = AsyncT IO
- type WAsync = WAsyncT IO
- type Parallel = ParallelT IO
- type ZipSerial = ZipSerialM IO
- type ZipAsync = ZipAsyncM IO
- foldWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a
- foldMapWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b
- forEachWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b
- class Semigroup a where
- type Streaming = IsStream
- runStream :: Monad m => SerialT m a -> m ()
- runStreaming :: (Monad m, IsStream t) => t m a -> m ()
- runStreamT :: Monad m => SerialT m a -> m ()
- runInterleavedT :: Monad m => WSerialT m a -> m ()
- runAsyncT :: Monad m => AsyncT m a -> m ()
- runParallelT :: Monad m => ParallelT m a -> m ()
- runZipStream :: Monad m => ZipSerialM m a -> m ()
- runZipAsync :: Monad m => ZipAsyncM m a -> m ()
- type StreamT = SerialT
- type InterleavedT = WSerialT
- type ZipStream = ZipSerialM
- interleaving :: IsStream t => WSerialT m a -> t m a
- zipping :: IsStream t => ZipSerialM m a -> t m a
- zippingAsync :: IsStream t => ZipAsyncM m a -> t m a
- (<=>) :: IsStream t => t m a -> t m a -> t m a
- (<|) :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
Module Overview
The basic stream type is Serial
, it represents a sequence of IO actions,
and is a Monad
. The type SerialT
is a monad transformer that can
represent a sequence of actions in an arbitrary monad. The type Serial
is
in fact a synonym for SerialT IO
. There are a few more types similar to
SerialT
, all of them represent a stream and differ only in the
Semigroup
, Applicative
and Monad
compositions of the stream. Serial
and WSerial
types compose serially whereas Async
and WAsync
types compose concurrently. All these types can be freely inter-converted
using type combinators without any cost. You can freely switch to any type
of composition at any point in the program. When no type annotation or
explicit stream type combinators are used, the default stream type is
inferred as Serial
.
This module exports stream types, instances and combinators for:
- converting between different stream types
- appending and concurrently merging streams
- Concurrency control
- Concurrent function application
- Stream rate control
This module is designed to be imported unqualified:
import Streamly
See the Streamly.Prelude module for APIs for construction, generation, elimination and transformation of streams.
Type Synonyms
type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m) Source #
A monad that can perform concurrent or parallel IO operations. Streams
that can be composed concurrently require the underlying monad to be
MonadAsync
.
Since: 0.1.0 (Streamly)
Since: 0.8.0
Stream transformers
A stream represents a sequence of pure or effectful actions. The
cons
and IsStream
operations and the corresponding operators .:
and
|:
can be used to join pure values or effectful actions in a sequence.
The effects in the stream can be executed in many different ways
depending on the type of stream. In other words, the behavior of IsStream
depends on the type of the stream.
There are three high level categories of streams, spatially ordered
streams, speculative streams and time ordered streams. Spatially
ordered streams, SerialT
and WSerialT
, execute the effects in serial
order i.e. one at a time and present the outputs of those effects to the
consumer in the same order. Speculative streams, AheadT
, may execute
many effects concurrently but present the outputs to the consumer in the
specified spatial order. Time ordered streams, AsyncT
, WAsyncT
and
ParallelT
, may execute many effects concurrently and present the
outputs of those effects to the consumer in time order i.e. as soon as
the output is generated.
We described above how the effects in a sequence are executed for
different types of streams. The behvavior of the Semigroup
and Monad
instances follow the behavior of IsStream
. Stream generation operations
like repeatM
also execute the effects differently for different
streams, providing a concurrent generation capability when used with
stream types that execute effects concurrently. Similarly, effectful
transformation operations like mapM
also execute the transforming
effects differently for different types of streams.
Serial Streams
When a stream consumer demands an element from a serial stream constructed
as a `consM` b `consM` ... nil
, the action a
at the head of the stream
sequence is executed and the result is supplied to the consumer. When the
next element is demanded, the action b
is executed and its result is
supplied. Thus, the effects are performed and results are consumed strictly
in a serial order. Serial streams can be considered as spatially ordered
streams as the order of execution and consumption is the same as the spatial
order in which the actions are composed by the programmer.
Serial streams enforce the side effects as well as the results of the actions to be in the same order in which the actions are added to the stream. Therefore, the semigroup operation for serial streams is not commutative:
a <> b is not the same as b <> a
There are two serial stream types SerialT
and WSerialT
. The stream
evaluation of both the variants works in the same way as described above,
they differ only in the Semigroup
and Monad
implementaitons.
For SerialT
streams:
(<>) =serial
--Semigroup
(>>=) = flip .concatMapWith
serial
--Monad
A single Monad
bind behaves like a for
loop:
>>>
:{
Stream.toList $ do x <- Stream.fromList [1,2] -- foreach x in stream return x :} [1,2]
Nested monad binds behave like nested for
loops:
>>>
:{
Stream.toList $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [3,4] -- foreach y in stream return (x, y) :} [(1,3),(1,4),(2,3),(2,4)]
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
For WSerialT
streams:
(<>) =wSerial
--Semigroup
(>>=) = flip .concatMapWith
wSerial
--Monad
Note that <>
is associative only if we disregard the ordering of elements
in the resulting stream.
A single Monad
bind behaves like a for
loop:
>>>
:{
Stream.toList $ Stream.fromWSerial $ do x <- Stream.fromList [1,2] -- foreach x in stream return x :} [1,2]
Nested monad binds behave like interleaved nested for
loops:
>>>
:{
Stream.toList $ Stream.fromWSerial $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [3,4] -- foreach y in stream return (x, y) :} [(1,3),(2,3),(1,4),(2,4)]
It is a result of interleaving all the nested iterations corresponding to
element 1
in the first stream with all the nested iterations of element
2
:
>>>
import Streamly.Prelude (wSerial)
>>>
Stream.toList $ Stream.fromList [(1,3),(1,4)] `Stream.wSerial` Stream.fromList [(2,3),(2,4)]
[(1,3),(2,3),(1,4),(2,4)]
The W
in the name stands for wide
or breadth wise scheduling in
contrast to the depth wise scheduling behavior of SerialT
.
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
Speculative Streams
When a stream consumer demands an element from a speculative stream
constructed as a `consM` b `consM` ... nil
, the action a
at the head
of the stream is executed and the output of the action is supplied to the
consumer. However, in addition to the action at the head multiple actions
following it may also be executed concurrently and the results buffered.
When the next element is demanded it may be served from the buffer and we
may execute the next action in the sequence to keep the buffer adequately
filled. Thus, the actions are executed concurrently but results consumed in
serial order just like serial streams. IsStream
can be used to fold an
infinite lazy container of effects, as the number of concurrent executions
is limited.
Similar to IsStream
, the monadic stream generation (e.g. replicateM) and
transformation operations (e.g. mapM) on speculative streams can execute
multiple effects concurrently in a speculative manner.
How many effects can be executed concurrently and how many results can be
buffered are controlled by maxThreads
and maxBuffer
combinators
respectively. The actual number of concurrent threads is adjusted according
to the rate at which the consumer is consuming the stream. It may even
execute actions serially in a single thread if that is enough to match the
consumer's speed.
Speculative streams enforce ordering of the results of actions in the stream but the side effects are only partially ordered. Therefore, the semigroup operation for speculative streams is not commutative from the pure outputs perspective but commutative from side effects perspective.
For AheadT
streams:
(<>) =ahead
(>>=) = flip .concatMapWith
ahead
A single Monad
bind behaves like a for
loop with iterations executed
concurrently, ahead of time, producing side effects of iterations out of
order, but results in order:
>>>
:{
Stream.toList $ Stream.fromAhead $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [2,1]
Nested monad binds behave like nested for
loops with nested iterations
executed concurrently, ahead of time:
>>>
:{
Stream.toList $ Stream.fromAhead $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,5,4,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1
in the first stream constitute one output stream and all
the iterations corresponding to 2
constitute another output stream and
these two output streams are merged using ahead
.
Since: 0.3.0 (Streamly)
Since: 0.8.0
Instances
Asynchronous Streams
Scheduling and execution: In an asynchronous stream a `consM` b `consM`
c ...
, the actions a
, b
, and c
are executed concurrently with the
consumer of the stream. The actions are scheduled for execution in the
same order as they are specified in the stream. Multiple scheduled actions
may be executed concurrently in parallel threads of execution. The
actions may be executed out of order and they may complete at arbitrary
times. Therefore, the effects of the actions may be observed out of
order.
Buffering: The results from multiple threads of execution are queued in a buffer as soon as they become available. The consumer of the stream is served from this buffer. Therefore, the consumer may observe the results to be out of order. In other words, an asynchronous stream is an unordered stream i.e. order does not matter.
Concurrency control: Threads are suspended if the maxBuffer
limit is
reached, and resumed when the consumer makes space in the buffer. The
maximum number of concurrent threads depends on maxThreads
. Number of
threads is increased or decreased based on the speed of the consumer.
Generation operations: Concurrent stream generation operations e.g.
replicateM
when used in async style schedule and execute
the stream generating actions in the manner described above. The generation
actions run concurrently, effects and results of the actions as observed by
the consumer of the stream may be out of order.
Transformation operations: Concurrent stream transformation operations
e.g. mapM
, when used in async style, schedule and
execute transformation actions in the manner described above. Transformation
actions run concurrently, effects and results of the actions may be
observed by the consumer out of order.
Variants: There are two asynchronous stream types AsyncT
and WAsyncT
.
They are identical with respect to single stream evaluation behavior. Their
behaviors differ in how they combine multiple streams using Semigroup
or
Monad
composition. Since the order of elements does not matter in
asynchronous streams the Semigroup
operation is effectively commutative.
For AsyncT
streams:
(<>) =async
(>>=) = flip .concatMapWith
async
A single Monad
bind behaves like a for
loop with iterations of the loop
executed concurrently a la the async
combinator, producing results and
side effects of iterations out of order:
>>>
:{
Stream.toList $ Stream.fromAsync $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [1,2]
Nested monad binds behave like nested for
loops with nested iterations
executed concurrently, a la the async
combinator:
>>>
:{
Stream.toList $ Stream.fromAsync $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,4,5,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1
in the first stream constitute one output stream and all
the iterations corresponding to 2
constitute another output stream and
these two output streams are merged using async
.
Since: 0.1.0 (Streamly)
Since: 0.8.0
Instances
For WAsyncT
streams:
(<>) =wAsync
(>>=) = flip .concatMapWith
wAsync
A single Monad
bind behaves like a for
loop with iterations of the loop
executed concurrently a la the wAsync
combinator, producing results and
side effects of iterations out of order:
>>>
:{
Stream.toList $ Stream.fromWAsync $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [1,2]
Nested monad binds behave like nested for
loops with nested iterations
executed concurrently, a la the wAsync
combinator:
>>>
:{
Stream.toList $ Stream.fromWAsync $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,4,5,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1
in the first stream constitute one WAsyncT
output
stream and all the iterations corresponding to 2
constitute another
WAsyncT
output stream and these two output streams are merged using
wAsync
.
The W
in the name stands for wide
or breadth wise scheduling in
contrast to the depth wise scheduling behavior of AsyncT
.
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
For ParallelT
streams:
(<>) =parallel
(>>=) = flip .concatMapWith
parallel
See AsyncT
, ParallelT
is similar except that all
iterations are strictly concurrent while in AsyncT
it depends on the
consumer demand and available threads. See parallel
for more details.
Since: 0.1.0 (Streamly)
Since: 0.7.0 (maxBuffer applies to ParallelT streams)
Since: 0.8.0
Instances
Zipping Streams
ZipSerialM
and ZipAsyncM
, provide Applicative
instances for zipping the
corresponding elements of two streams together. Note that these types are
not monads.
data ZipSerialM m a Source #
For ZipSerialM
streams:
(<>) = serial
(*) = 'Streamly.Prelude.serial.zipWith' id
Applicative evaluates the streams being zipped serially:
>>>
s1 = Stream.fromFoldable [1, 2]
>>>
s2 = Stream.fromFoldable [3, 4]
>>>
s3 = Stream.fromFoldable [5, 6]
>>>
Stream.toList $ Stream.fromZipSerial $ (,,) <$> s1 <*> s2 <*> s3
[(1,3,5),(2,4,6)]
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
For ZipAsyncM
streams:
(<>) = serial
(*) = 'Streamly.Prelude.serial.zipAsyncWith' id
Applicative evaluates the streams being zipped concurrently, the following would take half the time that it would take in serial zipping:
>>>
s = Stream.fromFoldableM $ Prelude.map delay [1, 1, 1]
>>>
Stream.toList $ Stream.fromZipAsync $ (,) <$> s <*> s
... [(1,1),(1,1),(1,1)]
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
IsStream ZipAsyncM Source # | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type toStream :: forall (m :: Type -> TYPE LiftedRep) a. ZipAsyncM m a -> Stream m a Source # fromStream :: forall (m :: Type -> TYPE LiftedRep) a. Stream m a -> ZipAsyncM m a Source # consM :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source # (|:) :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source # | |
MonadAsync m => Applicative (ZipAsyncM m) Source # | |
Defined in Streamly.Internal.Data.Stream.ZipAsync | |
Monad m => Functor (ZipAsyncM m) Source # | |
Monoid (ZipAsyncM m a) Source # | |
Semigroup (ZipAsyncM m a) Source # | |
Parallel Function Application
Stream processing functions can be composed in a chain using function
application with or without the $
operator, or with reverse function
application operator &
. Streamly provides concurrent versions of these
operators applying stream processing functions such that each stage of the
stream can run in parallel. The operators start with a |
; we can read |$
as "parallel dollar
" to remember that |
comes before $
.
Imports for the code snippets below:
import Streamly import qualified Streamly.Prelude as S import Control.Concurrent
(|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b infixr 0 Source #
Parallel transform application operator; applies a stream transformation
function t m a -> t m b
to a stream t m a
concurrently; the input stream
is evaluated asynchronously in an independent thread yielding elements to a
buffer and the transformation function runs in another thread consuming the
input from the buffer. |$
is just like regular function application
operator $
except that it is concurrent.
If you read the signature as (t m a -> t m b) -> (t m a -> t m b)
you can
look at it as a transformation that converts a transform function to a
buffered concurrent transform function.
The following code prints a value every second even though each stage adds a 1 second delay.
>>>
:{
Stream.drain $ Stream.mapM (\x -> threadDelay 1000000 >> print x) |$ Stream.replicateM 3 (threadDelay 1000000 >> return 1) :} 1 1 1
Concurrent
Since: 0.3.0 (Streamly)
Since: 0.8.0
(|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b infixl 1 Source #
(|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b infixr 0 Source #
Parallel fold application operator; applies a fold function t m a -> m b
to a stream t m a
concurrently; The the input stream is evaluated
asynchronously in an independent thread yielding elements to a buffer and
the folding action runs in another thread consuming the input from the
buffer.
If you read the signature as (t m a -> m b) -> (t m a -> m b)
you can look
at it as a transformation that converts a fold function to a buffered
concurrent fold function.
The .
at the end of the operator is a mnemonic for termination of the
stream.
In the example below, each stage introduces a delay of 1 sec but output is printed every second because both stages are concurrent.
>>>
import Control.Concurrent (threadDelay)
>>>
import Streamly.Prelude ((|$.))
>>>
:{
Stream.foldlM' (\_ a -> threadDelay 1000000 >> print a) (return ()) |$. Stream.replicateM 3 (threadDelay 1000000 >> return 1) :} 1 1 1
Concurrent
Since: 0.3.0 (Streamly)
Since: 0.8.0
(|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b infixl 1 Source #
mkAsync :: (IsStream t, MonadAsync m) => t m a -> m (t m a) Source #
Make a stream asynchronous, triggers the computation and returns a stream
in the underlying monad representing the output generated by the original
computation. The returned action is exhaustible and must be drained once. If
not drained fully we may have a thread blocked forever and once exhausted it
will always return empty
.
Since: 0.2.0
Merging Streams
The Semigroup
operation <>
of each stream type combines two streams in a
type specific manner. This section provides polymorphic versions of <>
which can be used to combine two streams in a predetermined way irrespective
of the type.
serial :: IsStream t => t m a -> t m a -> t m a infixr 6 Source #
Appends two streams sequentially, yielding all elements from the first stream, and then all elements from the second stream.
>>>
import Streamly.Prelude (serial)
>>>
stream1 = Stream.fromList [1,2]
>>>
stream2 = Stream.fromList [3,4]
>>>
Stream.toList $ stream1 `serial` stream2
[1,2,3,4]
This operation can be used to fold an infinite lazy container of streams.
Since: 0.2.0 (Streamly)
Since: 0.8.0
wSerial :: IsStream t => t m a -> t m a -> t m a infixr 6 Source #
Interleaves two streams, yielding one element from each stream alternately. When one stream stops the rest of the other stream is used in the output stream.
>>>
import Streamly.Prelude (wSerial)
>>>
stream1 = Stream.fromList [1,2]
>>>
stream2 = Stream.fromList [3,4]
>>>
Stream.toList $ Stream.fromWSerial $ stream1 `wSerial` stream2
[1,3,2,4]
Note, for singleton streams wSerial
and serial
are identical.
Note that this operation cannot be used to fold a container of infinite streams but it can be used for very large streams as the state that it needs to maintain is proportional to the logarithm of the number of streams.
Since: 0.2.0 (Streamly)
Since: 0.8.0
ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #
Appends two streams, both the streams may be evaluated concurrently but the outputs are used in the same order as the corresponding actions in the original streams, side effects will happen in the order in which the streams are evaluated:
>>>
import Streamly.Prelude (ahead, SerialT)
>>>
stream1 = Stream.fromEffect (delay 4) :: SerialT IO Int
>>>
stream2 = Stream.fromEffect (delay 2) :: SerialT IO Int
>>>
Stream.toList $ stream1 `ahead` stream2 :: IO [Int]
2 sec 4 sec [4,2]
Multiple streams can be combined. With enough threads, all of them can be scheduled simultaneously:
>>>
stream3 = Stream.fromEffect (delay 1)
>>>
Stream.toList $ stream1 `ahead` stream2 `ahead` stream3
1 sec 2 sec 4 sec [4,2,1]
With 2 threads, only two can be scheduled at a time, when one of those finishes, the third one gets scheduled:
>>>
Stream.toList $ Stream.maxThreads 2 $ stream1 `ahead` stream2 `ahead` stream3
2 sec 1 sec 4 sec [4,2,1]
Only streams are scheduled for ahead evaluation, how actions within a stream
are evaluated depends on the stream type. If it is a concurrent stream they
will be evaluated concurrently. It may not make much sense combining serial
streams using ahead
.
ahead
can be safely used to fold an infinite lazy container of streams.
Since: 0.3.0 (Streamly)
Since: 0.8.0
async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #
Merges two streams, both the streams may be evaluated concurrently, outputs from both are used as they arrive:
>>>
import Streamly.Prelude (async)
>>>
stream1 = Stream.fromEffect (delay 4)
>>>
stream2 = Stream.fromEffect (delay 2)
>>>
Stream.toList $ stream1 `async` stream2
2 sec 4 sec [2,4]
Multiple streams can be combined. With enough threads, all of them can be scheduled simultaneously:
>>>
stream3 = Stream.fromEffect (delay 1)
>>>
Stream.toList $ stream1 `async` stream2 `async` stream3
... [1,2,4]
With 2 threads, only two can be scheduled at a time, when one of those finishes, the third one gets scheduled:
>>>
Stream.toList $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3
... [2,1,4]
With a single thread, it becomes serial:
>>>
Stream.toList $ Stream.maxThreads 1 $ stream1 `async` stream2 `async` stream3
... [4,2,1]
Only streams are scheduled for async evaluation, how actions within a stream are evaluated depends on the stream type. If it is a concurrent stream they will be evaluated concurrently.
In the following example, both the streams are scheduled for concurrent evaluation but each individual stream is evaluated serially:
>>>
stream1 = Stream.fromListM $ Prelude.map delay [3,3] -- SerialT IO Int
>>>
stream2 = Stream.fromListM $ Prelude.map delay [1,1] -- SerialT IO Int
>>>
Stream.toList $ stream1 `async` stream2 -- IO [Int]
... [1,1,3,3]
If total threads are 2, the third stream is scheduled only after one of the first two has finished:
stream3 = Stream.fromListM $ Prelude.map delay [2,2] -- SerialT IO Int Stream.toList $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3 -- IO [Int]
... [1,1,3,2,3,2]
Thus async
goes deep in first few streams rather than going wide in all
streams. It prefers to evaluate the leftmost streams as much as possible.
Because of this behavior, async
can be safely used to fold an infinite
lazy container of streams.
Since: 0.2.0 (Streamly)
Since: 0.8.0
wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #
For singleton streams, wAsync
is the same as async
. See async
for
singleton stream behavior. For multi-element streams, while async
is left
biased i.e. it tries to evaluate the left side stream as much as possible,
wAsync
tries to schedule them both fairly. In other words, async
goes
deep while wAsync
goes wide. However, outputs are always used as they
arrive.
With a single thread, async
starts behaving like serial
while wAsync
starts behaving like wSerial
.
>>>
import Streamly.Prelude (async, wAsync)
>>>
stream1 = Stream.fromList [1,2,3]
>>>
stream2 = Stream.fromList [4,5,6]
>>>
Stream.toList $ Stream.fromAsync $ Stream.maxThreads 1 $ stream1 `async` stream2
[1,2,3,4,5,6]
>>>
Stream.toList $ Stream.fromWAsync $ Stream.maxThreads 1 $ stream1 `wAsync` stream2
[1,4,2,5,3,6]
With two threads available, and combining three streams:
>>>
stream3 = Stream.fromList [7,8,9]
>>>
Stream.toList $ Stream.fromAsync $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3
[1,2,3,4,5,6,7,8,9]
>>>
Stream.toList $ Stream.fromWAsync $ Stream.maxThreads 2 $ stream1 `wAsync` stream2 `wAsync` stream3
[1,4,2,7,5,3,8,6,9]
This operation cannot be used to fold an infinite lazy container of streams, because it schedules all the streams in a round robin manner.
Note that WSerialT
and single threaded WAsyncT
both interleave streams
but the exact scheduling is slightly different in both cases.
Since: 0.2.0 (Streamly)
Since: 0.8.0
parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #
Like async
except that the execution is much more
strict. There is no limit on the number of threads. While
async
may not schedule a stream if there is no demand
from the consumer, parallel
always evaluates both the streams immediately.
The only limit that applies to parallel
is maxBuffer
.
Evaluation may block if the output buffer becomes full.
>>>
import Streamly.Prelude (parallel)
>>>
stream = Stream.fromEffect (delay 2) `parallel` Stream.fromEffect (delay 1)
>>>
Stream.toList stream -- IO [Int]
1 sec 2 sec [1,2]
parallel
guarantees that all the streams are scheduled for execution
immediately, therefore, we could use things like starting timers inside the
streams and relying on the fact that all timers were started at the same
time.
Unlike async
this operation cannot be used to fold an infinite lazy
container of streams, because it schedules all the streams strictly
concurrently.
Since: 0.2.0 (Streamly)
Since: 0.8.0
Concurrency Control
These combinators can be used at any point in a stream composition to set parameters to control the concurrency of the argument stream. A control parameter set at any point remains effective for any concurrent combinators used in the argument stream until it is reset by using the combinator again. These control parameters have no effect on non-concurrent combinators in the stream, or on non-concurrent streams.
Pitfall: Remember that maxBuffer
in the following example applies to
mapM
and any other combinators that may follow it, and it does not apply
to the combinators before it:
... $ maxBuffer 10 $ S.mapM ... ...
If we use &
instead of $
the situation will reverse, in the following
example, maxBuffer
does not apply to mapM
, it applies to combinators
that come before it, because those are the arguments to maxBuffer
:
... & maxBuffer 10 & S.mapM ... ...
maxThreads :: IsStream t => Int -> t m a -> t m a Source #
Specify the maximum number of threads that can be spawned concurrently for
any concurrent combinator in a stream.
A value of 0 resets the thread limit to default, a negative value means
there is no limit. The default value is 1500. maxThreads
does not affect
ParallelT
streams as they can use unbounded number of threads.
When the actions in a stream are IO bound, having blocking IO calls, this option can be used to control the maximum number of in-flight IO requests. When the actions are CPU bound this option can be used to control the amount of CPU used by the stream.
Since: 0.4.0 (Streamly)
Since: 0.8.0
maxBuffer :: IsStream t => Int -> t m a -> t m a Source #
Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.
CAUTION! using an unbounded maxBuffer
value (i.e. a negative value)
coupled with an unbounded maxThreads
value is a recipe for disaster in
presence of infinite streams, or very large streams. Especially, it must
not be used when pure
is used in ZipAsyncM
streams as pure
in
applicative zip streams generates an infinite stream causing unbounded
concurrent generation with no limit on the buffer or threads.
Since: 0.4.0 (Streamly)
Since: 0.8.0
Rate Limiting
Specifies the stream yield rate in yields per second (Hertz
).
We keep accumulating yield credits at rateGoal
. At any point of time we
allow only as many yields as we have accumulated as per rateGoal
since the
start of time. If the consumer or the producer is slower or faster, the
actual rate may fall behind or exceed rateGoal
. We try to recover the gap
between the two by increasing or decreasing the pull rate from the producer.
However, if the gap becomes more than rateBuffer
we try to recover only as
much as rateBuffer
.
rateLow
puts a bound on how low the instantaneous rate can go when
recovering the rate gap. In other words, it determines the maximum yield
latency. Similarly, rateHigh
puts a bound on how high the instantaneous
rate can go when recovering the rate gap. In other words, it determines the
minimum yield latency. We reduce the latency by increasing concurrency,
therefore we can say that it puts an upper bound on concurrency.
If the rateGoal
is 0 or negative the stream never yields a value.
If the rateBuffer
is 0 or negative we do not attempt to recover.
Since: 0.5.0 (Streamly)
Since: 0.8.0
rate :: IsStream t => Maybe Rate -> t m a -> t m a Source #
Specify the pull rate of a stream.
A Nothing
value resets the rate to default which is unlimited. When the
rate is specified, concurrent production may be ramped up or down
automatically to achieve the specified yield rate. The specific behavior for
different styles of Rate
specifications is documented under Rate
. The
effective maximum production rate achieved by a stream is governed by:
- The
maxThreads
limit - The
maxBuffer
limit - The maximum rate that the stream producer can achieve
- The maximum rate that the stream consumer can achieve
Since: 0.5.0 (Streamly)
Since: 0.8.0
avgRate :: IsStream t => Double -> t m a -> t m a Source #
Same as rate (Just $ Rate (r/2) r (2*r) maxBound)
Specifies the average production rate of a stream in number of yields
per second (i.e. Hertz
). Concurrent production is ramped up or down
automatically to achieve the specified average yield rate. The rate can
go down to half of the specified rate on the lower side and double of
the specified rate on the higher side.
Since: 0.5.0 (Streamly)
Since: 0.8.0
minRate :: IsStream t => Double -> t m a -> t m a Source #
Same as rate (Just $ Rate r r (2*r) maxBound)
Specifies the minimum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go below the specified rate, even though it may possibly go above it at times, the upper limit is double of the specified rate.
Since: 0.5.0 (Streamly)
Since: 0.8.0
maxRate :: IsStream t => Double -> t m a -> t m a Source #
Same as rate (Just $ Rate (r/2) r r maxBound)
Specifies the maximum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go above the specified rate, even though it may possibly go below it at times, the lower limit is half of the specified rate. This can be useful in applications where certain resource usage must not be allowed to go beyond certain limits.
Since: 0.5.0 (Streamly)
Since: 0.8.0
constRate :: IsStream t => Double -> t m a -> t m a Source #
Same as rate (Just $ Rate r r r 0)
Specifies a constant yield rate. If for some reason the actual rate goes above or below the specified rate we do not try to recover it by increasing or decreasing the rate in future. This can be useful in applications like graphics frame refresh where we need to maintain a constant refresh rate.
Since: 0.5.0 (Streamly)
Since: 0.8.0
Stream Type Adapters
You may want to use different stream composition styles at different points
in your program. Stream types can be freely converted or adapted from one
type to another. The IsStream
type class facilitates type conversion of
one stream type to another. It is not used directly, instead the type
combinators provided below are used for conversions.
To adapt from one monomorphic type (e.g. AsyncT
) to another monomorphic
type (e.g. SerialT
) use the adapt
combinator. To give a polymorphic code
a specific interpretation or to adapt a specific type to a polymorphic type
use the type specific combinators e.g. fromAsync
or fromWSerial
. You
cannot adapt polymorphic code to polymorphic code, as the compiler would not know
which specific type you are converting from or to. If you see a an
ambiguous type variable
error then most likely you are using adapt
unnecessarily on polymorphic code.
class (forall m a. MonadAsync m => Semigroup (t m a), forall m a. MonadAsync m => Monoid (t m a), forall m. Monad m => Functor (t m), forall m. MonadAsync m => Applicative (t m)) => IsStream t Source #
Class of types that can represent a stream of elements of some type a
in
some monad m
.
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
zipSerially :: IsStream t => ZipSerialM m a -> t m a Source #
zipAsyncly :: IsStream t => ZipAsyncM m a -> t m a Source #
adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a Source #
Adapt any specific stream type to any other specific stream type.
Since: 0.1.0 (Streamly)
Since: 0.8.0
IO Streams
type ZipSerial = ZipSerialM IO Source #
Folding Containers of Streams
These are variants of standard Foldable
fold functions that use a
polymorphic stream sum operation (e.g. async
or wSerial
) to fold a
finite container of streams. Note that these are just special cases of
the more general concatMapWith
operation.
foldWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a Source #
Same as concatFoldableWith
Since: 0.1.0
foldMapWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b Source #
Same as concatMapFoldableWith
Since: 0.1.0
forEachWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b Source #
Same as concatForFoldableWith
Since: 0.1.0
Re-exports
The class of semigroups (types with an associative binary operation).
Instances should satisfy the following:
Since: base-4.9.0.0
(<>) :: a -> a -> a infixr 6 #
An associative operation.
>>>
[1,2,3] <> [4,5,6]
[1,2,3,4,5,6]
Reduce a non-empty list with <>
The default definition should be sufficient, but this can be overridden for efficiency.
>>>
import Data.List.NonEmpty (NonEmpty (..))
>>>
sconcat $ "Hello" :| [" ", "Haskell", "!"]
"Hello Haskell!"
stimes :: Integral b => b -> a -> a #
Repeat a value n
times.
Given that this works on a Semigroup
it is allowed to fail if
you request 0 or fewer repetitions, and the default definition
will do so.
By making this a member of the class, idempotent semigroups
and monoids can upgrade this to execute in \(\mathcal{O}(1)\) by
picking stimes =
or stimesIdempotent
stimes =
respectively.stimesIdempotentMonoid
>>>
stimes 4 [1]
[1,1,1,1]
Instances
Semigroup All | Since: base-4.9.0.0 |
Semigroup Any | Since: base-4.9.0.0 |
Semigroup Void | Since: base-4.9.0.0 |
Semigroup ByteString | |
Defined in Data.ByteString.Internal (<>) :: ByteString -> ByteString -> ByteString # sconcat :: NonEmpty ByteString -> ByteString # stimes :: Integral b => b -> ByteString -> ByteString # | |
Semigroup ByteString | |
Defined in Data.ByteString.Lazy.Internal (<>) :: ByteString -> ByteString -> ByteString # sconcat :: NonEmpty ByteString -> ByteString # stimes :: Integral b => b -> ByteString -> ByteString # | |
Semigroup IntSet | Since: containers-0.5.7 |
Semigroup Ordering | Since: base-4.9.0.0 |
Semigroup ByteArray | |
Semigroup () | Since: base-4.9.0.0 |
Bits a => Semigroup (And a) | Since: base-4.16 |
FiniteBits a => Semigroup (Iff a) | This constraint is arguably
too strong. However, as some types (such as Since: base-4.16 |
Bits a => Semigroup (Ior a) | Since: base-4.16 |
Bits a => Semigroup (Xor a) | Since: base-4.16 |
Semigroup a => Semigroup (Identity a) | Since: base-4.9.0.0 |
Semigroup (First a) | Since: base-4.9.0.0 |
Semigroup (Last a) | Since: base-4.9.0.0 |
Semigroup a => Semigroup (Down a) | Since: base-4.11.0.0 |
Semigroup (First a) | Since: base-4.9.0.0 |
Semigroup (Last a) | Since: base-4.9.0.0 |
Ord a => Semigroup (Max a) | Since: base-4.9.0.0 |
Ord a => Semigroup (Min a) | Since: base-4.9.0.0 |
Monoid m => Semigroup (WrappedMonoid m) | Since: base-4.9.0.0 |
Defined in Data.Semigroup (<>) :: WrappedMonoid m -> WrappedMonoid m -> WrappedMonoid m # sconcat :: NonEmpty (WrappedMonoid m) -> WrappedMonoid m # stimes :: Integral b => b -> WrappedMonoid m -> WrappedMonoid m # | |
Semigroup a => Semigroup (Dual a) | Since: base-4.9.0.0 |
Semigroup (Endo a) | Since: base-4.9.0.0 |
Num a => Semigroup (Product a) | Since: base-4.9.0.0 |
Num a => Semigroup (Sum a) | Since: base-4.9.0.0 |
Semigroup p => Semigroup (Par1 p) | Since: base-4.12.0.0 |
Semigroup (IntMap a) | Since: containers-0.5.7 |
Semigroup (MergeSet a) | |
Ord a => Semigroup (Set a) | Since: containers-0.5.7 |
Semigroup a => Semigroup (IO a) | Since: base-4.10.0.0 |
Semigroup (Heap a) | |
Semigroup (Array a) | Since: primitive-0.6.3.0 |
Storable a => Semigroup (Array a) Source # | |
Prim a => Semigroup (Array a) Source # | |
Prim a => Semigroup (Array a) Source # | |
Semigroup (List a) Source # | |
Semigroup (ZipList a) Source # | |
Semigroup (SmallArray a) Source # | Since: 0.6.3.0 |
Defined in Streamly.Internal.Data.SmallArray.Type (<>) :: SmallArray a -> SmallArray a -> SmallArray a # sconcat :: NonEmpty (SmallArray a) -> SmallArray a # stimes :: Integral b => b -> SmallArray a -> SmallArray a # | |
Semigroup a => Semigroup (Q a) | Since: template-haskell-2.17.0.0 |
Semigroup (NonEmpty a) | Since: base-4.9.0.0 |
Semigroup a => Semigroup (Maybe a) | Since: base-4.9.0.0 |
Semigroup a => Semigroup (a) | Since: base-4.15 |
Semigroup [a] | Since: base-4.9.0.0 |
Semigroup (Either a b) | Since: base-4.9.0.0 |
Semigroup (Proxy s) | Since: base-4.9.0.0 |
Semigroup (U1 p) | Since: base-4.12.0.0 |
Semigroup (V1 p) | Since: base-4.12.0.0 |
Semigroup a => Semigroup (ST s a) | Since: base-4.11.0.0 |
Ord k => Semigroup (Map k v) | |
MonadAsync m => Semigroup (AheadT m a) Source # | |
MonadAsync m => Semigroup (AsyncT m a) Source # | |
MonadAsync m => Semigroup (WAsyncT m a) Source # | |
MonadAsync m => Semigroup (ParallelT m a) Source # | |
Semigroup (SerialT m a) Source # | |
Semigroup (WSerialT m a) Source # | |
Semigroup (Stream m a) Source # | |
Semigroup (ZipSerialM m a) Source # | |
Defined in Streamly.Internal.Data.Stream.Zip (<>) :: ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a # sconcat :: NonEmpty (ZipSerialM m a) -> ZipSerialM m a # stimes :: Integral b => b -> ZipSerialM m a -> ZipSerialM m a # | |
Semigroup (ZipAsyncM m a) Source # | |
Semigroup b => Semigroup (a -> b) | Since: base-4.9.0.0 |
(Semigroup a, Semigroup b) => Semigroup (a, b) | Since: base-4.9.0.0 |
Semigroup a => Semigroup (Const a b) | Since: base-4.9.0.0 |
(Applicative f, Semigroup a) => Semigroup (Ap f a) | Since: base-4.12.0.0 |
Alternative f => Semigroup (Alt f a) | Since: base-4.9.0.0 |
Semigroup (f p) => Semigroup (Rec1 f p) | Since: base-4.12.0.0 |
(Semigroup b, Monad m) => Semigroup (Tee m a b) Source # |
|
Monad m => Semigroup (Pipe m a b) Source # | |
(Semigroup a, Semigroup b, Semigroup c) => Semigroup (a, b, c) | Since: base-4.9.0.0 |
(Semigroup (f a), Semigroup (g a)) => Semigroup (Product f g a) | Since: base-4.16.0.0 |
(Semigroup (f p), Semigroup (g p)) => Semigroup ((f :*: g) p) | Since: base-4.12.0.0 |
Semigroup c => Semigroup (K1 i c p) | Since: base-4.12.0.0 |
(Semigroup a, Semigroup b, Semigroup c, Semigroup d) => Semigroup (a, b, c, d) | Since: base-4.9.0.0 |
Semigroup (f (g a)) => Semigroup (Compose f g a) | Since: base-4.16.0.0 |
Semigroup (f (g p)) => Semigroup ((f :.: g) p) | Since: base-4.12.0.0 |
Semigroup (f p) => Semigroup (M1 i c f p) | Since: base-4.12.0.0 |
(Semigroup a, Semigroup b, Semigroup c, Semigroup d, Semigroup e) => Semigroup (a, b, c, d, e) | Since: base-4.9.0.0 |
Deprecated
runStreamT :: Monad m => SerialT m a -> m () Source #
Same as runStream
.
Since: 0.1.0
runInterleavedT :: Monad m => WSerialT m a -> m () Source #
Same as drain . fromWSerial
.
Since: 0.1.0
runParallelT :: Monad m => ParallelT m a -> m () Source #
Same as drain . fromParallel
.
Since: 0.1.0
runZipStream :: Monad m => ZipSerialM m a -> m () Source #
Same as drain . zipping
.
Since: 0.1.0
runZipAsync :: Monad m => ZipAsyncM m a -> m () Source #
Same as drain . zippingAsync
.
Since: 0.1.0
type InterleavedT = WSerialT Source #
Deprecated: Please use WSerialT
instead.
Since: 0.1.0
type ZipStream = ZipSerialM Source #
Deprecated: Please use ZipSerialM
instead.
Since: 0.1.0
interleaving :: IsStream t => WSerialT m a -> t m a Source #
zipping :: IsStream t => ZipSerialM m a -> t m a Source #
zippingAsync :: IsStream t => ZipAsyncM m a -> t m a Source #