| Copyright | (c) 2017 Harendra Kumar | 
|---|---|
| License | BSD3 | 
| Maintainer | streamly@composewell.com | 
| Stability | experimental | 
| Portability | GHC | 
| Safe Haskell | None | 
| Language | Haskell2010 | 
Streamly.Internal.Data.Stream.Parallel
Description
Synopsis
- data ParallelT m a
- type Parallel = ParallelT IO
- parallely :: IsStream t => ParallelT m a -> t m a
- parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- parallelFst :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- parallelMin :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a
- tapAsync :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> t m a
- distributeAsync_ :: (Foldable f, IsStream t, MonadAsync m) => f (t m a -> m b) -> t m a -> t m a
Parallel Stream Type
Async composition with strict concurrent execution of all streams.
The Semigroup instance of ParallelT executes both the streams
 concurrently without any delay or without waiting for the consumer demand
 and merges the results as they arrive. If the consumer does not consume
 the results, they are buffered upto a configured maximum, controlled by the
 maxBuffer primitive. If the buffer becomes full the concurrent tasks will
 block until there is space in the buffer.
Both WAsyncT and ParallelT, evaluate the constituent streams fairly in a
 round robin fashion. The key difference is that WAsyncT might wait for the
 consumer demand before it executes the tasks whereas ParallelT starts
 executing all the tasks immediately without waiting for the consumer demand.
 For WAsyncT the maxThreads limit applies whereas for ParallelT it does
 not apply. In other words, WAsyncT can be lazy whereas ParallelT is
 strict.
ParallelT is useful for cases when the streams are required to be
 evaluated simultaneously irrespective of how the consumer consumes them e.g.
 when we want to race two tasks and want to start both strictly at the same
 time or if we have timers in the parallel tasks and our results depend on
 the timers being started at the same time. If we do not have such
 requirements then AsyncT or AheadT are recommended as they can be more
 efficient than ParallelT.
main = (toList.parallely$ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,3,2,4]
When streams with more than one element are merged, it yields whichever
 stream yields first without any bias, unlike the Async style streams.
Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.
Similarly, the Monad instance of ParallelT runs all iterations
 of the loop concurrently.
import Streamly import qualified Streamly.Prelude as S import Control.Concurrent main =drain.parallely$ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3
Note that parallel composition can only combine a finite number of streams as it needs to retain state for each unfinished stream.
Since: 0.7.0 (maxBuffer applies to ParallelT streams)
Since: 0.1.0
Instances
type Parallel = ParallelT IO Source #
A parallely composing IO stream of elements of type a.
 See ParallelT documentation for more details.
Since: 0.2.0
parallely :: IsStream t => ParallelT m a -> t m a Source #
Fix the type of a polymorphic stream as ParallelT.
Since: 0.1.0
Merge Concurrently
parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #
parallelFst :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #
Like parallel but stops the output as soon as the first stream stops.
Internal
parallelMin :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #
Like parallel but stops the output as soon as any of the two streams
 stops.
Internal
Evaluate Concurrently
mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a Source #
Generate a stream asynchronously to keep it buffered, lazily consume from the buffer.
Internal
Tap Concurrently
tapAsync :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> t m a Source #
Redirect a copy of the stream to a supplied fold and run it concurrently
 in an independent thread. The fold may buffer some elements. The buffer size
 is determined by the prevailing maxBuffer setting.
              Stream m a -> m b
                      |
-----stream m a ---------------stream m a-----
> S.drain $ S.tapAsync (S.mapM_ print) (S.enumerateFromTo 1 2) 1 2
Exceptions from the concurrently running fold are propagated to the current computation. Note that, because of buffering in the fold, exceptions may be delayed and may not correspond to the current element being processed in the parent stream, but we guarantee that before the parent stream stops the tap finishes and all exceptions from it are drained.
Compare with tap.
Internal
distributeAsync_ :: (Foldable f, IsStream t, MonadAsync m) => f (t m a -> m b) -> t m a -> t m a Source #
Concurrently distribute a stream to a collection of fold functions, discarding the outputs of the folds.
>>>S.drain $ distributeAsync_ [S.mapM_ print, S.mapM_ print] (S.enumerateFromTo 1 2)
distributeAsync_ = flip (foldr tapAsync)
Internal