map-reduce-folds-0.1.0.7: foldl wrappers for map-reduce
Copyright(c) Adam Conner-Sax 2019
LicenseBSD-3-Clause
Maintaineradam_conner_sax@yahoo.com
Stabilityexperimental
Safe HaskellNone
LanguageHaskell2010

Control.MapReduce.Engines.Streamly

Description

map-reduce engine (fold builder) using Streamly streams as its intermediate and return type.

Notes: 1. These are polymorphic in the return stream type. Thought the streams do have to be serial when groupBy is called So you have to specify the stream type in the call or it has to be inferrable from the use of the result.

  1. There is a concurrent engine here, one that uses Streamly's concurrency features to map over the stream. I've not been able to verify that this is faster on an appropriate task with appropriate runtime settings.
Synopsis

Engines

streamlyEngine :: (Foldable g, Functor g) => (forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z)) -> MapReduceFold y k c (SerialT Identity) x d Source #

map-reduce-fold builder returning a SerialT Identity d result

streamlyEngineM :: (IsStream t, Monad m, MonadAsync m, Traversable g) => (forall z. SerialT m (k, z) -> SerialT m (k, g z)) -> MapReduceFoldM m y k c (t m) x d Source #

effectful map-reduce-fold engine returning a (Istream t => t m d) result The MonadAsync constraint here more or less requires us to run in IO, or something IO like.

concurrentStreamlyEngine :: forall tIn tOut m g y k c x d. (IsStream tIn, IsStream tOut, MonadAsync m, Foldable g, Functor g) => (forall z. SerialT m (k, z) -> SerialT m (k, g z)) -> MapReduceFold y k c (tOut m) x d Source #

possibly (depending on chosen stream types) concurrent map-reduce-fold builder returning an (Istream t, MonadAsync m) => t m d result

Streamly Combinators

toStreamlyFold :: Monad m => Fold a b -> Fold m a b Source #

convert a Control.Foldl Fold into a Streamly.Data.Fold fold

toStreamlyFoldM :: FoldM m a b -> Fold m a b Source #

convert a Control.Foldl FoldM into a Streamly.Data.Fold fold

Result Extraction

resultToList :: (Monad m, IsStream t) => t m a -> m [a] Source #

make a stream into an (effectful) []

concatStream :: (Monad m, Monoid a) => SerialT m a -> m a Source #

mappend all in a monoidal stream

concatStreamFold :: Monoid b => Fold a (SerialT Identity b) -> Fold a b Source #

mappend everything in a pure Streamly fold

concatStreamFoldM :: (Monad m, Monoid b, IsStream t) => FoldM m a (t m b) -> FoldM m a b Source #

mappend everything in an effectful Streamly fold.

concatConcurrentStreamFold :: (Monad m, Monoid b, IsStream t) => Fold a (t m b) -> FoldM m a b Source #

mappend everything in a concurrent Streamly fold.

groupBy Functions

groupByHashableKey :: (Monad m, Hashable k, Eq k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #

Group streamly stream of (k,c) by hashable key. NB: this function uses the fact that SerialT m is a monad

groupByOrderedKey :: (Monad m, Ord k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #

Group streamly stream of (k,c) by ordered key. NB: this function uses the fact that SerialT m is a monad

groupByHashableKeyST :: (Monad m, Hashable k, Eq k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #

Group streamly stream of (k,c) by hashable key. Uses mutable hashtables running in the ST monad. NB: this function uses the fact that SerialT m is a monad

groupByDiscriminatedKey :: (Monad m, Grouping k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #

Group streamly stream of (k,c) by key with instance of Grouping from http://hackage.haskell.org/package/discrimination. NB: this function uses the fact that SerialT m is a monad

Re-Exports

data SerialT (m :: Type -> Type) a #

The Semigroup operation for SerialT behaves like a regular append operation. Therefore, when a <> b is evaluated, stream a is evaluated first until it exhausts and then stream b is evaluated. In other words, the elements of stream b are appended to the elements of stream a. This operation can be used to fold an infinite lazy container of streams.

import Streamly
import qualified Streamly.Prelude as S

main = (S.toList . serially $ (S.fromList [1,2]) <> (S.fromList [3,4])) >>= print
[1,2,3,4]

The Monad instance runs the monadic continuation for each element of the stream, serially.

main = S.drain . serially $ do
    x <- return 1 <> return 2
    S.yieldM $ print x
1
2

SerialT nests streams serially in a depth first manner.

main = S.drain . serially $ do
    x <- return 1 <> return 2
    y <- return 3 <> return 4
    S.yieldM $ print (x, y)
(1,3)
(1,4)
(2,3)
(2,4)

We call the monadic code being run for each element of the stream a monadic continuation. In imperative paradigm we can think of this composition as nested for loops and the monadic continuation is the body of the loop. The loop iterates for all elements of the stream.

Note that the behavior and semantics of SerialT, including Semigroup and Monad instances are exactly like Haskell lists except that SerialT can contain effectful actions while lists are pure.

In the code above, the serially combinator can be omitted as the default stream type is SerialT.

Since: streamly-0.2.0

Instances

Instances details
MonadTrans SerialT 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

lift :: Monad m => m a -> SerialT m a #

IsStream SerialT 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

toStream :: forall (m :: Type -> Type) a. SerialT m a -> Stream m a #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> SerialT m a #

consM :: MonadAsync m => m a -> SerialT m a -> SerialT m a #

(|:) :: MonadAsync m => m a -> SerialT m a -> SerialT m a #

(MonadBase b m, Monad m) => MonadBase b (SerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftBase :: b α -> SerialT m α #

MonadState s m => MonadState s (SerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

get :: SerialT m s #

put :: s -> SerialT m () #

state :: (s -> (a, s)) -> SerialT m a #

MonadReader r m => MonadReader r (SerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

ask :: SerialT m r #

local :: (r -> r) -> SerialT m a -> SerialT m a #

reader :: (r -> a) -> SerialT m a #

Monad m => Monad (SerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

(>>=) :: SerialT m a -> (a -> SerialT m b) -> SerialT m b #

(>>) :: SerialT m a -> SerialT m b -> SerialT m b #

return :: a -> SerialT m a #

Monad m => Functor (SerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

fmap :: (a -> b) -> SerialT m a -> SerialT m b #

(<$) :: a -> SerialT m b -> SerialT m a #

Monad m => Applicative (SerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

pure :: a -> SerialT m a #

(<*>) :: SerialT m (a -> b) -> SerialT m a -> SerialT m b #

liftA2 :: (a -> b -> c) -> SerialT m a -> SerialT m b -> SerialT m c #

(*>) :: SerialT m a -> SerialT m b -> SerialT m b #

(<*) :: SerialT m a -> SerialT m b -> SerialT m a #

(Foldable m, Monad m) => Foldable (SerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

fold :: Monoid m0 => SerialT m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> SerialT m a -> m0 #

foldMap' :: Monoid m0 => (a -> m0) -> SerialT m a -> m0 #

foldr :: (a -> b -> b) -> b -> SerialT m a -> b #

foldr' :: (a -> b -> b) -> b -> SerialT m a -> b #

foldl :: (b -> a -> b) -> b -> SerialT m a -> b #

foldl' :: (b -> a -> b) -> b -> SerialT m a -> b #

foldr1 :: (a -> a -> a) -> SerialT m a -> a #

foldl1 :: (a -> a -> a) -> SerialT m a -> a #

toList :: SerialT m a -> [a] #

null :: SerialT m a -> Bool #

length :: SerialT m a -> Int #

elem :: Eq a => a -> SerialT m a -> Bool #

maximum :: Ord a => SerialT m a -> a #

minimum :: Ord a => SerialT m a -> a #

sum :: Num a => SerialT m a -> a #

product :: Num a => SerialT m a -> a #

Traversable (SerialT Identity) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

traverse :: Applicative f => (a -> f b) -> SerialT Identity a -> f (SerialT Identity b) #

sequenceA :: Applicative f => SerialT Identity (f a) -> f (SerialT Identity a) #

mapM :: Monad m => (a -> m b) -> SerialT Identity a -> m (SerialT Identity b) #

sequence :: Monad m => SerialT Identity (m a) -> m (SerialT Identity a) #

MonadIO m => MonadIO (SerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftIO :: IO a -> SerialT m a #

NFData1 (SerialT Identity) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftRnf :: (a -> ()) -> SerialT Identity a -> () #

MonadThrow m => MonadThrow (SerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

throwM :: Exception e => e -> SerialT m a #

IsList (SerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Associated Types

type Item (SerialT Identity a) #

Eq a => Eq (SerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Ord a => Ord (SerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Read a => Read (SerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Show a => Show (SerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

a ~ Char => IsString (SerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Semigroup (SerialT m a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

(<>) :: SerialT m a -> SerialT m a -> SerialT m a #

sconcat :: NonEmpty (SerialT m a) -> SerialT m a #

stimes :: Integral b => b -> SerialT m a -> SerialT m a #

Monoid (SerialT m a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

mempty :: SerialT m a #

mappend :: SerialT m a -> SerialT m a -> SerialT m a #

mconcat :: [SerialT m a] -> SerialT m a #

NFData a => NFData (SerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

rnf :: SerialT Identity a -> () #

type Item (SerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

type Item (SerialT Identity a) = a

data WSerialT (m :: Type -> Type) a #

The Semigroup operation for WSerialT interleaves the elements from the two streams. Therefore, when a <> b is evaluated, stream a is evaluated first to produce the first element of the combined stream and then stream b is evaluated to produce the next element of the combined stream, and then we go back to evaluating stream a and so on. In other words, the elements of stream a are interleaved with the elements of stream b.

Note that evaluation of a <> b <> c does not schedule a, b and c with equal priority. This expression is equivalent to a <> (b <> c), therefore, it fairly interleaves a with the result of b <> c. For example, S.fromList [1,2] <> S.fromList [3,4] <> S.fromList [5,6] :: WSerialT Identity Int would result in [1,3,2,5,4,6]. In other words, the leftmost stream gets the same scheduling priority as the rest of the streams taken together. The same is true for each subexpression on the right.

Note that this operation cannot be used to fold a container of infinite streams as the state that it needs to maintain is proportional to the number of streams.

The W in the name stands for wide or breadth wise scheduling in contrast to the depth wise scheduling behavior of SerialT.

import Streamly
import qualified Streamly.Prelude as S

main = (S.toList . wSerially $ (S.fromList [1,2]) <> (S.fromList [3,4])) >>= print
[1,3,2,4]

Similarly, the Monad instance interleaves the iterations of the inner and the outer loop, nesting loops in a breadth first manner.

main = S.drain . wSerially $ do
    x <- return 1 <> return 2
    y <- return 3 <> return 4
    S.yieldM $ print (x, y)
(1,3)
(2,3)
(1,4)
(2,4)

Since: streamly-0.2.0

Instances

Instances details
MonadTrans WSerialT 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

lift :: Monad m => m a -> WSerialT m a #

IsStream WSerialT 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

toStream :: forall (m :: Type -> Type) a. WSerialT m a -> Stream m a #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> WSerialT m a #

consM :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a #

(|:) :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a #

(MonadBase b m, Monad m) => MonadBase b (WSerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftBase :: b α -> WSerialT m α #

MonadState s m => MonadState s (WSerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

get :: WSerialT m s #

put :: s -> WSerialT m () #

state :: (s -> (a, s)) -> WSerialT m a #

MonadReader r m => MonadReader r (WSerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

ask :: WSerialT m r #

local :: (r -> r) -> WSerialT m a -> WSerialT m a #

reader :: (r -> a) -> WSerialT m a #

Monad m => Monad (WSerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

(>>=) :: WSerialT m a -> (a -> WSerialT m b) -> WSerialT m b #

(>>) :: WSerialT m a -> WSerialT m b -> WSerialT m b #

return :: a -> WSerialT m a #

Monad m => Functor (WSerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

fmap :: (a -> b) -> WSerialT m a -> WSerialT m b #

(<$) :: a -> WSerialT m b -> WSerialT m a #

Monad m => Applicative (WSerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

pure :: a -> WSerialT m a #

(<*>) :: WSerialT m (a -> b) -> WSerialT m a -> WSerialT m b #

liftA2 :: (a -> b -> c) -> WSerialT m a -> WSerialT m b -> WSerialT m c #

(*>) :: WSerialT m a -> WSerialT m b -> WSerialT m b #

(<*) :: WSerialT m a -> WSerialT m b -> WSerialT m a #

(Foldable m, Monad m) => Foldable (WSerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

fold :: Monoid m0 => WSerialT m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> WSerialT m a -> m0 #

foldMap' :: Monoid m0 => (a -> m0) -> WSerialT m a -> m0 #

foldr :: (a -> b -> b) -> b -> WSerialT m a -> b #

foldr' :: (a -> b -> b) -> b -> WSerialT m a -> b #

foldl :: (b -> a -> b) -> b -> WSerialT m a -> b #

foldl' :: (b -> a -> b) -> b -> WSerialT m a -> b #

foldr1 :: (a -> a -> a) -> WSerialT m a -> a #

foldl1 :: (a -> a -> a) -> WSerialT m a -> a #

toList :: WSerialT m a -> [a] #

null :: WSerialT m a -> Bool #

length :: WSerialT m a -> Int #

elem :: Eq a => a -> WSerialT m a -> Bool #

maximum :: Ord a => WSerialT m a -> a #

minimum :: Ord a => WSerialT m a -> a #

sum :: Num a => WSerialT m a -> a #

product :: Num a => WSerialT m a -> a #

Traversable (WSerialT Identity) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

traverse :: Applicative f => (a -> f b) -> WSerialT Identity a -> f (WSerialT Identity b) #

sequenceA :: Applicative f => WSerialT Identity (f a) -> f (WSerialT Identity a) #

mapM :: Monad m => (a -> m b) -> WSerialT Identity a -> m (WSerialT Identity b) #

sequence :: Monad m => WSerialT Identity (m a) -> m (WSerialT Identity a) #

MonadIO m => MonadIO (WSerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftIO :: IO a -> WSerialT m a #

NFData1 (WSerialT Identity) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftRnf :: (a -> ()) -> WSerialT Identity a -> () #

MonadThrow m => MonadThrow (WSerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

throwM :: Exception e => e -> WSerialT m a #

IsList (WSerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Associated Types

type Item (WSerialT Identity a) #

Eq a => Eq (WSerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Ord a => Ord (WSerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Read a => Read (WSerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Show a => Show (WSerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

a ~ Char => IsString (WSerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Semigroup (WSerialT m a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

(<>) :: WSerialT m a -> WSerialT m a -> WSerialT m a #

sconcat :: NonEmpty (WSerialT m a) -> WSerialT m a #

stimes :: Integral b => b -> WSerialT m a -> WSerialT m a #

Monoid (WSerialT m a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

mempty :: WSerialT m a #

mappend :: WSerialT m a -> WSerialT m a -> WSerialT m a #

mconcat :: [WSerialT m a] -> WSerialT m a #

NFData a => NFData (WSerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

rnf :: WSerialT Identity a -> () #

type Item (WSerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

type Item (WSerialT Identity a) = a

data AheadT (m :: Type -> Type) a #

The Semigroup operation for AheadT appends two streams. The combined stream behaves like a single stream with the actions from the second stream appended to the first stream. The combined stream is evaluated in the speculative style. This operation can be used to fold an infinite lazy container of streams.

import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent

main = do
 xs <- S.toList . aheadly $ (p 1 |: p 2 |: nil) <> (p 3 |: p 4 |: nil)
 print xs
 where p n = threadDelay 1000000 >> return n
[1,2,3,4]

Any exceptions generated by a constituent stream are propagated to the output stream.

The monad instance of AheadT may run each monadic continuation (bind) concurrently in a speculative manner, performing side effects in a partially ordered manner but producing the outputs in an ordered manner like SerialT.

main = S.drain . aheadly $ do
    n <- return 3 <> return 2 <> return 1
    S.yieldM $ do
         threadDelay (n * 1000000)
         myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1
ThreadId 39: Delay 2
ThreadId 38: Delay 3

Since: streamly-0.3.0

Instances

Instances details
MonadTrans AheadT 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

lift :: Monad m => m a -> AheadT m a #

IsStream AheadT 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

toStream :: forall (m :: Type -> Type) a. AheadT m a -> Stream m a #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> AheadT m a #

consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a #

(|:) :: MonadAsync m => m a -> AheadT m a -> AheadT m a #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AheadT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

liftBase :: b α -> AheadT m α #

(MonadState s m, MonadAsync m) => MonadState s (AheadT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

get :: AheadT m s #

put :: s -> AheadT m () #

state :: (s -> (a, s)) -> AheadT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (AheadT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

ask :: AheadT m r #

local :: (r -> r) -> AheadT m a -> AheadT m a #

reader :: (r -> a) -> AheadT m a #

MonadAsync m => Monad (AheadT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

(>>=) :: AheadT m a -> (a -> AheadT m b) -> AheadT m b #

(>>) :: AheadT m a -> AheadT m b -> AheadT m b #

return :: a -> AheadT m a #

Monad m => Functor (AheadT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

fmap :: (a -> b) -> AheadT m a -> AheadT m b #

(<$) :: a -> AheadT m b -> AheadT m a #

(Monad m, MonadAsync m) => Applicative (AheadT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

pure :: a -> AheadT m a #

(<*>) :: AheadT m (a -> b) -> AheadT m a -> AheadT m b #

liftA2 :: (a -> b -> c) -> AheadT m a -> AheadT m b -> AheadT m c #

(*>) :: AheadT m a -> AheadT m b -> AheadT m b #

(<*) :: AheadT m a -> AheadT m b -> AheadT m a #

(MonadIO m, MonadAsync m) => MonadIO (AheadT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

liftIO :: IO a -> AheadT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (AheadT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

throwM :: Exception e => e -> AheadT m a #

MonadAsync m => Semigroup (AheadT m a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

(<>) :: AheadT m a -> AheadT m a -> AheadT m a #

sconcat :: NonEmpty (AheadT m a) -> AheadT m a #

stimes :: Integral b => b -> AheadT m a -> AheadT m a #

MonadAsync m => Monoid (AheadT m a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

mempty :: AheadT m a #

mappend :: AheadT m a -> AheadT m a -> AheadT m a #

mconcat :: [AheadT m a] -> AheadT m a #

data AsyncT (m :: Type -> Type) a #

The Semigroup operation (<>) for AsyncT merges two streams concurrently with priority given to the first stream. In s1 <> s2 <> s3 ... the streams s1, s2 and s3 are scheduled for execution in that order. Multiple scheduled streams may be executed concurrently and the elements generated by them are served to the consumer as and when they become available. This behavior is similar to the scheduling and execution behavior of actions in a single async stream.

Since only a finite number of streams are executed concurrently, this operation can be used to fold an infinite lazy container of streams.

import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent

main = (S.toList . asyncly $ (S.fromList [1,2]) <> (S.fromList [3,4])) >>= print
[1,2,3,4]

Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.

Similarly, the monad instance of AsyncT may run each iteration concurrently based on demand. More concurrent iterations are started only if the previous iterations are not able to produce enough output for the consumer.

main = drain . asyncly $ do
    n <- return 3 <> return 2 <> return 1
    S.yieldM $ do
         threadDelay (n * 1000000)
         myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1
ThreadId 39: Delay 2
ThreadId 38: Delay 3

Since: streamly-0.1.0

Instances

Instances details
MonadTrans AsyncT 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

lift :: Monad m => m a -> AsyncT m a #

IsStream AsyncT 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

toStream :: forall (m :: Type -> Type) a. AsyncT m a -> Stream m a #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> AsyncT m a #

consM :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a #

(|:) :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftBase :: b α -> AsyncT m α #

(MonadState s m, MonadAsync m) => MonadState s (AsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

get :: AsyncT m s #

put :: s -> AsyncT m () #

state :: (s -> (a, s)) -> AsyncT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (AsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

ask :: AsyncT m r #

local :: (r -> r) -> AsyncT m a -> AsyncT m a #

reader :: (r -> a) -> AsyncT m a #

MonadAsync m => Monad (AsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(>>=) :: AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b #

(>>) :: AsyncT m a -> AsyncT m b -> AsyncT m b #

return :: a -> AsyncT m a #

Monad m => Functor (AsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

fmap :: (a -> b) -> AsyncT m a -> AsyncT m b #

(<$) :: a -> AsyncT m b -> AsyncT m a #

(Monad m, MonadAsync m) => Applicative (AsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

pure :: a -> AsyncT m a #

(<*>) :: AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b #

liftA2 :: (a -> b -> c) -> AsyncT m a -> AsyncT m b -> AsyncT m c #

(*>) :: AsyncT m a -> AsyncT m b -> AsyncT m b #

(<*) :: AsyncT m a -> AsyncT m b -> AsyncT m a #

(MonadIO m, MonadAsync m) => MonadIO (AsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftIO :: IO a -> AsyncT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (AsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

throwM :: Exception e => e -> AsyncT m a #

MonadAsync m => Semigroup (AsyncT m a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(<>) :: AsyncT m a -> AsyncT m a -> AsyncT m a #

sconcat :: NonEmpty (AsyncT m a) -> AsyncT m a #

stimes :: Integral b => b -> AsyncT m a -> AsyncT m a #

MonadAsync m => Monoid (AsyncT m a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

mempty :: AsyncT m a #

mappend :: AsyncT m a -> AsyncT m a -> AsyncT m a #

mconcat :: [AsyncT m a] -> AsyncT m a #

data WAsyncT (m :: Type -> Type) a #

WAsyncT is similar to WSerialT but with concurrent execution. The Semigroup operation (<>) for WAsyncT merges two streams concurrently interleaving the actions from both the streams. In s1 <> s2 <> s3 ..., the individual actions from streams s1, s2 and s3 are scheduled for execution in a round-robin fashion. Multiple scheduled actions may be executed concurrently, the results from concurrent executions are consumed in the order in which they become available.

The W in the name stands for wide or breadth wise scheduling in contrast to the depth wise scheduling behavior of AsyncT.

import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent

main = (S.toList . wAsyncly . maxThreads 1 $ (S.fromList [1,2]) <> (S.fromList [3,4])) >>= print
[1,3,2,4]

For this example, we are using maxThreads 1 so that concurrent thread scheduling does not affect the results and make them unpredictable. Let's now take a more general example:

main = (S.toList . wAsyncly . maxThreads 1 $ (S.fromList [1,2,3]) <> (S.fromList [4,5,6]) <> (S.fromList [7,8,9])) >>= print
[1,4,2,7,5,3,8,6,9]

This is how the execution of the above stream proceeds:

  1. The scheduler queue is initialized with [S.fromList [1,2,3], (S.fromList [4,5,6]) <> (S.fromList [7,8,9])] assuming the head of the queue is represented by the rightmost item.
  2. S.fromList [1,2,3] is executed, yielding the element 1 and putting [2,3] at the back of the scheduler queue. The scheduler queue now looks like [(S.fromList [4,5,6]) <> (S.fromList [7,8,9]), S.fromList [2,3]].
  3. Now (S.fromList [4,5,6]) <> (S.fromList [7,8,9]) is picked up for execution, S.fromList [7,8,9] is added at the back of the queue and S.fromList [4,5,6] is executed, yielding the element 4 and adding S.fromList [5,6] at the back of the queue. The queue now looks like [S.fromList [2,3], S.fromList [7,8,9], S.fromList [5,6]].
  4. Note that the scheduler queue expands by one more stream component in every pass because one more <> is broken down into two components. At this point there are no more <> operations to be broken down further and the queue has reached its maximum size. Now these streams are scheduled in round-robin fashion yielding [2,7,5,3,8,8,9].

As we see above, in a right associated expression composed with <>, only one <> operation is broken down into two components in one execution, therefore, if we have n streams composed using <> it will take n scheduler passes to expand the whole expression. By the time n-th component is added to the scheduler queue, the first component would have received n scheduler passes.

Since all streams get interleaved, this operation is not suitable for folding an infinite lazy container of infinite size streams. However, if the streams are small, the streams on the left may get finished before more streams are added to the scheduler queue from the right side of the expression, so it may be possible to fold an infinite lazy container of streams. For example, if the streams are of size n then at most n streams would be in the scheduler queue at a time.

Note that WSerialT and WAsyncT differ in their scheduling behavior, therefore the output of WAsyncT even with a single thread of execution is not the same as that of WSerialT See notes in WSerialT for details about its scheduling behavior.

Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.

Similarly, the Monad instance of WAsyncT runs all iterations fairly concurrently using a round robin scheduling.

main = drain . wAsyncly $ do
    n <- return 3 <> return 2 <> return 1
    S.yieldM $ do
         threadDelay (n * 1000000)
         myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1
ThreadId 39: Delay 2
ThreadId 38: Delay 3

Since: streamly-0.2.0

Instances

Instances details
MonadTrans WAsyncT 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

lift :: Monad m => m a -> WAsyncT m a #

IsStream WAsyncT 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

toStream :: forall (m :: Type -> Type) a. WAsyncT m a -> Stream m a #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> WAsyncT m a #

consM :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a #

(|:) :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (WAsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftBase :: b α -> WAsyncT m α #

(MonadState s m, MonadAsync m) => MonadState s (WAsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

get :: WAsyncT m s #

put :: s -> WAsyncT m () #

state :: (s -> (a, s)) -> WAsyncT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (WAsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

ask :: WAsyncT m r #

local :: (r -> r) -> WAsyncT m a -> WAsyncT m a #

reader :: (r -> a) -> WAsyncT m a #

MonadAsync m => Monad (WAsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(>>=) :: WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b #

(>>) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m b #

return :: a -> WAsyncT m a #

Monad m => Functor (WAsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

fmap :: (a -> b) -> WAsyncT m a -> WAsyncT m b #

(<$) :: a -> WAsyncT m b -> WAsyncT m a #

(Monad m, MonadAsync m) => Applicative (WAsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

pure :: a -> WAsyncT m a #

(<*>) :: WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b #

liftA2 :: (a -> b -> c) -> WAsyncT m a -> WAsyncT m b -> WAsyncT m c #

(*>) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m b #

(<*) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m a #

(MonadIO m, MonadAsync m) => MonadIO (WAsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftIO :: IO a -> WAsyncT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (WAsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

throwM :: Exception e => e -> WAsyncT m a #

MonadAsync m => Semigroup (WAsyncT m a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(<>) :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a #

sconcat :: NonEmpty (WAsyncT m a) -> WAsyncT m a #

stimes :: Integral b => b -> WAsyncT m a -> WAsyncT m a #

MonadAsync m => Monoid (WAsyncT m a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

mempty :: WAsyncT m a #

mappend :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a #

mconcat :: [WAsyncT m a] -> WAsyncT m a #

data ParallelT (m :: Type -> Type) a #

Async composition with strict concurrent execution of all streams.

The Semigroup instance of ParallelT executes both the streams concurrently without any delay or without waiting for the consumer demand and merges the results as they arrive. If the consumer does not consume the results, they are buffered upto a configured maximum, controlled by the maxBuffer primitive. If the buffer becomes full the concurrent tasks will block until there is space in the buffer.

Both WAsyncT and ParallelT, evaluate the constituent streams fairly in a round robin fashion. The key difference is that WAsyncT might wait for the consumer demand before it executes the tasks whereas ParallelT starts executing all the tasks immediately without waiting for the consumer demand. For WAsyncT the maxThreads limit applies whereas for ParallelT it does not apply. In other words, WAsyncT can be lazy whereas ParallelT is strict.

ParallelT is useful for cases when the streams are required to be evaluated simultaneously irrespective of how the consumer consumes them e.g. when we want to race two tasks and want to start both strictly at the same time or if we have timers in the parallel tasks and our results depend on the timers being started at the same time. If we do not have such requirements then AsyncT or AheadT are recommended as they can be more efficient than ParallelT.

main = (toList . parallely $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,3,2,4]

When streams with more than one element are merged, it yields whichever stream yields first without any bias, unlike the Async style streams.

Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.

Similarly, the Monad instance of ParallelT runs all iterations of the loop concurrently.

import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent

main = drain . parallely $ do
    n <- return 3 <> return 2 <> return 1
    S.yieldM $ do
         threadDelay (n * 1000000)
         myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1
ThreadId 39: Delay 2
ThreadId 38: Delay 3

Note that parallel composition can only combine a finite number of streams as it needs to retain state for each unfinished stream.

Since: 0.7.0 (maxBuffer applies to ParallelT streams)

Since: 0.1.0

Instances

Instances details
MonadTrans ParallelT 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

lift :: Monad m => m a -> ParallelT m a #

IsStream ParallelT 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

toStream :: forall (m :: Type -> Type) a. ParallelT m a -> Stream m a #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> ParallelT m a #

consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a #

(|:) :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (ParallelT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

liftBase :: b α -> ParallelT m α #

(MonadState s m, MonadAsync m) => MonadState s (ParallelT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

get :: ParallelT m s #

put :: s -> ParallelT m () #

state :: (s -> (a, s)) -> ParallelT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (ParallelT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

ask :: ParallelT m r #

local :: (r -> r) -> ParallelT m a -> ParallelT m a #

reader :: (r -> a) -> ParallelT m a #

MonadAsync m => Monad (ParallelT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

(>>=) :: ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b #

(>>) :: ParallelT m a -> ParallelT m b -> ParallelT m b #

return :: a -> ParallelT m a #

Monad m => Functor (ParallelT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

fmap :: (a -> b) -> ParallelT m a -> ParallelT m b #

(<$) :: a -> ParallelT m b -> ParallelT m a #

(Monad m, MonadAsync m) => Applicative (ParallelT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

pure :: a -> ParallelT m a #

(<*>) :: ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b #

liftA2 :: (a -> b -> c) -> ParallelT m a -> ParallelT m b -> ParallelT m c #

(*>) :: ParallelT m a -> ParallelT m b -> ParallelT m b #

(<*) :: ParallelT m a -> ParallelT m b -> ParallelT m a #

(MonadIO m, MonadAsync m) => MonadIO (ParallelT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

liftIO :: IO a -> ParallelT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (ParallelT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

throwM :: Exception e => e -> ParallelT m a #

MonadAsync m => Semigroup (ParallelT m a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

(<>) :: ParallelT m a -> ParallelT m a -> ParallelT m a #

sconcat :: NonEmpty (ParallelT m a) -> ParallelT m a #

stimes :: Integral b => b -> ParallelT m a -> ParallelT m a #

MonadAsync m => Monoid (ParallelT m a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

mempty :: ParallelT m a #

mappend :: ParallelT m a -> ParallelT m a -> ParallelT m a #

mconcat :: [ParallelT m a] -> ParallelT m a #

type MonadAsync (m :: Type -> Type) = (MonadIO m, MonadBaseControl IO m, MonadThrow m) #

A monad that can perform concurrent or parallel IO operations. Streams that can be composed concurrently require the underlying monad to be MonadAsync.

Since: streamly-0.1.0

class (forall (m :: Type -> Type) a. MonadAsync m => Semigroup (t m a), forall (m :: Type -> Type) a. MonadAsync m => Monoid (t m a), forall (m :: Type -> Type). Monad m => Functor (t m), forall (m :: Type -> Type). MonadAsync m => Applicative (t m)) => IsStream (t :: (Type -> Type) -> Type -> Type) #

Class of types that can represent a stream of elements of some type a in some monad m.

Since: streamly-0.2.0

Minimal complete definition

toStream, fromStream, consM, (|:)

Instances

Instances details
IsStream AheadT 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

toStream :: forall (m :: Type -> Type) a. AheadT m a -> Stream m a #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> AheadT m a #

consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a #

(|:) :: MonadAsync m => m a -> AheadT m a -> AheadT m a #

IsStream AsyncT 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

toStream :: forall (m :: Type -> Type) a. AsyncT m a -> Stream m a #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> AsyncT m a #

consM :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a #

(|:) :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a #

IsStream WAsyncT 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

toStream :: forall (m :: Type -> Type) a. WAsyncT m a -> Stream m a #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> WAsyncT m a #

consM :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a #

(|:) :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a #

IsStream ParallelT 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

toStream :: forall (m :: Type -> Type) a. ParallelT m a -> Stream m a #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> ParallelT m a #

consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a #

(|:) :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a #

IsStream SerialT 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

toStream :: forall (m :: Type -> Type) a. SerialT m a -> Stream m a #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> SerialT m a #

consM :: MonadAsync m => m a -> SerialT m a -> SerialT m a #

(|:) :: MonadAsync m => m a -> SerialT m a -> SerialT m a #

IsStream WSerialT 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

toStream :: forall (m :: Type -> Type) a. WSerialT m a -> Stream m a #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> WSerialT m a #

consM :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a #

(|:) :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a #

IsStream ZipSerialM 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

toStream :: forall (m :: Type -> Type) a. ZipSerialM m a -> Stream m a #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> ZipSerialM m a #

consM :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a #

(|:) :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a #

IsStream ZipAsyncM 
Instance details

Defined in Streamly.Internal.Data.Stream.Zip

Methods

toStream :: forall (m :: Type -> Type) a. ZipAsyncM m a -> Stream m a #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> ZipAsyncM m a #

consM :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a #

(|:) :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a #

IsStream Stream 
Instance details

Defined in Streamly.Internal.Data.Stream.StreamK.Type

Methods

toStream :: forall (m :: Type -> Type) a. Stream m a -> Stream m a #

fromStream :: forall (m :: Type -> Type) a. Stream m a -> Stream m a #

consM :: MonadAsync m => m a -> Stream m a -> Stream m a #

(|:) :: MonadAsync m => m a -> Stream m a -> Stream m a #