Copyright | (c) 2017 Harendra Kumar |
---|---|

License | BSD3 |

Maintainer | streamly@composewell.com |

Stability | experimental |

Portability | GHC |

Safe Haskell | None |

Language | Haskell2010 |

Continuation passing style (CPS) stream implementation. The symbol `K`

below
denotes a function as well as a Kontinuation.

## Synopsis

- class (forall m a. MonadAsync m => Semigroup (t m a), forall m a. MonadAsync m => Monoid (t m a), forall m. Monad m => Functor (t m), forall m. MonadAsync m => Applicative (t m)) => IsStream t where
- toStream :: t m a -> Stream m a
- fromStream :: Stream m a -> t m a
- consM :: MonadAsync m => m a -> t m a -> t m a
- (|:) :: MonadAsync m => m a -> t m a -> t m a

- adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a
- newtype Stream m a = MkStream (forall r. State Stream m a -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
- mkStream :: IsStream t => (forall r. State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a
- fromStopK :: IsStream t => StopK m -> t m a
- fromYieldK :: IsStream t => YieldK m a -> t m a
- consK :: IsStream t => YieldK m a -> t m a -> t m a
- foldStream :: IsStream t => State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
- foldStreamShared :: IsStream t => State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
- foldrM :: IsStream t => (a -> m b -> m b) -> m b -> t m a -> m b
- foldrS :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b
- foldrSM :: (IsStream t, Monad m) => (m a -> t m b -> t m b) -> t m b -> t m a -> t m b
- build :: IsStream t => forall a. (forall b. (a -> b -> b) -> b -> b) -> t m a
- buildS :: IsStream t => ((a -> t m a -> t m a) -> t m a -> t m a) -> t m a
- buildM :: (IsStream t, MonadAsync m) => (forall r. (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a
- buildSM :: (IsStream t, MonadAsync m) => ((m a -> t m a -> t m a) -> t m a -> t m a) -> t m a
- sharedM :: (IsStream t, MonadAsync m) => (forall r. (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a
- augmentS :: IsStream t => ((a -> t m a -> t m a) -> t m a -> t m a) -> t m a -> t m a
- augmentSM :: (IsStream t, MonadAsync m) => ((m a -> t m a -> t m a) -> t m a -> t m a) -> t m a -> t m a
- cons :: IsStream t => a -> t m a -> t m a
- (.:) :: IsStream t => a -> t m a -> t m a
- consMStream :: Monad m => m a -> Stream m a -> Stream m a
- consMBy :: (IsStream t, MonadAsync m) => (t m a -> t m a -> t m a) -> m a -> t m a -> t m a
- yieldM :: (Monad m, IsStream t) => m a -> t m a
- yield :: IsStream t => a -> t m a
- nil :: IsStream t => t m a
- nilM :: (IsStream t, Monad m) => m b -> t m a
- conjoin :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- serial :: IsStream t => t m a -> t m a -> t m a
- map :: IsStream t => (a -> b) -> t m a -> t m b
- mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b
- mapMSerial :: MonadAsync m => (a -> m b) -> Stream m a -> Stream m b
- unShare :: IsStream t => t m a -> t m a
- concatMapBy :: IsStream t => (forall c. t m c -> t m c -> t m c) -> (a -> t m b) -> t m a -> t m b
- concatMap :: IsStream t => (a -> t m b) -> t m a -> t m b
- bindWith :: IsStream t => (forall c. t m c -> t m c -> t m c) -> t m a -> (a -> t m b) -> t m b
- type Streaming = IsStream

# A class for streams

class (forall m a. MonadAsync m => Semigroup (t m a), forall m a. MonadAsync m => Monoid (t m a), forall m. Monad m => Functor (t m), forall m. MonadAsync m => Applicative (t m)) => IsStream t where Source #

Class of types that can represent a stream of elements of some type `a`

in
some monad `m`

.

*Since: 0.2.0*

toStream :: t m a -> Stream m a Source #

fromStream :: Stream m a -> t m a Source #

consM :: MonadAsync m => m a -> t m a -> t m a infixr 5 Source #

Constructs a stream by adding a monadic action at the head of an existing stream. For example:

> toList $ getLine `consM` getLine `consM` nil hello world ["hello","world"]

*Concurrent (do not use parallely to construct infinite streams)*

*Since: 0.2.0*

(|:) :: MonadAsync m => m a -> t m a -> t m a infixr 5 Source #

Operator equivalent of `consM`

. We can read it as "`parallel colon`

"
to remember that `|`

comes before `:`

.

> toList $ getLine |: getLine |: nil hello world ["hello","world"]

let delay = threadDelay 1000000 >> print 1 drain $ serially $ delay |: delay |: delay |: nil drain $ parallely $ delay |: delay |: delay |: nil

*Concurrent (do not use parallely to construct infinite streams)*

*Since: 0.2.0*

## Instances

IsStream Stream Source # | |

IsStream ZipAsyncM Source # | |

IsStream ZipSerialM Source # | |

Defined in Streamly.Internal.Data.Stream.Zip toStream :: ZipSerialM m a -> Stream m a Source # fromStream :: Stream m a -> ZipSerialM m a Source # consM :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source # (|:) :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source # | |

IsStream WSerialT Source # | |

IsStream SerialT Source # | |

IsStream ParallelT Source # | |

IsStream WAsyncT Source # | |

IsStream AsyncT Source # | |

IsStream AheadT Source # | |

adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a Source #

Adapt any specific stream type to any other specific stream type.

*Since: 0.1.0*

# The stream type

The type `Stream m a`

represents a monadic stream of values of type `a`

constructed using actions in monad `m`

. It uses stop, singleton and yield
continuations equivalent to the following direct style type:

data Stream m a = Stop | Singleton a | Yield a (Stream m a)

To facilitate parallel composition we maintain a local state in an `SVar`

that is shared across and is used for synchronization of the streams being
composed.

The singleton case can be expressed in terms of stop and yield but we have
it as a separate case to optimize composition operations for streams with
single element. We build singleton streams in the implementation of `pure`

for Applicative and Monad, and in `lift`

for MonadTrans.

XXX remove the Stream type parameter from State as it is always constant. We can remove it from SVar as well

# Construction

mkStream :: IsStream t => (forall r. State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a Source #

Build a stream from an `SVar`

, a stop continuation, a singleton stream
continuation and a yield continuation.

fromYieldK :: IsStream t => YieldK m a -> t m a Source #

Make a singleton stream from a callback function. The callback function calls the one-shot yield continuation to yield an element.

consK :: IsStream t => YieldK m a -> t m a -> t m a Source #

Add a yield function at the head of the stream.

# Elimination

foldStream :: IsStream t => State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r Source #

Fold a stream by providing a State, stop continuation, a singleton continuation and a yield continuation. The stream will not use the SVar passed via State.

foldStreamShared :: IsStream t => State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r Source #

Fold a stream by providing an SVar, a stop continuation, a singleton continuation and a yield continuation. The stream would share the current SVar passed via the State.

# foldr/build

foldrM :: IsStream t => (a -> m b -> m b) -> m b -> t m a -> m b Source #

Lazy right fold with a monadic step function.

foldrS :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b Source #

Lazy right associative fold to a stream.

buildM :: (IsStream t, MonadAsync m) => (forall r. (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a Source #

buildSM :: (IsStream t, MonadAsync m) => ((m a -> t m a -> t m a) -> t m a -> t m a) -> t m a Source #

sharedM :: (IsStream t, MonadAsync m) => (forall r. (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a Source #

Like `buildM`

but shares the SVar state across computations.

augmentSM :: (IsStream t, MonadAsync m) => ((m a -> t m a -> t m a) -> t m a -> t m a) -> t m a -> t m a Source #

cons :: IsStream t => a -> t m a -> t m a infixr 5 Source #

Construct a stream by adding a pure value at the head of an existing
stream. For serial streams this is the same as `(return a) `consM` r`

but
more efficient. For concurrent streams this is not concurrent whereas
`consM`

is concurrent. For example:

> toList $ 1 `cons` 2 `cons` 3 `cons` nil [1,2,3]

*Since: 0.1.0*

consMBy :: (IsStream t, MonadAsync m) => (t m a -> t m a -> t m a) -> m a -> t m a -> t m a Source #

nilM :: (IsStream t, Monad m) => m b -> t m a Source #

An empty stream producing a side effect.

> toList (nilM (print "nil")) "nil" []

*Internal*

conjoin :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b Source #

mapMSerial :: MonadAsync m => (a -> m b) -> Stream m a -> Stream m b Source #

concatMapBy :: IsStream t => (forall c. t m c -> t m c -> t m c) -> (a -> t m b) -> t m a -> t m b Source #