Copyright | (c) Adam Conner-Sax 2019 |
---|---|
License | BSD-3-Clause |
Maintainer | adam_conner_sax@yahoo.com |
Stability | experimental |
Safe Haskell | None |
Language | Haskell2010 |
map-reduce engine (fold builder) using Streamly
streams as its intermediate and return type.
Notes:
1. These are polymorphic in the return stream type. Thought the streams do have to be serial
when groupBy
is called
So you have to specify the stream type in the call or it has to be inferrable from the use of the result.
- There is a concurrent engine here, one that uses Streamly's concurrency features to map over the stream. I've not been able to verify that this is faster on an appropriate task with appropriate runtime settings.
Synopsis
- streamlyEngine :: (Foldable g, Functor g) => (forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z)) -> MapReduceFold y k c (SerialT Identity) x d
- streamlyEngineM :: (IsStream t, Monad m, MonadAsync m, Traversable g) => (forall z. SerialT m (k, z) -> SerialT m (k, g z)) -> MapReduceFoldM m y k c (t m) x d
- concurrentStreamlyEngine :: forall tIn tOut m g y k c x d. (IsStream tIn, IsStream tOut, MonadAsync m, Foldable g, Functor g) => (forall z. SerialT m (k, z) -> SerialT m (k, g z)) -> MapReduceFold y k c (tOut m) x d
- resultToList :: (Monad m, IsStream t) => t m a -> m [a]
- concatStream :: (Monad m, Monoid a) => SerialT m a -> m a
- concatStreamFold :: Monoid b => Fold a (SerialT Identity b) -> Fold a b
- concatStreamFoldM :: (Monad m, Monoid b, IsStream t) => FoldM m a (t m b) -> FoldM m a b
- concatConcurrentStreamFold :: (Monad m, Monoid b, IsStream t) => Fold a (t m b) -> FoldM m a b
- groupByHashableKey :: (Monad m, Hashable k, Eq k) => SerialT m (k, c) -> SerialT m (k, Seq c)
- groupByOrderedKey :: (Monad m, Ord k) => SerialT m (k, c) -> SerialT m (k, Seq c)
- groupByHashableKeyST :: (Monad m, Hashable k, Eq k) => SerialT m (k, c) -> SerialT m (k, Seq c)
- groupByDiscriminatedKey :: (Monad m, Grouping k) => SerialT m (k, c) -> SerialT m (k, Seq c)
- data SerialT (m :: Type -> Type) a
- data WSerialT (m :: Type -> Type) a
- data AheadT (m :: Type -> Type) a
- data AsyncT (m :: Type -> Type) a
- data WAsyncT (m :: Type -> Type) a
- data ParallelT (m :: Type -> Type) a
- type MonadAsync (m :: Type -> Type) = (MonadIO m, MonadBaseControl IO m, MonadThrow m)
- class IsStream (t :: (Type -> Type) -> Type -> Type)
Engines
streamlyEngine :: (Foldable g, Functor g) => (forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z)) -> MapReduceFold y k c (SerialT Identity) x d Source #
map-reduce-fold builder returning a SerialT Identity d
result
streamlyEngineM :: (IsStream t, Monad m, MonadAsync m, Traversable g) => (forall z. SerialT m (k, z) -> SerialT m (k, g z)) -> MapReduceFoldM m y k c (t m) x d Source #
effectful map-reduce-fold engine returning a (Istream t => t m d) result The MonadAsync constraint here more or less requires us to run in IO, or something IO like.
concurrentStreamlyEngine :: forall tIn tOut m g y k c x d. (IsStream tIn, IsStream tOut, MonadAsync m, Foldable g, Functor g) => (forall z. SerialT m (k, z) -> SerialT m (k, g z)) -> MapReduceFold y k c (tOut m) x d Source #
possibly (depending on chosen stream types) concurrent map-reduce-fold builder returning an (Istream t, MonadAsync m) => t m d
result
Result Extraction
resultToList :: (Monad m, IsStream t) => t m a -> m [a] Source #
make a stream into an (effectful) []
concatStreamFold :: Monoid b => Fold a (SerialT Identity b) -> Fold a b Source #
mappend everything in a pure Streamly fold
concatStreamFoldM :: (Monad m, Monoid b, IsStream t) => FoldM m a (t m b) -> FoldM m a b Source #
mappend everything in an effectful Streamly fold.
concatConcurrentStreamFold :: (Monad m, Monoid b, IsStream t) => Fold a (t m b) -> FoldM m a b Source #
mappend everything in a concurrent Streamly fold.
groupBy
Functions
groupByHashableKey :: (Monad m, Hashable k, Eq k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #
Group streamly stream of (k,c)
by hashable
key.
NB: this function uses the fact that SerialT m
is a monad
groupByOrderedKey :: (Monad m, Ord k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #
Group streamly stream of (k,c)
by ordered key.
NB: this function uses the fact that SerialT m
is a monad
groupByHashableKeyST :: (Monad m, Hashable k, Eq k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #
Group streamly stream of (k,c)
by hashable
key. Uses mutable hashtables running in the ST monad.
NB: this function uses the fact that SerialT m
is a monad
groupByDiscriminatedKey :: (Monad m, Grouping k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #
Group streamly stream of (k,c)
by key with instance of Grouping from http://hackage.haskell.org/package/discrimination.
NB: this function uses the fact that SerialT m
is a monad
Re-Exports
data SerialT (m :: Type -> Type) a #
Deep serial composition or serial composition with depth first traversal.
The Semigroup
instance of SerialT
appends two streams serially in a
depth first manner, yielding all elements from the first stream, and then
all elements from the second stream.
import Streamly import qualified Streamly.Prelude as S main = (toList
.serially
$ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,2,3,4]
The Monad
instance runs the monadic continuation for each
element of the stream, serially.
main =runStream
.serially
$ do x <- return 1 <> return 2 S.yieldM $ print x
1 2
SerialT
nests streams serially in a depth first manner.
main =runStream
.serially
$ do x <- return 1 <> return 2 y <- return 3 <> return 4 S.yieldM $ print (x, y)
(1,3) (1,4) (2,3) (2,4)
This behavior of SerialT
is exactly like a list transformer. We call the
monadic code being run for each element of the stream a monadic
continuation. In imperative paradigm we can think of this composition as
nested for
loops and the monadic continuation is the body of the loop. The
loop iterates for all elements of the stream.
The serially
combinator can be omitted as the default stream type is
SerialT
.
Note that serial composition with depth first traversal can be used to
combine an infinite number of streams as it explores only one stream at a
time.
Since: streamly-0.2.0
Instances
data WSerialT (m :: Type -> Type) a #
Wide serial composition or serial composition with a breadth first
traversal. The Semigroup
instance of WSerialT
traverses
the two streams in a breadth first manner. In other words, it interleaves
two streams, yielding one element from each stream alternately.
import Streamly import qualified Streamly.Prelude as S main = (toList
.wSerially
$ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,3,2,4]
Similarly, the Monad
instance interleaves the iterations of the
inner and the outer loop, nesting loops in a breadth first manner.
main =runStream
.wSerially
$ do x <- return 1 <> return 2 y <- return 3 <> return 4 S.yieldM $ print (x, y)
(1,3) (2,3) (1,4) (2,4)
Note that a serial composition with breadth first traversal can only combine a finite number of streams as it needs to retain state for each unfinished stream.
Since: streamly-0.2.0
Instances
data AheadT (m :: Type -> Type) a #
Deep ahead composition or ahead composition with depth first traversal.
The semigroup composition of AheadT
appends streams in a depth first
manner just like SerialT
except that it can produce elements concurrently
ahead of time. It is like AsyncT
except that AsyncT
produces the output
as it arrives whereas AheadT
orders the output in the traversal order.
main = (toList
.aheadly
$ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,2,3,4]
Any exceptions generated by a constituent stream are propagated to the output stream.
Similarly, the monad instance of AheadT
may run each iteration
concurrently ahead of time but presents the results in the same order as
SerialT
.
import Streamly import qualified Streamly.Prelude as S import Control.Concurrent main =runStream
.aheadly
$ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3
All iterations may run in the same thread if they do not block.
Note that ahead composition with depth first traversal can be used to combine infinite number of streams as it explores only a bounded number of streams at a time.
Since: streamly-0.3.0
Instances
MonadTrans AheadT | |
Defined in Streamly.Streams.Ahead | |
IsStream AheadT | |
Defined in Streamly.Streams.Ahead toStream :: AheadT m a -> Stream m a fromStream :: Stream m a -> AheadT m a consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a # (|:) :: MonadAsync m => m a -> AheadT m a -> AheadT m a # | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AheadT m) | |
Defined in Streamly.Streams.Ahead | |
(MonadState s m, MonadAsync m) => MonadState s (AheadT m) | |
(MonadReader r m, MonadAsync m) => MonadReader r (AheadT m) | |
MonadAsync m => Monad (AheadT m) | |
Monad m => Functor (AheadT m) | |
(Monad m, MonadAsync m) => Applicative (AheadT m) | |
(MonadIO m, MonadAsync m) => MonadIO (AheadT m) | |
Defined in Streamly.Streams.Ahead | |
(MonadThrow m, MonadAsync m) => MonadThrow (AheadT m) | |
Defined in Streamly.Streams.Ahead | |
MonadAsync m => Semigroup (AheadT m a) | |
MonadAsync m => Monoid (AheadT m a) | |
data AsyncT (m :: Type -> Type) a #
Deep async composition or async composition with depth first traversal. In
a left to right Semigroup
composition it tries to yield elements from the
left stream as long as it can, but it can run the right stream in parallel
if it needs to, based on demand. The right stream can be run if the left
stream blocks on IO or cannot produce elements fast enough for the consumer.
main = (toList
.asyncly
$ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,2,3,4]
Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.
Similarly, the monad instance of AsyncT
may run each iteration
concurrently based on demand. More concurrent iterations are started only
if the previous iterations are not able to produce enough output for the
consumer.
import Streamly import qualified Streamly.Prelude as S import Control.Concurrent main =runStream
.asyncly
$ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3
All iterations may run in the same thread if they do not block.
Note that async composition with depth first traversal can be used to combine infinite number of streams as it explores only a bounded number of streams at a time.
Since: streamly-0.1.0
Instances
MonadTrans AsyncT | |
Defined in Streamly.Streams.Async | |
IsStream AsyncT | |
Defined in Streamly.Streams.Async toStream :: AsyncT m a -> Stream m a fromStream :: Stream m a -> AsyncT m a consM :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a # (|:) :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a # | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AsyncT m) | |
Defined in Streamly.Streams.Async | |
(MonadState s m, MonadAsync m) => MonadState s (AsyncT m) | |
(MonadReader r m, MonadAsync m) => MonadReader r (AsyncT m) | |
MonadAsync m => Monad (AsyncT m) | |
Monad m => Functor (AsyncT m) | |
(Monad m, MonadAsync m) => Applicative (AsyncT m) | |
(MonadIO m, MonadAsync m) => MonadIO (AsyncT m) | |
Defined in Streamly.Streams.Async | |
(MonadThrow m, MonadAsync m) => MonadThrow (AsyncT m) | |
Defined in Streamly.Streams.Async | |
MonadAsync m => Semigroup (AsyncT m a) | |
MonadAsync m => Monoid (AsyncT m a) | |
data WAsyncT (m :: Type -> Type) a #
Wide async composition or async composition with breadth first traversal.
The Semigroup instance of WAsyncT
concurrently traverses the composed
streams using a depth first travesal or in a round robin fashion, yielding
elements from both streams alternately.
main = (toList
.wAsyncly
$ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,3,2,4]
Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.
Similarly, the Monad
instance of WAsyncT
runs all iterations fairly
concurrently using a round robin scheduling.
import Streamly import qualified Streamly.Prelude as S import Control.Concurrent main =runStream
.wAsyncly
$ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3
Unlike AsyncT
all iterations are guaranteed to run fairly
concurrently, unconditionally.
Note that async composition with breadth first traversal can only combine a finite number of streams as it needs to retain state for each unfinished stream.
Since: streamly-0.2.0
Instances
MonadTrans WAsyncT | |
Defined in Streamly.Streams.Async | |
IsStream WAsyncT | |
Defined in Streamly.Streams.Async toStream :: WAsyncT m a -> Stream m a fromStream :: Stream m a -> WAsyncT m a consM :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a # (|:) :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a # | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (WAsyncT m) | |
Defined in Streamly.Streams.Async | |
(MonadState s m, MonadAsync m) => MonadState s (WAsyncT m) | |
(MonadReader r m, MonadAsync m) => MonadReader r (WAsyncT m) | |
MonadAsync m => Monad (WAsyncT m) | |
Monad m => Functor (WAsyncT m) | |
(Monad m, MonadAsync m) => Applicative (WAsyncT m) | |
(MonadIO m, MonadAsync m) => MonadIO (WAsyncT m) | |
Defined in Streamly.Streams.Async | |
(MonadThrow m, MonadAsync m) => MonadThrow (WAsyncT m) | |
Defined in Streamly.Streams.Async | |
MonadAsync m => Semigroup (WAsyncT m a) | |
MonadAsync m => Monoid (WAsyncT m a) | |
data ParallelT (m :: Type -> Type) a #
Async composition with simultaneous traversal of all streams.
The Semigroup instance of ParallelT
concurrently merges two streams,
running both strictly concurrently and yielding elements from both streams
as they arrive. When multiple streams are combined using ParallelT
each
one is evaluated in its own thread and the results produced are presented in
the combined stream on a first come first serve basis.
AsyncT
and WAsyncT
are concurrent lookahead streams each with a
specific type of consumption pattern (depth first or breadth first). Since
they are lookahead, they may introduce certain default latency in starting
more concurrent tasks for efficiency reasons or may put a default limitation
on the resource consumption (e.g. number of concurrent threads for
lookahead). If we look at the implementation detail, they both can share a
pool of worker threads to evaluate the streams in the desired pattern and at
the desired rate. However, ParallelT
uses a separate runtime thread to
evaluate each stream.
WAsyncT
is similar to ParallelT
, as both of them evaluate the
constituent streams fairly in a round robin fashion.
However, the key difference is that WAsyncT
is lazy or pull driven
whereas ParallelT
is strict or push driven. ParallelT
immediately
starts concurrent evaluation of both the streams (in separate threads) and
later picks the results whereas WAsyncT
may wait for a certain latency
threshold before initiating concurrent evaluation of the next stream. The
concurrent scheduling of the next stream or the degree of concurrency is
driven by the feedback from the consumer. In case of ParallelT
each stream
is evaluated in a separate thread and results are pushed to a shared
output buffer, the evaluation rate is controlled by blocking when the buffer
is full.
Concurrent lookahead streams are generally more efficient than
ParallelT
and can work pretty efficiently even for smaller tasks because
they do not necessarily use a separate thread for each task. So they should
be preferred over ParallelT
especially when efficiency is a concern and
simultaneous strict evaluation is not a requirement. ParallelT
is useful
for cases when the streams are required to be evaluated simultaneously
irrespective of how the consumer consumes them e.g. when we want to race
two tasks and want to start both strictly at the same time or if we have
timers in the parallel tasks and our results depend on the timers being
started at the same time. We can say that ParallelT
is almost the same
(modulo some implementation differences) as WAsyncT
when the latter is
used with unlimited lookahead and zero latency in initiating lookahead.
main = (toList
.parallely
$ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,3,2,4]
When streams with more than one element are merged, it yields whichever
stream yields first without any bias, unlike the Async
style streams.
Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.
Similarly, the Monad
instance of ParallelT
runs all iterations
of the loop concurrently.
import Streamly import qualified Streamly.Prelude as S import Control.Concurrent main =runStream
.parallely
$ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3
Note that parallel composition can only combine a finite number of streams as it needs to retain state for each unfinished stream.
Since: streamly-0.1.0
Instances
MonadTrans ParallelT | |
Defined in Streamly.Streams.Parallel | |
IsStream ParallelT | |
Defined in Streamly.Streams.Parallel toStream :: ParallelT m a -> Stream m a fromStream :: Stream m a -> ParallelT m a consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a # (|:) :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a # | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (ParallelT m) | |
Defined in Streamly.Streams.Parallel | |
(MonadState s m, MonadAsync m) => MonadState s (ParallelT m) | |
(MonadReader r m, MonadAsync m) => MonadReader r (ParallelT m) | |
MonadAsync m => Monad (ParallelT m) | |
Monad m => Functor (ParallelT m) | |
(Monad m, MonadAsync m) => Applicative (ParallelT m) | |
Defined in Streamly.Streams.Parallel | |
(MonadIO m, MonadAsync m) => MonadIO (ParallelT m) | |
Defined in Streamly.Streams.Parallel | |
(MonadThrow m, MonadAsync m) => MonadThrow (ParallelT m) | |
Defined in Streamly.Streams.Parallel | |
MonadAsync m => Semigroup (ParallelT m a) | |
MonadAsync m => Monoid (ParallelT m a) | |
type MonadAsync (m :: Type -> Type) = (MonadIO m, MonadBaseControl IO m, MonadThrow m) #
A monad that can perform concurrent or parallel IO operations. Streams
that can be composed concurrently require the underlying monad to be
MonadAsync
.
Since: streamly-0.1.0
class IsStream (t :: (Type -> Type) -> Type -> Type) #
Class of types that can represent a stream of elements of some type a
in
some monad m
.
Since: streamly-0.2.0