map-reduce-folds-0.1.0.0: foldl wrappers for map-reduce

Copyright(c) Adam Conner-Sax 2019
LicenseBSD-3-Clause
Maintaineradam_conner_sax@yahoo.com
Stabilityexperimental
Safe HaskellNone
LanguageHaskell2010

Control.MapReduce.Engines.Streamly

Contents

Description

map-reduce engine (fold builder) using Streamly streams as its intermediate and return type.

Notes: 1. These are polymorphic in the return stream type. Thought the streams do have to be serial when groupBy is called So you have to specify the stream type in the call or it has to be inferrable from the use of the result.

  1. There is a concurrent engine here, one that uses Streamly's concurrency features to map over the stream. I've not been able to verify that this is faster on an appropriate task with appropriate runtime settings.
Synopsis

Engines

streamlyEngine :: (Foldable g, Functor g) => (forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z)) -> MapReduceFold y k c (SerialT Identity) x d Source #

map-reduce-fold builder returning a SerialT Identity d result

streamlyEngineM :: (IsStream t, Monad m, MonadAsync m, Traversable g) => (forall z. SerialT m (k, z) -> SerialT m (k, g z)) -> MapReduceFoldM m y k c (t m) x d Source #

effectful map-reduce-fold engine returning a (Istream t => t m d) result The MonadAsync constraint here more or less requires us to run in IO, or something IO like.

concurrentStreamlyEngine :: forall tIn tOut m g y k c x d. (IsStream tIn, IsStream tOut, MonadAsync m, Foldable g, Functor g) => (forall z. SerialT m (k, z) -> SerialT m (k, g z)) -> MapReduceFold y k c (tOut m) x d Source #

possibly (depending on chosen stream types) concurrent map-reduce-fold builder returning an (Istream t, MonadAsync m) => t m d result

Result Extraction

resultToList :: (Monad m, IsStream t) => t m a -> m [a] Source #

make a stream into an (effectful) []

concatStream :: (Monad m, Monoid a) => SerialT m a -> m a Source #

mappend all in a monoidal stream

concatStreamFold :: Monoid b => Fold a (SerialT Identity b) -> Fold a b Source #

mappend everything in a pure Streamly fold

concatStreamFoldM :: (Monad m, Monoid b, IsStream t) => FoldM m a (t m b) -> FoldM m a b Source #

mappend everything in an effectful Streamly fold.

concatConcurrentStreamFold :: (Monad m, Monoid b, IsStream t) => Fold a (t m b) -> FoldM m a b Source #

mappend everything in a concurrent Streamly fold.

groupBy Functions

groupByHashableKey :: (Monad m, Hashable k, Eq k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #

Group streamly stream of (k,c) by hashable key. NB: this function uses the fact that SerialT m is a monad

groupByOrderedKey :: (Monad m, Ord k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #

Group streamly stream of (k,c) by ordered key. NB: this function uses the fact that SerialT m is a monad

groupByHashableKeyST :: (Monad m, Hashable k, Eq k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #

Group streamly stream of (k,c) by hashable key. Uses mutable hashtables running in the ST monad. NB: this function uses the fact that SerialT m is a monad

groupByDiscriminatedKey :: (Monad m, Grouping k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #

Group streamly stream of (k,c) by key with instance of Grouping from http://hackage.haskell.org/package/discrimination. NB: this function uses the fact that SerialT m is a monad

Re-Exports

data SerialT (m :: Type -> Type) a #

Deep serial composition or serial composition with depth first traversal. The Semigroup instance of SerialT appends two streams serially in a depth first manner, yielding all elements from the first stream, and then all elements from the second stream.

import Streamly
import qualified Streamly.Prelude as S

main = (toList . serially $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,2,3,4]

The Monad instance runs the monadic continuation for each element of the stream, serially.

main = runStream . serially $ do
    x <- return 1 <> return 2
    S.yieldM $ print x
1
2

SerialT nests streams serially in a depth first manner.

main = runStream . serially $ do
    x <- return 1 <> return 2
    y <- return 3 <> return 4
    S.yieldM $ print (x, y)
(1,3)
(1,4)
(2,3)
(2,4)

This behavior of SerialT is exactly like a list transformer. We call the monadic code being run for each element of the stream a monadic continuation. In imperative paradigm we can think of this composition as nested for loops and the monadic continuation is the body of the loop. The loop iterates for all elements of the stream.

The serially combinator can be omitted as the default stream type is SerialT. Note that serial composition with depth first traversal can be used to combine an infinite number of streams as it explores only one stream at a time.

Since: streamly-0.2.0

Instances
MonadTrans SerialT 
Instance details

Defined in Streamly.Streams.Serial

Methods

lift :: Monad m => m a -> SerialT m a #

IsStream SerialT 
Instance details

Defined in Streamly.Streams.Serial

Methods

toStream :: SerialT m a -> Stream m a

fromStream :: Stream m a -> SerialT m a

consM :: MonadAsync m => m a -> SerialT m a -> SerialT m a #

(|:) :: MonadAsync m => m a -> SerialT m a -> SerialT m a #

(MonadBase b m, Monad m) => MonadBase b (SerialT m) 
Instance details

Defined in Streamly.Streams.Serial

Methods

liftBase :: b α -> SerialT m α #

MonadState s m => MonadState s (SerialT m) 
Instance details

Defined in Streamly.Streams.Serial

Methods

get :: SerialT m s #

put :: s -> SerialT m () #

state :: (s -> (a, s)) -> SerialT m a #

MonadReader r m => MonadReader r (SerialT m) 
Instance details

Defined in Streamly.Streams.Serial

Methods

ask :: SerialT m r #

local :: (r -> r) -> SerialT m a -> SerialT m a #

reader :: (r -> a) -> SerialT m a #

Monad m => Monad (SerialT m) 
Instance details

Defined in Streamly.Streams.Serial

Methods

(>>=) :: SerialT m a -> (a -> SerialT m b) -> SerialT m b #

(>>) :: SerialT m a -> SerialT m b -> SerialT m b #

return :: a -> SerialT m a #

fail :: String -> SerialT m a #

Monad m => Functor (SerialT m) 
Instance details

Defined in Streamly.Streams.Serial

Methods

fmap :: (a -> b) -> SerialT m a -> SerialT m b #

(<$) :: a -> SerialT m b -> SerialT m a #

Monad m => Applicative (SerialT m) 
Instance details

Defined in Streamly.Streams.Serial

Methods

pure :: a -> SerialT m a #

(<*>) :: SerialT m (a -> b) -> SerialT m a -> SerialT m b #

liftA2 :: (a -> b -> c) -> SerialT m a -> SerialT m b -> SerialT m c #

(*>) :: SerialT m a -> SerialT m b -> SerialT m b #

(<*) :: SerialT m a -> SerialT m b -> SerialT m a #

(Foldable m, Monad m) => Foldable (SerialT m) 
Instance details

Defined in Streamly.Streams.Serial

Methods

fold :: Monoid m0 => SerialT m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> SerialT m a -> m0 #

foldr :: (a -> b -> b) -> b -> SerialT m a -> b #

foldr' :: (a -> b -> b) -> b -> SerialT m a -> b #

foldl :: (b -> a -> b) -> b -> SerialT m a -> b #

foldl' :: (b -> a -> b) -> b -> SerialT m a -> b #

foldr1 :: (a -> a -> a) -> SerialT m a -> a #

foldl1 :: (a -> a -> a) -> SerialT m a -> a #

toList :: SerialT m a -> [a] #

null :: SerialT m a -> Bool #

length :: SerialT m a -> Int #

elem :: Eq a => a -> SerialT m a -> Bool #

maximum :: Ord a => SerialT m a -> a #

minimum :: Ord a => SerialT m a -> a #

sum :: Num a => SerialT m a -> a #

product :: Num a => SerialT m a -> a #

Traversable (SerialT Identity) 
Instance details

Defined in Streamly.Streams.Serial

Methods

traverse :: Applicative f => (a -> f b) -> SerialT Identity a -> f (SerialT Identity b) #

sequenceA :: Applicative f => SerialT Identity (f a) -> f (SerialT Identity a) #

mapM :: Monad m => (a -> m b) -> SerialT Identity a -> m (SerialT Identity b) #

sequence :: Monad m => SerialT Identity (m a) -> m (SerialT Identity a) #

MonadIO m => MonadIO (SerialT m) 
Instance details

Defined in Streamly.Streams.Serial

Methods

liftIO :: IO a -> SerialT m a #

NFData1 (SerialT Identity) 
Instance details

Defined in Streamly.Streams.Serial

Methods

liftRnf :: (a -> ()) -> SerialT Identity a -> () #

MonadThrow m => MonadThrow (SerialT m) 
Instance details

Defined in Streamly.Streams.Serial

Methods

throwM :: Exception e => e -> SerialT m a #

IsList (SerialT Identity a) 
Instance details

Defined in Streamly.Streams.Serial

Associated Types

type Item (SerialT Identity a) :: Type #

Eq a => Eq (SerialT Identity a) 
Instance details

Defined in Streamly.Streams.Serial

Ord a => Ord (SerialT Identity a) 
Instance details

Defined in Streamly.Streams.Serial

Read a => Read (SerialT Identity a) 
Instance details

Defined in Streamly.Streams.Serial

Show a => Show (SerialT Identity a) 
Instance details

Defined in Streamly.Streams.Serial

a ~ Char => IsString (SerialT Identity a) 
Instance details

Defined in Streamly.Streams.Serial

Semigroup (SerialT m a) 
Instance details

Defined in Streamly.Streams.Serial

Methods

(<>) :: SerialT m a -> SerialT m a -> SerialT m a #

sconcat :: NonEmpty (SerialT m a) -> SerialT m a #

stimes :: Integral b => b -> SerialT m a -> SerialT m a #

Monoid (SerialT m a) 
Instance details

Defined in Streamly.Streams.Serial

Methods

mempty :: SerialT m a #

mappend :: SerialT m a -> SerialT m a -> SerialT m a #

mconcat :: [SerialT m a] -> SerialT m a #

NFData a => NFData (SerialT Identity a) 
Instance details

Defined in Streamly.Streams.Serial

Methods

rnf :: SerialT Identity a -> () #

type Item (SerialT Identity a) 
Instance details

Defined in Streamly.Streams.Serial

type Item (SerialT Identity a) = a

data WSerialT (m :: Type -> Type) a #

Wide serial composition or serial composition with a breadth first traversal. The Semigroup instance of WSerialT traverses the two streams in a breadth first manner. In other words, it interleaves two streams, yielding one element from each stream alternately.

import Streamly
import qualified Streamly.Prelude as S

main = (toList . wSerially $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,3,2,4]

Similarly, the Monad instance interleaves the iterations of the inner and the outer loop, nesting loops in a breadth first manner.

main = runStream . wSerially $ do
    x <- return 1 <> return 2
    y <- return 3 <> return 4
    S.yieldM $ print (x, y)
(1,3)
(2,3)
(1,4)
(2,4)

Note that a serial composition with breadth first traversal can only combine a finite number of streams as it needs to retain state for each unfinished stream.

Since: streamly-0.2.0

Instances
MonadTrans WSerialT 
Instance details

Defined in Streamly.Streams.Serial

Methods

lift :: Monad m => m a -> WSerialT m a #

IsStream WSerialT 
Instance details

Defined in Streamly.Streams.Serial

Methods

toStream :: WSerialT m a -> Stream m a

fromStream :: Stream m a -> WSerialT m a

consM :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a #

(|:) :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a #

(MonadBase b m, Monad m) => MonadBase b (WSerialT m) 
Instance details

Defined in Streamly.Streams.Serial

Methods

liftBase :: b α -> WSerialT m α #

MonadState s m => MonadState s (WSerialT m) 
Instance details

Defined in Streamly.Streams.Serial

Methods

get :: WSerialT m s #

put :: s -> WSerialT m () #

state :: (s -> (a, s)) -> WSerialT m a #

MonadReader r m => MonadReader r (WSerialT m) 
Instance details

Defined in Streamly.Streams.Serial

Methods

ask :: WSerialT m r #

local :: (r -> r) -> WSerialT m a -> WSerialT m a #

reader :: (r -> a) -> WSerialT m a #

Monad m => Monad (WSerialT m) 
Instance details

Defined in Streamly.Streams.Serial

Methods

(>>=) :: WSerialT m a -> (a -> WSerialT m b) -> WSerialT m b #

(>>) :: WSerialT m a -> WSerialT m b -> WSerialT m b #

return :: a -> WSerialT m a #

fail :: String -> WSerialT m a #

Monad m => Functor (WSerialT m) 
Instance details

Defined in Streamly.Streams.Serial

Methods

fmap :: (a -> b) -> WSerialT m a -> WSerialT m b #

(<$) :: a -> WSerialT m b -> WSerialT m a #

Monad m => Applicative (WSerialT m) 
Instance details

Defined in Streamly.Streams.Serial

Methods

pure :: a -> WSerialT m a #

(<*>) :: WSerialT m (a -> b) -> WSerialT m a -> WSerialT m b #

liftA2 :: (a -> b -> c) -> WSerialT m a -> WSerialT m b -> WSerialT m c #

(*>) :: WSerialT m a -> WSerialT m b -> WSerialT m b #

(<*) :: WSerialT m a -> WSerialT m b -> WSerialT m a #

(Foldable m, Monad m) => Foldable (WSerialT m) 
Instance details

Defined in Streamly.Streams.Serial

Methods

fold :: Monoid m0 => WSerialT m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> WSerialT m a -> m0 #

foldr :: (a -> b -> b) -> b -> WSerialT m a -> b #

foldr' :: (a -> b -> b) -> b -> WSerialT m a -> b #

foldl :: (b -> a -> b) -> b -> WSerialT m a -> b #

foldl' :: (b -> a -> b) -> b -> WSerialT m a -> b #

foldr1 :: (a -> a -> a) -> WSerialT m a -> a #

foldl1 :: (a -> a -> a) -> WSerialT m a -> a #

toList :: WSerialT m a -> [a] #

null :: WSerialT m a -> Bool #

length :: WSerialT m a -> Int #

elem :: Eq a => a -> WSerialT m a -> Bool #

maximum :: Ord a => WSerialT m a -> a #

minimum :: Ord a => WSerialT m a -> a #

sum :: Num a => WSerialT m a -> a #

product :: Num a => WSerialT m a -> a #

Traversable (WSerialT Identity) 
Instance details

Defined in Streamly.Streams.Serial

Methods

traverse :: Applicative f => (a -> f b) -> WSerialT Identity a -> f (WSerialT Identity b) #

sequenceA :: Applicative f => WSerialT Identity (f a) -> f (WSerialT Identity a) #

mapM :: Monad m => (a -> m b) -> WSerialT Identity a -> m (WSerialT Identity b) #

sequence :: Monad m => WSerialT Identity (m a) -> m (WSerialT Identity a) #

MonadIO m => MonadIO (WSerialT m) 
Instance details

Defined in Streamly.Streams.Serial

Methods

liftIO :: IO a -> WSerialT m a #

NFData1 (WSerialT Identity) 
Instance details

Defined in Streamly.Streams.Serial

Methods

liftRnf :: (a -> ()) -> WSerialT Identity a -> () #

MonadThrow m => MonadThrow (WSerialT m) 
Instance details

Defined in Streamly.Streams.Serial

Methods

throwM :: Exception e => e -> WSerialT m a #

IsList (WSerialT Identity a) 
Instance details

Defined in Streamly.Streams.Serial

Associated Types

type Item (WSerialT Identity a) :: Type #

Eq a => Eq (WSerialT Identity a) 
Instance details

Defined in Streamly.Streams.Serial

Ord a => Ord (WSerialT Identity a) 
Instance details

Defined in Streamly.Streams.Serial

Read a => Read (WSerialT Identity a) 
Instance details

Defined in Streamly.Streams.Serial

Show a => Show (WSerialT Identity a) 
Instance details

Defined in Streamly.Streams.Serial

a ~ Char => IsString (WSerialT Identity a) 
Instance details

Defined in Streamly.Streams.Serial

Semigroup (WSerialT m a) 
Instance details

Defined in Streamly.Streams.Serial

Methods

(<>) :: WSerialT m a -> WSerialT m a -> WSerialT m a #

sconcat :: NonEmpty (WSerialT m a) -> WSerialT m a #

stimes :: Integral b => b -> WSerialT m a -> WSerialT m a #

Monoid (WSerialT m a) 
Instance details

Defined in Streamly.Streams.Serial

Methods

mempty :: WSerialT m a #

mappend :: WSerialT m a -> WSerialT m a -> WSerialT m a #

mconcat :: [WSerialT m a] -> WSerialT m a #

NFData a => NFData (WSerialT Identity a) 
Instance details

Defined in Streamly.Streams.Serial

Methods

rnf :: WSerialT Identity a -> () #

type Item (WSerialT Identity a) 
Instance details

Defined in Streamly.Streams.Serial

type Item (WSerialT Identity a) = a

data AheadT (m :: Type -> Type) a #

Deep ahead composition or ahead composition with depth first traversal. The semigroup composition of AheadT appends streams in a depth first manner just like SerialT except that it can produce elements concurrently ahead of time. It is like AsyncT except that AsyncT produces the output as it arrives whereas AheadT orders the output in the traversal order.

main = (toList . aheadly $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,2,3,4]

Any exceptions generated by a constituent stream are propagated to the output stream.

Similarly, the monad instance of AheadT may run each iteration concurrently ahead of time but presents the results in the same order as SerialT.

import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent

main = runStream . aheadly $ do
    n <- return 3 <> return 2 <> return 1
    S.yieldM $ do
         threadDelay (n * 1000000)
         myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1
ThreadId 39: Delay 2
ThreadId 38: Delay 3

All iterations may run in the same thread if they do not block.

Note that ahead composition with depth first traversal can be used to combine infinite number of streams as it explores only a bounded number of streams at a time.

Since: streamly-0.3.0

Instances
MonadTrans AheadT 
Instance details

Defined in Streamly.Streams.Ahead

Methods

lift :: Monad m => m a -> AheadT m a #

IsStream AheadT 
Instance details

Defined in Streamly.Streams.Ahead

Methods

toStream :: AheadT m a -> Stream m a

fromStream :: Stream m a -> AheadT m a

consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a #

(|:) :: MonadAsync m => m a -> AheadT m a -> AheadT m a #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AheadT m) 
Instance details

Defined in Streamly.Streams.Ahead

Methods

liftBase :: b α -> AheadT m α #

(MonadState s m, MonadAsync m) => MonadState s (AheadT m) 
Instance details

Defined in Streamly.Streams.Ahead

Methods

get :: AheadT m s #

put :: s -> AheadT m () #

state :: (s -> (a, s)) -> AheadT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (AheadT m) 
Instance details

Defined in Streamly.Streams.Ahead

Methods

ask :: AheadT m r #

local :: (r -> r) -> AheadT m a -> AheadT m a #

reader :: (r -> a) -> AheadT m a #

MonadAsync m => Monad (AheadT m) 
Instance details

Defined in Streamly.Streams.Ahead

Methods

(>>=) :: AheadT m a -> (a -> AheadT m b) -> AheadT m b #

(>>) :: AheadT m a -> AheadT m b -> AheadT m b #

return :: a -> AheadT m a #

fail :: String -> AheadT m a #

Monad m => Functor (AheadT m) 
Instance details

Defined in Streamly.Streams.Ahead

Methods

fmap :: (a -> b) -> AheadT m a -> AheadT m b #

(<$) :: a -> AheadT m b -> AheadT m a #

(Monad m, MonadAsync m) => Applicative (AheadT m) 
Instance details

Defined in Streamly.Streams.Ahead

Methods

pure :: a -> AheadT m a #

(<*>) :: AheadT m (a -> b) -> AheadT m a -> AheadT m b #

liftA2 :: (a -> b -> c) -> AheadT m a -> AheadT m b -> AheadT m c #

(*>) :: AheadT m a -> AheadT m b -> AheadT m b #

(<*) :: AheadT m a -> AheadT m b -> AheadT m a #

(MonadIO m, MonadAsync m) => MonadIO (AheadT m) 
Instance details

Defined in Streamly.Streams.Ahead

Methods

liftIO :: IO a -> AheadT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (AheadT m) 
Instance details

Defined in Streamly.Streams.Ahead

Methods

throwM :: Exception e => e -> AheadT m a #

MonadAsync m => Semigroup (AheadT m a) 
Instance details

Defined in Streamly.Streams.Ahead

Methods

(<>) :: AheadT m a -> AheadT m a -> AheadT m a #

sconcat :: NonEmpty (AheadT m a) -> AheadT m a #

stimes :: Integral b => b -> AheadT m a -> AheadT m a #

MonadAsync m => Monoid (AheadT m a) 
Instance details

Defined in Streamly.Streams.Ahead

Methods

mempty :: AheadT m a #

mappend :: AheadT m a -> AheadT m a -> AheadT m a #

mconcat :: [AheadT m a] -> AheadT m a #

data AsyncT (m :: Type -> Type) a #

Deep async composition or async composition with depth first traversal. In a left to right Semigroup composition it tries to yield elements from the left stream as long as it can, but it can run the right stream in parallel if it needs to, based on demand. The right stream can be run if the left stream blocks on IO or cannot produce elements fast enough for the consumer.

main = (toList . asyncly $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,2,3,4]

Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.

Similarly, the monad instance of AsyncT may run each iteration concurrently based on demand. More concurrent iterations are started only if the previous iterations are not able to produce enough output for the consumer.

import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent

main = runStream . asyncly $ do
    n <- return 3 <> return 2 <> return 1
    S.yieldM $ do
         threadDelay (n * 1000000)
         myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1
ThreadId 39: Delay 2
ThreadId 38: Delay 3

All iterations may run in the same thread if they do not block.

Note that async composition with depth first traversal can be used to combine infinite number of streams as it explores only a bounded number of streams at a time.

Since: streamly-0.1.0

Instances
MonadTrans AsyncT 
Instance details

Defined in Streamly.Streams.Async

Methods

lift :: Monad m => m a -> AsyncT m a #

IsStream AsyncT 
Instance details

Defined in Streamly.Streams.Async

Methods

toStream :: AsyncT m a -> Stream m a

fromStream :: Stream m a -> AsyncT m a

consM :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a #

(|:) :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AsyncT m) 
Instance details

Defined in Streamly.Streams.Async

Methods

liftBase :: b α -> AsyncT m α #

(MonadState s m, MonadAsync m) => MonadState s (AsyncT m) 
Instance details

Defined in Streamly.Streams.Async

Methods

get :: AsyncT m s #

put :: s -> AsyncT m () #

state :: (s -> (a, s)) -> AsyncT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (AsyncT m) 
Instance details

Defined in Streamly.Streams.Async

Methods

ask :: AsyncT m r #

local :: (r -> r) -> AsyncT m a -> AsyncT m a #

reader :: (r -> a) -> AsyncT m a #

MonadAsync m => Monad (AsyncT m) 
Instance details

Defined in Streamly.Streams.Async

Methods

(>>=) :: AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b #

(>>) :: AsyncT m a -> AsyncT m b -> AsyncT m b #

return :: a -> AsyncT m a #

fail :: String -> AsyncT m a #

Monad m => Functor (AsyncT m) 
Instance details

Defined in Streamly.Streams.Async

Methods

fmap :: (a -> b) -> AsyncT m a -> AsyncT m b #

(<$) :: a -> AsyncT m b -> AsyncT m a #

(Monad m, MonadAsync m) => Applicative (AsyncT m) 
Instance details

Defined in Streamly.Streams.Async

Methods

pure :: a -> AsyncT m a #

(<*>) :: AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b #

liftA2 :: (a -> b -> c) -> AsyncT m a -> AsyncT m b -> AsyncT m c #

(*>) :: AsyncT m a -> AsyncT m b -> AsyncT m b #

(<*) :: AsyncT m a -> AsyncT m b -> AsyncT m a #

(MonadIO m, MonadAsync m) => MonadIO (AsyncT m) 
Instance details

Defined in Streamly.Streams.Async

Methods

liftIO :: IO a -> AsyncT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (AsyncT m) 
Instance details

Defined in Streamly.Streams.Async

Methods

throwM :: Exception e => e -> AsyncT m a #

MonadAsync m => Semigroup (AsyncT m a) 
Instance details

Defined in Streamly.Streams.Async

Methods

(<>) :: AsyncT m a -> AsyncT m a -> AsyncT m a #

sconcat :: NonEmpty (AsyncT m a) -> AsyncT m a #

stimes :: Integral b => b -> AsyncT m a -> AsyncT m a #

MonadAsync m => Monoid (AsyncT m a) 
Instance details

Defined in Streamly.Streams.Async

Methods

mempty :: AsyncT m a #

mappend :: AsyncT m a -> AsyncT m a -> AsyncT m a #

mconcat :: [AsyncT m a] -> AsyncT m a #

data WAsyncT (m :: Type -> Type) a #

Wide async composition or async composition with breadth first traversal. The Semigroup instance of WAsyncT concurrently traverses the composed streams using a depth first travesal or in a round robin fashion, yielding elements from both streams alternately.

main = (toList . wAsyncly $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,3,2,4]

Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.

Similarly, the Monad instance of WAsyncT runs all iterations fairly concurrently using a round robin scheduling.

import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent

main = runStream . wAsyncly $ do
    n <- return 3 <> return 2 <> return 1
    S.yieldM $ do
         threadDelay (n * 1000000)
         myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1
ThreadId 39: Delay 2
ThreadId 38: Delay 3

Unlike AsyncT all iterations are guaranteed to run fairly concurrently, unconditionally.

Note that async composition with breadth first traversal can only combine a finite number of streams as it needs to retain state for each unfinished stream.

Since: streamly-0.2.0

Instances
MonadTrans WAsyncT 
Instance details

Defined in Streamly.Streams.Async

Methods

lift :: Monad m => m a -> WAsyncT m a #

IsStream WAsyncT 
Instance details

Defined in Streamly.Streams.Async

Methods

toStream :: WAsyncT m a -> Stream m a

fromStream :: Stream m a -> WAsyncT m a

consM :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a #

(|:) :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (WAsyncT m) 
Instance details

Defined in Streamly.Streams.Async

Methods

liftBase :: b α -> WAsyncT m α #

(MonadState s m, MonadAsync m) => MonadState s (WAsyncT m) 
Instance details

Defined in Streamly.Streams.Async

Methods

get :: WAsyncT m s #

put :: s -> WAsyncT m () #

state :: (s -> (a, s)) -> WAsyncT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (WAsyncT m) 
Instance details

Defined in Streamly.Streams.Async

Methods

ask :: WAsyncT m r #

local :: (r -> r) -> WAsyncT m a -> WAsyncT m a #

reader :: (r -> a) -> WAsyncT m a #

MonadAsync m => Monad (WAsyncT m) 
Instance details

Defined in Streamly.Streams.Async

Methods

(>>=) :: WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b #

(>>) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m b #

return :: a -> WAsyncT m a #

fail :: String -> WAsyncT m a #

Monad m => Functor (WAsyncT m) 
Instance details

Defined in Streamly.Streams.Async

Methods

fmap :: (a -> b) -> WAsyncT m a -> WAsyncT m b #

(<$) :: a -> WAsyncT m b -> WAsyncT m a #

(Monad m, MonadAsync m) => Applicative (WAsyncT m) 
Instance details

Defined in Streamly.Streams.Async

Methods

pure :: a -> WAsyncT m a #

(<*>) :: WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b #

liftA2 :: (a -> b -> c) -> WAsyncT m a -> WAsyncT m b -> WAsyncT m c #

(*>) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m b #

(<*) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m a #

(MonadIO m, MonadAsync m) => MonadIO (WAsyncT m) 
Instance details

Defined in Streamly.Streams.Async

Methods

liftIO :: IO a -> WAsyncT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (WAsyncT m) 
Instance details

Defined in Streamly.Streams.Async

Methods

throwM :: Exception e => e -> WAsyncT m a #

MonadAsync m => Semigroup (WAsyncT m a) 
Instance details

Defined in Streamly.Streams.Async

Methods

(<>) :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a #

sconcat :: NonEmpty (WAsyncT m a) -> WAsyncT m a #

stimes :: Integral b => b -> WAsyncT m a -> WAsyncT m a #

MonadAsync m => Monoid (WAsyncT m a) 
Instance details

Defined in Streamly.Streams.Async

Methods

mempty :: WAsyncT m a #

mappend :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a #

mconcat :: [WAsyncT m a] -> WAsyncT m a #

data ParallelT (m :: Type -> Type) a #

Async composition with simultaneous traversal of all streams.

The Semigroup instance of ParallelT concurrently merges two streams, running both strictly concurrently and yielding elements from both streams as they arrive. When multiple streams are combined using ParallelT each one is evaluated in its own thread and the results produced are presented in the combined stream on a first come first serve basis.

AsyncT and WAsyncT are concurrent lookahead streams each with a specific type of consumption pattern (depth first or breadth first). Since they are lookahead, they may introduce certain default latency in starting more concurrent tasks for efficiency reasons or may put a default limitation on the resource consumption (e.g. number of concurrent threads for lookahead). If we look at the implementation detail, they both can share a pool of worker threads to evaluate the streams in the desired pattern and at the desired rate. However, ParallelT uses a separate runtime thread to evaluate each stream.

WAsyncT is similar to ParallelT, as both of them evaluate the constituent streams fairly in a round robin fashion. However, the key difference is that WAsyncT is lazy or pull driven whereas ParallelT is strict or push driven. ParallelT immediately starts concurrent evaluation of both the streams (in separate threads) and later picks the results whereas WAsyncT may wait for a certain latency threshold before initiating concurrent evaluation of the next stream. The concurrent scheduling of the next stream or the degree of concurrency is driven by the feedback from the consumer. In case of ParallelT each stream is evaluated in a separate thread and results are pushed to a shared output buffer, the evaluation rate is controlled by blocking when the buffer is full.

Concurrent lookahead streams are generally more efficient than ParallelT and can work pretty efficiently even for smaller tasks because they do not necessarily use a separate thread for each task. So they should be preferred over ParallelT especially when efficiency is a concern and simultaneous strict evaluation is not a requirement. ParallelT is useful for cases when the streams are required to be evaluated simultaneously irrespective of how the consumer consumes them e.g. when we want to race two tasks and want to start both strictly at the same time or if we have timers in the parallel tasks and our results depend on the timers being started at the same time. We can say that ParallelT is almost the same (modulo some implementation differences) as WAsyncT when the latter is used with unlimited lookahead and zero latency in initiating lookahead.

main = (toList . parallely $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,3,2,4]

When streams with more than one element are merged, it yields whichever stream yields first without any bias, unlike the Async style streams.

Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.

Similarly, the Monad instance of ParallelT runs all iterations of the loop concurrently.

import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent

main = runStream . parallely $ do
    n <- return 3 <> return 2 <> return 1
    S.yieldM $ do
         threadDelay (n * 1000000)
         myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1
ThreadId 39: Delay 2
ThreadId 38: Delay 3

Note that parallel composition can only combine a finite number of streams as it needs to retain state for each unfinished stream.

Since: streamly-0.1.0

Instances
MonadTrans ParallelT 
Instance details

Defined in Streamly.Streams.Parallel

Methods

lift :: Monad m => m a -> ParallelT m a #

IsStream ParallelT 
Instance details

Defined in Streamly.Streams.Parallel

Methods

toStream :: ParallelT m a -> Stream m a

fromStream :: Stream m a -> ParallelT m a

consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a #

(|:) :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (ParallelT m) 
Instance details

Defined in Streamly.Streams.Parallel

Methods

liftBase :: b α -> ParallelT m α #

(MonadState s m, MonadAsync m) => MonadState s (ParallelT m) 
Instance details

Defined in Streamly.Streams.Parallel

Methods

get :: ParallelT m s #

put :: s -> ParallelT m () #

state :: (s -> (a, s)) -> ParallelT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (ParallelT m) 
Instance details

Defined in Streamly.Streams.Parallel

Methods

ask :: ParallelT m r #

local :: (r -> r) -> ParallelT m a -> ParallelT m a #

reader :: (r -> a) -> ParallelT m a #

MonadAsync m => Monad (ParallelT m) 
Instance details

Defined in Streamly.Streams.Parallel

Methods

(>>=) :: ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b #

(>>) :: ParallelT m a -> ParallelT m b -> ParallelT m b #

return :: a -> ParallelT m a #

fail :: String -> ParallelT m a #

Monad m => Functor (ParallelT m) 
Instance details

Defined in Streamly.Streams.Parallel

Methods

fmap :: (a -> b) -> ParallelT m a -> ParallelT m b #

(<$) :: a -> ParallelT m b -> ParallelT m a #

(Monad m, MonadAsync m) => Applicative (ParallelT m) 
Instance details

Defined in Streamly.Streams.Parallel

Methods

pure :: a -> ParallelT m a #

(<*>) :: ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b #

liftA2 :: (a -> b -> c) -> ParallelT m a -> ParallelT m b -> ParallelT m c #

(*>) :: ParallelT m a -> ParallelT m b -> ParallelT m b #

(<*) :: ParallelT m a -> ParallelT m b -> ParallelT m a #

(MonadIO m, MonadAsync m) => MonadIO (ParallelT m) 
Instance details

Defined in Streamly.Streams.Parallel

Methods

liftIO :: IO a -> ParallelT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (ParallelT m) 
Instance details

Defined in Streamly.Streams.Parallel

Methods

throwM :: Exception e => e -> ParallelT m a #

MonadAsync m => Semigroup (ParallelT m a) 
Instance details

Defined in Streamly.Streams.Parallel

Methods

(<>) :: ParallelT m a -> ParallelT m a -> ParallelT m a #

sconcat :: NonEmpty (ParallelT m a) -> ParallelT m a #

stimes :: Integral b => b -> ParallelT m a -> ParallelT m a #

MonadAsync m => Monoid (ParallelT m a) 
Instance details

Defined in Streamly.Streams.Parallel

Methods

mempty :: ParallelT m a #

mappend :: ParallelT m a -> ParallelT m a -> ParallelT m a #

mconcat :: [ParallelT m a] -> ParallelT m a #

type MonadAsync (m :: Type -> Type) = (MonadIO m, MonadBaseControl IO m, MonadThrow m) #

A monad that can perform concurrent or parallel IO operations. Streams that can be composed concurrently require the underlying monad to be MonadAsync.

Since: streamly-0.1.0

class IsStream (t :: (Type -> Type) -> Type -> Type) #

Class of types that can represent a stream of elements of some type a in some monad m.

Since: streamly-0.2.0

Minimal complete definition

toStream, fromStream, consM, (|:)

Instances
IsStream Stream 
Instance details

Defined in Streamly.Streams.StreamK.Type

Methods

toStream :: Stream m a -> Stream m a

fromStream :: Stream m a -> Stream m a

consM :: MonadAsync m => m a -> Stream m a -> Stream m a #

(|:) :: MonadAsync m => m a -> Stream m a -> Stream m a #

IsStream AheadT 
Instance details

Defined in Streamly.Streams.Ahead

Methods

toStream :: AheadT m a -> Stream m a

fromStream :: Stream m a -> AheadT m a

consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a #

(|:) :: MonadAsync m => m a -> AheadT m a -> AheadT m a #

IsStream ZipSerialM 
Instance details

Defined in Streamly.Streams.Zip

Methods

toStream :: ZipSerialM m a -> Stream m a

fromStream :: Stream m a -> ZipSerialM m a

consM :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a #

(|:) :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a #

IsStream ZipAsyncM 
Instance details

Defined in Streamly.Streams.Zip

Methods

toStream :: ZipAsyncM m a -> Stream m a

fromStream :: Stream m a -> ZipAsyncM m a

consM :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a #

(|:) :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a #

IsStream AsyncT 
Instance details

Defined in Streamly.Streams.Async

Methods

toStream :: AsyncT m a -> Stream m a

fromStream :: Stream m a -> AsyncT m a

consM :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a #

(|:) :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a #

IsStream WAsyncT 
Instance details

Defined in Streamly.Streams.Async

Methods

toStream :: WAsyncT m a -> Stream m a

fromStream :: Stream m a -> WAsyncT m a

consM :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a #

(|:) :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a #

IsStream ParallelT 
Instance details

Defined in Streamly.Streams.Parallel

Methods

toStream :: ParallelT m a -> Stream m a

fromStream :: Stream m a -> ParallelT m a

consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a #

(|:) :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a #

IsStream SerialT 
Instance details

Defined in Streamly.Streams.Serial

Methods

toStream :: SerialT m a -> Stream m a

fromStream :: Stream m a -> SerialT m a

consM :: MonadAsync m => m a -> SerialT m a -> SerialT m a #

(|:) :: MonadAsync m => m a -> SerialT m a -> SerialT m a #

IsStream WSerialT 
Instance details

Defined in Streamly.Streams.Serial

Methods

toStream :: WSerialT m a -> Stream m a

fromStream :: Stream m a -> WSerialT m a

consM :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a #

(|:) :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a #