Copyright | (c) 2017 Composewell Technologies |
---|---|
License | BSD3 |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | Safe-Inferred |
Language | Haskell2010 |
Synopsis
- class (forall m a. MonadAsync m => Semigroup (t m a), forall m a. MonadAsync m => Monoid (t m a), forall m. Monad m => Functor (t m), forall m. MonadAsync m => Applicative (t m)) => IsStream t where
- toStream :: t m a -> Stream m a
- fromStream :: Stream m a -> t m a
- consM :: MonadAsync m => m a -> t m a -> t m a
- (|:) :: MonadAsync m => m a -> t m a -> t m a
- newtype Stream m a = MkStream (forall r. State Stream m a -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
- type Streaming = IsStream
- fromStreamS :: (IsStream t, Monad m) => Stream m a -> t m a
- toStreamS :: (IsStream t, Monad m) => t m a -> Stream m a
- fromStreamD :: (IsStream t, Monad m) => Stream m a -> t m a
- toStreamD :: (IsStream t, Monad m) => t m a -> Stream m a
- adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a
- toConsK :: IsStream t => (m a -> t m a -> t m a) -> m a -> Stream m a -> Stream m a
- mkStream :: IsStream t => (forall r. State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a
- foldStreamShared :: IsStream t => State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
- foldStream :: IsStream t => State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
- data SerialT m a
- type Serial = SerialT IO
- fromSerial :: IsStream t => SerialT m a -> t m a
- data WSerialT m a
- type WSerial = WSerialT IO
- fromWSerial :: IsStream t => WSerialT m a -> t m a
- data AsyncT m a
- type Async = AsyncT IO
- fromAsync :: IsStream t => AsyncT m a -> t m a
- data WAsyncT m a
- type WAsync = WAsyncT IO
- fromWAsync :: IsStream t => WAsyncT m a -> t m a
- data AheadT m a
- type Ahead = AheadT IO
- fromAhead :: IsStream t => AheadT m a -> t m a
- data ParallelT m a
- type Parallel = ParallelT IO
- fromParallel :: IsStream t => ParallelT m a -> t m a
- data ZipSerialM m a
- type ZipSerial = ZipSerialM IO
- fromZipSerial :: IsStream t => ZipSerialM m a -> t m a
- data ZipAsyncM m a
- type ZipAsync = ZipAsyncM IO
- fromZipAsync :: IsStream t => ZipAsyncM m a -> t m a
- cons :: IsStream t => a -> t m a -> t m a
- (.:) :: IsStream t => a -> t m a -> t m a
- nil :: IsStream t => t m a
- nilM :: (IsStream t, Monad m) => m b -> t m a
- fromPure :: IsStream t => a -> t m a
- fromEffect :: (Monad m, IsStream t) => m a -> t m a
- repeat :: IsStream t => a -> t m a
- bindWith :: IsStream t => (t m b -> t m b -> t m b) -> t m a -> (a -> t m b) -> t m b
- concatMapWith :: IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
- concatFoldableWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a
- concatMapFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b
- concatForFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b
- drain :: (IsStream t, Monad m) => t m a -> m ()
- fromList :: (Monad m, IsStream t) => [a] -> t m a
- toList :: (IsStream t, Monad m) => t m a -> m [a]
- foldrM :: (IsStream t, Monad m) => (a -> m b -> m b) -> m b -> t m a -> m b
- foldrMx :: (IsStream t, Monad m) => (a -> m x -> m x) -> m x -> (m x -> m b) -> t m a -> m b
- foldr :: (IsStream t, Monad m) => (a -> b -> b) -> b -> t m a -> m b
- foldlx' :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> m b
- foldlMx' :: (IsStream t, Monad m) => (x -> a -> m x) -> m x -> (x -> m b) -> t m a -> m b
- foldl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> m b
- fold :: (IsStream t, Monad m) => Fold m a b -> t m a -> m b
- eqBy :: (IsStream t, Monad m) => (a -> b -> Bool) -> t m a -> t m b -> m Bool
- cmpBy :: (IsStream t, Monad m) => (a -> b -> Ordering) -> t m a -> t m b -> m Ordering
- interleaving :: IsStream t => WSerialT m a -> t m a
- zipping :: IsStream t => ZipSerialM m a -> t m a
- zippingAsync :: IsStream t => ZipAsyncM m a -> t m a
IsStream Type Class
class (forall m a. MonadAsync m => Semigroup (t m a), forall m a. MonadAsync m => Monoid (t m a), forall m. Monad m => Functor (t m), forall m. MonadAsync m => Applicative (t m)) => IsStream t where Source #
Class of types that can represent a stream of elements of some type a
in
some monad m
.
Since: 0.2.0 (Streamly)
Since: 0.8.0
toStream :: t m a -> Stream m a Source #
fromStream :: Stream m a -> t m a Source #
consM :: MonadAsync m => m a -> t m a -> t m a infixr 5 Source #
Constructs a stream by adding a monadic action at the head of an existing stream. For example:
> toList $ getLine `consM` getLine `consM` nil hello world ["hello","world"]
Concurrent (do not use fromParallel
to construct infinite streams)
Since: 0.2.0
(|:) :: MonadAsync m => m a -> t m a -> t m a infixr 5 Source #
Operator equivalent of consM
. We can read it as "parallel colon
"
to remember that |
comes before :
.
> toList $ getLine |: getLine |: nil hello world ["hello","world"]
let delay = threadDelay 1000000 >> print 1 drain $ fromSerial $ delay |: delay |: delay |: nil drain $ fromParallel $ delay |: delay |: delay |: nil
Concurrent (do not use fromParallel
to construct infinite streams)
Since: 0.2.0
Instances
The type Stream m a
represents a monadic stream of values of type a
constructed using actions in monad m
. It uses stop, singleton and yield
continuations equivalent to the following direct style type:
data Stream m a = Stop | Singleton a | Yield a (Stream m a)
To facilitate parallel composition we maintain a local state in an SVar
that is shared across and is used for synchronization of the streams being
composed.
The singleton case can be expressed in terms of stop and yield but we have
it as a separate case to optimize composition operations for streams with
single element. We build singleton streams in the implementation of pure
for Applicative and Monad, and in lift
for MonadTrans.
Type Conversion
adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a Source #
Adapt any specific stream type to any other specific stream type.
Since: 0.1.0 (Streamly)
Since: 0.8.0
toConsK :: IsStream t => (m a -> t m a -> t m a) -> m a -> Stream m a -> Stream m a Source #
Adapt a polymorphic consM operation to a StreamK cons operation
Building a stream
mkStream :: IsStream t => (forall r. State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a Source #
Build a stream from an SVar
, a stop continuation, a singleton stream
continuation and a yield continuation.
foldStreamShared :: IsStream t => State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r Source #
Fold a stream by providing an SVar, a stop continuation, a singleton continuation and a yield continuation. The stream would share the current SVar passed via the State.
foldStream :: IsStream t => State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r Source #
Fold a stream by providing a State, stop continuation, a singleton continuation and a yield continuation. The stream will not use the SVar passed via State.
Stream Types
For SerialT
streams:
(<>) =serial
--Semigroup
(>>=) = flip .concatMapWith
serial
--Monad
A single Monad
bind behaves like a for
loop:
>>>
:{
Stream.toList $ do x <- Stream.fromList [1,2] -- foreach x in stream return x :} [1,2]
Nested monad binds behave like nested for
loops:
>>>
:{
Stream.toList $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [3,4] -- foreach y in stream return (x, y) :} [(1,3),(1,4),(2,3),(2,4)]
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
fromSerial :: IsStream t => SerialT m a -> t m a Source #
For WSerialT
streams:
(<>) =wSerial
--Semigroup
(>>=) = flip .concatMapWith
wSerial
--Monad
Note that <>
is associative only if we disregard the ordering of elements
in the resulting stream.
A single Monad
bind behaves like a for
loop:
>>>
:{
Stream.toList $ Stream.fromWSerial $ do x <- Stream.fromList [1,2] -- foreach x in stream return x :} [1,2]
Nested monad binds behave like interleaved nested for
loops:
>>>
:{
Stream.toList $ Stream.fromWSerial $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [3,4] -- foreach y in stream return (x, y) :} [(1,3),(2,3),(1,4),(2,4)]
It is a result of interleaving all the nested iterations corresponding to
element 1
in the first stream with all the nested iterations of element
2
:
>>>
import Streamly.Prelude (wSerial)
>>>
Stream.toList $ Stream.fromList [(1,3),(1,4)] `Stream.wSerial` Stream.fromList [(2,3),(2,4)]
[(1,3),(2,3),(1,4),(2,4)]
The W
in the name stands for wide
or breadth wise scheduling in
contrast to the depth wise scheduling behavior of SerialT
.
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
fromWSerial :: IsStream t => WSerialT m a -> t m a Source #
For AsyncT
streams:
(<>) =async
(>>=) = flip .concatMapWith
async
A single Monad
bind behaves like a for
loop with iterations of the loop
executed concurrently a la the async
combinator, producing results and
side effects of iterations out of order:
>>>
:{
Stream.toList $ Stream.fromAsync $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [1,2]
Nested monad binds behave like nested for
loops with nested iterations
executed concurrently, a la the async
combinator:
>>>
:{
Stream.toList $ Stream.fromAsync $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,4,5,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1
in the first stream constitute one output stream and all
the iterations corresponding to 2
constitute another output stream and
these two output streams are merged using async
.
Since: 0.1.0 (Streamly)
Since: 0.8.0
Instances
For WAsyncT
streams:
(<>) =wAsync
(>>=) = flip .concatMapWith
wAsync
A single Monad
bind behaves like a for
loop with iterations of the loop
executed concurrently a la the wAsync
combinator, producing results and
side effects of iterations out of order:
>>>
:{
Stream.toList $ Stream.fromWAsync $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [1,2]
Nested monad binds behave like nested for
loops with nested iterations
executed concurrently, a la the wAsync
combinator:
>>>
:{
Stream.toList $ Stream.fromWAsync $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,4,5,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1
in the first stream constitute one WAsyncT
output
stream and all the iterations corresponding to 2
constitute another
WAsyncT
output stream and these two output streams are merged using
wAsync
.
The W
in the name stands for wide
or breadth wise scheduling in
contrast to the depth wise scheduling behavior of AsyncT
.
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
fromWAsync :: IsStream t => WAsyncT m a -> t m a Source #
For AheadT
streams:
(<>) =ahead
(>>=) = flip .concatMapWith
ahead
A single Monad
bind behaves like a for
loop with iterations executed
concurrently, ahead of time, producing side effects of iterations out of
order, but results in order:
>>>
:{
Stream.toList $ Stream.fromAhead $ do x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x :} 1 sec 2 sec [2,1]
Nested monad binds behave like nested for
loops with nested iterations
executed concurrently, ahead of time:
>>>
:{
Stream.toList $ Stream.fromAhead $ do x <- Stream.fromList [1,2] -- foreach x in stream y <- Stream.fromList [2,4] -- foreach y in stream Stream.fromEffect $ delay (x + y) :} 3 sec 4 sec 5 sec 6 sec [3,5,4,6]
The behavior can be explained as follows. All the iterations corresponding
to the element 1
in the first stream constitute one output stream and all
the iterations corresponding to 2
constitute another output stream and
these two output streams are merged using ahead
.
Since: 0.3.0 (Streamly)
Since: 0.8.0
Instances
For ParallelT
streams:
(<>) =parallel
(>>=) = flip .concatMapWith
parallel
See AsyncT
, ParallelT
is similar except that all
iterations are strictly concurrent while in AsyncT
it depends on the
consumer demand and available threads. See parallel
for more details.
Since: 0.1.0 (Streamly)
Since: 0.7.0 (maxBuffer applies to ParallelT streams)
Since: 0.8.0
Instances
fromParallel :: IsStream t => ParallelT m a -> t m a Source #
data ZipSerialM m a Source #
For ZipSerialM
streams:
(<>) = serial
(*) = 'Streamly.Prelude.serial.zipWith' id
Applicative evaluates the streams being zipped serially:
>>>
s1 = Stream.fromFoldable [1, 2]
>>>
s2 = Stream.fromFoldable [3, 4]
>>>
s3 = Stream.fromFoldable [5, 6]
>>>
Stream.toList $ Stream.fromZipSerial $ (,,) <$> s1 <*> s2 <*> s3
[(1,3,5),(2,4,6)]
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
type ZipSerial = ZipSerialM IO Source #
fromZipSerial :: IsStream t => ZipSerialM m a -> t m a Source #
For ZipAsyncM
streams:
(<>) = serial
(*) = 'Streamly.Prelude.serial.zipAsyncWith' id
Applicative evaluates the streams being zipped concurrently, the following would take half the time that it would take in serial zipping:
>>>
s = Stream.fromFoldableM $ Prelude.map delay [1, 1, 1]
>>>
Stream.toList $ Stream.fromZipAsync $ (,) <$> s <*> s
... [(1,1),(1,1),(1,1)]
Since: 0.2.0 (Streamly)
Since: 0.8.0
Instances
IsStream ZipAsyncM Source # | |
Defined in Streamly.Internal.Data.Stream.IsStream.Type toStream :: forall (m :: Type -> TYPE LiftedRep) a. ZipAsyncM m a -> Stream m a Source # fromStream :: forall (m :: Type -> TYPE LiftedRep) a. Stream m a -> ZipAsyncM m a Source # consM :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source # (|:) :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source # | |
MonadAsync m => Applicative (ZipAsyncM m) Source # | |
Defined in Streamly.Internal.Data.Stream.ZipAsync | |
Monad m => Functor (ZipAsyncM m) Source # | |
Monoid (ZipAsyncM m a) Source # | |
Semigroup (ZipAsyncM m a) Source # | |
fromZipAsync :: IsStream t => ZipAsyncM m a -> t m a Source #
Construction
cons :: IsStream t => a -> t m a -> t m a infixr 5 Source #
Construct a stream by adding a pure value at the head of an existing
stream. For serial streams this is the same as (return a) `consM` r
but
more efficient. For concurrent streams this is not concurrent whereas
consM
is concurrent. For example:
> toList $ 1 `cons` 2 `cons` 3 `cons` nil [1,2,3]
Since: 0.1.0
fromEffect :: (Monad m, IsStream t) => m a -> t m a Source #
Bind/Concat
concatMapWith :: IsStream t => (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b Source #
concatMapWith mixer generator stream
is a two dimensional looping
combinator. The generator
function is used to generate streams from the
elements in the input stream
and the mixer
function is used to merge
those streams.
Note we can merge streams concurrently by using a concurrent merge function.
Since: 0.7.0
Since: 0.8.0 (signature change)
Fold Utilities
concatFoldableWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a Source #
A variant of fold
that allows you to fold a Foldable
container of streams using the specified stream sum operation.
concatFoldableWith async
$ map return [1..3]
Equivalent to:
concatFoldableWith f = Prelude.foldr f S.nil concatFoldableWith f = S.concatMapFoldableWith f id
Since: 0.8.0 (Renamed foldWith to concatFoldableWith)
Since: 0.1.0 (Streamly)
concatMapFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b Source #
A variant of foldMap
that allows you to map a monadic streaming action
on a Foldable
container and then fold it using the specified stream merge
operation.
concatMapFoldableWith async
return [1..3]
Equivalent to:
concatMapFoldableWith f g = Prelude.foldr (f . g) S.nil concatMapFoldableWith f g xs = S.concatMapWith f g (S.fromFoldable xs)
Since: 0.8.0 (Renamed foldMapWith to concatMapFoldableWith)
Since: 0.1.0 (Streamly)
concatForFoldableWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b Source #
Like concatMapFoldableWith
but with the last two arguments reversed i.e. the
monadic streaming function is the last argument.
Equivalent to:
concatForFoldableWith f xs g = Prelude.foldr (f . g) S.nil xs concatForFoldableWith f = flip (S.concatMapFoldableWith f)
Since: 0.8.0 (Renamed forEachWith to concatForFoldableWith)
Since: 0.1.0 (Streamly)
Running Effects
Conversion operations
fromList :: (Monad m, IsStream t) => [a] -> t m a Source #
fromList =foldr
cons
nil
Construct a stream from a list of pure values. This is more efficient than
fromFoldable
for serial streams.
Since: 0.4.0
toList :: (IsStream t, Monad m) => t m a -> m [a] Source #
Convert a stream into a list in the underlying monad.
Since: 0.1.0
Fold operations
foldrMx :: (IsStream t, Monad m) => (a -> m x -> m x) -> m x -> (m x -> m b) -> t m a -> m b Source #
foldlx' :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> m b Source #
Strict left fold with an extraction function. Like the standard strict
left fold, but applies a user supplied extraction function (the third
argument) to the folded value at the end. This is designed to work with the
foldl
library. The suffix x
is a mnemonic for extraction.
Since: 0.7.0
foldlMx' :: (IsStream t, Monad m) => (x -> a -> m x) -> m x -> (x -> m b) -> t m a -> m b Source #
Like foldlx'
, but with a monadic step function.
Since: 0.7.0
foldl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> m b Source #
Strict left associative fold.
Since: 0.2.0
Zip style operations
eqBy :: (IsStream t, Monad m) => (a -> b -> Bool) -> t m a -> t m b -> m Bool Source #
Compare two streams for equality
Since: 0.5.3
cmpBy :: (IsStream t, Monad m) => (a -> b -> Ordering) -> t m a -> t m b -> m Ordering Source #
Compare two streams
Since: 0.5.3
Deprecated
interleaving :: IsStream t => WSerialT m a -> t m a Source #
zipping :: IsStream t => ZipSerialM m a -> t m a Source #
zippingAsync :: IsStream t => ZipAsyncM m a -> t m a Source #