Copyright | (c) 2017 Harendra Kumar |
---|---|
License | BSD3 |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
Synopsis
- data ParallelT m a
- type Parallel = ParallelT IO
- parallely :: IsStream t => ParallelT m a -> t m a
- parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- parallelFst :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- parallelMin :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a
- tapAsync :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> t m a
- distributeAsync_ :: (Foldable f, IsStream t, MonadAsync m) => f (t m a -> m b) -> t m a -> t m a
Parallel Stream Type
Async composition with strict concurrent execution of all streams.
The Semigroup
instance of ParallelT
executes both the streams
concurrently without any delay or without waiting for the consumer demand
and merges the results as they arrive. If the consumer does not consume
the results, they are buffered upto a configured maximum, controlled by the
maxBuffer
primitive. If the buffer becomes full the concurrent tasks will
block until there is space in the buffer.
Both WAsyncT
and ParallelT
, evaluate the constituent streams fairly in a
round robin fashion. The key difference is that WAsyncT
might wait for the
consumer demand before it executes the tasks whereas ParallelT
starts
executing all the tasks immediately without waiting for the consumer demand.
For WAsyncT
the maxThreads
limit applies whereas for ParallelT
it does
not apply. In other words, WAsyncT
can be lazy whereas ParallelT
is
strict.
ParallelT
is useful for cases when the streams are required to be
evaluated simultaneously irrespective of how the consumer consumes them e.g.
when we want to race two tasks and want to start both strictly at the same
time or if we have timers in the parallel tasks and our results depend on
the timers being started at the same time. If we do not have such
requirements then AsyncT
or AheadT
are recommended as they can be more
efficient than ParallelT
.
main = (toList
.parallely
$ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,3,2,4]
When streams with more than one element are merged, it yields whichever
stream yields first without any bias, unlike the Async
style streams.
Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.
Similarly, the Monad
instance of ParallelT
runs all iterations
of the loop concurrently.
import Streamly import qualified Streamly.Prelude as S import Control.Concurrent main =drain
.parallely
$ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3
Note that parallel composition can only combine a finite number of streams as it needs to retain state for each unfinished stream.
Since: 0.7.0 (maxBuffer applies to ParallelT streams)
Since: 0.1.0
Instances
MonadTrans ParallelT Source # | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
IsStream ParallelT Source # | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (ParallelT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
(MonadState s m, MonadAsync m) => MonadState s (ParallelT m) Source # | |
(MonadReader r m, MonadAsync m) => MonadReader r (ParallelT m) Source # | |
MonadAsync m => Monad (ParallelT m) Source # | |
Monad m => Functor (ParallelT m) Source # | |
(Monad m, MonadAsync m) => Applicative (ParallelT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
(MonadIO m, MonadAsync m) => MonadIO (ParallelT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
(MonadThrow m, MonadAsync m) => MonadThrow (ParallelT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
MonadAsync m => Semigroup (ParallelT m a) Source # | |
MonadAsync m => Monoid (ParallelT m a) Source # | |
type Parallel = ParallelT IO Source #
A parallely composing IO stream of elements of type a
.
See ParallelT
documentation for more details.
Since: 0.2.0
parallely :: IsStream t => ParallelT m a -> t m a Source #
Fix the type of a polymorphic stream as ParallelT
.
Since: 0.1.0
Merge Concurrently
parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #
parallelFst :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #
Like parallel
but stops the output as soon as the first stream stops.
Internal
parallelMin :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #
Like parallel
but stops the output as soon as any of the two streams
stops.
Internal
Evaluate Concurrently
mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a Source #
Generate a stream asynchronously to keep it buffered, lazily consume from the buffer.
Internal
Tap Concurrently
tapAsync :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> t m a Source #
Redirect a copy of the stream to a supplied fold and run it concurrently
in an independent thread. The fold may buffer some elements. The buffer size
is determined by the prevailing maxBuffer
setting.
Stream m a -> m b | -----stream m a ---------------stream m a-----
> S.drain $ S.tapAsync (S.mapM_ print) (S.enumerateFromTo 1 2) 1 2
Exceptions from the concurrently running fold are propagated to the current computation. Note that, because of buffering in the fold, exceptions may be delayed and may not correspond to the current element being processed in the parent stream, but we guarantee that before the parent stream stops the tap finishes and all exceptions from it are drained.
Compare with tap
.
Internal
distributeAsync_ :: (Foldable f, IsStream t, MonadAsync m) => f (t m a -> m b) -> t m a -> t m a Source #
Concurrently distribute a stream to a collection of fold functions, discarding the outputs of the folds.
>>>
S.drain $ distributeAsync_ [S.mapM_ print, S.mapM_ print] (S.enumerateFromTo 1 2)
distributeAsync_ = flip (foldr tapAsync)
Internal