Copyright | (c) 2017 Composewell Technologies |
---|---|
License | BSD-3-Clause |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
| This module contains different stream types and combinators to interconvert between them.
Synopsis
- data SerialT m a
- type Serial = SerialT IO
- data WSerialT m a
- type WSerial = WSerialT IO
- data AheadT m a
- type Ahead = AheadT IO
- data AsyncT m a
- type Async = AsyncT IO
- data WAsyncT m a
- type WAsync = WAsyncT IO
- data ParallelT m a
- type Parallel = ParallelT IO
- mkAsync :: (IsStream t, MonadAsync m) => t m a -> t m a
- data ZipSerialM m a
- type ZipSerial = ZipSerialM IO
- data ZipAsyncM m a
- type ZipAsync = ZipAsyncM IO
- class (forall m a. MonadAsync m => Semigroup (t m a), forall m a. MonadAsync m => Monoid (t m a), forall m. Monad m => Functor (t m), forall m. MonadAsync m => Applicative (t m)) => IsStream t
- fromSerial :: IsStream t => SerialT m a -> t m a
- fromWSerial :: IsStream t => WSerialT m a -> t m a
- fromAsync :: IsStream t => AsyncT m a -> t m a
- fromAhead :: IsStream t => AheadT m a -> t m a
- fromWAsync :: IsStream t => WAsyncT m a -> t m a
- fromParallel :: IsStream t => ParallelT m a -> t m a
- fromZipSerial :: IsStream t => ZipSerialM m a -> t m a
- fromZipAsync :: IsStream t => ZipAsyncM m a -> t m a
- adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a
- type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m)
Stream Types
Serial Streams
For SerialT
streams:
(<>) =serial
--Semigroup
(>>=) = flip .concatMapWith
serial
--Monad
A single Monad
bind behaves like a for
loop:
>>>
:{
Stream.toList $ do x <- Stream.fromList [1,2] -- foreach x in stream return x :} [1,2]
Nested monad binds behave like nested for
loops:
>>>
:{
Stream.toList $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [3,4] -- foreach y in stream return (x, y) :} [(1,3),(1,4),(2,3),(2,4)]
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
For WSerialT
streams:
(<>) =wSerial
--Semigroup
(>>=) = flip .concatMapWith
wSerial
--Monad
Note that <>
is associative only if we disregard the ordering of elements
in the resulting stream.
A single Monad
bind behaves like a for
loop:
>>>
:{
Stream.toList $ Stream.fromWSerial $ do x <- Stream.fromList [1,2] -- foreach x in stream return x :} [1,2]
Nested monad binds behave like interleaved nested for
loops:
>>>
:{
Stream.toList $ Stream.fromWSerial $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [3,4] -- foreach y in stream return (x, y) :} [(1,3),(2,3),(1,4),(2,4)]
It is a result of interleaving all the nested iterations corresponding to
element 1
in the first stream with all the nested iterations of element
2
:
>>>
import Streamly.Prelude (wSerial)
>>>
Stream.toList $ Stream.fromList [(1,3),(1,4)] `wSerial` Stream.fromList [(2,3),(2,4)]
[(1,3),(2,3),(1,4),(2,4)]
The W
in the name stands for wide
or breadth wise scheduling in
contrast to the depth wise scheduling behavior of SerialT
.
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
Speculative Streams
For AheadT
streams:
(<>) =ahead
(>>=) = flip .concatMapWith
ahead
A single Monad
bind behaves like a for
loop with iterations executed
concurrently, ahead of time, producing side effects of iterations out of
order, but results in order:
>>>
:{
Stream.toList $ Stream.fromAhead $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [2,1]
Nested monad binds behave like nested for
loops with nested iterations
executed concurrently, ahead of time:
>>>
:{
Stream.toList $ Stream.fromAhead $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,5,4,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1
in the first stream constitute one output stream and all
the iterations corresponding to 2
constitute another output stream and
these two output streams are merged using ahead
.
Since: 0.3.0 (Streamly)
Since: 0.8.0
Instances
MonadTrans AheadT Source # | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
IsStream AheadT Source # | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AheadT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
(MonadState s m, MonadAsync m) => MonadState s (AheadT m) Source # | |
(MonadReader r m, MonadAsync m) => MonadReader r (AheadT m) Source # | |
MonadAsync m => Monad (AheadT m) Source # | |
Monad m => Functor (AheadT m) Source # | |
(Monad m, MonadAsync m) => Applicative (AheadT m) Source # | |
(MonadIO m, MonadAsync m) => MonadIO (AheadT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
(MonadThrow m, MonadAsync m) => MonadThrow (AheadT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
MonadAsync m => Semigroup (AheadT m a) Source # | |
MonadAsync m => Monoid (AheadT m a) Source # | |
Asynchronous Streams
For AsyncT
streams:
(<>) =async
(>>=) = flip .concatMapWith
async
A single Monad
bind behaves like a for
loop with iterations of the loop
executed concurrently a la the async
combinator, producing results and
side effects of iterations out of order:
>>>
:{
Stream.toList $ Stream.fromAsync $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [1,2]
Nested monad binds behave like nested for
loops with nested iterations
executed concurrently, a la the async
combinator:
>>>
:{
Stream.toList $ Stream.fromAsync $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,4,5,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1
in the first stream constitute one output stream and all
the iterations corresponding to 2
constitute another output stream and
these two output streams are merged using async
.
Since: 0.1.0 (Streamly)
Since: 0.8.0
Instances
MonadTrans AsyncT Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
IsStream AsyncT Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadState s m, MonadAsync m) => MonadState s (AsyncT m) Source # | |
(MonadReader r m, MonadAsync m) => MonadReader r (AsyncT m) Source # | |
MonadAsync m => Monad (AsyncT m) Source # | |
Monad m => Functor (AsyncT m) Source # | |
(Monad m, MonadAsync m) => Applicative (AsyncT m) Source # | |
(MonadIO m, MonadAsync m) => MonadIO (AsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadThrow m, MonadAsync m) => MonadThrow (AsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
MonadAsync m => Semigroup (AsyncT m a) Source # | |
MonadAsync m => Monoid (AsyncT m a) Source # | |
For WAsyncT
streams:
(<>) =wAsync
(>>=) = flip .concatMapWith
wAsync
A single Monad
bind behaves like a for
loop with iterations of the loop
executed concurrently a la the wAsync
combinator, producing results and
side effects of iterations out of order:
>>>
:{
Stream.toList $ Stream.fromWAsync $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [1,2]
Nested monad binds behave like nested for
loops with nested iterations
executed concurrently, a la the wAsync
combinator:
>>>
:{
Stream.toList $ Stream.fromWAsync $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,4,5,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1
in the first stream constitute one WAsyncT
output
stream and all the iterations corresponding to 2
constitute another
WAsyncT
output stream and these two output streams are merged using
wAsync
.
The W
in the name stands for wide
or breadth wise scheduling in
contrast to the depth wise scheduling behavior of AsyncT
.
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
MonadTrans WAsyncT Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
IsStream WAsyncT Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (WAsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadState s m, MonadAsync m) => MonadState s (WAsyncT m) Source # | |
(MonadReader r m, MonadAsync m) => MonadReader r (WAsyncT m) Source # | |
MonadAsync m => Monad (WAsyncT m) Source # | |
Monad m => Functor (WAsyncT m) Source # | |
(Monad m, MonadAsync m) => Applicative (WAsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadIO m, MonadAsync m) => MonadIO (WAsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadThrow m, MonadAsync m) => MonadThrow (WAsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
MonadAsync m => Semigroup (WAsyncT m a) Source # | |
MonadAsync m => Monoid (WAsyncT m a) Source # | |
Parallel Streams
Ahead, Async and WAsync schedule actions concurrently on demand.
Unlike those Parallel
streams schedule all actions concurrently
upfront.
For ParallelT
streams:
(<>) =parallel
(>>=) = flip .concatMapWith
parallel
See AsyncT
, ParallelT
is similar except that all
iterations are strictly concurrent while in AsyncT
it depends on the
consumer demand and available threads. See parallel
for more details.
Since: 0.1.0 (Streamly)
Since: 0.7.0 (maxBuffer applies to ParallelT streams)
Since: 0.8.0
Instances
MonadTrans ParallelT Source # | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
IsStream ParallelT Source # | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (ParallelT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
(MonadState s m, MonadAsync m) => MonadState s (ParallelT m) Source # | |
(MonadReader r m, MonadAsync m) => MonadReader r (ParallelT m) Source # | |
MonadAsync m => Monad (ParallelT m) Source # | |
Monad m => Functor (ParallelT m) Source # | |
(Monad m, MonadAsync m) => Applicative (ParallelT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
(MonadIO m, MonadAsync m) => MonadIO (ParallelT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
(MonadThrow m, MonadAsync m) => MonadThrow (ParallelT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
MonadAsync m => Semigroup (ParallelT m a) Source # | |
MonadAsync m => Monoid (ParallelT m a) Source # | |
mkAsync :: (IsStream t, MonadAsync m) => t m a -> t m a Source #
Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it terminates if the buffer is full and a worker thread is kicked off again to evaluate the remaining stream when there is space in the buffer. The consumer consumes the stream lazily from the buffer.
Since: 0.2.0 (Streamly)
Since: 0.8.0
Zipping Streams
data ZipSerialM m a Source #
For ZipSerialM
streams:
(<>) = serial
(*) = 'Streamly.Prelude.serial.zipWith' id
Applicative evaluates the streams being zipped serially:
>>>
s1 = Stream.fromFoldable [1, 2]
>>>
s2 = Stream.fromFoldable [3, 4]
>>>
s3 = Stream.fromFoldable [5, 6]
>>>
Stream.toList $ Stream.fromZipSerial $ (,,) <$> s1 <*> s2 <*> s3
[(1,3,5),(2,4,6)]
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
type ZipSerial = ZipSerialM IO Source #
For ZipAsyncM
streams:
(<>) = serial
(*) = 'Streamly.Prelude.serial.zipAsyncWith' id
Applicative evaluates the streams being zipped concurrently, the following would take half the time that it would take in serial zipping:
>>>
s = Stream.fromFoldableM $ Prelude.map delay [1, 1, 1]
>>>
Stream.toList $ Stream.fromZipAsync $ (,) <$> s <*> s
... [(1,1),(1,1),(1,1)]
Since: 0.2.0 (Streamly)
Since: 0.8.0
Stream Type Adapters
class (forall m a. MonadAsync m => Semigroup (t m a), forall m a. MonadAsync m => Monoid (t m a), forall m. Monad m => Functor (t m), forall m. MonadAsync m => Applicative (t m)) => IsStream t Source #
Class of types that can represent a stream of elements of some type a
in
some monad m
.
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
IsStream Stream Source # | |
Defined in Streamly.Internal.Data.Stream.StreamK.Type | |
IsStream ParallelT Source # | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
IsStream WAsyncT Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
IsStream AsyncT Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
IsStream AheadT Source # | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
IsStream ZipAsyncM Source # | |
Defined in Streamly.Internal.Data.Stream.Zip | |
IsStream ZipSerialM Source # | |
Defined in Streamly.Internal.Data.Stream.Zip toStream :: forall (m :: Type -> Type) a. ZipSerialM m a -> Stream m a Source # fromStream :: forall (m :: Type -> Type) a. Stream m a -> ZipSerialM m a Source # consM :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source # (|:) :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source # | |
IsStream WSerialT Source # | |
Defined in Streamly.Internal.Data.Stream.Serial | |
IsStream SerialT Source # | |
Defined in Streamly.Internal.Data.Stream.Serial |
fromSerial :: IsStream t => SerialT m a -> t m a Source #
fromWSerial :: IsStream t => WSerialT m a -> t m a Source #
fromWAsync :: IsStream t => WAsyncT m a -> t m a Source #
fromParallel :: IsStream t => ParallelT m a -> t m a Source #
fromZipSerial :: IsStream t => ZipSerialM m a -> t m a Source #
fromZipAsync :: IsStream t => ZipAsyncM m a -> t m a Source #
adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a Source #
Adapt any specific stream type to any other specific stream type.
Since: 0.1.0 (Streamly)
Since: 0.8.0
Type Synonyms
type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m) Source #
A monad that can perform concurrent or parallel IO operations. Streams
that can be composed concurrently require the underlying monad to be
MonadAsync
.
Since: 0.1.0 (Streamly)
Since: 0.8.0