Copyright | (c) 2017 Composewell Technologies |
---|---|
License | BSD3 |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
To run examples in this module:
>>>
import qualified Streamly.Prelude as Stream
>>>
import Control.Concurrent (threadDelay)
>>>
:{
delay n = do threadDelay (n * 1000000) -- sleep for n seconds putStrLn (show n ++ " sec") -- print "n sec" return n -- IO Int :}
Synopsis
- data AsyncT m a
- type Async = AsyncT IO
- fromAsync :: IsStream t => AsyncT m a -> t m a
- async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- (<|) :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- mkAsync :: (IsStream t, MonadAsync m) => t m a -> t m a
- mkAsyncK :: (IsStream t, MonadAsync m) => t m a -> t m a
- data WAsyncT m a
- type WAsync = WAsyncT IO
- fromWAsync :: IsStream t => WAsyncT m a -> t m a
- wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
Documentation
For AsyncT
streams:
(<>) =async
(>>=) = flip .concatMapWith
async
A single Monad
bind behaves like a for
loop with iterations of the loop
executed concurrently a la the async
combinator, producing results and
side effects of iterations out of order:
>>>
:{
Stream.toList $ Stream.fromAsync $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [1,2]
Nested monad binds behave like nested for
loops with nested iterations
executed concurrently, a la the async
combinator:
>>>
:{
Stream.toList $ Stream.fromAsync $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,4,5,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1
in the first stream constitute one output stream and all
the iterations corresponding to 2
constitute another output stream and
these two output streams are merged using async
.
Since: 0.1.0 (Streamly)
Since: 0.8.0
Instances
MonadTrans AsyncT Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
IsStream AsyncT Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadState s m, MonadAsync m) => MonadState s (AsyncT m) Source # | |
(MonadReader r m, MonadAsync m) => MonadReader r (AsyncT m) Source # | |
MonadAsync m => Monad (AsyncT m) Source # | |
Monad m => Functor (AsyncT m) Source # | |
(Monad m, MonadAsync m) => Applicative (AsyncT m) Source # | |
(MonadIO m, MonadAsync m) => MonadIO (AsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadThrow m, MonadAsync m) => MonadThrow (AsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
MonadAsync m => Semigroup (AsyncT m a) Source # | |
MonadAsync m => Monoid (AsyncT m a) Source # | |
async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #
Merges two streams, both the streams may be evaluated concurrently, outputs from both are used as they arrive:
>>>
import Streamly.Prelude (async)
>>>
stream1 = Stream.fromEffect (delay 4)
>>>
stream2 = Stream.fromEffect (delay 2)
>>>
Stream.toList $ stream1 `async` stream2
2 sec 4 sec [2,4]
Multiple streams can be combined. With enough threads, all of them can be scheduled simultaneously:
>>>
stream3 = Stream.fromEffect (delay 1)
>>>
Stream.toList $ stream1 `async` stream2 `async` stream3
... [1,2,4]
With 2 threads, only two can be scheduled at a time, when one of those finishes, the third one gets scheduled:
>>>
Stream.toList $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3
... [2,1,4]
With a single thread, it becomes serial:
>>>
Stream.toList $ Stream.maxThreads 1 $ stream1 `async` stream2 `async` stream3
... [4,2,1]
Only streams are scheduled for async evaluation, how actions within a stream are evaluated depends on the stream type. If it is a concurrent stream they will be evaluated concurrently.
In the following example, both the streams are scheduled for concurrent evaluation but each individual stream is evaluated serially:
>>>
stream1 = Stream.fromListM $ Prelude.map delay [3,3] -- SerialT IO Int
>>>
stream2 = Stream.fromListM $ Prelude.map delay [1,1] -- SerialT IO Int
>>>
Stream.toList $ stream1 `async` stream2 -- IO [Int]
... [1,1,3,3]
If total threads are 2, the third stream is scheduled only after one of the first two has finished:
>>>
stream3 = Stream.fromListM $ Prelude.map delay [2,2] -- SerialT IO Int
>>>
Stream.toList $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3 -- IO [Int]
... [1,1,3,2,3,2]
Thus async
goes deep in first few streams rather than going wide in all
streams. It prefers to evaluate the leftmost streams as much as possible.
Because of this behavior, async
can be safely used to fold an infinite
lazy container of streams.
Since: 0.2.0 (Streamly)
Since: 0.8.0
(<|) :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #
mkAsync :: (IsStream t, MonadAsync m) => t m a -> t m a Source #
Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it terminates if the buffer is full and a worker thread is kicked off again to evaluate the remaining stream when there is space in the buffer. The consumer consumes the stream lazily from the buffer.
Since: 0.2.0 (Streamly)
Since: 0.8.0
mkAsyncK :: (IsStream t, MonadAsync m) => t m a -> t m a Source #
Generate a stream asynchronously to keep it buffered, lazily consume from the buffer.
Pre-release
For WAsyncT
streams:
(<>) =wAsync
(>>=) = flip .concatMapWith
wAsync
A single Monad
bind behaves like a for
loop with iterations of the loop
executed concurrently a la the wAsync
combinator, producing results and
side effects of iterations out of order:
>>>
:{
Stream.toList $ Stream.fromWAsync $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [1,2]
Nested monad binds behave like nested for
loops with nested iterations
executed concurrently, a la the wAsync
combinator:
>>>
:{
Stream.toList $ Stream.fromWAsync $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,4,5,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1
in the first stream constitute one WAsyncT
output
stream and all the iterations corresponding to 2
constitute another
WAsyncT
output stream and these two output streams are merged using
wAsync
.
The W
in the name stands for wide
or breadth wise scheduling in
contrast to the depth wise scheduling behavior of AsyncT
.
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
MonadTrans WAsyncT Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
IsStream WAsyncT Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (WAsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadState s m, MonadAsync m) => MonadState s (WAsyncT m) Source # | |
(MonadReader r m, MonadAsync m) => MonadReader r (WAsyncT m) Source # | |
MonadAsync m => Monad (WAsyncT m) Source # | |
Monad m => Functor (WAsyncT m) Source # | |
(Monad m, MonadAsync m) => Applicative (WAsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadIO m, MonadAsync m) => MonadIO (WAsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadThrow m, MonadAsync m) => MonadThrow (WAsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
MonadAsync m => Semigroup (WAsyncT m a) Source # | |
MonadAsync m => Monoid (WAsyncT m a) Source # | |
fromWAsync :: IsStream t => WAsyncT m a -> t m a Source #
wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #
For singleton streams, wAsync
is the same as async
. See async
for
singleton stream behavior. For multi-element streams, while async
is left
biased i.e. it tries to evaluate the left side stream as much as possible,
wAsync
tries to schedule them both fairly. In other words, async
goes
deep while wAsync
goes wide. However, outputs are always used as they
arrive.
With a single thread, async
starts behaving like serial
while wAsync
starts behaving like wSerial
.
>>>
import Streamly.Prelude (wAsync)
>>>
stream1 = Stream.fromList [1,2,3]
>>>
stream2 = Stream.fromList [4,5,6]
>>>
Stream.toList $ Stream.fromAsync $ Stream.maxThreads 1 $ stream1 `async` stream2
[1,2,3,4,5,6]
>>>
Stream.toList $ Stream.fromWAsync $ Stream.maxThreads 1 $ stream1 `wAsync` stream2
[1,4,2,5,3,6]
With two threads available, and combining three streams:
>>>
stream3 = Stream.fromList [7,8,9]
>>>
Stream.toList $ Stream.fromAsync $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3
[1,2,3,4,5,6,7,8,9]
>>>
Stream.toList $ Stream.fromWAsync $ Stream.maxThreads 2 $ stream1 `wAsync` stream2 `wAsync` stream3
[1,4,2,7,5,3,8,6,9]
This operation cannot be used to fold an infinite lazy container of streams, because it schedules all the streams in a round robin manner.
Note that WSerialT
and single threaded WAsyncT
both interleave streams
but the exact scheduling is slightly different in both cases.
Since: 0.2.0 (Streamly)
Since: 0.8.0