| Copyright | (c) 2017 Harendra Kumar |
|---|---|
| License | BSD3 |
| Maintainer | streamly@composewell.com |
| Stability | experimental |
| Portability | GHC |
| Safe Haskell | None |
| Language | Haskell2010 |
Streamly
Contents
Description
Streamly is a general purpose programming framework using cocnurrent data
flow programming paradigm. It can be considered as a generalization of
Haskell lists to monadic streaming with concurrent composition capability.
The serial stream type in streamly SerialT m a is like the list type [a]
parameterized by the monad m. For example, SerialT IO a is a moral
equivalent of [a] in the IO monad. Streams are constructed very much like
lists, except that they use nil and cons instead of '[]' and :.
> import Streamly > import Streamly.Prelude (cons, consM) > import qualified Streamly.Prelude as S > > S.toList $ 1 `cons` 2 `cons` 3 `cons` nil [1,2,3]
Unlike lists, streams can be constructed from monadic effects:
> S.toList$getLine`consM`getLine`consM` S.nilhello world ["hello","world"]
Streams are processed just like lists, with list like combinators, except that they are monadic and work in a streaming fashion. Here is a simple console echo program example:
> S.drain $ S.repeatM getLine & S.mapM putStrLn
SerialT Identity a is a moral equivalent of pure lists. Streamly utilizes
fusion for high performance, therefore, we can represent and process strings
as streams of Char, encode and decode the streams to/from UTF8 and
serialize them to Array Word8 obviating the need for special purpose
libraries like bytestring and text.
For more details please see the Streamly.Tutorial module and the examples directory in this package.
Synopsis
- type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m)
- data SerialT m a
- data WSerialT m a
- data AheadT m a
- data AsyncT m a
- data WAsyncT m a
- data ParallelT m a
- data ZipSerialM m a
- data ZipAsyncM m a
- (|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b
- (|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b
- (|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b
- (|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b
- mkAsync :: (IsStream t, MonadAsync m) => t m a -> m (t m a)
- serial :: IsStream t => t m a -> t m a -> t m a
- wSerial :: IsStream t => t m a -> t m a -> t m a
- ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- maxThreads :: IsStream t => Int -> t m a -> t m a
- maxBuffer :: IsStream t => Int -> t m a -> t m a
- data Rate = Rate {}
- rate :: IsStream t => Maybe Rate -> t m a -> t m a
- avgRate :: IsStream t => Double -> t m a -> t m a
- minRate :: IsStream t => Double -> t m a -> t m a
- maxRate :: IsStream t => Double -> t m a -> t m a
- constRate :: IsStream t => Double -> t m a -> t m a
- class (forall m a. MonadAsync m => Semigroup (t m a), forall m a. MonadAsync m => Monoid (t m a), forall m. Monad m => Functor (t m), forall m. MonadAsync m => Applicative (t m)) => IsStream t
- serially :: IsStream t => SerialT m a -> t m a
- wSerially :: IsStream t => WSerialT m a -> t m a
- asyncly :: IsStream t => AsyncT m a -> t m a
- aheadly :: IsStream t => AheadT m a -> t m a
- wAsyncly :: IsStream t => WAsyncT m a -> t m a
- parallely :: IsStream t => ParallelT m a -> t m a
- zipSerially :: IsStream t => ZipSerialM m a -> t m a
- zipAsyncly :: IsStream t => ZipAsyncM m a -> t m a
- adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a
- type Serial = SerialT IO
- type WSerial = WSerialT IO
- type Ahead = AheadT IO
- type Async = AsyncT IO
- type WAsync = WAsyncT IO
- type Parallel = ParallelT IO
- type ZipSerial = ZipSerialM IO
- type ZipAsync = ZipAsyncM IO
- foldWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a
- foldMapWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b
- forEachWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b
- class Semigroup a where
- type Streaming = IsStream
- runStream :: Monad m => SerialT m a -> m ()
- runStreaming :: (Monad m, IsStream t) => t m a -> m ()
- runStreamT :: Monad m => SerialT m a -> m ()
- runInterleavedT :: Monad m => WSerialT m a -> m ()
- runAsyncT :: Monad m => AsyncT m a -> m ()
- runParallelT :: Monad m => ParallelT m a -> m ()
- runZipStream :: Monad m => ZipSerialM m a -> m ()
- runZipAsync :: Monad m => ZipAsyncM m a -> m ()
- type StreamT = SerialT
- type InterleavedT = WSerialT
- type ZipStream = ZipSerialM
- interleaving :: IsStream t => WSerialT m a -> t m a
- zipping :: IsStream t => ZipSerialM m a -> t m a
- zippingAsync :: IsStream t => ZipAsyncM m a -> t m a
- (<=>) :: IsStream t => t m a -> t m a -> t m a
- (<|) :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
Module Overview
The basic stream type is Serial, it represents a sequence of IO actions,
and is a Monad. The type SerialT is a monad transformer that can
represent a sequence of actions in an arbitrary monad. The type Serial is
in fact a synonym for SerialT IO. There are a few more types similar to
SerialT, all of them represent a stream and differ only in the
Semigroup, Applicative and Monad compositions of the stream. Serial
and WSerial types compose serially whereas Async and WAsync
types compose concurrently. All these types can be freely inter-converted
using type combinators without any cost. You can freely switch to any type
of composition at any point in the program. When no type annotation or
explicit stream type combinators are used, the default stream type is
inferred as Serial.
This module exports stream types, instances and combinators for:
- converting between different stream types
- appending and concurrently merging streams
- Concurrency control
- Concurrent function application
- Stream rate control
This module is designed to be imported unqualified:
import Streamly
See the Streamly.Prelude module for APIs for construction, generation, elimination and transformation of streams.
Type Synonyms
type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m) Source #
A monad that can perform concurrent or parallel IO operations. Streams
that can be composed concurrently require the underlying monad to be
MonadAsync.
Since: 0.1.0
Stream transformers
A stream represents a sequence of pure or effectful actions. The
cons and consM operations and the corresponding operators .: and
|: can be used to join pure values or effectful actions in a sequence.
The effects in the stream can be executed in many different ways
depending on the type of stream. In other words, the behavior of consM
depends on the type of the stream.
There are three high level categories of streams, spatially ordered
streams, speculative streams and time ordered streams. Spatially
ordered streams, SerialT and WSerialT, execute the effects in serial
order i.e. one at a time and present the outputs of those effects to the
consumer in the same order. Speculative streams, AheadT, may execute
many effects concurrently but present the outputs to the consumer in the
specified spatial order. Time ordered streams, AsyncT, WAsyncT and
ParallelT, may execute many effects concurrently and present the
outputs of those effects to the consumer in time order i.e. as soon as
the output is generated.
We described above how the effects in a sequence are executed for
different types of streams. The behvavior of the Semigroup and Monad
instances follow the behavior of consM. Stream generation operations
like repeatM also execute the effects differently for different
streams, providing a concurrent generation capability when used with
stream types that execute effects concurrently. Similarly, effectful
transformation operations like mapM also execute the transforming
effects differently for different types of streams.
Serial Streams
When a stream consumer demands an element from a serial stream constructed
as a `consM` b `consM` ... nil, the action a at the head of the stream
sequence is executed and the result is supplied to the consumer. When the
next element is demanded, the action b is executed and its result is
supplied. Thus, the effects are performed and results are consumed strictly
in a serial order. Serial streams can be considered as spatially ordered
streams as the order of execution and consumption is the same as the spatial
order in which the actions are composed by the programmer.
Serial streams enforce the side effects as well as the results of the actions to be in the same order in which the actions are added to the stream. Therefore, the semigroup operation for serial streams is not commutative:
a <> b is not the same as b <> a
There are two serial stream types SerialT and WSerialT. The stream
evaluation of both the variants works in the same way as described above,
they differ only in the Semigroup and Monad implementaitons.
The Semigroup operation for SerialT behaves like a regular append
operation. Therefore, when a <> b is evaluated, stream a is evaluated
first until it exhausts and then stream b is evaluated. In other words,
the elements of stream b are appended to the elements of stream a. This
operation can be used to fold an infinite lazy container of streams.
import Streamly
import qualified Streamly.Prelude as S
main = (S.toList . serially $ (S.fromList [1,2]) <> (S.fromList [3,4])) >>= print
[1,2,3,4]
The Monad instance runs the monadic continuation for each
element of the stream, serially.
main = S.drain . serially $ do
x <- return 1 <> return 2
S.yieldM $ print x
1 2
SerialT nests streams serially in a depth first manner.
main = S.drain . serially $ do
x <- return 1 <> return 2
y <- return 3 <> return 4
S.yieldM $ print (x, y)
(1,3) (1,4) (2,3) (2,4)
We call the monadic code being run for each element of the stream a monadic
continuation. In imperative paradigm we can think of this composition as
nested for loops and the monadic continuation is the body of the loop. The
loop iterates for all elements of the stream.
Note that the behavior and semantics of SerialT, including Semigroup
and Monad instances are exactly like Haskell lists except that SerialT
can contain effectful actions while lists are pure.
In the code above, the serially combinator can be omitted as the default
stream type is SerialT.
Since: 0.2.0
Instances
The Semigroup operation for WSerialT interleaves the elements from the
two streams. Therefore, when a <> b is evaluated, stream a is evaluated
first to produce the first element of the combined stream and then stream
b is evaluated to produce the next element of the combined stream, and
then we go back to evaluating stream a and so on. In other words, the
elements of stream a are interleaved with the elements of stream b.
Note that evaluation of a <> b <> c does not schedule a, b and c
with equal priority. This expression is equivalent to a <> (b <> c),
therefore, it fairly interleaves a with the result of b <> c. For
example, S.fromList [1,2] <> S.fromList [3,4] <> S.fromList [5,6] ::
WSerialT Identity Int would result in [1,3,2,5,4,6]. In other words, the
leftmost stream gets the same scheduling priority as the rest of the
streams taken together. The same is true for each subexpression on the right.
Note that this operation cannot be used to fold a container of infinite streams as the state that it needs to maintain is proportional to the number of streams.
The W in the name stands for wide or breadth wise scheduling in
contrast to the depth wise scheduling behavior of SerialT.
import Streamly
import qualified Streamly.Prelude as S
main = (S.toList . wSerially $ (S.fromList [1,2]) <> (S.fromList [3,4])) >>= print
[1,3,2,4]
Similarly, the Monad instance interleaves the iterations of the
inner and the outer loop, nesting loops in a breadth first manner.
main = S.drain . wSerially $ do
x <- return 1 <> return 2
y <- return 3 <> return 4
S.yieldM $ print (x, y)
(1,3) (2,3) (1,4) (2,4)
Since: 0.2.0
Instances
Speculative Streams
When a stream consumer demands an element from a speculative stream
constructed as a `consM` b `consM` ... nil, the action a at the head
of the stream is executed and the output of the action is supplied to the
consumer. However, in addition to the action at the head multiple actions
following it may also be executed concurrently and the results buffered.
When the next element is demanded it may be served from the buffer and we
may execute the next action in the sequence to keep the buffer adequately
filled. Thus, the actions are executed concurrently but results consumed in
serial order just like serial streams. consM can be used to fold an
infinite lazy container of effects, as the number of concurrent executions
is limited.
Similar to consM, the monadic stream generation (e.g. replicateM) and
transformation operations (e.g. mapM) on speculative streams can execute
multiple effects concurrently in a speculative manner.
How many effects can be executed concurrently and how many results can be
buffered are controlled by maxThreads and maxBuffer combinators
respectively. The actual number of concurrent threads is adjusted according
to the rate at which the consumer is consuming the stream. It may even
execute actions serially in a single thread if that is enough to match the
consumer's speed.
Speculative streams enforce ordering of the results of actions in the stream but the side effects are only partially ordered. Therefore, the semigroup operation for speculative streams is not commutative from the pure outputs perspective but commutative from side effects perspective.
The Semigroup operation for AheadT appends two streams. The combined
stream behaves like a single stream with the actions from the second stream
appended to the first stream. The combined stream is evaluated in the
speculative style. This operation can be used to fold an infinite lazy
container of streams.
import Streamly import qualified Streamly.Prelude as S import Control.Concurrent main = do xs <- S.toList.aheadly$ (p 1 |: p 2 |: nil) <> (p 3 |: p 4 |: nil) print xs where p n = threadDelay 1000000 >> return n
[1,2,3,4]
Any exceptions generated by a constituent stream are propagated to the output stream.
The monad instance of AheadT may run each monadic continuation (bind)
concurrently in a speculative manner, performing side effects in a partially
ordered manner but producing the outputs in an ordered manner like
SerialT.
main = S.drain . aheadly $ do
n <- return 3 <> return 2 <> return 1
S.yieldM $ do
threadDelay (n * 1000000)
myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3
Since: 0.3.0
Instances
| MonadTrans AheadT Source # | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
| IsStream AheadT Source # | |
| (MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AheadT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
| (MonadState s m, MonadAsync m) => MonadState s (AheadT m) Source # | |
| (MonadReader r m, MonadAsync m) => MonadReader r (AheadT m) Source # | |
| MonadAsync m => Monad (AheadT m) Source # | |
| Monad m => Functor (AheadT m) Source # | |
| (Monad m, MonadAsync m) => Applicative (AheadT m) Source # | |
| (MonadIO m, MonadAsync m) => MonadIO (AheadT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
| (MonadThrow m, MonadAsync m) => MonadThrow (AheadT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
| MonadAsync m => Semigroup (AheadT m a) Source # | |
| MonadAsync m => Monoid (AheadT m a) Source # | |
Asynchronous Streams
Scheduling and execution: In an asynchronous stream a `consM` b `consM`
c ..., the actions a, b, and c are executed concurrently with the
consumer of the stream. The actions are scheduled for execution in the
same order as they are specified in the stream. Multiple scheduled actions
may be executed concurrently in parallel threads of execution. The
actions may be executed out of order and they may complete at arbitrary
times. Therefore, the effects of the actions may be observed out of
order.
Buffering: The results from multiple threads of execution are queued in a buffer as soon as they become available. The consumer of the stream is served from this buffer. Therefore, the consumer may observe the results to be out of order. In other words, an asynchronous stream is an unordered stream i.e. order does not matter.
Concurrency control: Threads are suspended if the maxBuffer limit is
reached, and resumed when the consumer makes space in the buffer. The
maximum number of concurrent threads depends on maxThreads. Number of
threads is increased or decreased based on the speed of the consumer.
Generation operations: Concurrent stream generation operations e.g.
replicateM when used in async style schedule and execute
the stream generating actions in the manner described above. The generation
actions run concurrently, effects and results of the actions as observed by
the consumer of the stream may be out of order.
Transformation operations: Concurrent stream transformation operations
e.g. mapM, when used in async style, schedule and
execute transformation actions in the manner described above. Transformation
actions run concurrently, effects and results of the actions may be
observed by the consumer out of order.
Variants: There are two asynchronous stream types AsyncT and WAsyncT.
They are identical with respect to single stream evaluation behavior. Their
behaviors differ in how they combine multiple streams using Semigroup or
Monad composition. Since the order of elements does not matter in
asynchronous streams the Semigroup operation is effectively commutative.
The Semigroup operation (<>) for AsyncT merges two streams
concurrently with priority given to the first stream. In s1 <> s2 <> s3
... the streams s1, s2 and s3 are scheduled for execution in that order.
Multiple scheduled streams may be executed concurrently and the elements
generated by them are served to the consumer as and when they become
available. This behavior is similar to the scheduling and execution behavior
of actions in a single async stream.
Since only a finite number of streams are executed concurrently, this operation can be used to fold an infinite lazy container of streams.
import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent
main = (S.toList . asyncly $ (S.fromList [1,2]) <> (S.fromList [3,4])) >>= print
[1,2,3,4]
Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.
Similarly, the monad instance of AsyncT may run each iteration
concurrently based on demand. More concurrent iterations are started only
if the previous iterations are not able to produce enough output for the
consumer.
main =drain.asyncly$ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3
Since: 0.1.0
Instances
| MonadTrans AsyncT Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
| IsStream AsyncT Source # | |
| (MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
| (MonadState s m, MonadAsync m) => MonadState s (AsyncT m) Source # | |
| (MonadReader r m, MonadAsync m) => MonadReader r (AsyncT m) Source # | |
| MonadAsync m => Monad (AsyncT m) Source # | |
| Monad m => Functor (AsyncT m) Source # | |
| (Monad m, MonadAsync m) => Applicative (AsyncT m) Source # | |
| (MonadIO m, MonadAsync m) => MonadIO (AsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
| (MonadThrow m, MonadAsync m) => MonadThrow (AsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
| MonadAsync m => Semigroup (AsyncT m a) Source # | |
| MonadAsync m => Monoid (AsyncT m a) Source # | |
WAsyncT is similar to WSerialT but with concurrent execution.
The Semigroup operation (<>) for WAsyncT merges two streams
concurrently interleaving the actions from both the streams. In s1
<> s2 <> s3 ..., the individual actions from streams s1, s2 and s3
are scheduled for execution in a round-robin fashion. Multiple scheduled
actions may be executed concurrently, the results from concurrent executions
are consumed in the order in which they become available.
The W in the name stands for wide or breadth wise scheduling in
contrast to the depth wise scheduling behavior of AsyncT.
import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent
main = (S.toList . wAsyncly . maxThreads 1 $ (S.fromList [1,2]) <> (S.fromList [3,4])) >>= print
[1,3,2,4]
For this example, we are using maxThreads 1 so that concurrent thread
scheduling does not affect the results and make them unpredictable. Let's
now take a more general example:
main = (S.toList . wAsyncly . maxThreads 1 $ (S.fromList [1,2,3]) <> (S.fromList [4,5,6]) <> (S.fromList [7,8,9])) >>= print
[1,4,2,7,5,3,8,6,9]
This is how the execution of the above stream proceeds:
- The scheduler queue is initialized with
[S.fromList [1,2,3], (S.fromList [4,5,6]) <> (S.fromList [7,8,9])]assuming the head of the queue is represented by the rightmost item. S.fromList [1,2,3]is executed, yielding the element1and putting[2,3]at the back of the scheduler queue. The scheduler queue now looks like[(S.fromList [4,5,6]) <> (S.fromList [7,8,9]), S.fromList [2,3]].- Now
(S.fromList [4,5,6]) <> (S.fromList [7,8,9])is picked up for execution,S.fromList [7,8,9]is added at the back of the queue andS.fromList [4,5,6]is executed, yielding the element4and addingS.fromList [5,6]at the back of the queue. The queue now looks like[S.fromList [2,3], S.fromList [7,8,9], S.fromList [5,6]]. - Note that the scheduler queue expands by one more stream component in
every pass because one more
<>is broken down into two components. At this point there are no more<>operations to be broken down further and the queue has reached its maximum size. Now these streams are scheduled in round-robin fashion yielding[2,7,5,3,8,8,9].
As we see above, in a right associated expression composed with <>, only
one <> operation is broken down into two components in one execution,
therefore, if we have n streams composed using <> it will take n
scheduler passes to expand the whole expression. By the time n-th
component is added to the scheduler queue, the first component would have
received n scheduler passes.
Since all streams get interleaved, this operation is not suitable for
folding an infinite lazy container of infinite size streams. However, if
the streams are small, the streams on the left may get finished before more
streams are added to the scheduler queue from the right side of the
expression, so it may be possible to fold an infinite lazy container of
streams. For example, if the streams are of size n then at most n
streams would be in the scheduler queue at a time.
Note that WSerialT and WAsyncT differ in their scheduling behavior,
therefore the output of WAsyncT even with a single thread of execution is
not the same as that of WSerialT See notes in WSerialT for details about
its scheduling behavior.
Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.
Similarly, the Monad instance of WAsyncT runs all iterations fairly
concurrently using a round robin scheduling.
main =drain.wAsyncly$ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3
Since: 0.2.0
Instances
| MonadTrans WAsyncT Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
| IsStream WAsyncT Source # | |
| (MonadBase b m, Monad m, MonadAsync m) => MonadBase b (WAsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
| (MonadState s m, MonadAsync m) => MonadState s (WAsyncT m) Source # | |
| (MonadReader r m, MonadAsync m) => MonadReader r (WAsyncT m) Source # | |
| MonadAsync m => Monad (WAsyncT m) Source # | |
| Monad m => Functor (WAsyncT m) Source # | |
| (Monad m, MonadAsync m) => Applicative (WAsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
| (MonadIO m, MonadAsync m) => MonadIO (WAsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
| (MonadThrow m, MonadAsync m) => MonadThrow (WAsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
| MonadAsync m => Semigroup (WAsyncT m a) Source # | |
| MonadAsync m => Monoid (WAsyncT m a) Source # | |
Async composition with strict concurrent execution of all streams.
The Semigroup instance of ParallelT executes both the streams
concurrently without any delay or without waiting for the consumer demand
and merges the results as they arrive. If the consumer does not consume
the results, they are buffered upto a configured maximum, controlled by the
maxBuffer primitive. If the buffer becomes full the concurrent tasks will
block until there is space in the buffer.
Both WAsyncT and ParallelT, evaluate the constituent streams fairly in a
round robin fashion. The key difference is that WAsyncT might wait for the
consumer demand before it executes the tasks whereas ParallelT starts
executing all the tasks immediately without waiting for the consumer demand.
For WAsyncT the maxThreads limit applies whereas for ParallelT it does
not apply. In other words, WAsyncT can be lazy whereas ParallelT is
strict.
ParallelT is useful for cases when the streams are required to be
evaluated simultaneously irrespective of how the consumer consumes them e.g.
when we want to race two tasks and want to start both strictly at the same
time or if we have timers in the parallel tasks and our results depend on
the timers being started at the same time. If we do not have such
requirements then AsyncT or AheadT are recommended as they can be more
efficient than ParallelT.
main = (toList.parallely$ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,3,2,4]
When streams with more than one element are merged, it yields whichever
stream yields first without any bias, unlike the Async style streams.
Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.
Similarly, the Monad instance of ParallelT runs all iterations
of the loop concurrently.
import Streamly import qualified Streamly.Prelude as S import Control.Concurrent main =drain.parallely$ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3
Note that parallel composition can only combine a finite number of streams as it needs to retain state for each unfinished stream.
Since: 0.7.0 (maxBuffer applies to ParallelT streams)
Since: 0.1.0
Instances
| MonadTrans ParallelT Source # | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
| IsStream ParallelT Source # | |
| (MonadBase b m, Monad m, MonadAsync m) => MonadBase b (ParallelT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
| (MonadState s m, MonadAsync m) => MonadState s (ParallelT m) Source # | |
| (MonadReader r m, MonadAsync m) => MonadReader r (ParallelT m) Source # | |
| MonadAsync m => Monad (ParallelT m) Source # | |
| Monad m => Functor (ParallelT m) Source # | |
| (Monad m, MonadAsync m) => Applicative (ParallelT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
| (MonadIO m, MonadAsync m) => MonadIO (ParallelT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
| (MonadThrow m, MonadAsync m) => MonadThrow (ParallelT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
| MonadAsync m => Semigroup (ParallelT m a) Source # | |
| MonadAsync m => Monoid (ParallelT m a) Source # | |
Zipping Streams
ZipSerialM and ZipAsyncM, provide Applicative instances for zipping the
corresponding elements of two streams together. Note that these types are
not monads.
data ZipSerialM m a Source #
The applicative instance of ZipSerialM zips a number of streams serially
i.e. it produces one element from each stream serially and then zips all
those elements.
main = (toList . zipSerially $ (,,) <$> s1 <*> s2 <*> s3) >>= print
where s1 = fromFoldable [1, 2]
s2 = fromFoldable [3, 4]
s3 = fromFoldable [5, 6]
[(1,3,5),(2,4,6)]
The Semigroup instance of this type works the same way as that of
SerialT.
Since: 0.2.0
Instances
Like ZipSerialM but zips in parallel, it generates all the elements to
be zipped concurrently.
main = (toList . zipAsyncly $ (,,) <$> s1 <*> s2 <*> s3) >>= print
where s1 = fromFoldable [1, 2]
s2 = fromFoldable [3, 4]
s3 = fromFoldable [5, 6]
[(1,3,5),(2,4,6)]
The Semigroup instance of this type works the same way as that of
SerialT.
Since: 0.2.0
Parallel Function Application
Stream processing functions can be composed in a chain using function
application with or without the $ operator, or with reverse function
application operator &. Streamly provides concurrent versions of these
operators applying stream processing functions such that each stage of the
stream can run in parallel. The operators start with a |; we can read |$
as "parallel dollar" to remember that | comes before $.
Imports for the code snippets below:
import Streamly import qualified Streamly.Prelude as S import Control.Concurrent
(|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b infixr 0 Source #
Parallel transform application operator; applies a stream transformation
function t m a -> t m b to a stream t m a concurrently; the input stream
is evaluated asynchronously in an independent thread yielding elements to a
buffer and the transformation function runs in another thread consuming the
input from the buffer. |$ is just like regular function application
operator $ except that it is concurrent.
If you read the signature as (t m a -> t m b) -> (t m a -> t m b) you can
look at it as a transformation that converts a transform function to a
buffered concurrent transform function.
The following code prints a value every second even though each stage adds a 1 second delay.
drain $
S.mapM (\x -> threadDelay 1000000 >> print x)
|$ S.repeatM (threadDelay 1000000 >> return 1)
Concurrent
Since: 0.3.0
(|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b infixl 1 Source #
Parallel reverse function application operator for streams; just like the
regular reverse function application operator & except that it is
concurrent.
drain $
S.repeatM (threadDelay 1000000 >> return 1)
|& S.mapM (\x -> threadDelay 1000000 >> print x)
Concurrent
Since: 0.3.0
(|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b infixr 0 Source #
Parallel fold application operator; applies a fold function t m a -> m b
to a stream t m a concurrently; The the input stream is evaluated
asynchronously in an independent thread yielding elements to a buffer and
the folding action runs in another thread consuming the input from the
buffer.
If you read the signature as (t m a -> m b) -> (t m a -> m b) you can look
at it as a transformation that converts a fold function to a buffered
concurrent fold function.
The . at the end of the operator is a mnemonic for termination of the
stream.
S.foldlM' (\_ a -> threadDelay 1000000 >> print a) ()
|$. S.repeatM (threadDelay 1000000 >> return 1)
Concurrent
Since: 0.3.0
(|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b infixl 1 Source #
Parallel reverse function application operator for applying a run or fold
functions to a stream. Just like |$. except that the operands are reversed.
S.repeatM (threadDelay 1000000 >> return 1) |&. S.foldlM' (\_ a -> threadDelay 1000000 >> print a) ()
Concurrent
Since: 0.3.0
mkAsync :: (IsStream t, MonadAsync m) => t m a -> m (t m a) Source #
Make a stream asynchronous, triggers the computation and returns a stream
in the underlying monad representing the output generated by the original
computation. The returned action is exhaustible and must be drained once. If
not drained fully we may have a thread blocked forever and once exhausted it
will always return empty.
Since: 0.2.0
Merging Streams
The Semigroup operation <> of each stream type combines two streams in a
type specific manner. This section provides polymorphic versions of <>
which can be used to combine two streams in a predetermined way irrespective
of the type.
ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #
async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #
wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #
parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #
Concurrency Control
These combinators can be used at any point in a stream composition to set parameters to control the concurrency of the argument stream. A control parameter set at any point remains effective for any concurrent combinators used in the argument stream until it is reset by using the combinator again. These control parameters have no effect on non-concurrent combinators in the stream, or on non-concurrent streams.
Pitfall: Remember that maxBuffer in the following example applies to
mapM and any other combinators that may follow it, and it does not apply
to the combinators before it:
... $ maxBuffer 10 $ S.mapM ... ...
If we use & instead of $ the situation will reverse, in the following
example, maxBuffer does not apply to mapM, it applies to combinators
that come before it, because those are the arguments to maxBuffer:
... & maxBuffer 10 & S.mapM ... ...
maxThreads :: IsStream t => Int -> t m a -> t m a Source #
Specify the maximum number of threads that can be spawned concurrently for
any concurrent combinator in a stream.
A value of 0 resets the thread limit to default, a negative value means
there is no limit. The default value is 1500. maxThreads does not affect
ParallelT streams as they can use unbounded number of threads.
When the actions in a stream are IO bound, having blocking IO calls, this option can be used to control the maximum number of in-flight IO requests. When the actions are CPU bound this option can be used to control the amount of CPU used by the stream.
Since: 0.4.0
maxBuffer :: IsStream t => Int -> t m a -> t m a Source #
Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.
CAUTION! using an unbounded maxBuffer value (i.e. a negative value)
coupled with an unbounded maxThreads value is a recipe for disaster in
presence of infinite streams, or very large streams. Especially, it must
not be used when pure is used in ZipAsyncM streams as pure in
applicative zip streams generates an infinite stream causing unbounded
concurrent generation with no limit on the buffer or threads.
Since: 0.4.0
Rate Limiting
Specifies the stream yield rate in yields per second (Hertz).
We keep accumulating yield credits at rateGoal. At any point of time we
allow only as many yields as we have accumulated as per rateGoal since the
start of time. If the consumer or the producer is slower or faster, the
actual rate may fall behind or exceed rateGoal. We try to recover the gap
between the two by increasing or decreasing the pull rate from the producer.
However, if the gap becomes more than rateBuffer we try to recover only as
much as rateBuffer.
rateLow puts a bound on how low the instantaneous rate can go when
recovering the rate gap. In other words, it determines the maximum yield
latency. Similarly, rateHigh puts a bound on how high the instantaneous
rate can go when recovering the rate gap. In other words, it determines the
minimum yield latency. We reduce the latency by increasing concurrency,
therefore we can say that it puts an upper bound on concurrency.
If the rateGoal is 0 or negative the stream never yields a value.
If the rateBuffer is 0 or negative we do not attempt to recover.
Since: 0.5.0
rate :: IsStream t => Maybe Rate -> t m a -> t m a Source #
Specify the pull rate of a stream.
A Nothing value resets the rate to default which is unlimited. When the
rate is specified, concurrent production may be ramped up or down
automatically to achieve the specified yield rate. The specific behavior for
different styles of Rate specifications is documented under Rate. The
effective maximum production rate achieved by a stream is governed by:
- The
maxThreadslimit - The
maxBufferlimit - The maximum rate that the stream producer can achieve
- The maximum rate that the stream consumer can achieve
Since: 0.5.0
avgRate :: IsStream t => Double -> t m a -> t m a Source #
Same as rate (Just $ Rate (r/2) r (2*r) maxBound)
Specifies the average production rate of a stream in number of yields
per second (i.e. Hertz). Concurrent production is ramped up or down
automatically to achieve the specified average yield rate. The rate can
go down to half of the specified rate on the lower side and double of
the specified rate on the higher side.
Since: 0.5.0
minRate :: IsStream t => Double -> t m a -> t m a Source #
Same as rate (Just $ Rate r r (2*r) maxBound)
Specifies the minimum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go below the specified rate, even though it may possibly go above it at times, the upper limit is double of the specified rate.
Since: 0.5.0
maxRate :: IsStream t => Double -> t m a -> t m a Source #
Same as rate (Just $ Rate (r/2) r r maxBound)
Specifies the maximum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go above the specified rate, even though it may possibly go below it at times, the lower limit is half of the specified rate. This can be useful in applications where certain resource usage must not be allowed to go beyond certain limits.
Since: 0.5.0
constRate :: IsStream t => Double -> t m a -> t m a Source #
Same as rate (Just $ Rate r r r 0)
Specifies a constant yield rate. If for some reason the actual rate goes above or below the specified rate we do not try to recover it by increasing or decreasing the rate in future. This can be useful in applications like graphics frame refresh where we need to maintain a constant refresh rate.
Since: 0.5.0
Stream Type Adapters
You may want to use different stream composition styles at different points
in your program. Stream types can be freely converted or adapted from one
type to another. The IsStream type class facilitates type conversion of
one stream type to another. It is not used directly, instead the type
combinators provided below are used for conversions.
To adapt from one monomorphic type (e.g. AsyncT) to another monomorphic
type (e.g. SerialT) use the adapt combinator. To give a polymorphic code
a specific interpretation or to adapt a specific type to a polymorphic type
use the type specific combinators e.g. asyncly or wSerially. You
cannot adapt polymorphic code to polymorphic code, as the compiler would not know
which specific type you are converting from or to. If you see a an
ambiguous type variable error then most likely you are using adapt
unnecessarily on polymorphic code.
class (forall m a. MonadAsync m => Semigroup (t m a), forall m a. MonadAsync m => Monoid (t m a), forall m. Monad m => Functor (t m), forall m. MonadAsync m => Applicative (t m)) => IsStream t Source #
Class of types that can represent a stream of elements of some type a in
some monad m.
Since: 0.2.0
Minimal complete definition
Instances
| IsStream Stream Source # | |
| IsStream ZipAsyncM Source # | |
| IsStream ZipSerialM Source # | |
Defined in Streamly.Internal.Data.Stream.Zip Methods toStream :: ZipSerialM m a -> Stream m a Source # fromStream :: Stream m a -> ZipSerialM m a Source # consM :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source # (|:) :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source # | |
| IsStream WSerialT Source # | |
| IsStream SerialT Source # | |
| IsStream ParallelT Source # | |
| IsStream WAsyncT Source # | |
| IsStream AsyncT Source # | |
| IsStream AheadT Source # | |
serially :: IsStream t => SerialT m a -> t m a Source #
Fix the type of a polymorphic stream as SerialT.
Since: 0.1.0
wSerially :: IsStream t => WSerialT m a -> t m a Source #
Fix the type of a polymorphic stream as WSerialT.
Since: 0.2.0
asyncly :: IsStream t => AsyncT m a -> t m a Source #
Fix the type of a polymorphic stream as AsyncT.
Since: 0.1.0
aheadly :: IsStream t => AheadT m a -> t m a Source #
Fix the type of a polymorphic stream as AheadT.
Since: 0.3.0
wAsyncly :: IsStream t => WAsyncT m a -> t m a Source #
Fix the type of a polymorphic stream as WAsyncT.
Since: 0.2.0
parallely :: IsStream t => ParallelT m a -> t m a Source #
Fix the type of a polymorphic stream as ParallelT.
Since: 0.1.0
zipSerially :: IsStream t => ZipSerialM m a -> t m a Source #
Fix the type of a polymorphic stream as ZipSerialM.
Since: 0.2.0
zipAsyncly :: IsStream t => ZipAsyncM m a -> t m a Source #
Fix the type of a polymorphic stream as ZipAsyncM.
Since: 0.2.0
adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a Source #
Adapt any specific stream type to any other specific stream type.
Since: 0.1.0
IO Streams
type Serial = SerialT IO Source #
A serial IO stream of elements of type a. See SerialT documentation
for more details.
Since: 0.2.0
type WSerial = WSerialT IO Source #
An interleaving serial IO stream of elements of type a. See WSerialT
documentation for more details.
Since: 0.2.0
type Ahead = AheadT IO Source #
A serial IO stream of elements of type a with concurrent lookahead. See
AheadT documentation for more details.
Since: 0.3.0
type Async = AsyncT IO Source #
A demand driven left biased parallely composing IO stream of elements of
type a. See AsyncT documentation for more details.
Since: 0.2.0
type WAsync = WAsyncT IO Source #
A round robin parallely composing IO stream of elements of type a.
See WAsyncT documentation for more details.
Since: 0.2.0
type Parallel = ParallelT IO Source #
A parallely composing IO stream of elements of type a.
See ParallelT documentation for more details.
Since: 0.2.0
type ZipSerial = ZipSerialM IO Source #
An IO stream whose applicative instance zips streams serially.
Since: 0.2.0
type ZipAsync = ZipAsyncM IO Source #
An IO stream whose applicative instance zips streams wAsyncly.
Since: 0.2.0
Folding Containers of Streams
These are variants of standard Foldable fold functions that use a
polymorphic stream sum operation (e.g. async or wSerial) to fold a
finite container of streams. Note that these are just special cases of
the more general concatMapWith operation.
foldMapWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b Source #
forEachWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b Source #
Like foldMapWith but with the last two arguments reversed i.e. the
monadic streaming function is the last argument.
Equivalent to:
forEachWith = flip S.foldMapWith
Since: 0.1.0 (Streamly)
Re-exports
The class of semigroups (types with an associative binary operation).
Instances should satisfy the associativity law:
Since: base-4.9.0.0
Minimal complete definition
Methods
(<>) :: a -> a -> a infixr 6 #
An associative operation.
Reduce a non-empty list with <>
The default definition should be sufficient, but this can be overridden for efficiency.
stimes :: Integral b => b -> a -> a #
Repeat a value n times.
Given that this works on a Semigroup it is allowed to fail if
you request 0 or fewer repetitions, and the default definition
will do so.
By making this a member of the class, idempotent semigroups
and monoids can upgrade this to execute in O(1) by
picking stimes = or stimesIdempotentstimes =
respectively.stimesIdempotentMonoid
Instances
| Semigroup Ordering | Since: base-4.9.0.0 |
| Semigroup () | Since: base-4.9.0.0 |
| Semigroup Void | Since: base-4.9.0.0 |
| Semigroup All | Since: base-4.9.0.0 |
| Semigroup Any | Since: base-4.9.0.0 |
| Semigroup ByteArray | |
| Semigroup [a] | Since: base-4.9.0.0 |
| Semigroup a => Semigroup (Maybe a) | Since: base-4.9.0.0 |
| Semigroup a => Semigroup (IO a) | Since: base-4.10.0.0 |
| Semigroup p => Semigroup (Par1 p) | Since: base-4.12.0.0 |
| Ord a => Semigroup (Min a) | Since: base-4.9.0.0 |
| Ord a => Semigroup (Max a) | Since: base-4.9.0.0 |
| Semigroup (First a) | Since: base-4.9.0.0 |
| Semigroup (Last a) | Since: base-4.9.0.0 |
| Monoid m => Semigroup (WrappedMonoid m) | Since: base-4.9.0.0 |
Defined in Data.Semigroup Methods (<>) :: WrappedMonoid m -> WrappedMonoid m -> WrappedMonoid m # sconcat :: NonEmpty (WrappedMonoid m) -> WrappedMonoid m # stimes :: Integral b => b -> WrappedMonoid m -> WrappedMonoid m # | |
| Semigroup a => Semigroup (Option a) | Since: base-4.9.0.0 |
| Semigroup a => Semigroup (Identity a) | Since: base-4.9.0.0 |
| Semigroup (First a) | Since: base-4.9.0.0 |
| Semigroup (Last a) | Since: base-4.9.0.0 |
| Semigroup a => Semigroup (Dual a) | Since: base-4.9.0.0 |
| Semigroup (Endo a) | Since: base-4.9.0.0 |
| Num a => Semigroup (Sum a) | Since: base-4.9.0.0 |
| Num a => Semigroup (Product a) | Since: base-4.9.0.0 |
| Semigroup a => Semigroup (Down a) | Since: base-4.11.0.0 |
| Semigroup (NonEmpty a) | Since: base-4.9.0.0 |
| Ord a => Semigroup (Set a) | Since: containers-0.5.7 |
| Semigroup (Heap a) | |
| Semigroup (Array a) | Since: primitive-0.6.3.0 |
| Semigroup (SmallArray a) Source # | Since: 0.6.3.0 |
Defined in Streamly.Internal.Data.SmallArray.Types Methods (<>) :: SmallArray a -> SmallArray a -> SmallArray a # sconcat :: NonEmpty (SmallArray a) -> SmallArray a # stimes :: Integral b => b -> SmallArray a -> SmallArray a # | |
| Semigroup (MergeSet a) | |
| Storable a => Semigroup (Array a) Source # | |
| Semigroup (ZipList a) Source # | |
| Semigroup (List a) Source # | |
| Semigroup b => Semigroup (a -> b) | Since: base-4.9.0.0 |
| Semigroup (Either a b) | Since: base-4.9.0.0 |
| Semigroup (V1 p) | Since: base-4.12.0.0 |
| Semigroup (U1 p) | Since: base-4.12.0.0 |
| (Semigroup a, Semigroup b) => Semigroup (a, b) | Since: base-4.9.0.0 |
| Semigroup a => Semigroup (ST s a) | Since: base-4.11.0.0 |
| Semigroup (Proxy s) | Since: base-4.9.0.0 |
| Ord k => Semigroup (Map k v) | |
| Semigroup (Stream m a) Source # | |
| Semigroup (ZipAsyncM m a) Source # | |
| Semigroup (ZipSerialM m a) Source # | |
Defined in Streamly.Internal.Data.Stream.Zip Methods (<>) :: ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a # sconcat :: NonEmpty (ZipSerialM m a) -> ZipSerialM m a # stimes :: Integral b => b -> ZipSerialM m a -> ZipSerialM m a # | |
| Semigroup (WSerialT m a) Source # | |
| Semigroup (SerialT m a) Source # | |
| MonadAsync m => Semigroup (ParallelT m a) Source # | |
| MonadAsync m => Semigroup (WAsyncT m a) Source # | |
| MonadAsync m => Semigroup (AsyncT m a) Source # | |
| MonadAsync m => Semigroup (AheadT m a) Source # | |
| Semigroup (f p) => Semigroup (Rec1 f p) | Since: base-4.12.0.0 |
| (Semigroup a, Semigroup b, Semigroup c) => Semigroup (a, b, c) | Since: base-4.9.0.0 |
| Semigroup a => Semigroup (Const a b) | Since: base-4.9.0.0 |
| (Applicative f, Semigroup a) => Semigroup (Ap f a) | Since: base-4.12.0.0 |
| Alternative f => Semigroup (Alt f a) | Since: base-4.9.0.0 |
| Monad m => Semigroup (Pipe m a b) Source # | |
| (Semigroup b, Monad m) => Semigroup (Fold m a b) Source # | Combines the outputs of the folds (the type |
| Semigroup c => Semigroup (K1 i c p) | Since: base-4.12.0.0 |
| (Semigroup (f p), Semigroup (g p)) => Semigroup ((f :*: g) p) | Since: base-4.12.0.0 |
| (Semigroup a, Semigroup b, Semigroup c, Semigroup d) => Semigroup (a, b, c, d) | Since: base-4.9.0.0 |
| Semigroup (f p) => Semigroup (M1 i c f p) | Since: base-4.12.0.0 |
| Semigroup (f (g p)) => Semigroup ((f :.: g) p) | Since: base-4.12.0.0 |
| (Semigroup a, Semigroup b, Semigroup c, Semigroup d, Semigroup e) => Semigroup (a, b, c, d, e) | Since: base-4.9.0.0 |
Deprecated
runStream :: Monad m => SerialT m a -> m () Source #
Deprecated: Please use Streamly.Prelude.drain instead.
Same as "Streamly.Prelude.runStream".
runStreaming :: (Monad m, IsStream t) => t m a -> m () Source #
runStreamT :: Monad m => SerialT m a -> m () Source #
Deprecated: Please use runStream instead.
Same as runStream.
Since: 0.1.0
runInterleavedT :: Monad m => WSerialT m a -> m () Source #
Deprecated: Please use 'runStream . interleaving' instead.
Same as runStream . wSerially.
Since: 0.1.0
runAsyncT :: Monad m => AsyncT m a -> m () Source #
Deprecated: Please use 'runStream . asyncly' instead.
Same as runStream . asyncly.
Since: 0.1.0
runParallelT :: Monad m => ParallelT m a -> m () Source #
Deprecated: Please use 'runStream . parallely' instead.
Same as runStream . parallely.
Since: 0.1.0
runZipStream :: Monad m => ZipSerialM m a -> m () Source #
Deprecated: Please use 'runStream . zipSerially instead.
Same as runStream . zipping.
Since: 0.1.0
runZipAsync :: Monad m => ZipAsyncM m a -> m () Source #
Deprecated: Please use 'runStream . zipAsyncly instead.
Same as runStream . zippingAsync.
Since: 0.1.0
type InterleavedT = WSerialT Source #
Deprecated: Please use WSerialT instead.
Since: 0.1.0
type ZipStream = ZipSerialM Source #
Deprecated: Please use ZipSerialM instead.
Since: 0.1.0
interleaving :: IsStream t => WSerialT m a -> t m a Source #
zipping :: IsStream t => ZipSerialM m a -> t m a Source #
zippingAsync :: IsStream t => ZipAsyncM m a -> t m a Source #