map-reduce-folds-0.1.2: foldl wrappers for map-reduce
Copyright(c) Adam Conner-Sax 2019
LicenseBSD-3-Clause
Maintaineradam_conner_sax@yahoo.com
Stabilityexperimental
Safe HaskellSafe-Inferred
LanguageHaskell2010

Control.MapReduce.Engines.Streamly

Description

map-reduce engine (fold builder) using Streamly streams as its intermediate and return type.

Notes: 1. These are polymorphic in the return stream type. Thought the streams do have to be serial when groupBy is called So you have to specify the stream type in the call or it has to be inferrable from the use of the result.

  1. There is a concurrent engine here, one that uses Streamly's concurrency features to map over the stream. I've not been able to verify that this is faster on an appropriate task with appropriate runtime settings.
Synopsis

Engines

streamlyEngine :: (Foldable g, Functor g) => (forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z)) -> MapReduceFold y k c (SerialT Identity) x d Source #

map-reduce-fold builder returning a SerialT Identity d result

streamlyEngineM :: (IsStream t, Monad m, MonadAsync m, Traversable g) => (forall z. SerialT m (k, z) -> SerialT m (k, g z)) -> MapReduceFoldM m y k c (t m) x d Source #

effectful map-reduce-fold engine returning a (Istream t => t m d) result The MonadAsync constraint here more or less requires us to run in IO, or something IO like.

concurrentStreamlyEngine :: forall tIn tOut m g y k c x d. (IsStream tIn, IsStream tOut, MonadAsync m, Foldable g, Functor g) => (forall z. SerialT m (k, z) -> SerialT m (k, g z)) -> MapReduceFold y k c (tOut m) x d Source #

possibly (depending on chosen stream types) concurrent map-reduce-fold builder returning an (Istream t, MonadAsync m) => t m d result

Streamly Combinators

toStreamlyFold :: Monad m => Fold a b -> Fold m a b Source #

convert a Control.Foldl Fold into a Streamly.Data.Fold fold

toStreamlyFoldM :: Functor m => FoldM m a b -> Fold m a b Source #

convert a Control.Foldl FoldM into a Streamly.Data.Fold fold

Result Extraction

resultToList :: (Monad m, IsStream t) => t m a -> m [a] Source #

make a stream into an (effectful) []

concatStream :: (Monad m, Monoid a) => SerialT m a -> m a Source #

mappend all in a monoidal stream

concatStreamFold :: Monoid b => Fold a (SerialT Identity b) -> Fold a b Source #

mappend everything in a pure Streamly fold

concatStreamFoldM :: (Monad m, Monoid b, IsStream t) => FoldM m a (t m b) -> FoldM m a b Source #

mappend everything in an effectful Streamly fold.

concatConcurrentStreamFold :: (Monad m, Monoid b, IsStream t) => Fold a (t m b) -> FoldM m a b Source #

mappend everything in a concurrent Streamly fold.

groupBy Functions

groupByHashableKey :: (Monad m, Hashable k, Eq k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #

Group streamly stream of (k,c) by hashable key. NB: this function uses the fact that SerialT m is a monad

groupByOrderedKey :: (Monad m, Ord k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #

Group streamly stream of (k,c) by ordered key. NB: this function uses the fact that SerialT m is a monad

groupByHashableKeyST :: (Monad m, Hashable k, Eq k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #

Group streamly stream of (k,c) by hashable key. Uses mutable hashtables running in the ST monad. NB: this function uses the fact that SerialT m is a monad

groupByDiscriminatedKey :: (Monad m, Grouping k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #

Group streamly stream of (k,c) by key with instance of Grouping from http://hackage.haskell.org/package/discrimination. NB: this function uses the fact that SerialT m is a monad

Re-Exports

data SerialT (m :: Type -> TYPE LiftedRep) a #

For SerialT streams:

(<>) = serial                       -- Semigroup
(>>=) = flip . concatMapWith serial -- Monad

A single Monad bind behaves like a for loop:

>>> :{
Stream.toList $ do
     x <- Stream.fromList [1,2] -- foreach x in stream
     return x
:}
[1,2]

Nested monad binds behave like nested for loops:

>>> :{
Stream.toList $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [3,4] -- foreach y in stream
    return (x, y)
:}
[(1,3),(1,4),(2,3),(2,4)]

Since: 0.2.0 (Streamly)

Since: streamly-0.8.0

Instances

Instances details
IsStream SerialT 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. SerialT m a -> Stream m a #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. Stream m a -> SerialT m a #

consM :: MonadAsync m => m a -> SerialT m a -> SerialT m a #

(|:) :: MonadAsync m => m a -> SerialT m a -> SerialT m a #

MonadTrans SerialT 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

lift :: Monad m => m a -> SerialT m a #

MonadReader r m => MonadReader r (SerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

ask :: SerialT m r #

local :: (r -> r) -> SerialT m a -> SerialT m a #

reader :: (r -> a) -> SerialT m a #

MonadState s m => MonadState s (SerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

get :: SerialT m s #

put :: s -> SerialT m () #

state :: (s -> (a, s)) -> SerialT m a #

(MonadBase b m, Monad m) => MonadBase b (SerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftBase :: b α -> SerialT m α #

MonadIO m => MonadIO (SerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftIO :: IO a -> SerialT m a #

(Foldable m, Monad m) => Foldable (SerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

fold :: Monoid m0 => SerialT m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> SerialT m a -> m0 #

foldMap' :: Monoid m0 => (a -> m0) -> SerialT m a -> m0 #

foldr :: (a -> b -> b) -> b -> SerialT m a -> b #

foldr' :: (a -> b -> b) -> b -> SerialT m a -> b #

foldl :: (b -> a -> b) -> b -> SerialT m a -> b #

foldl' :: (b -> a -> b) -> b -> SerialT m a -> b #

foldr1 :: (a -> a -> a) -> SerialT m a -> a #

foldl1 :: (a -> a -> a) -> SerialT m a -> a #

toList :: SerialT m a -> [a] #

null :: SerialT m a -> Bool #

length :: SerialT m a -> Int #

elem :: Eq a => a -> SerialT m a -> Bool #

maximum :: Ord a => SerialT m a -> a #

minimum :: Ord a => SerialT m a -> a #

sum :: Num a => SerialT m a -> a #

product :: Num a => SerialT m a -> a #

Traversable (SerialT Identity) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

traverse :: Applicative f => (a -> f b) -> SerialT Identity a -> f (SerialT Identity b) #

sequenceA :: Applicative f => SerialT Identity (f a) -> f (SerialT Identity a) #

mapM :: Monad m => (a -> m b) -> SerialT Identity a -> m (SerialT Identity b) #

sequence :: Monad m => SerialT Identity (m a) -> m (SerialT Identity a) #

Monad m => Applicative (SerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

pure :: a -> SerialT m a #

(<*>) :: SerialT m (a -> b) -> SerialT m a -> SerialT m b #

liftA2 :: (a -> b -> c) -> SerialT m a -> SerialT m b -> SerialT m c #

(*>) :: SerialT m a -> SerialT m b -> SerialT m b #

(<*) :: SerialT m a -> SerialT m b -> SerialT m a #

Monad m => Functor (SerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

fmap :: (a -> b) -> SerialT m a -> SerialT m b #

(<$) :: a -> SerialT m b -> SerialT m a #

Monad m => Monad (SerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

(>>=) :: SerialT m a -> (a -> SerialT m b) -> SerialT m b #

(>>) :: SerialT m a -> SerialT m b -> SerialT m b #

return :: a -> SerialT m a #

NFData1 (SerialT Identity) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftRnf :: (a -> ()) -> SerialT Identity a -> () #

MonadThrow m => MonadThrow (SerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

throwM :: Exception e => e -> SerialT m a #

a ~ Char => IsString (SerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Monoid (SerialT m a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

mempty :: SerialT m a #

mappend :: SerialT m a -> SerialT m a -> SerialT m a #

mconcat :: [SerialT m a] -> SerialT m a #

Semigroup (SerialT m a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

(<>) :: SerialT m a -> SerialT m a -> SerialT m a #

sconcat :: NonEmpty (SerialT m a) -> SerialT m a #

stimes :: Integral b => b -> SerialT m a -> SerialT m a #

IsList (SerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Associated Types

type Item (SerialT Identity a) #

Read a => Read (SerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Show a => Show (SerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

NFData a => NFData (SerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

rnf :: SerialT Identity a -> () #

Eq a => Eq (SerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Ord a => Ord (SerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

type Item (SerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

type Item (SerialT Identity a) = a

data WSerialT (m :: Type -> TYPE LiftedRep) a #

For WSerialT streams:

(<>) = wSerial                       -- Semigroup
(>>=) = flip . concatMapWith wSerial -- Monad

Note that <> is associative only if we disregard the ordering of elements in the resulting stream.

A single Monad bind behaves like a for loop:

>>> :{
Stream.toList $ Stream.fromWSerial $ do
     x <- Stream.fromList [1,2] -- foreach x in stream
     return x
:}
[1,2]

Nested monad binds behave like interleaved nested for loops:

>>> :{
Stream.toList $ Stream.fromWSerial $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [3,4] -- foreach y in stream
    return (x, y)
:}
[(1,3),(2,3),(1,4),(2,4)]

It is a result of interleaving all the nested iterations corresponding to element 1 in the first stream with all the nested iterations of element 2:

>>> import Streamly.Prelude (wSerial)
>>> Stream.toList $ Stream.fromList [(1,3),(1,4)] `Stream.wSerial` Stream.fromList [(2,3),(2,4)]
[(1,3),(2,3),(1,4),(2,4)]

The W in the name stands for wide or breadth wise scheduling in contrast to the depth wise scheduling behavior of SerialT.

Since: 0.2.0 (Streamly)

Since: streamly-0.8.0

Instances

Instances details
IsStream WSerialT 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. WSerialT m a -> Stream m a #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. Stream m a -> WSerialT m a #

consM :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a #

(|:) :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a #

MonadTrans WSerialT 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

lift :: Monad m => m a -> WSerialT m a #

MonadReader r m => MonadReader r (WSerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

ask :: WSerialT m r #

local :: (r -> r) -> WSerialT m a -> WSerialT m a #

reader :: (r -> a) -> WSerialT m a #

MonadState s m => MonadState s (WSerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

get :: WSerialT m s #

put :: s -> WSerialT m () #

state :: (s -> (a, s)) -> WSerialT m a #

(MonadBase b m, Monad m) => MonadBase b (WSerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftBase :: b α -> WSerialT m α #

MonadIO m => MonadIO (WSerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftIO :: IO a -> WSerialT m a #

(Foldable m, Monad m) => Foldable (WSerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

fold :: Monoid m0 => WSerialT m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> WSerialT m a -> m0 #

foldMap' :: Monoid m0 => (a -> m0) -> WSerialT m a -> m0 #

foldr :: (a -> b -> b) -> b -> WSerialT m a -> b #

foldr' :: (a -> b -> b) -> b -> WSerialT m a -> b #

foldl :: (b -> a -> b) -> b -> WSerialT m a -> b #

foldl' :: (b -> a -> b) -> b -> WSerialT m a -> b #

foldr1 :: (a -> a -> a) -> WSerialT m a -> a #

foldl1 :: (a -> a -> a) -> WSerialT m a -> a #

toList :: WSerialT m a -> [a] #

null :: WSerialT m a -> Bool #

length :: WSerialT m a -> Int #

elem :: Eq a => a -> WSerialT m a -> Bool #

maximum :: Ord a => WSerialT m a -> a #

minimum :: Ord a => WSerialT m a -> a #

sum :: Num a => WSerialT m a -> a #

product :: Num a => WSerialT m a -> a #

Traversable (WSerialT Identity) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

traverse :: Applicative f => (a -> f b) -> WSerialT Identity a -> f (WSerialT Identity b) #

sequenceA :: Applicative f => WSerialT Identity (f a) -> f (WSerialT Identity a) #

mapM :: Monad m => (a -> m b) -> WSerialT Identity a -> m (WSerialT Identity b) #

sequence :: Monad m => WSerialT Identity (m a) -> m (WSerialT Identity a) #

Monad m => Applicative (WSerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

pure :: a -> WSerialT m a #

(<*>) :: WSerialT m (a -> b) -> WSerialT m a -> WSerialT m b #

liftA2 :: (a -> b -> c) -> WSerialT m a -> WSerialT m b -> WSerialT m c #

(*>) :: WSerialT m a -> WSerialT m b -> WSerialT m b #

(<*) :: WSerialT m a -> WSerialT m b -> WSerialT m a #

Monad m => Functor (WSerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

fmap :: (a -> b) -> WSerialT m a -> WSerialT m b #

(<$) :: a -> WSerialT m b -> WSerialT m a #

Monad m => Monad (WSerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

(>>=) :: WSerialT m a -> (a -> WSerialT m b) -> WSerialT m b #

(>>) :: WSerialT m a -> WSerialT m b -> WSerialT m b #

return :: a -> WSerialT m a #

NFData1 (WSerialT Identity) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

liftRnf :: (a -> ()) -> WSerialT Identity a -> () #

MonadThrow m => MonadThrow (WSerialT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

throwM :: Exception e => e -> WSerialT m a #

a ~ Char => IsString (WSerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Monoid (WSerialT m a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

mempty :: WSerialT m a #

mappend :: WSerialT m a -> WSerialT m a -> WSerialT m a #

mconcat :: [WSerialT m a] -> WSerialT m a #

Semigroup (WSerialT m a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

(<>) :: WSerialT m a -> WSerialT m a -> WSerialT m a #

sconcat :: NonEmpty (WSerialT m a) -> WSerialT m a #

stimes :: Integral b => b -> WSerialT m a -> WSerialT m a #

IsList (WSerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Associated Types

type Item (WSerialT Identity a) #

Read a => Read (WSerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Show a => Show (WSerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

NFData a => NFData (WSerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Methods

rnf :: WSerialT Identity a -> () #

Eq a => Eq (WSerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

Ord a => Ord (WSerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

type Item (WSerialT Identity a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Serial

type Item (WSerialT Identity a) = a

data AheadT (m :: Type -> TYPE LiftedRep) a #

For AheadT streams:

(<>) = ahead
(>>=) = flip . concatMapWith ahead

A single Monad bind behaves like a for loop with iterations executed concurrently, ahead of time, producing side effects of iterations out of order, but results in order:

>>> :{
Stream.toList $ Stream.fromAhead $ do
     x <- Stream.fromList [2,1] -- foreach x in stream
     Stream.fromEffect $ delay x
:}
1 sec
2 sec
[2,1]

Nested monad binds behave like nested for loops with nested iterations executed concurrently, ahead of time:

>>> :{
Stream.toList $ Stream.fromAhead $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [2,4] -- foreach y in stream
    Stream.fromEffect $ delay (x + y)
:}
3 sec
4 sec
5 sec
6 sec
[3,5,4,6]

The behavior can be explained as follows. All the iterations corresponding to the element 1 in the first stream constitute one output stream and all the iterations corresponding to 2 constitute another output stream and these two output streams are merged using ahead.

Since: 0.3.0 (Streamly)

Since: streamly-0.8.0

Instances

Instances details
IsStream AheadT 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. AheadT m a -> Stream m a #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. Stream m a -> AheadT m a #

consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a #

(|:) :: MonadAsync m => m a -> AheadT m a -> AheadT m a #

MonadTrans AheadT 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

lift :: Monad m => m a -> AheadT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (AheadT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

ask :: AheadT m r #

local :: (r -> r) -> AheadT m a -> AheadT m a #

reader :: (r -> a) -> AheadT m a #

(MonadState s m, MonadAsync m) => MonadState s (AheadT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

get :: AheadT m s #

put :: s -> AheadT m () #

state :: (s -> (a, s)) -> AheadT m a #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AheadT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

liftBase :: b α -> AheadT m α #

(MonadIO m, MonadAsync m) => MonadIO (AheadT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

liftIO :: IO a -> AheadT m a #

(Monad m, MonadAsync m) => Applicative (AheadT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

pure :: a -> AheadT m a #

(<*>) :: AheadT m (a -> b) -> AheadT m a -> AheadT m b #

liftA2 :: (a -> b -> c) -> AheadT m a -> AheadT m b -> AheadT m c #

(*>) :: AheadT m a -> AheadT m b -> AheadT m b #

(<*) :: AheadT m a -> AheadT m b -> AheadT m a #

Monad m => Functor (AheadT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

fmap :: (a -> b) -> AheadT m a -> AheadT m b #

(<$) :: a -> AheadT m b -> AheadT m a #

MonadAsync m => Monad (AheadT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

(>>=) :: AheadT m a -> (a -> AheadT m b) -> AheadT m b #

(>>) :: AheadT m a -> AheadT m b -> AheadT m b #

return :: a -> AheadT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (AheadT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

throwM :: Exception e => e -> AheadT m a #

MonadAsync m => Monoid (AheadT m a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

mempty :: AheadT m a #

mappend :: AheadT m a -> AheadT m a -> AheadT m a #

mconcat :: [AheadT m a] -> AheadT m a #

MonadAsync m => Semigroup (AheadT m a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Ahead

Methods

(<>) :: AheadT m a -> AheadT m a -> AheadT m a #

sconcat :: NonEmpty (AheadT m a) -> AheadT m a #

stimes :: Integral b => b -> AheadT m a -> AheadT m a #

data AsyncT (m :: Type -> TYPE LiftedRep) a #

For AsyncT streams:

(<>) = async
(>>=) = flip . concatMapWith async

A single Monad bind behaves like a for loop with iterations of the loop executed concurrently a la the async combinator, producing results and side effects of iterations out of order:

>>> :{
Stream.toList $ Stream.fromAsync $ do
     x <- Stream.fromList [2,1] -- foreach x in stream
     Stream.fromEffect $ delay x
:}
1 sec
2 sec
[1,2]

Nested monad binds behave like nested for loops with nested iterations executed concurrently, a la the async combinator:

>>> :{
Stream.toList $ Stream.fromAsync $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [2,4] -- foreach y in stream
    Stream.fromEffect $ delay (x + y)
:}
3 sec
4 sec
5 sec
6 sec
[3,4,5,6]

The behavior can be explained as follows. All the iterations corresponding to the element 1 in the first stream constitute one output stream and all the iterations corresponding to 2 constitute another output stream and these two output streams are merged using async.

Since: 0.1.0 (Streamly)

Since: streamly-0.8.0

Instances

Instances details
IsStream AsyncT 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. AsyncT m a -> Stream m a #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. Stream m a -> AsyncT m a #

consM :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a #

(|:) :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a #

MonadTrans AsyncT 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

lift :: Monad m => m a -> AsyncT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (AsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

ask :: AsyncT m r #

local :: (r -> r) -> AsyncT m a -> AsyncT m a #

reader :: (r -> a) -> AsyncT m a #

(MonadState s m, MonadAsync m) => MonadState s (AsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

get :: AsyncT m s #

put :: s -> AsyncT m () #

state :: (s -> (a, s)) -> AsyncT m a #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftBase :: b α -> AsyncT m α #

(MonadIO m, MonadAsync m) => MonadIO (AsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftIO :: IO a -> AsyncT m a #

(Monad m, MonadAsync m) => Applicative (AsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

pure :: a -> AsyncT m a #

(<*>) :: AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b #

liftA2 :: (a -> b -> c) -> AsyncT m a -> AsyncT m b -> AsyncT m c #

(*>) :: AsyncT m a -> AsyncT m b -> AsyncT m b #

(<*) :: AsyncT m a -> AsyncT m b -> AsyncT m a #

Monad m => Functor (AsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

fmap :: (a -> b) -> AsyncT m a -> AsyncT m b #

(<$) :: a -> AsyncT m b -> AsyncT m a #

MonadAsync m => Monad (AsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(>>=) :: AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b #

(>>) :: AsyncT m a -> AsyncT m b -> AsyncT m b #

return :: a -> AsyncT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (AsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

throwM :: Exception e => e -> AsyncT m a #

MonadAsync m => Monoid (AsyncT m a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

mempty :: AsyncT m a #

mappend :: AsyncT m a -> AsyncT m a -> AsyncT m a #

mconcat :: [AsyncT m a] -> AsyncT m a #

MonadAsync m => Semigroup (AsyncT m a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(<>) :: AsyncT m a -> AsyncT m a -> AsyncT m a #

sconcat :: NonEmpty (AsyncT m a) -> AsyncT m a #

stimes :: Integral b => b -> AsyncT m a -> AsyncT m a #

data WAsyncT (m :: Type -> TYPE LiftedRep) a #

For WAsyncT streams:

(<>) = wAsync
(>>=) = flip . concatMapWith wAsync

A single Monad bind behaves like a for loop with iterations of the loop executed concurrently a la the wAsync combinator, producing results and side effects of iterations out of order:

>>> :{
Stream.toList $ Stream.fromWAsync $ do
     x <- Stream.fromList [2,1] -- foreach x in stream
     Stream.fromEffect $ delay x
:}
1 sec
2 sec
[1,2]

Nested monad binds behave like nested for loops with nested iterations executed concurrently, a la the wAsync combinator:

>>> :{
Stream.toList $ Stream.fromWAsync $ do
    x <- Stream.fromList [1,2] -- foreach x in stream
    y <- Stream.fromList [2,4] -- foreach y in stream
    Stream.fromEffect $ delay (x + y)
:}
3 sec
4 sec
5 sec
6 sec
[3,4,5,6]

The behavior can be explained as follows. All the iterations corresponding to the element 1 in the first stream constitute one WAsyncT output stream and all the iterations corresponding to 2 constitute another WAsyncT output stream and these two output streams are merged using wAsync.

The W in the name stands for wide or breadth wise scheduling in contrast to the depth wise scheduling behavior of AsyncT.

Since: 0.2.0 (Streamly)

Since: streamly-0.8.0

Instances

Instances details
IsStream WAsyncT 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. WAsyncT m a -> Stream m a #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. Stream m a -> WAsyncT m a #

consM :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a #

(|:) :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a #

MonadTrans WAsyncT 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

lift :: Monad m => m a -> WAsyncT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (WAsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

ask :: WAsyncT m r #

local :: (r -> r) -> WAsyncT m a -> WAsyncT m a #

reader :: (r -> a) -> WAsyncT m a #

(MonadState s m, MonadAsync m) => MonadState s (WAsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

get :: WAsyncT m s #

put :: s -> WAsyncT m () #

state :: (s -> (a, s)) -> WAsyncT m a #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (WAsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftBase :: b α -> WAsyncT m α #

(MonadIO m, MonadAsync m) => MonadIO (WAsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

liftIO :: IO a -> WAsyncT m a #

(Monad m, MonadAsync m) => Applicative (WAsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

pure :: a -> WAsyncT m a #

(<*>) :: WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b #

liftA2 :: (a -> b -> c) -> WAsyncT m a -> WAsyncT m b -> WAsyncT m c #

(*>) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m b #

(<*) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m a #

Monad m => Functor (WAsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

fmap :: (a -> b) -> WAsyncT m a -> WAsyncT m b #

(<$) :: a -> WAsyncT m b -> WAsyncT m a #

MonadAsync m => Monad (WAsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(>>=) :: WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b #

(>>) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m b #

return :: a -> WAsyncT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (WAsyncT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

throwM :: Exception e => e -> WAsyncT m a #

MonadAsync m => Monoid (WAsyncT m a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

mempty :: WAsyncT m a #

mappend :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a #

mconcat :: [WAsyncT m a] -> WAsyncT m a #

MonadAsync m => Semigroup (WAsyncT m a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Async

Methods

(<>) :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a #

sconcat :: NonEmpty (WAsyncT m a) -> WAsyncT m a #

stimes :: Integral b => b -> WAsyncT m a -> WAsyncT m a #

data ParallelT (m :: Type -> TYPE LiftedRep) a #

For ParallelT streams:

(<>) = parallel
(>>=) = flip . concatMapWith parallel

See AsyncT, ParallelT is similar except that all iterations are strictly concurrent while in AsyncT it depends on the consumer demand and available threads. See parallel for more details.

Since: 0.1.0 (Streamly)

Since: 0.7.0 (maxBuffer applies to ParallelT streams)

Since: streamly-0.8.0

Instances

Instances details
IsStream ParallelT 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. ParallelT m a -> Stream m a #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. Stream m a -> ParallelT m a #

consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a #

(|:) :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a #

MonadTrans ParallelT 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

lift :: Monad m => m a -> ParallelT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (ParallelT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

ask :: ParallelT m r #

local :: (r -> r) -> ParallelT m a -> ParallelT m a #

reader :: (r -> a) -> ParallelT m a #

(MonadState s m, MonadAsync m) => MonadState s (ParallelT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

get :: ParallelT m s #

put :: s -> ParallelT m () #

state :: (s -> (a, s)) -> ParallelT m a #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (ParallelT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

liftBase :: b α -> ParallelT m α #

(MonadIO m, MonadAsync m) => MonadIO (ParallelT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

liftIO :: IO a -> ParallelT m a #

(Monad m, MonadAsync m) => Applicative (ParallelT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

pure :: a -> ParallelT m a #

(<*>) :: ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b #

liftA2 :: (a -> b -> c) -> ParallelT m a -> ParallelT m b -> ParallelT m c #

(*>) :: ParallelT m a -> ParallelT m b -> ParallelT m b #

(<*) :: ParallelT m a -> ParallelT m b -> ParallelT m a #

Monad m => Functor (ParallelT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

fmap :: (a -> b) -> ParallelT m a -> ParallelT m b #

(<$) :: a -> ParallelT m b -> ParallelT m a #

MonadAsync m => Monad (ParallelT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

(>>=) :: ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b #

(>>) :: ParallelT m a -> ParallelT m b -> ParallelT m b #

return :: a -> ParallelT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (ParallelT m) 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

throwM :: Exception e => e -> ParallelT m a #

MonadAsync m => Monoid (ParallelT m a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

mempty :: ParallelT m a #

mappend :: ParallelT m a -> ParallelT m a -> ParallelT m a #

mconcat :: [ParallelT m a] -> ParallelT m a #

MonadAsync m => Semigroup (ParallelT m a) 
Instance details

Defined in Streamly.Internal.Data.Stream.Parallel

Methods

(<>) :: ParallelT m a -> ParallelT m a -> ParallelT m a #

sconcat :: NonEmpty (ParallelT m a) -> ParallelT m a #

stimes :: Integral b => b -> ParallelT m a -> ParallelT m a #

type MonadAsync (m :: Type -> Type) = (MonadIO m, MonadBaseControl IO m, MonadThrow m) #

A monad that can perform concurrent or parallel IO operations. Streams that can be composed concurrently require the underlying monad to be MonadAsync.

Since: 0.1.0 (Streamly)

Since: streamly-0.8.0

class (forall (m :: Type -> Type) a. MonadAsync m => Semigroup (t m a), forall (m :: Type -> Type) a. MonadAsync m => Monoid (t m a), forall (m :: Type -> Type). Monad m => Functor (t m), forall (m :: Type -> Type). MonadAsync m => Applicative (t m)) => IsStream (t :: (Type -> TYPE LiftedRep) -> Type -> TYPE LiftedRep) #

Class of types that can represent a stream of elements of some type a in some monad m.

Since: 0.2.0 (Streamly)

Since: streamly-0.8.0

Minimal complete definition

toStream, fromStream, consM, (|:)

Instances

Instances details
IsStream AheadT 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. AheadT m a -> Stream m a #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. Stream m a -> AheadT m a #

consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a #

(|:) :: MonadAsync m => m a -> AheadT m a -> AheadT m a #

IsStream AsyncT 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. AsyncT m a -> Stream m a #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. Stream m a -> AsyncT m a #

consM :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a #

(|:) :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a #

IsStream WAsyncT 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. WAsyncT m a -> Stream m a #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. Stream m a -> WAsyncT m a #

consM :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a #

(|:) :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a #

IsStream ParallelT 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. ParallelT m a -> Stream m a #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. Stream m a -> ParallelT m a #

consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a #

(|:) :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a #

IsStream SerialT 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. SerialT m a -> Stream m a #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. Stream m a -> SerialT m a #

consM :: MonadAsync m => m a -> SerialT m a -> SerialT m a #

(|:) :: MonadAsync m => m a -> SerialT m a -> SerialT m a #

IsStream WSerialT 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. WSerialT m a -> Stream m a #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. Stream m a -> WSerialT m a #

consM :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a #

(|:) :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a #

IsStream ZipSerialM 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. ZipSerialM m a -> Stream m a #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. Stream m a -> ZipSerialM m a #

consM :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a #

(|:) :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a #

IsStream ZipAsyncM 
Instance details

Defined in Streamly.Internal.Data.Stream.IsStream.Type

Methods

toStream :: forall (m :: Type -> TYPE LiftedRep) a. ZipAsyncM m a -> Stream m a #

fromStream :: forall (m :: Type -> TYPE LiftedRep) a. Stream m a -> ZipAsyncM m a #

consM :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a #

(|:) :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a #