Copyright | (c) 2017 Composewell Technologies |
---|---|
License | BSD3 |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
To run examples in this module:
>>>
import qualified Streamly.Prelude as Stream
>>>
import Control.Concurrent (threadDelay)
>>>
:{
delay n = do threadDelay (n * 1000000) -- sleep for n seconds putStrLn (show n ++ " sec") -- print "n sec" return n -- IO Int :}
Synopsis
- data ParallelT m a
- type Parallel = ParallelT IO
- fromParallel :: IsStream t => ParallelT m a -> t m a
- parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- parallelFst :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- parallelMin :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a
- mkParallelD :: MonadAsync m => Stream m a -> Stream m a
- mkParallelK :: (IsStream t, MonadAsync m) => t m a -> t m a
- tapAsync :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> t m a
- tapAsyncF :: MonadAsync m => Fold m a b -> Stream m a -> Stream m a
- distributeAsync_ :: (Foldable f, IsStream t, MonadAsync m) => f (t m a -> m b) -> t m a -> t m a
- newCallbackStream :: (IsStream t, MonadAsync m) => m (a -> m (), t m a)
Parallel Stream Type
For ParallelT
streams:
(<>) =parallel
(>>=) = flip .concatMapWith
parallel
See AsyncT
, ParallelT
is similar except that all
iterations are strictly concurrent while in AsyncT
it depends on the
consumer demand and available threads. See parallel
for more details.
Since: 0.1.0 (Streamly)
Since: 0.7.0 (maxBuffer applies to ParallelT streams)
Since: 0.8.0
Instances
MonadTrans ParallelT Source # | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
IsStream ParallelT Source # | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (ParallelT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
(MonadState s m, MonadAsync m) => MonadState s (ParallelT m) Source # | |
(MonadReader r m, MonadAsync m) => MonadReader r (ParallelT m) Source # | |
MonadAsync m => Monad (ParallelT m) Source # | |
Monad m => Functor (ParallelT m) Source # | |
(Monad m, MonadAsync m) => Applicative (ParallelT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
(MonadIO m, MonadAsync m) => MonadIO (ParallelT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
(MonadThrow m, MonadAsync m) => MonadThrow (ParallelT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
MonadAsync m => Semigroup (ParallelT m a) Source # | |
MonadAsync m => Monoid (ParallelT m a) Source # | |
fromParallel :: IsStream t => ParallelT m a -> t m a Source #
Merge Concurrently
parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a infixr 6 Source #
Like async
except that the execution is much more
strict. There is no limit on the number of threads. While
async
may not schedule a stream if there is no demand
from the consumer, parallel
always evaluates both the streams immediately.
The only limit that applies to parallel
is maxBuffer
.
Evaluation may block if the output buffer becomes full.
>>>
import Streamly.Prelude (parallel)
>>>
stream = Stream.fromEffect (delay 2) `parallel` Stream.fromEffect (delay 1)
>>>
Stream.toList stream -- IO [Int]
1 sec 2 sec [1,2]
parallel
guarantees that all the streams are scheduled for execution
immediately, therefore, we could use things like starting timers inside the
streams and relying on the fact that all timers were started at the same
time.
Unlike async
this operation cannot be used to fold an infinite lazy
container of streams, because it schedules all the streams strictly
concurrently.
Since: 0.2.0 (Streamly)
Since: 0.8.0
parallelFst :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #
Like parallel
but stops the output as soon as the first stream stops.
Pre-release
parallelMin :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #
Like parallel
but stops the output as soon as any of the two streams
stops.
Pre-release
Evaluate Concurrently
mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a Source #
Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it blocks if the buffer is full until there is space in the buffer. The consumer consumes the stream lazily from the buffer.
mkParallel = D.fromStreamD . mkParallelD . D.toStreamD
Pre-release
mkParallelD :: MonadAsync m => Stream m a -> Stream m a Source #
Same as mkParallel
but for StreamD stream.
mkParallelK :: (IsStream t, MonadAsync m) => t m a -> t m a Source #
Like mkParallel
but uses StreamK internally.
Pre-release
Tap Concurrently
tapAsync :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> t m a Source #
Redirect a copy of the stream to a supplied fold and run it concurrently
in an independent thread. The fold may buffer some elements. The buffer size
is determined by the prevailing maxBuffer
setting.
Stream m a -> m b | -----stream m a ---------------stream m a-----
> S.drain $ S.tapAsync (S.mapM_ print) (S.enumerateFromTo 1 2) 1 2
Exceptions from the concurrently running fold are propagated to the current computation. Note that, because of buffering in the fold, exceptions may be delayed and may not correspond to the current element being processed in the parent stream, but we guarantee that before the parent stream stops the tap finishes and all exceptions from it are drained.
Compare with tap
.
Pre-release
distributeAsync_ :: (Foldable f, IsStream t, MonadAsync m) => f (t m a -> m b) -> t m a -> t m a Source #
Concurrently distribute a stream to a collection of fold functions, discarding the outputs of the folds.
> Stream.drain $ Stream.distributeAsync_ [Stream.mapM_ print, Stream.mapM_ print] (Stream.enumerateFromTo 1 2) 1 2 1 2
distributeAsync_ = flip (foldr tapAsync)
Pre-release
Callbacks
newCallbackStream :: (IsStream t, MonadAsync m) => m (a -> m (), t m a) Source #
Generates a callback and a stream pair. The callback returned is used to queue values to the stream. The stream is infinite, there is no way for the callback to indicate that it is done now.
Pre-release