| Copyright | (c) 2017 Composewell Technologies |
|---|---|
| License | BSD-3-Clause |
| Maintainer | streamly@composewell.com |
| Stability | experimental |
| Portability | GHC |
| Safe Haskell | None |
| Language | Haskell2010 |
Streamly.Internal.Data.Stream.IsStream.Types
Description
| This module contains different stream types and combinators to interconvert between them.
Synopsis
- data SerialT m a
- type Serial = SerialT IO
- data WSerialT m a
- type WSerial = WSerialT IO
- data AheadT m a
- type Ahead = AheadT IO
- data AsyncT m a
- type Async = AsyncT IO
- data WAsyncT m a
- type WAsync = WAsyncT IO
- data ParallelT m a
- type Parallel = ParallelT IO
- mkAsync :: (IsStream t, MonadAsync m) => t m a -> t m a
- data ZipSerialM m a
- type ZipSerial = ZipSerialM IO
- data ZipAsyncM m a
- type ZipAsync = ZipAsyncM IO
- class (forall m a. MonadAsync m => Semigroup (t m a), forall m a. MonadAsync m => Monoid (t m a), forall m. Monad m => Functor (t m), forall m. MonadAsync m => Applicative (t m)) => IsStream t
- fromSerial :: IsStream t => SerialT m a -> t m a
- fromWSerial :: IsStream t => WSerialT m a -> t m a
- fromAsync :: IsStream t => AsyncT m a -> t m a
- fromAhead :: IsStream t => AheadT m a -> t m a
- fromWAsync :: IsStream t => WAsyncT m a -> t m a
- fromParallel :: IsStream t => ParallelT m a -> t m a
- fromZipSerial :: IsStream t => ZipSerialM m a -> t m a
- fromZipAsync :: IsStream t => ZipAsyncM m a -> t m a
- adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a
- type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m)
Stream Types
Serial Streams
For SerialT streams:
(<>) =serial--Semigroup(>>=) = flip .concatMapWithserial--Monad
A single Monad bind behaves like a for loop:
>>>:{Stream.toList $ do x <- Stream.fromList [1,2] -- foreach x in stream return x :} [1,2]
Nested monad binds behave like nested for loops:
>>>:{Stream.toList $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [3,4] -- foreach y in stream return (x, y) :} [(1,3),(1,4),(2,3),(2,4)]
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
For WSerialT streams:
(<>) =wSerial--Semigroup(>>=) = flip .concatMapWithwSerial--Monad
Note that <> is associative only if we disregard the ordering of elements
in the resulting stream.
A single Monad bind behaves like a for loop:
>>>:{Stream.toList $ Stream.fromWSerial $ do x <- Stream.fromList [1,2] -- foreach x in stream return x :} [1,2]
Nested monad binds behave like interleaved nested for loops:
>>>:{Stream.toList $ Stream.fromWSerial $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [3,4] -- foreach y in stream return (x, y) :} [(1,3),(2,3),(1,4),(2,4)]
It is a result of interleaving all the nested iterations corresponding to
element 1 in the first stream with all the nested iterations of element
2:
>>>import Streamly.Prelude (wSerial)>>>Stream.toList $ Stream.fromList [(1,3),(1,4)] `wSerial` Stream.fromList [(2,3),(2,4)][(1,3),(2,3),(1,4),(2,4)]
The W in the name stands for wide or breadth wise scheduling in
contrast to the depth wise scheduling behavior of SerialT.
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
Speculative Streams
For AheadT streams:
(<>) =ahead(>>=) = flip .concatMapWithahead
A single Monad bind behaves like a for loop with iterations executed
concurrently, ahead of time, producing side effects of iterations out of
order, but results in order:
>>>:{Stream.toList $ Stream.fromAhead $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [2,1]
Nested monad binds behave like nested for loops with nested iterations
executed concurrently, ahead of time:
>>>:{Stream.toList $ Stream.fromAhead $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,5,4,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1 in the first stream constitute one output stream and all
the iterations corresponding to 2 constitute another output stream and
these two output streams are merged using ahead.
Since: 0.3.0 (Streamly)
Since: 0.8.0
Instances
| MonadTrans AheadT Source # | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
| IsStream AheadT Source # | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
| (MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AheadT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
| (MonadState s m, MonadAsync m) => MonadState s (AheadT m) Source # | |
| (MonadReader r m, MonadAsync m) => MonadReader r (AheadT m) Source # | |
| MonadAsync m => Monad (AheadT m) Source # | |
| Monad m => Functor (AheadT m) Source # | |
| (Monad m, MonadAsync m) => Applicative (AheadT m) Source # | |
| (MonadIO m, MonadAsync m) => MonadIO (AheadT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
| (MonadThrow m, MonadAsync m) => MonadThrow (AheadT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
| MonadAsync m => Semigroup (AheadT m a) Source # | |
| MonadAsync m => Monoid (AheadT m a) Source # | |
Asynchronous Streams
For AsyncT streams:
(<>) =async(>>=) = flip .concatMapWithasync
A single Monad bind behaves like a for loop with iterations of the loop
executed concurrently a la the async combinator, producing results and
side effects of iterations out of order:
>>>:{Stream.toList $ Stream.fromAsync $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [1,2]
Nested monad binds behave like nested for loops with nested iterations
executed concurrently, a la the async combinator:
>>>:{Stream.toList $ Stream.fromAsync $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,4,5,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1 in the first stream constitute one output stream and all
the iterations corresponding to 2 constitute another output stream and
these two output streams are merged using async.
Since: 0.1.0 (Streamly)
Since: 0.8.0
Instances
| MonadTrans AsyncT Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
| IsStream AsyncT Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
| (MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
| (MonadState s m, MonadAsync m) => MonadState s (AsyncT m) Source # | |
| (MonadReader r m, MonadAsync m) => MonadReader r (AsyncT m) Source # | |
| MonadAsync m => Monad (AsyncT m) Source # | |
| Monad m => Functor (AsyncT m) Source # | |
| (Monad m, MonadAsync m) => Applicative (AsyncT m) Source # | |
| (MonadIO m, MonadAsync m) => MonadIO (AsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
| (MonadThrow m, MonadAsync m) => MonadThrow (AsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
| MonadAsync m => Semigroup (AsyncT m a) Source # | |
| MonadAsync m => Monoid (AsyncT m a) Source # | |
For WAsyncT streams:
(<>) =wAsync(>>=) = flip .concatMapWithwAsync
A single Monad bind behaves like a for loop with iterations of the loop
executed concurrently a la the wAsync combinator, producing results and
side effects of iterations out of order:
>>>:{Stream.toList $ Stream.fromWAsync $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [1,2]
Nested monad binds behave like nested for loops with nested iterations
executed concurrently, a la the wAsync combinator:
>>>:{Stream.toList $ Stream.fromWAsync $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,4,5,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1 in the first stream constitute one WAsyncT output
stream and all the iterations corresponding to 2 constitute another
WAsyncT output stream and these two output streams are merged using
wAsync.
The W in the name stands for wide or breadth wise scheduling in
contrast to the depth wise scheduling behavior of AsyncT.
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
| MonadTrans WAsyncT Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
| IsStream WAsyncT Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
| (MonadBase b m, Monad m, MonadAsync m) => MonadBase b (WAsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
| (MonadState s m, MonadAsync m) => MonadState s (WAsyncT m) Source # | |
| (MonadReader r m, MonadAsync m) => MonadReader r (WAsyncT m) Source # | |
| MonadAsync m => Monad (WAsyncT m) Source # | |
| Monad m => Functor (WAsyncT m) Source # | |
| (Monad m, MonadAsync m) => Applicative (WAsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
| (MonadIO m, MonadAsync m) => MonadIO (WAsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
| (MonadThrow m, MonadAsync m) => MonadThrow (WAsyncT m) Source # | |
Defined in Streamly.Internal.Data.Stream.Async | |
| MonadAsync m => Semigroup (WAsyncT m a) Source # | |
| MonadAsync m => Monoid (WAsyncT m a) Source # | |
Parallel Streams
Ahead, Async and WAsync schedule actions concurrently on demand.
Unlike those Parallel streams schedule all actions concurrently
upfront.
For ParallelT streams:
(<>) =parallel(>>=) = flip .concatMapWithparallel
See AsyncT, ParallelT is similar except that all
iterations are strictly concurrent while in AsyncT it depends on the
consumer demand and available threads. See parallel for more details.
Since: 0.1.0 (Streamly)
Since: 0.7.0 (maxBuffer applies to ParallelT streams)
Since: 0.8.0
Instances
mkAsync :: (IsStream t, MonadAsync m) => t m a -> t m a Source #
Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it terminates if the buffer is full and a worker thread is kicked off again to evaluate the remaining stream when there is space in the buffer. The consumer consumes the stream lazily from the buffer.
Since: 0.2.0 (Streamly)
Since: 0.8.0
Zipping Streams
data ZipSerialM m a Source #
For ZipSerialM streams:
(<>) = serial
(*) = 'Streamly.Prelude.serial.zipWith' id
Applicative evaluates the streams being zipped serially:
>>>s1 = Stream.fromFoldable [1, 2]>>>s2 = Stream.fromFoldable [3, 4]>>>s3 = Stream.fromFoldable [5, 6]>>>Stream.toList $ Stream.fromZipSerial $ (,,) <$> s1 <*> s2 <*> s3[(1,3,5),(2,4,6)]
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
type ZipSerial = ZipSerialM IO Source #
For ZipAsyncM streams:
(<>) = serial
(*) = 'Streamly.Prelude.serial.zipAsyncWith' id
Applicative evaluates the streams being zipped concurrently, the following would take half the time that it would take in serial zipping:
>>>s = Stream.fromFoldableM $ Prelude.map delay [1, 1, 1]>>>Stream.toList $ Stream.fromZipAsync $ (,) <$> s <*> s... [(1,1),(1,1),(1,1)]
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
| IsStream ZipAsyncM Source # | |
Defined in Streamly.Internal.Data.Stream.Zip Methods toStream :: forall (m :: Type -> Type) a. ZipAsyncM m a -> Stream m a Source # fromStream :: forall (m :: Type -> Type) a. Stream m a -> ZipAsyncM m a Source # consM :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source # (|:) :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source # | |
| Monad m => Functor (ZipAsyncM m) Source # | |
| MonadAsync m => Applicative (ZipAsyncM m) Source # | |
Defined in Streamly.Internal.Data.Stream.Zip | |
| Semigroup (ZipAsyncM m a) Source # | |
| Monoid (ZipAsyncM m a) Source # | |
Stream Type Adapters
class (forall m a. MonadAsync m => Semigroup (t m a), forall m a. MonadAsync m => Monoid (t m a), forall m. Monad m => Functor (t m), forall m. MonadAsync m => Applicative (t m)) => IsStream t Source #
Class of types that can represent a stream of elements of some type a in
some monad m.
Since: 0.2.0 (Streamly)
Since: 0.8.0
Minimal complete definition
Instances
fromSerial :: IsStream t => SerialT m a -> t m a Source #
fromWSerial :: IsStream t => WSerialT m a -> t m a Source #
fromWAsync :: IsStream t => WAsyncT m a -> t m a Source #
fromParallel :: IsStream t => ParallelT m a -> t m a Source #
fromZipSerial :: IsStream t => ZipSerialM m a -> t m a Source #
fromZipAsync :: IsStream t => ZipAsyncM m a -> t m a Source #
adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a Source #
Adapt any specific stream type to any other specific stream type.
Since: 0.1.0 (Streamly)
Since: 0.8.0
Type Synonyms
type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m) Source #
A monad that can perform concurrent or parallel IO operations. Streams
that can be composed concurrently require the underlying monad to be
MonadAsync.
Since: 0.1.0 (Streamly)
Since: 0.8.0