| Copyright | (c) 2017 Composewell Technologies |
|---|---|
| License | BSD3 |
| Maintainer | streamly@composewell.com |
| Stability | experimental |
| Portability | GHC |
| Safe Haskell | Safe-Inferred |
| Language | Haskell2010 |
Streamly
Description
Deprecated: Please use Streamly.Prelude instead.
Streamly is a general purpose programming framework using cocnurrent data
flow programming paradigm. It can be considered as a generalization of
Haskell lists to monadic streaming with concurrent composition capability.
The serial stream type in streamly SerialT m a is like the list type [a]
parameterized by the monad m. For example, SerialT IO a is a moral
equivalent of [a] in the IO monad. Streams are constructed very much like
lists, except that they use nil and cons instead of '[]' and :.
> import Streamly > import Streamly.Prelude (cons, consM) > import qualified Streamly.Prelude as S > > S.toList $ 1 `cons` 2 `cons` 3 `cons` nil [1,2,3]
Unlike lists, streams can be constructed from monadic effects:
> S.toList$getLine`consM`getLine`consM` S.nilhello world ["hello","world"]
Streams are processed just like lists, with list like combinators, except that they are monadic and work in a streaming fashion. Here is a simple console echo program example:
> S.drain $ S.repeatM getLine & S.mapM putStrLn
SerialT Identity a is a moral equivalent of pure lists. Streamly utilizes
fusion for high performance, therefore, we can represent and process strings
as streams of Char, encode and decode the streams to/from UTF8 and
serialize them to Array Word8 obviating the need for special purpose
libraries like bytestring and text.
For more details please see the Streamly.Tutorial module and the examples directory in this package.
Synopsis
- type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m)
- data SerialT m a
- data WSerialT m a
- data AheadT m a
- data AsyncT m a
- data WAsyncT m a
- data ParallelT m a
- data ZipSerialM m a
- data ZipAsyncM m a
- (|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b
- (|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b
- (|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b
- (|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b
- mkAsync :: (IsStream t, MonadAsync m) => t m a -> m (t m a)
- serial :: IsStream t => t m a -> t m a -> t m a
- wSerial :: IsStream t => t m a -> t m a -> t m a
- ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- maxThreads :: IsStream t => Int -> t m a -> t m a
- maxBuffer :: IsStream t => Int -> t m a -> t m a
- data Rate = Rate {}
- rate :: IsStream t => Maybe Rate -> t m a -> t m a
- avgRate :: IsStream t => Double -> t m a -> t m a
- minRate :: IsStream t => Double -> t m a -> t m a
- maxRate :: IsStream t => Double -> t m a -> t m a
- constRate :: IsStream t => Double -> t m a -> t m a
- class (forall m a. MonadAsync m => Semigroup (t m a), forall m a. MonadAsync m => Monoid (t m a), forall m. Monad m => Functor (t m), forall m. MonadAsync m => Applicative (t m)) => IsStream t
- serially :: IsStream t => SerialT m a -> t m a
- wSerially :: IsStream t => WSerialT m a -> t m a
- asyncly :: IsStream t => AsyncT m a -> t m a
- aheadly :: IsStream t => AheadT m a -> t m a
- wAsyncly :: IsStream t => WAsyncT m a -> t m a
- parallely :: IsStream t => ParallelT m a -> t m a
- zipSerially :: IsStream t => ZipSerialM m a -> t m a
- zipAsyncly :: IsStream t => ZipAsyncM m a -> t m a
- adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a
- type Serial = SerialT IO
- type WSerial = WSerialT IO
- type Ahead = AheadT IO
- type Async = AsyncT IO
- type WAsync = WAsyncT IO
- type Parallel = ParallelT IO
- type ZipSerial = ZipSerialM IO
- type ZipAsync = ZipAsyncM IO
- foldWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a
- foldMapWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b
- forEachWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b
- class Semigroup a where
- type Streaming = IsStream
- runStream :: Monad m => SerialT m a -> m ()
- runStreaming :: (Monad m, IsStream t) => t m a -> m ()
- runStreamT :: Monad m => SerialT m a -> m ()
- runInterleavedT :: Monad m => WSerialT m a -> m ()
- runAsyncT :: Monad m => AsyncT m a -> m ()
- runParallelT :: Monad m => ParallelT m a -> m ()
- runZipStream :: Monad m => ZipSerialM m a -> m ()
- runZipAsync :: Monad m => ZipAsyncM m a -> m ()
- type StreamT = SerialT
- type InterleavedT = WSerialT
- type ZipStream = ZipSerialM
- interleaving :: IsStream t => WSerialT m a -> t m a
- zipping :: IsStream t => ZipSerialM m a -> t m a
- zippingAsync :: IsStream t => ZipAsyncM m a -> t m a
- (<=>) :: IsStream t => t m a -> t m a -> t m a
- (<|) :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
Module Overview
The basic stream type is Serial, it represents a sequence of IO actions,
and is a Monad. The type SerialT is a monad transformer that can
represent a sequence of actions in an arbitrary monad. The type Serial is
in fact a synonym for SerialT IO. There are a few more types similar to
SerialT, all of them represent a stream and differ only in the
Semigroup, Applicative and Monad compositions of the stream. Serial
and WSerial types compose serially whereas Async and WAsync
types compose concurrently. All these types can be freely inter-converted
using type combinators without any cost. You can freely switch to any type
of composition at any point in the program. When no type annotation or
explicit stream type combinators are used, the default stream type is
inferred as Serial.
This module exports stream types, instances and combinators for:
- converting between different stream types
- appending and concurrently merging streams
- Concurrency control
- Concurrent function application
- Stream rate control
This module is designed to be imported unqualified:
import Streamly
See the Streamly.Prelude module for APIs for construction, generation, elimination and transformation of streams.
Type Synonyms
type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m) Source #
A monad that can perform concurrent or parallel IO operations. Streams
that can be composed concurrently require the underlying monad to be
MonadAsync.
Since: 0.1.0 (Streamly)
Since: 0.8.0
Stream transformers
A stream represents a sequence of pure or effectful actions. The
cons and IsStream operations and the corresponding operators .: and
|: can be used to join pure values or effectful actions in a sequence.
The effects in the stream can be executed in many different ways
depending on the type of stream. In other words, the behavior of IsStream
depends on the type of the stream.
There are three high level categories of streams, spatially ordered
streams, speculative streams and time ordered streams. Spatially
ordered streams, SerialT and WSerialT, execute the effects in serial
order i.e. one at a time and present the outputs of those effects to the
consumer in the same order. Speculative streams, AheadT, may execute
many effects concurrently but present the outputs to the consumer in the
specified spatial order. Time ordered streams, AsyncT, WAsyncT and
ParallelT, may execute many effects concurrently and present the
outputs of those effects to the consumer in time order i.e. as soon as
the output is generated.
We described above how the effects in a sequence are executed for
different types of streams. The behvavior of the Semigroup and Monad
instances follow the behavior of IsStream. Stream generation operations
like repeatM also execute the effects differently for different
streams, providing a concurrent generation capability when used with
stream types that execute effects concurrently. Similarly, effectful
transformation operations like mapM also execute the transforming
effects differently for different types of streams.
Serial Streams
When a stream consumer demands an element from a serial stream constructed
as a `consM` b `consM` ... nil, the action a at the head of the stream
sequence is executed and the result is supplied to the consumer. When the
next element is demanded, the action b is executed and its result is
supplied. Thus, the effects are performed and results are consumed strictly
in a serial order. Serial streams can be considered as spatially ordered
streams as the order of execution and consumption is the same as the spatial
order in which the actions are composed by the programmer.
Serial streams enforce the side effects as well as the results of the actions to be in the same order in which the actions are added to the stream. Therefore, the semigroup operation for serial streams is not commutative:
a <> b is not the same as b <> a
There are two serial stream types SerialT and WSerialT. The stream
evaluation of both the variants works in the same way as described above,
they differ only in the Semigroup and Monad implementaitons.
For SerialT streams:
(<>) =serial--Semigroup(>>=) = flip .concatMapWithserial--Monad
A single Monad bind behaves like a for loop:
>>>:{Stream.toList $ do x <- Stream.fromList [1,2] -- foreach x in stream return x :} [1,2]
Nested monad binds behave like nested for loops:
>>>:{Stream.toList $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [3,4] -- foreach y in stream return (x, y) :} [(1,3),(1,4),(2,3),(2,4)]
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
For WSerialT streams:
(<>) =wSerial--Semigroup(>>=) = flip .concatMapWithwSerial--Monad
Note that <> is associative only if we disregard the ordering of elements
in the resulting stream.
A single Monad bind behaves like a for loop:
>>>:{Stream.toList $ Stream.fromWSerial $ do x <- Stream.fromList [1,2] -- foreach x in stream return x :} [1,2]
Nested monad binds behave like interleaved nested for loops:
>>>:{Stream.toList $ Stream.fromWSerial $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [3,4] -- foreach y in stream return (x, y) :} [(1,3),(2,3),(1,4),(2,4)]
It is a result of interleaving all the nested iterations corresponding to
element 1 in the first stream with all the nested iterations of element
2:
>>>import Streamly.Prelude (wSerial)>>>Stream.toList $ Stream.fromList [(1,3),(1,4)] `Stream.wSerial` Stream.fromList [(2,3),(2,4)][(1,3),(2,3),(1,4),(2,4)]
The W in the name stands for wide or breadth wise scheduling in
contrast to the depth wise scheduling behavior of SerialT.
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
Speculative Streams
When a stream consumer demands an element from a speculative stream
constructed as a `consM` b `consM` ... nil, the action a at the head
of the stream is executed and the output of the action is supplied to the
consumer. However, in addition to the action at the head multiple actions
following it may also be executed concurrently and the results buffered.
When the next element is demanded it may be served from the buffer and we
may execute the next action in the sequence to keep the buffer adequately
filled. Thus, the actions are executed concurrently but results consumed in
serial order just like serial streams. IsStream can be used to fold an
infinite lazy container of effects, as the number of concurrent executions
is limited.
Similar to IsStream, the monadic stream generation (e.g. replicateM) and
transformation operations (e.g. mapM) on speculative streams can execute
multiple effects concurrently in a speculative manner.
How many effects can be executed concurrently and how many results can be
buffered are controlled by maxThreads and maxBuffer combinators
respectively. The actual number of concurrent threads is adjusted according
to the rate at which the consumer is consuming the stream. It may even
execute actions serially in a single thread if that is enough to match the
consumer's speed.
Speculative streams enforce ordering of the results of actions in the stream but the side effects are only partially ordered. Therefore, the semigroup operation for speculative streams is not commutative from the pure outputs perspective but commutative from side effects perspective.
For AheadT streams:
(<>) =ahead(>>=) = flip .concatMapWithahead
A single Monad bind behaves like a for loop with iterations executed
concurrently, ahead of time, producing side effects of iterations out of
order, but results in order:
>>>:{Stream.toList $ Stream.fromAhead $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [2,1]
Nested monad binds behave like nested for loops with nested iterations
executed concurrently, ahead of time:
>>>:{Stream.toList $ Stream.fromAhead $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,5,4,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1 in the first stream constitute one output stream and all
the iterations corresponding to 2 constitute another output stream and
these two output streams are merged using ahead.
Since: 0.3.0 (Streamly)
Since: 0.8.0
Instances
Asynchronous Streams
Scheduling and execution: In an asynchronous stream a `consM` b `consM`
c ..., the actions a, b, and c are executed concurrently with the
consumer of the stream. The actions are scheduled for execution in the
same order as they are specified in the stream. Multiple scheduled actions
may be executed concurrently in parallel threads of execution. The
actions may be executed out of order and they may complete at arbitrary
times. Therefore, the effects of the actions may be observed out of
order.
Buffering: The results from multiple threads of execution are queued in a buffer as soon as they become available. The consumer of the stream is served from this buffer. Therefore, the consumer may observe the results to be out of order. In other words, an asynchronous stream is an unordered stream i.e. order does not matter.
Concurrency control: Threads are suspended if the maxBuffer limit is
reached, and resumed when the consumer makes space in the buffer. The
maximum number of concurrent threads depends on maxThreads. Number of
threads is increased or decreased based on the speed of the consumer.
Generation operations: Concurrent stream generation operations e.g.
replicateM when used in async style schedule and execute
the stream generating actions in the manner described above. The generation
actions run concurrently, effects and results of the actions as observed by
the consumer of the stream may be out of order.
Transformation operations: Concurrent stream transformation operations
e.g. mapM, when used in async style, schedule and
execute transformation actions in the manner described above. Transformation
actions run concurrently, effects and results of the actions may be
observed by the consumer out of order.
Variants: There are two asynchronous stream types AsyncT and WAsyncT.
They are identical with respect to single stream evaluation behavior. Their
behaviors differ in how they combine multiple streams using Semigroup or
Monad composition. Since the order of elements does not matter in
asynchronous streams the Semigroup operation is effectively commutative.
For AsyncT streams:
(<>) =async(>>=) = flip .concatMapWithasync
A single Monad bind behaves like a for loop with iterations of the loop
executed concurrently a la the async combinator, producing results and
side effects of iterations out of order:
>>>:{Stream.toList $ Stream.fromAsync $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [1,2]
Nested monad binds behave like nested for loops with nested iterations
executed concurrently, a la the async combinator:
>>>:{Stream.toList $ Stream.fromAsync $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,4,5,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1 in the first stream constitute one output stream and all
the iterations corresponding to 2 constitute another output stream and
these two output streams are merged using async.
Since: 0.1.0 (Streamly)
Since: 0.8.0
Instances
For WAsyncT streams:
(<>) =wAsync(>>=) = flip .concatMapWithwAsync
A single Monad bind behaves like a for loop with iterations of the loop
executed concurrently a la the wAsync combinator, producing results and
side effects of iterations out of order:
>>>:{Stream.toList $ Stream.fromWAsync $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [1,2]
Nested monad binds behave like nested for loops with nested iterations
executed concurrently, a la the wAsync combinator:
>>>:{Stream.toList $ Stream.fromWAsync $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,4,5,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1 in the first stream constitute one WAsyncT output
stream and all the iterations corresponding to 2 constitute another
WAsyncT output stream and these two output streams are merged using
wAsync.
The W in the name stands for wide or breadth wise scheduling in
contrast to the depth wise scheduling behavior of AsyncT.
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
For ParallelT streams:
(<>) =parallel(>>=) = flip .concatMapWithparallel
See AsyncT, ParallelT is similar except that all
iterations are strictly concurrent while in AsyncT it depends on the
consumer demand and available threads. See parallel for more details.
Since: 0.1.0 (Streamly)
Since: 0.7.0 (maxBuffer applies to ParallelT streams)
Since: 0.8.0
Instances
Zipping Streams
ZipSerialM and ZipAsyncM, provide Applicative instances for zipping the
corresponding elements of two streams together. Note that these types are
not monads.
data ZipSerialM m a Source #
For ZipSerialM streams:
(<>) = serial
(*) = 'Streamly.Prelude.serial.zipWith' id
Applicative evaluates the streams being zipped serially:
>>>s1 = Stream.fromFoldable [1, 2]>>>s2 = Stream.fromFoldable [3, 4]>>>s3 = Stream.fromFoldable [5, 6]>>>Stream.toList $ Stream.fromZipSerial $ (,,) <$> s1 <*> s2 <*> s3[(1,3,5),(2,4,6)]
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
For ZipAsyncM streams:
(<>) = serial
(*) = 'Streamly.Prelude.serial.zipAsyncWith' id
Applicative evaluates the streams being zipped concurrently, the following would take half the time that it would take in serial zipping:
>>>s = Stream.fromFoldableM $ Prelude.map delay [1, 1, 1]>>>Stream.toList $ Stream.fromZipAsync $ (,) <$> s <*> s... [(1,1),(1,1),(1,1)]
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
| IsStream ZipAsyncM Source # | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type Methods toStream :: forall (m :: Type -> TYPE LiftedRep) a. ZipAsyncM m a -> Stream m a Source # fromStream :: forall (m :: Type -> TYPE LiftedRep) a. Stream m a -> ZipAsyncM m a Source # consM :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source # (|:) :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source # | |
| MonadAsync m => Applicative (ZipAsyncM m) Source # | |
Defined in Streamly.Internal.Data.Stream.ZipAsync | |
| Monad m => Functor (ZipAsyncM m) Source # | |
| Monoid (ZipAsyncM m a) Source # | |
| Semigroup (ZipAsyncM m a) Source # | |
Parallel Function Application
Stream processing functions can be composed in a chain using function
application with or without the $ operator, or with reverse function
application operator &. Streamly provides concurrent versions of these
operators applying stream processing functions such that each stage of the
stream can run in parallel. The operators start with a |; we can read |$
as "parallel dollar" to remember that | comes before $.
Imports for the code snippets below:
import Streamly import qualified Streamly.Prelude as S import Control.Concurrent
(|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b infixr 0 Source #
Parallel transform application operator; applies a stream transformation
function t m a -> t m b to a stream t m a concurrently; the input stream
is evaluated asynchronously in an independent thread yielding elements to a
buffer and the transformation function runs in another thread consuming the
input from the buffer. |$ is just like regular function application
operator $ except that it is concurrent.
If you read the signature as (t m a -> t m b) -> (t m a -> t m b) you can
look at it as a transformation that converts a transform function to a
buffered concurrent transform function.
The following code prints a value every second even though each stage adds a 1 second delay.
>>>:{Stream.drain $ Stream.mapM (\x -> threadDelay 1000000 >> print x) |$ Stream.replicateM 3 (threadDelay 1000000 >> return 1) :} 1 1 1
Concurrent
Since: 0.3.0 (Streamly)
Since: 0.8.0
(|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b infixl 1 Source #
(|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b infixr 0 Source #
Parallel fold application operator; applies a fold function t m a -> m b
to a stream t m a concurrently; The the input stream is evaluated
asynchronously in an independent thread yielding elements to a buffer and
the folding action runs in another thread consuming the input from the
buffer.
If you read the signature as (t m a -> m b) -> (t m a -> m b) you can look
at it as a transformation that converts a fold function to a buffered
concurrent fold function.
The . at the end of the operator is a mnemonic for termination of the
stream.
In the example below, each stage introduces a delay of 1 sec but output is printed every second because both stages are concurrent.
>>>import Control.Concurrent (threadDelay)>>>import Streamly.Prelude ((|$.))>>>:{Stream.foldlM' (\_ a -> threadDelay 1000000 >> print a) (return ()) |$. Stream.replicateM 3 (threadDelay 1000000 >> return 1) :} 1 1 1
Concurrent
Since: 0.3.0 (Streamly)
Since: 0.8.0
(|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b infixl 1 Source #
mkAsync :: (IsStream t, MonadAsync m) => t m a -> m (t m a) Source #
Make a stream asynchronous, triggers the computation and returns a stream
in the underlying monad representing the output generated by the original
computation. The returned action is exhaustible and must be drained once. If
not drained fully we may have a thread blocked forever and once exhausted it
will always return empty.
Since: 0.2.0
Merging Streams
The Semigroup operation <> of each stream type combines two streams in a
type specific manner. This section provides polymorphic versions of <>
which can be used to combine two streams in a predetermined way irrespective
of the type.
serial :: IsStream t => t m a -> t m a -> t m a infixr 6 Source #
Appends two streams sequentially, yielding all elements from the first stream, and then all elements from the second stream.
>>>import Streamly.Prelude (serial)>>>stream1 = Stream.fromList [1,2]>>>stream2 = Stream.fromList [3,4]>>>Stream.toList $ stream1 `serial` stream2[1,2,3,4]
This operation can be used to fold an infinite lazy container of streams.
Since: 0.2.0 (Streamly)
Since: 0.8.0
wSerial :: IsStream t => t m a -> t m a -> t m a infixr 6 Source #
Interleaves two streams, yielding one element from each stream alternately. When one stream stops the rest of the other stream is used in the output stream.
>>>import Streamly.Prelude (wSerial)>>>stream1 = Stream.fromList [1,2]>>>stream2 = Stream.fromList [3,4]>>>Stream.toList $ Stream.fromWSerial $ stream1 `wSerial` stream2[1,3,2,4]
Note, for singleton streams wSerial and serial are identical.
Note that this operation cannot be used to fold a container of infinite streams but it can be used for very large streams as the state that it needs to maintain is proportional to the logarithm of the number of streams.
Since: 0.2.0 (Streamly)
Since: 0.8.0
ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #
Appends two streams, both the streams may be evaluated concurrently but the outputs are used in the same order as the corresponding actions in the original streams, side effects will happen in the order in which the streams are evaluated:
>>>import Streamly.Prelude (ahead, SerialT)>>>stream1 = Stream.fromEffect (delay 4) :: SerialT IO Int>>>stream2 = Stream.fromEffect (delay 2) :: SerialT IO Int>>>Stream.toList $ stream1 `ahead` stream2 :: IO [Int]2 sec 4 sec [4,2]
Multiple streams can be combined. With enough threads, all of them can be scheduled simultaneously:
>>>stream3 = Stream.fromEffect (delay 1)>>>Stream.toList $ stream1 `ahead` stream2 `ahead` stream31 sec 2 sec 4 sec [4,2,1]
With 2 threads, only two can be scheduled at a time, when one of those finishes, the third one gets scheduled:
>>>Stream.toList $ Stream.maxThreads 2 $ stream1 `ahead` stream2 `ahead` stream32 sec 1 sec 4 sec [4,2,1]
Only streams are scheduled for ahead evaluation, how actions within a stream
are evaluated depends on the stream type. If it is a concurrent stream they
will be evaluated concurrently. It may not make much sense combining serial
streams using ahead.
ahead can be safely used to fold an infinite lazy container of streams.
Since: 0.3.0 (Streamly)
Since: 0.8.0
async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #
Merges two streams, both the streams may be evaluated concurrently, outputs from both are used as they arrive:
>>>import Streamly.Prelude (async)>>>stream1 = Stream.fromEffect (delay 4)>>>stream2 = Stream.fromEffect (delay 2)>>>Stream.toList $ stream1 `async` stream22 sec 4 sec [2,4]
Multiple streams can be combined. With enough threads, all of them can be scheduled simultaneously:
>>>stream3 = Stream.fromEffect (delay 1)>>>Stream.toList $ stream1 `async` stream2 `async` stream3... [1,2,4]
With 2 threads, only two can be scheduled at a time, when one of those finishes, the third one gets scheduled:
>>>Stream.toList $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3... [2,1,4]
With a single thread, it becomes serial:
>>>Stream.toList $ Stream.maxThreads 1 $ stream1 `async` stream2 `async` stream3... [4,2,1]
Only streams are scheduled for async evaluation, how actions within a stream are evaluated depends on the stream type. If it is a concurrent stream they will be evaluated concurrently.
In the following example, both the streams are scheduled for concurrent evaluation but each individual stream is evaluated serially:
>>>stream1 = Stream.fromListM $ Prelude.map delay [3,3] -- SerialT IO Int>>>stream2 = Stream.fromListM $ Prelude.map delay [1,1] -- SerialT IO Int>>>Stream.toList $ stream1 `async` stream2 -- IO [Int]... [1,1,3,3]
If total threads are 2, the third stream is scheduled only after one of the first two has finished:
stream3 = Stream.fromListM $ Prelude.map delay [2,2] -- SerialT IO Int Stream.toList $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3 -- IO [Int]
... [1,1,3,2,3,2]
Thus async goes deep in first few streams rather than going wide in all
streams. It prefers to evaluate the leftmost streams as much as possible.
Because of this behavior, async can be safely used to fold an infinite
lazy container of streams.
Since: 0.2.0 (Streamly)
Since: 0.8.0
wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #
For singleton streams, wAsync is the same as async. See async for
singleton stream behavior. For multi-element streams, while async is left
biased i.e. it tries to evaluate the left side stream as much as possible,
wAsync tries to schedule them both fairly. In other words, async goes
deep while wAsync goes wide. However, outputs are always used as they
arrive.
With a single thread, async starts behaving like serial while wAsync
starts behaving like wSerial.
>>>import Streamly.Prelude (async, wAsync)>>>stream1 = Stream.fromList [1,2,3]>>>stream2 = Stream.fromList [4,5,6]>>>Stream.toList $ Stream.fromAsync $ Stream.maxThreads 1 $ stream1 `async` stream2[1,2,3,4,5,6]
>>>Stream.toList $ Stream.fromWAsync $ Stream.maxThreads 1 $ stream1 `wAsync` stream2[1,4,2,5,3,6]
With two threads available, and combining three streams:
>>>stream3 = Stream.fromList [7,8,9]>>>Stream.toList $ Stream.fromAsync $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3[1,2,3,4,5,6,7,8,9]
>>>Stream.toList $ Stream.fromWAsync $ Stream.maxThreads 2 $ stream1 `wAsync` stream2 `wAsync` stream3[1,4,2,7,5,3,8,6,9]
This operation cannot be used to fold an infinite lazy container of streams, because it schedules all the streams in a round robin manner.
Note that WSerialT and single threaded WAsyncT both interleave streams
but the exact scheduling is slightly different in both cases.
Since: 0.2.0 (Streamly)
Since: 0.8.0
parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #
Like async except that the execution is much more
strict. There is no limit on the number of threads. While
async may not schedule a stream if there is no demand
from the consumer, parallel always evaluates both the streams immediately.
The only limit that applies to parallel is maxBuffer.
Evaluation may block if the output buffer becomes full.
>>>import Streamly.Prelude (parallel)>>>stream = Stream.fromEffect (delay 2) `parallel` Stream.fromEffect (delay 1)>>>Stream.toList stream -- IO [Int]1 sec 2 sec [1,2]
parallel guarantees that all the streams are scheduled for execution
immediately, therefore, we could use things like starting timers inside the
streams and relying on the fact that all timers were started at the same
time.
Unlike async this operation cannot be used to fold an infinite lazy
container of streams, because it schedules all the streams strictly
concurrently.
Since: 0.2.0 (Streamly)
Since: 0.8.0
Concurrency Control
These combinators can be used at any point in a stream composition to set parameters to control the concurrency of the argument stream. A control parameter set at any point remains effective for any concurrent combinators used in the argument stream until it is reset by using the combinator again. These control parameters have no effect on non-concurrent combinators in the stream, or on non-concurrent streams.
Pitfall: Remember that maxBuffer in the following example applies to
mapM and any other combinators that may follow it, and it does not apply
to the combinators before it:
... $ maxBuffer 10 $ S.mapM ... ...
If we use & instead of $ the situation will reverse, in the following
example, maxBuffer does not apply to mapM, it applies to combinators
that come before it, because those are the arguments to maxBuffer:
... & maxBuffer 10 & S.mapM ... ...
maxThreads :: IsStream t => Int -> t m a -> t m a Source #
Specify the maximum number of threads that can be spawned concurrently for
any concurrent combinator in a stream.
A value of 0 resets the thread limit to default, a negative value means
there is no limit. The default value is 1500. maxThreads does not affect
ParallelT streams as they can use unbounded number of threads.
When the actions in a stream are IO bound, having blocking IO calls, this option can be used to control the maximum number of in-flight IO requests. When the actions are CPU bound this option can be used to control the amount of CPU used by the stream.
Since: 0.4.0 (Streamly)
Since: 0.8.0
maxBuffer :: IsStream t => Int -> t m a -> t m a Source #
Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.
CAUTION! using an unbounded maxBuffer value (i.e. a negative value)
coupled with an unbounded maxThreads value is a recipe for disaster in
presence of infinite streams, or very large streams. Especially, it must
not be used when pure is used in ZipAsyncM streams as pure in
applicative zip streams generates an infinite stream causing unbounded
concurrent generation with no limit on the buffer or threads.
Since: 0.4.0 (Streamly)
Since: 0.8.0
Rate Limiting
Specifies the stream yield rate in yields per second (Hertz).
We keep accumulating yield credits at rateGoal. At any point of time we
allow only as many yields as we have accumulated as per rateGoal since the
start of time. If the consumer or the producer is slower or faster, the
actual rate may fall behind or exceed rateGoal. We try to recover the gap
between the two by increasing or decreasing the pull rate from the producer.
However, if the gap becomes more than rateBuffer we try to recover only as
much as rateBuffer.
rateLow puts a bound on how low the instantaneous rate can go when
recovering the rate gap. In other words, it determines the maximum yield
latency. Similarly, rateHigh puts a bound on how high the instantaneous
rate can go when recovering the rate gap. In other words, it determines the
minimum yield latency. We reduce the latency by increasing concurrency,
therefore we can say that it puts an upper bound on concurrency.
If the rateGoal is 0 or negative the stream never yields a value.
If the rateBuffer is 0 or negative we do not attempt to recover.
Since: 0.5.0 (Streamly)
Since: 0.8.0
rate :: IsStream t => Maybe Rate -> t m a -> t m a Source #
Specify the pull rate of a stream.
A Nothing value resets the rate to default which is unlimited. When the
rate is specified, concurrent production may be ramped up or down
automatically to achieve the specified yield rate. The specific behavior for
different styles of Rate specifications is documented under Rate. The
effective maximum production rate achieved by a stream is governed by:
- The
maxThreadslimit - The
maxBufferlimit - The maximum rate that the stream producer can achieve
- The maximum rate that the stream consumer can achieve
Since: 0.5.0 (Streamly)
Since: 0.8.0
avgRate :: IsStream t => Double -> t m a -> t m a Source #
Same as rate (Just $ Rate (r/2) r (2*r) maxBound)
Specifies the average production rate of a stream in number of yields
per second (i.e. Hertz). Concurrent production is ramped up or down
automatically to achieve the specified average yield rate. The rate can
go down to half of the specified rate on the lower side and double of
the specified rate on the higher side.
Since: 0.5.0 (Streamly)
Since: 0.8.0
minRate :: IsStream t => Double -> t m a -> t m a Source #
Same as rate (Just $ Rate r r (2*r) maxBound)
Specifies the minimum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go below the specified rate, even though it may possibly go above it at times, the upper limit is double of the specified rate.
Since: 0.5.0 (Streamly)
Since: 0.8.0
maxRate :: IsStream t => Double -> t m a -> t m a Source #
Same as rate (Just $ Rate (r/2) r r maxBound)
Specifies the maximum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go above the specified rate, even though it may possibly go below it at times, the lower limit is half of the specified rate. This can be useful in applications where certain resource usage must not be allowed to go beyond certain limits.
Since: 0.5.0 (Streamly)
Since: 0.8.0
constRate :: IsStream t => Double -> t m a -> t m a Source #
Same as rate (Just $ Rate r r r 0)
Specifies a constant yield rate. If for some reason the actual rate goes above or below the specified rate we do not try to recover it by increasing or decreasing the rate in future. This can be useful in applications like graphics frame refresh where we need to maintain a constant refresh rate.
Since: 0.5.0 (Streamly)
Since: 0.8.0
Stream Type Adapters
You may want to use different stream composition styles at different points
in your program. Stream types can be freely converted or adapted from one
type to another. The IsStream type class facilitates type conversion of
one stream type to another. It is not used directly, instead the type
combinators provided below are used for conversions.
To adapt from one monomorphic type (e.g. AsyncT) to another monomorphic
type (e.g. SerialT) use the adapt combinator. To give a polymorphic code
a specific interpretation or to adapt a specific type to a polymorphic type
use the type specific combinators e.g. fromAsync or fromWSerial. You
cannot adapt polymorphic code to polymorphic code, as the compiler would not know
which specific type you are converting from or to. If you see a an
ambiguous type variable error then most likely you are using adapt
unnecessarily on polymorphic code.
class (forall m a. MonadAsync m => Semigroup (t m a), forall m a. MonadAsync m => Monoid (t m a), forall m. Monad m => Functor (t m), forall m. MonadAsync m => Applicative (t m)) => IsStream t Source #
Class of types that can represent a stream of elements of some type a in
some monad m.
Since: 0.2.0 (Streamly)
Since: 0.8.0
Minimal complete definition
Instances
zipSerially :: IsStream t => ZipSerialM m a -> t m a Source #
zipAsyncly :: IsStream t => ZipAsyncM m a -> t m a Source #
adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a Source #
Adapt any specific stream type to any other specific stream type.
Since: 0.1.0 (Streamly)
Since: 0.8.0
IO Streams
type ZipSerial = ZipSerialM IO Source #
Folding Containers of Streams
These are variants of standard Foldable fold functions that use a
polymorphic stream sum operation (e.g. async or wSerial) to fold a
finite container of streams. Note that these are just special cases of
the more general concatMapWith operation.
foldWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a Source #
Same as concatFoldableWith
Since: 0.1.0
foldMapWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b Source #
Same as concatMapFoldableWith
Since: 0.1.0
forEachWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b Source #
Same as concatForFoldableWith
Since: 0.1.0
Re-exports
The class of semigroups (types with an associative binary operation).
Instances should satisfy the following:
Since: base-4.9.0.0
Minimal complete definition
Methods
(<>) :: a -> a -> a infixr 6 #
An associative operation.
>>>[1,2,3] <> [4,5,6][1,2,3,4,5,6]
Reduce a non-empty list with <>
The default definition should be sufficient, but this can be overridden for efficiency.
>>>import Data.List.NonEmpty (NonEmpty (..))>>>sconcat $ "Hello" :| [" ", "Haskell", "!"]"Hello Haskell!"
stimes :: Integral b => b -> a -> a #
Repeat a value n times.
Given that this works on a Semigroup it is allowed to fail if
you request 0 or fewer repetitions, and the default definition
will do so.
By making this a member of the class, idempotent semigroups
and monoids can upgrade this to execute in \(\mathcal{O}(1)\) by
picking stimes = or stimesIdempotentstimes =
respectively.stimesIdempotentMonoid
>>>stimes 4 [1][1,1,1,1]
Instances
| Semigroup All | Since: base-4.9.0.0 |
| Semigroup Any | Since: base-4.9.0.0 |
| Semigroup Void | Since: base-4.9.0.0 |
| Semigroup ByteString | |
Defined in Data.ByteString.Internal Methods (<>) :: ByteString -> ByteString -> ByteString # sconcat :: NonEmpty ByteString -> ByteString # stimes :: Integral b => b -> ByteString -> ByteString # | |
| Semigroup ByteString | |
Defined in Data.ByteString.Lazy.Internal Methods (<>) :: ByteString -> ByteString -> ByteString # sconcat :: NonEmpty ByteString -> ByteString # stimes :: Integral b => b -> ByteString -> ByteString # | |
| Semigroup IntSet | Since: containers-0.5.7 |
| Semigroup Ordering | Since: base-4.9.0.0 |
| Semigroup ByteArray | |
| Semigroup () | Since: base-4.9.0.0 |
| Bits a => Semigroup (And a) | Since: base-4.16 |
| FiniteBits a => Semigroup (Iff a) | This constraint is arguably
too strong. However, as some types (such as Since: base-4.16 |
| Bits a => Semigroup (Ior a) | Since: base-4.16 |
| Bits a => Semigroup (Xor a) | Since: base-4.16 |
| Semigroup a => Semigroup (Identity a) | Since: base-4.9.0.0 |
| Semigroup (First a) | Since: base-4.9.0.0 |
| Semigroup (Last a) | Since: base-4.9.0.0 |
| Semigroup a => Semigroup (Down a) | Since: base-4.11.0.0 |
| Semigroup (First a) | Since: base-4.9.0.0 |
| Semigroup (Last a) | Since: base-4.9.0.0 |
| Ord a => Semigroup (Max a) | Since: base-4.9.0.0 |
| Ord a => Semigroup (Min a) | Since: base-4.9.0.0 |
| Monoid m => Semigroup (WrappedMonoid m) | Since: base-4.9.0.0 |
Defined in Data.Semigroup Methods (<>) :: WrappedMonoid m -> WrappedMonoid m -> WrappedMonoid m # sconcat :: NonEmpty (WrappedMonoid m) -> WrappedMonoid m # stimes :: Integral b => b -> WrappedMonoid m -> WrappedMonoid m # | |
| Semigroup a => Semigroup (Dual a) | Since: base-4.9.0.0 |
| Semigroup (Endo a) | Since: base-4.9.0.0 |
| Num a => Semigroup (Product a) | Since: base-4.9.0.0 |
| Num a => Semigroup (Sum a) | Since: base-4.9.0.0 |
| Semigroup p => Semigroup (Par1 p) | Since: base-4.12.0.0 |
| Semigroup (IntMap a) | Since: containers-0.5.7 |
| Semigroup (MergeSet a) | |
| Ord a => Semigroup (Set a) | Since: containers-0.5.7 |
| Semigroup a => Semigroup (IO a) | Since: base-4.10.0.0 |
| Semigroup (Heap a) | |
| Semigroup (Array a) | Since: primitive-0.6.3.0 |
| Storable a => Semigroup (Array a) Source # | |
| Prim a => Semigroup (Array a) Source # | |
| Prim a => Semigroup (Array a) Source # | |
| Semigroup (List a) Source # | |
| Semigroup (ZipList a) Source # | |
| Semigroup (SmallArray a) Source # | Since: 0.6.3.0 |
Defined in Streamly.Internal.Data.SmallArray.Type Methods (<>) :: SmallArray a -> SmallArray a -> SmallArray a # sconcat :: NonEmpty (SmallArray a) -> SmallArray a # stimes :: Integral b => b -> SmallArray a -> SmallArray a # | |
| Semigroup a => Semigroup (Q a) | Since: template-haskell-2.17.0.0 |
| Semigroup (NonEmpty a) | Since: base-4.9.0.0 |
| Semigroup a => Semigroup (Maybe a) | Since: base-4.9.0.0 |
| Semigroup a => Semigroup (a) | Since: base-4.15 |
| Semigroup [a] | Since: base-4.9.0.0 |
| Semigroup (Either a b) | Since: base-4.9.0.0 |
| Semigroup (Proxy s) | Since: base-4.9.0.0 |
| Semigroup (U1 p) | Since: base-4.12.0.0 |
| Semigroup (V1 p) | Since: base-4.12.0.0 |
| Semigroup a => Semigroup (ST s a) | Since: base-4.11.0.0 |
| Ord k => Semigroup (Map k v) | |
| MonadAsync m => Semigroup (AheadT m a) Source # | |
| MonadAsync m => Semigroup (AsyncT m a) Source # | |
| MonadAsync m => Semigroup (WAsyncT m a) Source # | |
| MonadAsync m => Semigroup (ParallelT m a) Source # | |
| Semigroup (SerialT m a) Source # | |
| Semigroup (WSerialT m a) Source # | |
| Semigroup (Stream m a) Source # | |
| Semigroup (ZipSerialM m a) Source # | |
Defined in Streamly.Internal.Data.Stream.Zip Methods (<>) :: ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a # sconcat :: NonEmpty (ZipSerialM m a) -> ZipSerialM m a # stimes :: Integral b => b -> ZipSerialM m a -> ZipSerialM m a # | |
| Semigroup (ZipAsyncM m a) Source # | |
| Semigroup b => Semigroup (a -> b) | Since: base-4.9.0.0 |
| (Semigroup a, Semigroup b) => Semigroup (a, b) | Since: base-4.9.0.0 |
| Semigroup a => Semigroup (Const a b) | Since: base-4.9.0.0 |
| (Applicative f, Semigroup a) => Semigroup (Ap f a) | Since: base-4.12.0.0 |
| Alternative f => Semigroup (Alt f a) | Since: base-4.9.0.0 |
| Semigroup (f p) => Semigroup (Rec1 f p) | Since: base-4.12.0.0 |
| (Semigroup b, Monad m) => Semigroup (Tee m a b) Source # |
|
| Monad m => Semigroup (Pipe m a b) Source # | |
| (Semigroup a, Semigroup b, Semigroup c) => Semigroup (a, b, c) | Since: base-4.9.0.0 |
| (Semigroup (f a), Semigroup (g a)) => Semigroup (Product f g a) | Since: base-4.16.0.0 |
| (Semigroup (f p), Semigroup (g p)) => Semigroup ((f :*: g) p) | Since: base-4.12.0.0 |
| Semigroup c => Semigroup (K1 i c p) | Since: base-4.12.0.0 |
| (Semigroup a, Semigroup b, Semigroup c, Semigroup d) => Semigroup (a, b, c, d) | Since: base-4.9.0.0 |
| Semigroup (f (g a)) => Semigroup (Compose f g a) | Since: base-4.16.0.0 |
| Semigroup (f (g p)) => Semigroup ((f :.: g) p) | Since: base-4.12.0.0 |
| Semigroup (f p) => Semigroup (M1 i c f p) | Since: base-4.12.0.0 |
| (Semigroup a, Semigroup b, Semigroup c, Semigroup d, Semigroup e) => Semigroup (a, b, c, d, e) | Since: base-4.9.0.0 |
Deprecated
runStreamT :: Monad m => SerialT m a -> m () Source #
Same as runStream.
Since: 0.1.0
runInterleavedT :: Monad m => WSerialT m a -> m () Source #
Same as drain . fromWSerial.
Since: 0.1.0
runParallelT :: Monad m => ParallelT m a -> m () Source #
Same as drain . fromParallel.
Since: 0.1.0
runZipStream :: Monad m => ZipSerialM m a -> m () Source #
Same as drain . zipping.
Since: 0.1.0
runZipAsync :: Monad m => ZipAsyncM m a -> m () Source #
Same as drain . zippingAsync.
Since: 0.1.0
type InterleavedT = WSerialT Source #
Deprecated: Please use WSerialT instead.
Since: 0.1.0
type ZipStream = ZipSerialM Source #
Deprecated: Please use ZipSerialM instead.
Since: 0.1.0
interleaving :: IsStream t => WSerialT m a -> t m a Source #
zipping :: IsStream t => ZipSerialM m a -> t m a Source #
zippingAsync :: IsStream t => ZipAsyncM m a -> t m a Source #