Copyright | (c) 2017 Composewell Technologies |
---|---|
License | BSD3 |
Maintainer | streamly@composewell.com |
Stability | released |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
Streams using Continuation Passing Style (CPS). See the Stream vs StreamK
section in the Streamly.Data.Stream module to know when to use this
module.
Please refer to Streamly.Internal.Data.Stream.StreamK for more functions that have not yet been released.
Synopsis
- data StreamK m a
- nil :: StreamK m a
- nilM :: Applicative m => m b -> StreamK m a
- cons :: a -> StreamK m a -> StreamK m a
- consM :: Monad m => m a -> StreamK m a -> StreamK m a
- fromPure :: a -> StreamK m a
- fromEffect :: Monad m => m a -> StreamK m a
- fromStream :: Monad m => Stream m a -> StreamK m a
- toStream :: Applicative m => StreamK m a -> Stream m a
- fromFoldable :: Foldable f => f a -> StreamK m a
- uncons :: Applicative m => StreamK m a -> m (Maybe (a, StreamK m a))
- drain :: Monad m => StreamK m a -> m ()
- parseBreakChunks :: (Monad m, Unbox a) => ParserK a m b -> StreamK m (Array a) -> m (Either ParseError b, StreamK m (Array a))
- parseChunks :: (Monad m, Unbox a) => ParserK a m b -> StreamK m (Array a) -> m (Either ParseError b)
- mapM :: Monad m => (a -> m b) -> StreamK m a -> StreamK m b
- dropWhile :: (a -> Bool) -> StreamK m a -> StreamK m a
- take :: Int -> StreamK m a -> StreamK m a
- append :: StreamK m a -> StreamK m a -> StreamK m a
- interleave :: StreamK m a -> StreamK m a -> StreamK m a
- mergeBy :: (a -> a -> Ordering) -> StreamK m a -> StreamK m a -> StreamK m a
- mergeByM :: Monad m => (a -> a -> m Ordering) -> StreamK m a -> StreamK m a -> StreamK m a
- zipWith :: Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c
- zipWithM :: Monad m => (a -> b -> m c) -> StreamK m a -> StreamK m b -> StreamK m c
- crossWith :: Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c
- concatEffect :: Monad m => m (StreamK m a) -> StreamK m a
- concatMapWith :: (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
- mergeMapWith :: (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
- reverse :: StreamK m a -> StreamK m a
- sortBy :: Monad m => (a -> a -> Ordering) -> StreamK m a -> StreamK m a
Setup
To execute the code examples provided in this module in ghci, please run the following commands first.
>>>
:m
>>>
import Data.Function (fix, (&))
>>>
import Data.Semigroup (cycle1)
>>>
effect n = print n >> return n
>>>
import Streamly.Data.StreamK (StreamK)
>>>
import qualified Streamly.Data.Fold as Fold
>>>
import qualified Streamly.Data.Parser as Parser
>>>
import qualified Streamly.Data.Stream as Stream
>>>
import qualified Streamly.Data.StreamK as StreamK
>>>
import qualified Streamly.FileSystem.Dir as Dir
For APIs that have not been released yet.
>>>
import qualified Streamly.Internal.Data.Stream.StreamK as StreamK
>>>
import qualified Streamly.Internal.FileSystem.Dir as Dir
Overview
Continuation passing style (CPS) stream implementation. The K
in StreamK
stands for Kontinuation.
StreamK can be constructed like lists, except that they use nil
instead of
'[]' and cons
instead of :
.
cons
adds a pure value at the head of the stream:
>>>
import Streamly.Data.StreamK (StreamK, cons, consM, nil)
>>>
stream = 1 `cons` 2 `cons` nil :: StreamK IO Int
You can use operations from Streamly.Data.Stream for StreamK as well by
converting StreamK to Stream (toStream
), and vice-versa (fromStream
).
>>>
Stream.fold Fold.toList $ StreamK.toStream stream -- IO [Int]
[1,2]
consM
adds an effect at the head of the stream:
>>>
stream = effect 1 `consM` effect 2 `consM` nil
>>>
Stream.fold Fold.toList $ StreamK.toStream stream
1 2 [1,2]
Exception Handling
There are no native exception handling operations in the StreamK module,
please convert to Stream
type and use exception handling operations from
Streamly.Data.Stream.
Type
Instances
Construction
Primitives
A stream that terminates without producing any output or side effect.
>>>
Stream.fold Fold.toList (StreamK.toStream StreamK.nil)
[]
nilM :: Applicative m => m b -> StreamK m a Source #
A stream that terminates without producing any output, but produces a side effect.
>>>
Stream.fold Fold.toList (StreamK.toStream (StreamK.nilM (print "nil")))
"nil" []
Pre-release
cons :: a -> StreamK m a -> StreamK m a infixr 5 Source #
A right associative prepend operation to add a pure value at the head of an existing stream::
>>>
s = 1 `StreamK.cons` 2 `StreamK.cons` 3 `StreamK.cons` StreamK.nil
>>>
Stream.fold Fold.toList (StreamK.toStream s)
[1,2,3]
It can be used efficiently with foldr
:
>>>
fromFoldable = Prelude.foldr StreamK.cons StreamK.nil
Same as the following but more efficient:
>>>
cons x xs = return x `StreamK.consM` xs
consM :: Monad m => m a -> StreamK m a -> StreamK m a infixr 5 Source #
A right associative prepend operation to add an effectful value at the head of an existing stream::
>>>
s = putStrLn "hello" `StreamK.consM` putStrLn "world" `StreamK.consM` StreamK.nil
>>>
Stream.fold Fold.drain (StreamK.toStream s)
hello world
It can be used efficiently with foldr
:
>>>
fromFoldableM = Prelude.foldr StreamK.consM StreamK.nil
Same as the following but more efficient:
>>>
consM x xs = StreamK.fromEffect x `StreamK.append` xs
From Values
fromPure :: a -> StreamK m a Source #
Create a singleton stream from a pure value.
>>>
fromPure a = a `StreamK.cons` StreamK.nil
>>>
fromPure = pure
>>>
fromPure = StreamK.fromEffect . pure
fromEffect :: Monad m => m a -> StreamK m a Source #
Create a singleton stream from a monadic action.
>>>
fromEffect m = m `StreamK.consM` StreamK.nil
>>>
Stream.fold Fold.drain $ StreamK.toStream $ StreamK.fromEffect (putStrLn "hello")
hello
From Stream
From Containers
fromFoldable :: Foldable f => f a -> StreamK m a Source #
>>>
fromFoldable = Prelude.foldr StreamK.cons StreamK.nil
Construct a stream from a Foldable
containing pure values:
Elimination
Primitives
drain :: Monad m => StreamK m a -> m () Source #
drain = foldl' (\_ _ -> ()) () drain = mapM_ (\_ -> return ())
Parsing
parseBreakChunks :: (Monad m, Unbox a) => ParserK a m b -> StreamK m (Array a) -> m (Either ParseError b, StreamK m (Array a)) Source #
parseChunks :: (Monad m, Unbox a) => ParserK a m b -> StreamK m (Array a) -> m (Either ParseError b) Source #
Transformation
Combining Two Streams
Appending
append :: StreamK m a -> StreamK m a -> StreamK m a infixr 6 Source #
Appends two streams sequentially, yielding all elements from the first stream, and then all elements from the second stream.
>>>
s1 = StreamK.fromStream $ Stream.fromList [1,2]
>>>
s2 = StreamK.fromStream $ Stream.fromList [3,4]
>>>
Stream.fold Fold.toList $ StreamK.toStream $ s1 `StreamK.append` s2
[1,2,3,4]
This has O(n) append performance where n
is the number of streams. It can
be used to efficiently fold an infinite lazy container of streams using
concatMapWith
et. al.
Interleaving
interleave :: StreamK m a -> StreamK m a -> StreamK m a infixr 6 Source #
Interleaves two streams, yielding one element from each stream alternately. When one stream stops the rest of the other stream is used in the output stream.
When joining many streams in a left associative manner earlier streams will
get exponential priority than the ones joining later. Because of exponential
weighting it can be used with concatMapWith
even on a large number of
streams.
Merging
Zipping
zipWith :: Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c Source #
Zip two streams serially using a pure zipping function.
zipWithM :: Monad m => (a -> b -> m c) -> StreamK m a -> StreamK m b -> StreamK m c Source #
Zip two streams serially using a monadic zipping function.
Cross Product
crossWith :: Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c Source #
Definition:
>>>
crossWith f m1 m2 = fmap f m1 `StreamK.crossApply` m2
Note that the second stream is evaluated multiple times.
Stream of streams
concatMapWith :: (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #
Perform a concatMap
using a specified concat strategy. The first
argument specifies a merge or concat function that is used to merge the
streams generated by the map function.
mergeMapWith :: (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #
Combine streams in pairs using a binary combinator, the resulting streams are then combined again in pairs recursively until we get to a single combined stream. The composition would thus form a binary tree.
For example, you can sort a stream using merge sort like this:
>>>
s = StreamK.fromStream $ Stream.fromList [5,1,7,9,2]
>>>
generate = StreamK.fromPure
>>>
combine = StreamK.mergeBy compare
>>>
Stream.fold Fold.toList $ StreamK.toStream $ StreamK.mergeMapWith combine generate s
[1,2,5,7,9]
Note that if the stream length is not a power of 2, the binary tree composed by mergeMapWith would not be balanced, which may or may not be important depending on what you are trying to achieve.
Caution: the stream of streams must be finite
Pre-release
Buffered Operations
sortBy :: Monad m => (a -> a -> Ordering) -> StreamK m a -> StreamK m a Source #
Sort the input stream using a supplied comparison function.
Sorting can be achieved by simply:
>>>
sortBy cmp = StreamK.mergeMapWith (StreamK.mergeBy cmp) StreamK.fromPure
However, this combinator uses a parser to first split the input stream into down and up sorted segments and then merges them to optimize sorting when pre-sorted sequences exist in the input stream.
O(n) space