streamly-core-0.2.2: Streaming, parsers, arrays, serialization and more
Copyright(c) 2017 Composewell Technologies
LicenseBSD3
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellSafe-Inferred
LanguageHaskell2010

Streamly.Internal.Data.StreamK

Description

 
Synopsis

Setup

To execute the code examples provided in this module in ghci, please run the following commands first.

>>> :m
>>> import Control.Concurrent (threadDelay)
>>> import Data.Function (fix, (&))
>>> import Data.Semigroup (cycle1)
>>> effect n = print n >> return n
>>> import Streamly.Data.StreamK (StreamK)
>>> import qualified Streamly.Data.Fold as Fold
>>> import qualified Streamly.Data.Parser as Parser
>>> import qualified Streamly.Data.Stream as Stream
>>> import qualified Streamly.Data.StreamK as StreamK
>>> import qualified Streamly.FileSystem.Dir as Dir

For APIs that have not been released yet.

>>> import qualified Streamly.Internal.Data.StreamK as StreamK
>>> import qualified Streamly.Internal.FileSystem.Dir as Dir

The stream type

StreamK type

type Stream = StreamK Source #

Deprecated: Please use StreamK instead.

Continuation Passing Style (CPS) version of Streamly.Data.Stream.Stream. Unlike Streamly.Data.Stream.Stream, StreamK can be composed recursively without affecting performance.

Semigroup instance appends two streams:

>>> (<>) = Stream.append

newtype StreamK m a Source #

Constructors

MkStream (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) 

Instances

Instances details
(Foldable m, Monad m) => Foldable (StreamK m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

fold :: Monoid m0 => StreamK m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> StreamK m a -> m0 #

foldMap' :: Monoid m0 => (a -> m0) -> StreamK m a -> m0 #

foldr :: (a -> b -> b) -> b -> StreamK m a -> b #

foldr' :: (a -> b -> b) -> b -> StreamK m a -> b #

foldl :: (b -> a -> b) -> b -> StreamK m a -> b #

foldl' :: (b -> a -> b) -> b -> StreamK m a -> b #

foldr1 :: (a -> a -> a) -> StreamK m a -> a #

foldl1 :: (a -> a -> a) -> StreamK m a -> a #

toList :: StreamK m a -> [a] #

null :: StreamK m a -> Bool #

length :: StreamK m a -> Int #

elem :: Eq a => a -> StreamK m a -> Bool #

maximum :: Ord a => StreamK m a -> a #

minimum :: Ord a => StreamK m a -> a #

sum :: Num a => StreamK m a -> a #

product :: Num a => StreamK m a -> a #

Traversable (StreamK Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

traverse :: Applicative f => (a -> f b) -> StreamK Identity a -> f (StreamK Identity b) #

sequenceA :: Applicative f => StreamK Identity (f a) -> f (StreamK Identity a) #

mapM :: Monad m => (a -> m b) -> StreamK Identity a -> m (StreamK Identity b) #

sequence :: Monad m => StreamK Identity (m a) -> m (StreamK Identity a) #

Monad m => Functor (StreamK m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

fmap :: (a -> b) -> StreamK m a -> StreamK m b #

(<$) :: a -> StreamK m b -> StreamK m a #

a ~ Char => IsString (StreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Monoid (StreamK m a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

mempty :: StreamK m a #

mappend :: StreamK m a -> StreamK m a -> StreamK m a #

mconcat :: [StreamK m a] -> StreamK m a #

Semigroup (StreamK m a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

(<>) :: StreamK m a -> StreamK m a -> StreamK m a #

sconcat :: NonEmpty (StreamK m a) -> StreamK m a #

stimes :: Integral b => b -> StreamK m a -> StreamK m a #

IsList (StreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Associated Types

type Item (StreamK Identity a) #

Read a => Read (StreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Show a => Show (StreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

type Item (StreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

type Item (StreamK Identity a) = a

CrossStreamK type wrapper

data CrossStreamK m a Source #

A newtype wrapper for the StreamK type adding a cross product style monad instance.

A Monad bind behaves like a for loop:

>>> :{
Stream.fold Fold.toList $ StreamK.toStream $ StreamK.unCross $ do
    x <- StreamK.mkCross $ StreamK.fromStream $ Stream.fromList [1,2]
    -- Perform the following actions for each x in the stream
    return x
:}
[1,2]

Nested monad binds behave like nested for loops:

>>> :{
Stream.fold Fold.toList $ StreamK.toStream $ StreamK.unCross $ do
    x <- StreamK.mkCross $ StreamK.fromStream $ Stream.fromList [1,2]
    y <- StreamK.mkCross $ StreamK.fromStream $ Stream.fromList [3,4]
    -- Perform the following actions for each x, for each y
    return (x, y)
:}
[(1,3),(1,4),(2,3),(2,4)]

Instances

Instances details
MonadTrans CrossStreamK Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

lift :: Monad m => m a -> CrossStreamK m a #

MonadIO m => MonadIO (CrossStreamK m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

liftIO :: IO a -> CrossStreamK m a #

(Foldable m, Monad m) => Foldable (CrossStreamK m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

fold :: Monoid m0 => CrossStreamK m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> CrossStreamK m a -> m0 #

foldMap' :: Monoid m0 => (a -> m0) -> CrossStreamK m a -> m0 #

foldr :: (a -> b -> b) -> b -> CrossStreamK m a -> b #

foldr' :: (a -> b -> b) -> b -> CrossStreamK m a -> b #

foldl :: (b -> a -> b) -> b -> CrossStreamK m a -> b #

foldl' :: (b -> a -> b) -> b -> CrossStreamK m a -> b #

foldr1 :: (a -> a -> a) -> CrossStreamK m a -> a #

foldl1 :: (a -> a -> a) -> CrossStreamK m a -> a #

toList :: CrossStreamK m a -> [a] #

null :: CrossStreamK m a -> Bool #

length :: CrossStreamK m a -> Int #

elem :: Eq a => a -> CrossStreamK m a -> Bool #

maximum :: Ord a => CrossStreamK m a -> a #

minimum :: Ord a => CrossStreamK m a -> a #

sum :: Num a => CrossStreamK m a -> a #

product :: Num a => CrossStreamK m a -> a #

Traversable (CrossStreamK Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Monad m => Applicative (CrossStreamK m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

pure :: a -> CrossStreamK m a #

(<*>) :: CrossStreamK m (a -> b) -> CrossStreamK m a -> CrossStreamK m b #

liftA2 :: (a -> b -> c) -> CrossStreamK m a -> CrossStreamK m b -> CrossStreamK m c #

(*>) :: CrossStreamK m a -> CrossStreamK m b -> CrossStreamK m b #

(<*) :: CrossStreamK m a -> CrossStreamK m b -> CrossStreamK m a #

Monad m => Functor (CrossStreamK m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

fmap :: (a -> b) -> CrossStreamK m a -> CrossStreamK m b #

(<$) :: a -> CrossStreamK m b -> CrossStreamK m a #

Monad m => Monad (CrossStreamK m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

(>>=) :: CrossStreamK m a -> (a -> CrossStreamK m b) -> CrossStreamK m b #

(>>) :: CrossStreamK m a -> CrossStreamK m b -> CrossStreamK m b #

return :: a -> CrossStreamK m a #

MonadThrow m => MonadThrow (CrossStreamK m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

throwM :: (HasCallStack, Exception e) => e -> CrossStreamK m a #

a ~ Char => IsString (CrossStreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Monoid (CrossStreamK m a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Semigroup (CrossStreamK m a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

(<>) :: CrossStreamK m a -> CrossStreamK m a -> CrossStreamK m a #

sconcat :: NonEmpty (CrossStreamK m a) -> CrossStreamK m a #

stimes :: Integral b => b -> CrossStreamK m a -> CrossStreamK m a #

IsList (CrossStreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Associated Types

type Item (CrossStreamK Identity a) #

Read a => Read (CrossStreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Show a => Show (CrossStreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

type Item (CrossStreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

unCross :: CrossStreamK m a -> StreamK m a Source #

Unwrap the StreamK type from CrossStreamK newtype.

This is a type level operation with no runtime overhead.

mkCross :: StreamK m a -> CrossStreamK m a Source #

Wrap the StreamK type in a CrossStreamK newtype to enable cross product style applicative and monad instances.

This is a type level operation with no runtime overhead.

foldr/build Fusion

mkStream :: (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a Source #

foldStream :: State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r Source #

Fold a stream by providing a State, stop continuation, a singleton continuation and a yield continuation. The stream will not use the SVar passed via State.

foldStreamShared :: State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r Source #

Fold a stream by providing an SVar, a stop continuation, a singleton continuation and a yield continuation. The stream would share the current SVar passed via the State.

foldrM :: (a -> m b -> m b) -> m b -> StreamK m a -> m b Source #

Lazy right fold with a monadic step function.

foldrS :: (a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b Source #

Right fold to a streaming monad.

foldrS StreamK.cons StreamK.nil === id

foldrS can be used to perform stateless stream to stream transformations like map and filter in general. It can be coupled with a scan to perform stateful transformations. However, note that the custom map and filter routines can be much more efficient than this due to better stream fusion.

>>> input = StreamK.fromStream $ Stream.fromList [1..5]
>>> Stream.fold Fold.toList $ StreamK.toStream $ StreamK.foldrS StreamK.cons StreamK.nil input
[1,2,3,4,5]

Find if any element in the stream is True:

>>> step x xs = if odd x then StreamK.fromPure True else xs
>>> input = StreamK.fromStream (Stream.fromList (2:4:5:undefined)) :: StreamK IO Int
>>> Stream.fold Fold.toList $ StreamK.toStream $ StreamK.foldrS step (StreamK.fromPure False) input
[True]

Map (+2) on odd elements and filter out the even elements:

>>> step x xs = if odd x then (x + 2) `StreamK.cons` xs else xs
>>> input = StreamK.fromStream (Stream.fromList [1..5]) :: StreamK IO Int
>>> Stream.fold Fold.toList $ StreamK.toStream $ StreamK.foldrS step StreamK.nil input
[3,5,7]

Pre-release

foldrSShared :: (a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b Source #

Fold sharing the SVar state within the reconstructed stream

foldrSM :: Monad m => (m a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b Source #

build :: forall m a. (forall b. (a -> b -> b) -> b -> b) -> StreamK m a Source #

buildS :: ((a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a Source #

buildM :: Monad m => (forall r. (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a Source #

buildSM :: Monad m => ((m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a Source #

augmentS :: ((a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a Source #

augmentSM :: Monad m => ((m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a Source #

unShare :: StreamK m a -> StreamK m a Source #

Detach a stream from an SVar

Construction

Primitives

fromStopK :: StopK m -> StreamK m a Source #

Make an empty stream from a stop function.

fromYieldK :: YieldK m a -> StreamK m a Source #

Make a singleton stream from a callback function. The callback function calls the one-shot yield continuation to yield an element.

consK :: YieldK m a -> StreamK m a -> StreamK m a Source #

Add a yield function at the head of the stream.

cons :: a -> StreamK m a -> StreamK m a infixr 5 Source #

A right associative prepend operation to add a pure value at the head of an existing stream::

>>> s = 1 `StreamK.cons` 2 `StreamK.cons` 3 `StreamK.cons` StreamK.nil
>>> Stream.fold Fold.toList (StreamK.toStream s)
[1,2,3]

It can be used efficiently with foldr:

>>> fromFoldable = Prelude.foldr StreamK.cons StreamK.nil

Same as the following but more efficient:

>>> cons x xs = return x `StreamK.consM` xs

(.:) :: a -> StreamK m a -> StreamK m a infixr 5 Source #

Operator equivalent of cons.

> toList $ 1 .: 2 .: 3 .: nil
[1,2,3]

consM :: Monad m => m a -> StreamK m a -> StreamK m a infixr 5 Source #

A right associative prepend operation to add an effectful value at the head of an existing stream::

>>> s = putStrLn "hello" `StreamK.consM` putStrLn "world" `StreamK.consM` StreamK.nil
>>> Stream.fold Fold.drain (StreamK.toStream s)
hello
world

It can be used efficiently with foldr:

>>> fromFoldableM = Prelude.foldr StreamK.consM StreamK.nil

Same as the following but more efficient:

>>> consM x xs = StreamK.fromEffect x `StreamK.append` xs

consMBy :: Monad m => (StreamK m a -> StreamK m a -> StreamK m a) -> m a -> StreamK m a -> StreamK m a Source #

nil :: StreamK m a Source #

A stream that terminates without producing any output or side effect.

>>> Stream.fold Fold.toList (StreamK.toStream StreamK.nil)
[]

nilM :: Applicative m => m b -> StreamK m a Source #

A stream that terminates without producing any output, but produces a side effect.

>>> Stream.fold Fold.toList (StreamK.toStream (StreamK.nilM (print "nil")))
"nil"
[]

Pre-release

Unfolding

unfoldr :: (b -> Maybe (a, b)) -> b -> StreamK m a Source #

>>> :{
unfoldr step s =
    case step s of
        Nothing -> StreamK.nil
        Just (a, b) -> a `StreamK.cons` unfoldr step b
:}

Build a stream by unfolding a pure step function step starting from a seed s. The step function returns the next element in the stream and the next seed value. When it is done it returns Nothing and the stream ends. For example,

>>> :{
let f b =
        if b > 2
        then Nothing
        else Just (b, b + 1)
in StreamK.toList $ StreamK.unfoldr f 0
:}
[0,1,2]

unfoldrMWith :: Monad m => (m a -> StreamK m a -> StreamK m a) -> (b -> m (Maybe (a, b))) -> b -> StreamK m a Source #

unfoldrM :: Monad m => (b -> m (Maybe (a, b))) -> b -> StreamK m a Source #

Build a stream by unfolding a monadic step function starting from a seed. The step function returns the next element in the stream and the next seed value. When it is done it returns Nothing and the stream ends. For example,

>>> :{
let f b =
        if b > 2
        then return Nothing
        else return (Just (b, b + 1))
in StreamK.toList $ StreamK.unfoldrM f 0
:}
[0,1,2]

From Values

fromEffect :: Monad m => m a -> StreamK m a Source #

fromPure :: a -> StreamK m a Source #

repeat :: a -> StreamK m a Source #

Generate an infinite stream by repeating a pure value.

Pre-release

repeatMWith :: (m a -> t m a -> t m a) -> m a -> t m a Source #

Like repeatM but takes a stream cons operation to combine the actions in a stream specific manner. A serial cons would repeat the values serially while an async cons would repeat concurrently.

Pre-release

replicateMWith :: (m a -> StreamK m a -> StreamK m a) -> Int -> m a -> StreamK m a Source #

From Indices

fromIndicesMWith :: (m a -> StreamK m a -> StreamK m a) -> (Int -> m a) -> StreamK m a Source #

Iteration

iterateMWith :: Monad m => (m a -> StreamK m a -> StreamK m a) -> (a -> m a) -> m a -> StreamK m a Source #

From Containers

fromFoldable :: Foldable f => f a -> StreamK m a Source #

>>> fromFoldable = Prelude.foldr StreamK.cons StreamK.nil

Construct a stream from a Foldable containing pure values:

fromFoldableM :: (Foldable f, Monad m) => f (m a) -> StreamK m a Source #

Cyclic

mfix :: Monad m => (m a -> StreamK m a) -> StreamK m a Source #

We can define cyclic structures using let:

>>> let (a, b) = ([1, b], head a) in (a, b)
([1,1],1)

The function fix defined as:

>>> fix f = let x = f x in x

ensures that the argument of a function and its output refer to the same lazy value x i.e. the same location in memory. Thus x can be defined in terms of itself, creating structures with cyclic references.

>>> f ~(a, b) = ([1, b], head a)
>>> fix f
([1,1],1)

mfix is essentially the same as fix but for monadic values.

Using mfix for streams we can construct a stream in which each element of the stream is defined in a cyclic fashion. The argument of the function being fixed represents the current element of the stream which is being returned by the stream monad. Thus, we can use the argument to construct itself.

In the following example, the argument action of the function f represents the tuple (x,y) returned by it in a given iteration. We define the first element of the tuple in terms of the second.

>>> import System.IO.Unsafe (unsafeInterleaveIO)
>>> :{
main = Stream.fold (Fold.drainMapM print) $ StreamK.toStream $ StreamK.mfix f
    where
    f action = StreamK.unCross $ do
        let incr n act = fmap ((+n) . snd) $ unsafeInterleaveIO act
        x <- StreamK.mkCross $ StreamK.fromStream $ Stream.sequence $ Stream.fromList [incr 1 action, incr 2 action]
        y <- StreamK.mkCross $ StreamK.fromStream $ Stream.fromList [4,5]
        return (x, y)
:}

Note: you cannot achieve this by just changing the order of the monad statements because that would change the order in which the stream elements are generated.

Note that the function f must be lazy in its argument, that's why we use unsafeInterleaveIO on action because IO monad is strict.

Pre-release

Elimination

Primitives

uncons :: Applicative m => StreamK m a -> m (Maybe (a, StreamK m a)) Source #

Strict Left Folds

foldl' :: Monad m => (b -> a -> b) -> b -> StreamK m a -> m b Source #

Strict left associative fold.

foldlx' :: forall m a b x. Monad m => (x -> a -> x) -> x -> (x -> b) -> StreamK m a -> m b Source #

Strict left fold with an extraction function. Like the standard strict left fold, but applies a user supplied extraction function (the third argument) to the folded value at the end. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction.

Note that the accumulator is always evaluated including the initial value.

foldlMx' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> StreamK m a -> m b Source #

Like foldx, but with a monadic step function.

foldlM' :: Monad m => (b -> a -> m b) -> m b -> StreamK m a -> m b Source #

Like foldl' but with a monadic step function.

Lazy Right Folds

foldr :: Monad m => (a -> b -> b) -> b -> StreamK m a -> m b Source #

Lazy right associative fold.

Specific Folds

drain :: Monad m => StreamK m a -> m () Source #

null :: Monad m => StreamK m a -> m Bool Source #

tail :: Applicative m => StreamK m a -> m (Maybe (StreamK m a)) Source #

init :: Applicative m => StreamK m a -> m (Maybe (StreamK m a)) Source #

Extract all but the last element of the stream, if any.

Note: This will end up buffering the entire stream.

Pre-release

Mapping

map :: (a -> b) -> StreamK m a -> StreamK m b Source #

mapMWith :: (m b -> StreamK m b -> StreamK m b) -> (a -> m b) -> StreamK m a -> StreamK m b Source #

mapMSerial :: Monad m => (a -> m b) -> StreamK m a -> StreamK m b Source #

Combining Two Streams

Appending

conjoin :: Monad m => StreamK m a -> StreamK m a -> StreamK m a Source #

append :: StreamK m a -> StreamK m a -> StreamK m a infixr 6 Source #

Interleave

interleave :: StreamK m a -> StreamK m a -> StreamK m a infixr 6 Source #

Note: When joining many streams in a left associative manner earlier streams will get exponential priority than the ones joining later. Because of exponentially high weighting of left streams it can be used with concatMapWith even on a large number of streams.

interleaveFst :: StreamK m a -> StreamK m a -> StreamK m a infixr 6 Source #

Like interleave but stops interleaving as soon as the first stream stops.

interleaveMin :: StreamK m a -> StreamK m a -> StreamK m a infixr 6 Source #

Like interleave but stops interleaving as soon as any of the two streams stops.

Cross Product

crossApplyWith :: (StreamK m b -> StreamK m b -> StreamK m b) -> StreamK m (a -> b) -> StreamK m a -> StreamK m b Source #

crossApply :: StreamK m (a -> b) -> StreamK m a -> StreamK m b Source #

Apply a stream of functions to a stream of values and flatten the results.

Note that the second stream is evaluated multiple times.

Definition:

>>> crossApply = StreamK.crossApplyWith StreamK.append
>>> crossApply = Stream.crossWith id

crossWith :: Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c Source #

Definition:

>>> crossWith f m1 m2 = fmap f m1 `StreamK.crossApply` m2

Note that the second stream is evaluated multiple times.

cross :: Monad m => StreamK m a -> StreamK m b -> StreamK m (a, b) Source #

Given a StreamK m a and StreamK m b generate a stream with all possible combinations of the tuple (a, b).

Definition:

>>> cross = StreamK.crossWith (,)

The second stream is evaluated multiple times. If that is not desired it can be cached in an Array and then generated from the array before calling this function. Caching may also improve performance if the stream is expensive to evaluate.

See cross for a much faster fused alternative.

Time: O(m x n)

Pre-release

Concat

before :: Monad m => m b -> StreamK m a -> StreamK m a Source #

Run an action before evaluating the stream.

concatEffect :: Monad m => m (StreamK m a) -> StreamK m a Source #

concatMapEffect :: Monad m => (b -> StreamK m a) -> m b -> StreamK m a Source #

concatMapWith :: (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #

Perform a concatMap using a specified concat strategy. The first argument specifies a merge or concat function that is used to merge the streams generated by the map function.

concatMap :: (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #

bindWith :: (StreamK m b -> StreamK m b -> StreamK m b) -> StreamK m a -> (a -> StreamK m b) -> StreamK m b Source #

concatIterateWith :: (StreamK m a -> StreamK m a -> StreamK m a) -> (a -> StreamK m a) -> StreamK m a -> StreamK m a Source #

Yield an input element in the output stream, map a stream generator on it and repeat the process on the resulting stream. Resulting streams are flattened using the concatMapWith combinator. This can be used for a depth first style (DFS) traversal of a tree like structure.

Example, list a directory tree using DFS:

>>> f = StreamK.fromStream . either Dir.readEitherPaths (const Stream.nil)
>>> input = StreamK.fromPure (Left ".")
>>> ls = StreamK.concatIterateWith StreamK.append f input

Note that iterateM is a special case of concatIterateWith:

>>> iterateM f = StreamK.concatIterateWith StreamK.append (StreamK.fromEffect . f) . StreamK.fromEffect

Pre-release

concatIterateLeftsWith :: b ~ Either a c => (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m b -> StreamK m b Source #

In an Either stream iterate on Lefts. This is a special case of concatIterateWith:

>>> concatIterateLeftsWith combine f = StreamK.concatIterateWith combine (either f (const StreamK.nil))

To traverse a directory tree:

>>> input = StreamK.fromPure (Left ".")
>>> ls = StreamK.concatIterateLeftsWith StreamK.append (StreamK.fromStream . Dir.readEither) input

Pre-release

concatIterateScanWith :: Monad m => (StreamK m a -> StreamK m a -> StreamK m a) -> (b -> a -> m (b, StreamK m a)) -> m b -> StreamK m a -> StreamK m a Source #

Like iterateMap but carries a state in the stream generation function. This can be used to traverse graph like structures, we can remember the visited nodes in the state to avoid cycles.

Note that a combination of iterateMap and usingState can also be used to traverse graphs. However, this function provides a more localized state instead of using a global state.

See also: mfix

Pre-release

Merge

mergeMapWith :: (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #

Combine streams in pairs using a binary combinator, the resulting streams are then combined again in pairs recursively until we get to a single combined stream. The composition would thus form a binary tree.

For example, you can sort a stream using merge sort like this:

>>> s = StreamK.fromStream $ Stream.fromList [5,1,7,9,2]
>>> generate = StreamK.fromPure
>>> combine = StreamK.mergeBy compare
>>> Stream.fold Fold.toList $ StreamK.toStream $ StreamK.mergeMapWith combine generate s
[1,2,5,7,9]

Note that if the stream length is not a power of 2, the binary tree composed by mergeMapWith would not be balanced, which may or may not be important depending on what you are trying to achieve.

Caution: the stream of streams must be finite

Pre-release

mergeIterateWith :: (StreamK m a -> StreamK m a -> StreamK m a) -> (a -> StreamK m a) -> StreamK m a -> StreamK m a Source #

Like concatIterateWith but uses the pairwise flattening combinator mergeMapWith for flattening the resulting streams. This can be used for a balanced traversal of a tree like structure.

Example, list a directory tree using balanced traversal:

>>> f = StreamK.fromStream . either Dir.readEitherPaths (const Stream.nil)
>>> input = StreamK.fromPure (Left ".")
>>> ls = StreamK.mergeIterateWith StreamK.interleave f input

Pre-release

Buffered Operations

foldlS :: (StreamK m b -> a -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b Source #

Lazy left fold to a stream.

foldlT :: (Monad m, Monad (s m), MonadTrans s) => (s m b -> a -> s m b) -> s m b -> StreamK m a -> s m b Source #

Lazy left fold to an arbitrary transformer monad.

foldrT :: (Monad m, Monad (s m), MonadTrans s) => (a -> s m b -> s m b) -> s m b -> StreamK m a -> s m b Source #

Right associative fold to an arbitrary transformer monad.

liftInner :: (Monad m, MonadTrans t, Monad (t m)) => StreamK m a -> StreamK (t m) a Source #

evalStateT :: Monad m => m s -> StreamK (StateT s m) a -> StreamK m a Source #

newtype StreamK m a Source #

Constructors

MkStream (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) 

Instances

Instances details
(Foldable m, Monad m) => Foldable (StreamK m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

fold :: Monoid m0 => StreamK m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> StreamK m a -> m0 #

foldMap' :: Monoid m0 => (a -> m0) -> StreamK m a -> m0 #

foldr :: (a -> b -> b) -> b -> StreamK m a -> b #

foldr' :: (a -> b -> b) -> b -> StreamK m a -> b #

foldl :: (b -> a -> b) -> b -> StreamK m a -> b #

foldl' :: (b -> a -> b) -> b -> StreamK m a -> b #

foldr1 :: (a -> a -> a) -> StreamK m a -> a #

foldl1 :: (a -> a -> a) -> StreamK m a -> a #

toList :: StreamK m a -> [a] #

null :: StreamK m a -> Bool #

length :: StreamK m a -> Int #

elem :: Eq a => a -> StreamK m a -> Bool #

maximum :: Ord a => StreamK m a -> a #

minimum :: Ord a => StreamK m a -> a #

sum :: Num a => StreamK m a -> a #

product :: Num a => StreamK m a -> a #

Traversable (StreamK Identity) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

traverse :: Applicative f => (a -> f b) -> StreamK Identity a -> f (StreamK Identity b) #

sequenceA :: Applicative f => StreamK Identity (f a) -> f (StreamK Identity a) #

mapM :: Monad m => (a -> m b) -> StreamK Identity a -> m (StreamK Identity b) #

sequence :: Monad m => StreamK Identity (m a) -> m (StreamK Identity a) #

Monad m => Functor (StreamK m) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

fmap :: (a -> b) -> StreamK m a -> StreamK m b #

(<$) :: a -> StreamK m b -> StreamK m a #

a ~ Char => IsString (StreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Monoid (StreamK m a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

mempty :: StreamK m a #

mappend :: StreamK m a -> StreamK m a -> StreamK m a #

mconcat :: [StreamK m a] -> StreamK m a #

Semigroup (StreamK m a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Methods

(<>) :: StreamK m a -> StreamK m a -> StreamK m a #

sconcat :: NonEmpty (StreamK m a) -> StreamK m a #

stimes :: Integral b => b -> StreamK m a -> StreamK m a #

IsList (StreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Associated Types

type Item (StreamK Identity a) #

Read a => Read (StreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

Show a => Show (StreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

type Item (StreamK Identity a) Source # 
Instance details

Defined in Streamly.Internal.Data.StreamK.Type

type Item (StreamK Identity a) = a

fromList :: [a] -> StreamK m a Source #

fromStream :: Monad m => Stream m a -> StreamK m a Source #

Convert a fused Stream to StreamK.

For example:

>>> s1 = StreamK.fromStream $ Stream.fromList [1,2]
>>> s2 = StreamK.fromStream $ Stream.fromList [3,4]
>>> Stream.fold Fold.toList $ StreamK.toStream $ s1 `StreamK.append` s2
[1,2,3,4]

toStream :: Applicative m => StreamK m a -> Stream m a Source #

Convert a StreamK to a fused Stream.

Specialized Generation

repeatM :: Monad m => m a -> StreamK m a Source #

>>> repeatM = StreamK.sequence . StreamK.repeat
>>> repeatM = fix . StreamK.consM
>>> repeatM = cycle1 . StreamK.fromEffect

Generate a stream by repeatedly executing a monadic action forever.

>>> :{
repeatAction =
       StreamK.repeatM (threadDelay 1000000 >> print 1)
     & StreamK.take 10
     & StreamK.fold Fold.drain
:}

replicate :: Int -> a -> StreamK m a Source #

replicateM :: Monad m => Int -> m a -> StreamK m a Source #

fromIndices :: (Int -> a) -> StreamK m a Source #

fromIndicesM :: Monad m => (Int -> m a) -> StreamK m a Source #

iterate :: (a -> a) -> a -> StreamK m a Source #

>>> iterate f x = x `StreamK.cons` iterate f x

Generate an infinite stream with x as the first element and each successive element derived by applying the function f on the previous element.

>>> StreamK.toList $ StreamK.take 5 $ StreamK.iterate (+1) 1
[1,2,3,4,5]

iterateM :: Monad m => (a -> m a) -> m a -> StreamK m a Source #

>>> iterateM f m = m >>= \a -> return a `StreamK.consM` iterateM f (f a)

Generate an infinite stream with the first element generated by the action m and each successive element derived by applying the monadic function f on the previous element.

>>> :{
StreamK.iterateM (\x -> print x >> return (x + 1)) (return 0)
    & StreamK.take 3
    & StreamK.toList
:}
0
1
[0,1,2]

Elimination

General Folds

foldr1 :: Monad m => (a -> a -> a) -> StreamK m a -> m (Maybe a) Source #

fold :: Monad m => Fold m a b -> StreamK m a -> m b Source #

Fold a stream using the supplied left Fold and reducing the resulting expression strictly at each step. The behavior is similar to foldl'. A Fold can terminate early without consuming the full stream. See the documentation of individual Folds for termination behavior.

Definitions:

>>> fold f = fmap fst . StreamK.foldBreak f
>>> fold f = StreamK.parseD (Parser.fromFold f)

Example:

>>> StreamK.fold Fold.sum $ StreamK.fromStream $ Stream.enumerateFromTo 1 100
5050

foldBreak :: Monad m => Fold m a b -> StreamK m a -> m (b, StreamK m a) Source #

Like fold but also returns the remaining stream. The resulting stream would be nil if the stream finished before the fold.

foldEither :: Monad m => Fold m a b -> StreamK m a -> m (Either (Fold m a b) (b, StreamK m a)) Source #

Fold resulting in either breaking the stream or continuation of the fold. Instead of supplying the input stream in one go we can run the fold multiple times, each time supplying the next segment of the input stream. If the fold has not yet finished it returns a fold that can be run again otherwise it returns the fold result and the residual stream.

Internal

foldConcat :: Monad m => Producer m a b -> Fold m b c -> StreamK m a -> m (c, StreamK m a) Source #

Generate streams from individual elements of a stream and fold the concatenation of those streams using the supplied fold. Return the result of the fold and residual stream.

For example, this can be used to efficiently fold an Array Word8 stream using Word8 folds.

Internal

parseDBreak :: Monad m => Parser a m b -> StreamK m a -> m (Either ParseError b, StreamK m a) Source #

Run a Parser over a stream and return rest of the Stream.

parseD :: Monad m => Parser a m b -> StreamK m a -> m (Either ParseError b) Source #

parseBreakChunks :: (Monad m, Unbox a) => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b, StreamK m (Array a)) Source #

Run a ParserK over a chunked StreamK and return the parse result and the remaining Stream.

parseChunks :: (Monad m, Unbox a) => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b) Source #

parseBreak :: forall m a b. Monad m => ParserK a m b -> StreamK m a -> m (Either ParseError b, StreamK m a) Source #

Similar to parseBreak but works on singular elements.

parse :: Monad m => ParserK a m b -> StreamK m a -> m (Either ParseError b) Source #

Run a ParserK over a StreamK. Please use parseChunks where possible, for better performance.

parseBreakChunksGeneric :: forall m a b. Monad m => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b, StreamK m (Array a)) Source #

Similar to parseBreak but works on generic arrays

Specialized Folds

head :: Monad m => StreamK m a -> m (Maybe a) Source #

elem :: (Monad m, Eq a) => a -> StreamK m a -> m Bool Source #

notElem :: (Monad m, Eq a) => a -> StreamK m a -> m Bool Source #

all :: Monad m => (a -> Bool) -> StreamK m a -> m Bool Source #

any :: Monad m => (a -> Bool) -> StreamK m a -> m Bool Source #

last :: Monad m => StreamK m a -> m (Maybe a) Source #

Extract the last element of the stream, if any.

minimum :: (Monad m, Ord a) => StreamK m a -> m (Maybe a) Source #

minimumBy :: Monad m => (a -> a -> Ordering) -> StreamK m a -> m (Maybe a) Source #

maximum :: (Monad m, Ord a) => StreamK m a -> m (Maybe a) Source #

maximumBy :: Monad m => (a -> a -> Ordering) -> StreamK m a -> m (Maybe a) Source #

findIndices :: (a -> Bool) -> StreamK m a -> StreamK m Int Source #

lookup :: (Monad m, Eq a) => a -> StreamK m (a, b) -> m (Maybe b) Source #

findM :: Monad m => (a -> m Bool) -> StreamK m a -> m (Maybe a) Source #

find :: Monad m => (a -> Bool) -> StreamK m a -> m (Maybe a) Source #

(!!) :: Monad m => StreamK m a -> Int -> m (Maybe a) Source #

Map and Fold

mapM_ :: Monad m => (a -> m b) -> StreamK m a -> m () Source #

Apply a monadic action to each element of the stream and discard the output of the action.

Conversions

toList :: Monad m => StreamK m a -> m [a] Source #

hoist :: (Monad m, Monad n) => (forall x. m x -> n x) -> StreamK m a -> StreamK n a Source #

Transformation

By folding (scans)

scanl' :: (b -> a -> b) -> b -> StreamK m a -> StreamK m b Source #

scanlx' :: (x -> a -> x) -> x -> (x -> b) -> StreamK m a -> StreamK m b Source #

Filtering

filter :: (a -> Bool) -> StreamK m a -> StreamK m a Source #

take :: Int -> StreamK m a -> StreamK m a Source #

takeWhile :: (a -> Bool) -> StreamK m a -> StreamK m a Source #

drop :: Int -> StreamK m a -> StreamK m a Source #

dropWhile :: (a -> Bool) -> StreamK m a -> StreamK m a Source #

Mapping

mapM :: Monad m => (a -> m b) -> StreamK m a -> StreamK m b Source #

sequence :: Monad m => StreamK m (m a) -> StreamK m a Source #

Inserting

intersperseM :: Monad m => m a -> StreamK m a -> StreamK m a Source #

intersperse :: Monad m => a -> StreamK m a -> StreamK m a Source #

insertBy :: (a -> a -> Ordering) -> a -> StreamK m a -> StreamK m a Source #

Deleting

deleteBy :: (a -> a -> Bool) -> a -> StreamK m a -> StreamK m a Source #

Reordering

sortBy :: Monad m => (a -> a -> Ordering) -> StreamK m a -> StreamK m a Source #

Sort the input stream using a supplied comparison function.

Sorting can be achieved by simply:

>>> sortBy cmp = StreamK.mergeMapWith (StreamK.mergeBy cmp) StreamK.fromPure

However, this combinator uses a parser to first split the input stream into down and up sorted segments and then merges them to optimize sorting when pre-sorted sequences exist in the input stream.

O(n) space

Map and Filter

mapMaybe :: (a -> Maybe b) -> StreamK m a -> StreamK m b Source #

Zipping

zipWith :: Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c Source #

Zipping of n streams can be performed by combining the streams pair wise using mergeMapWith with O(n * log n) time complexity. If used with concatMapWith it will have O(n^2) performance.

zipWithM :: Monad m => (a -> b -> m c) -> StreamK m a -> StreamK m b -> StreamK m c Source #

Merging

mergeBy :: (a -> a -> Ordering) -> StreamK m a -> StreamK m a -> StreamK m a Source #

Merging of n streams can be performed by combining the streams pair wise using mergeMapWith to give O(n * log n) time complexity. If used with concatMapWith it will have O(n^2) performance.

mergeByM :: Monad m => (a -> a -> m Ordering) -> StreamK m a -> StreamK m a -> StreamK m a Source #

Transformation comprehensions

the :: (Eq a, Monad m) => StreamK m a -> m (Maybe a) Source #

Exceptions

handle :: (MonadCatch m, Exception e) => (e -> m (StreamK m a)) -> StreamK m a -> StreamK m a Source #

Like Streamly.Data.Stream.handle but with one significant difference, this function observes exceptions from the consumer of the stream as well.

You can also convert StreamK to Stream and use exception handling from Stream module:

>>> handle f s = StreamK.fromStream $ Stream.handle (\e -> StreamK.toStream (f e)) (StreamK.toStream s)

Resource Management

bracketIO :: (MonadIO m, MonadCatch m) => IO b -> (b -> IO c) -> (b -> StreamK m a) -> StreamK m a Source #

Like Streamly.Data.Stream.bracketIO but with one significant difference, this function observes exceptions from the consumer of the stream as well. Therefore, it cleans up the resource promptly when the consumer encounters an exception.

You can also convert StreamK to Stream and use resource handling from Stream module:

>>> bracketIO bef aft bet = StreamK.fromStream $ Stream.bracketIO bef aft (StreamK.toStream . bet)