Copyright | (c) 2017 Composewell Technologies |
---|---|
License | BSD3 |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | Safe-Inferred |
Language | Haskell2010 |
Synopsis
- type Stream = StreamK
- newtype StreamK m a = MkStream (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
- data CrossStreamK m a
- unCross :: CrossStreamK m a -> StreamK m a
- mkCross :: StreamK m a -> CrossStreamK m a
- mkStream :: (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a
- foldStream :: State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r
- foldStreamShared :: State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r
- foldrM :: (a -> m b -> m b) -> m b -> StreamK m a -> m b
- foldrS :: (a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b
- foldrSShared :: (a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b
- foldrSM :: Monad m => (m a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b
- build :: forall m a. (forall b. (a -> b -> b) -> b -> b) -> StreamK m a
- buildS :: ((a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a
- buildM :: Monad m => (forall r. (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a
- buildSM :: Monad m => ((m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a
- augmentS :: ((a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a
- augmentSM :: Monad m => ((m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a
- unShare :: StreamK m a -> StreamK m a
- fromStopK :: StopK m -> StreamK m a
- fromYieldK :: YieldK m a -> StreamK m a
- consK :: YieldK m a -> StreamK m a -> StreamK m a
- cons :: a -> StreamK m a -> StreamK m a
- (.:) :: a -> StreamK m a -> StreamK m a
- consM :: Monad m => m a -> StreamK m a -> StreamK m a
- consMBy :: Monad m => (StreamK m a -> StreamK m a -> StreamK m a) -> m a -> StreamK m a -> StreamK m a
- nil :: StreamK m a
- nilM :: Applicative m => m b -> StreamK m a
- unfoldr :: (b -> Maybe (a, b)) -> b -> StreamK m a
- unfoldrMWith :: Monad m => (m a -> StreamK m a -> StreamK m a) -> (b -> m (Maybe (a, b))) -> b -> StreamK m a
- unfoldrM :: Monad m => (b -> m (Maybe (a, b))) -> b -> StreamK m a
- fromEffect :: Monad m => m a -> StreamK m a
- fromPure :: a -> StreamK m a
- repeat :: a -> StreamK m a
- repeatMWith :: (m a -> t m a -> t m a) -> m a -> t m a
- replicateMWith :: (m a -> StreamK m a -> StreamK m a) -> Int -> m a -> StreamK m a
- fromIndicesMWith :: (m a -> StreamK m a -> StreamK m a) -> (Int -> m a) -> StreamK m a
- iterateMWith :: Monad m => (m a -> StreamK m a -> StreamK m a) -> (a -> m a) -> m a -> StreamK m a
- fromFoldable :: Foldable f => f a -> StreamK m a
- fromFoldableM :: (Foldable f, Monad m) => f (m a) -> StreamK m a
- mfix :: Monad m => (m a -> StreamK m a) -> StreamK m a
- uncons :: Applicative m => StreamK m a -> m (Maybe (a, StreamK m a))
- foldl' :: Monad m => (b -> a -> b) -> b -> StreamK m a -> m b
- foldlx' :: forall m a b x. Monad m => (x -> a -> x) -> x -> (x -> b) -> StreamK m a -> m b
- foldr :: Monad m => (a -> b -> b) -> b -> StreamK m a -> m b
- drain :: Monad m => StreamK m a -> m ()
- null :: Monad m => StreamK m a -> m Bool
- tail :: Applicative m => StreamK m a -> m (Maybe (StreamK m a))
- init :: Applicative m => StreamK m a -> m (Maybe (StreamK m a))
- map :: (a -> b) -> StreamK m a -> StreamK m b
- mapMWith :: (m b -> StreamK m b -> StreamK m b) -> (a -> m b) -> StreamK m a -> StreamK m b
- mapMSerial :: Monad m => (a -> m b) -> StreamK m a -> StreamK m b
- conjoin :: Monad m => StreamK m a -> StreamK m a -> StreamK m a
- append :: StreamK m a -> StreamK m a -> StreamK m a
- interleave :: StreamK m a -> StreamK m a -> StreamK m a
- interleaveFst :: StreamK m a -> StreamK m a -> StreamK m a
- interleaveMin :: StreamK m a -> StreamK m a -> StreamK m a
- crossApplyWith :: (StreamK m b -> StreamK m b -> StreamK m b) -> StreamK m (a -> b) -> StreamK m a -> StreamK m b
- crossApply :: StreamK m (a -> b) -> StreamK m a -> StreamK m b
- crossApplySnd :: StreamK m a -> StreamK m b -> StreamK m b
- crossApplyFst :: StreamK m a -> StreamK m b -> StreamK m a
- crossWith :: Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c
- cross :: Monad m => StreamK m a -> StreamK m b -> StreamK m (a, b)
- before :: Monad m => m b -> StreamK m a -> StreamK m a
- concatEffect :: Monad m => m (StreamK m a) -> StreamK m a
- concatMapEffect :: Monad m => (b -> StreamK m a) -> m b -> StreamK m a
- concatMapWith :: (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
- concatMap :: (a -> StreamK m b) -> StreamK m a -> StreamK m b
- bindWith :: (StreamK m b -> StreamK m b -> StreamK m b) -> StreamK m a -> (a -> StreamK m b) -> StreamK m b
- concatIterateWith :: (StreamK m a -> StreamK m a -> StreamK m a) -> (a -> StreamK m a) -> StreamK m a -> StreamK m a
- concatIterateLeftsWith :: b ~ Either a c => (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m b -> StreamK m b
- concatIterateScanWith :: Monad m => (StreamK m a -> StreamK m a -> StreamK m a) -> (b -> a -> m (b, StreamK m a)) -> m b -> StreamK m a -> StreamK m a
- mergeMapWith :: (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b
- mergeIterateWith :: (StreamK m a -> StreamK m a -> StreamK m a) -> (a -> StreamK m a) -> StreamK m a -> StreamK m a
- foldlS :: (StreamK m b -> a -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b
- reverse :: StreamK m a -> StreamK m a
- foldlT :: (Monad m, Monad (s m), MonadTrans s) => (s m b -> a -> s m b) -> s m b -> StreamK m a -> s m b
- foldrT :: (Monad m, Monad (s m), MonadTrans s) => (a -> s m b -> s m b) -> s m b -> StreamK m a -> s m b
- liftInner :: (Monad m, MonadTrans t, Monad (t m)) => StreamK m a -> StreamK (t m) a
- evalStateT :: Monad m => m s -> StreamK (StateT s m) a -> StreamK m a
- newtype StreamK m a = MkStream (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
- fromList :: [a] -> StreamK m a
- fromStream :: Monad m => Stream m a -> StreamK m a
- toStream :: Applicative m => StreamK m a -> Stream m a
- repeatM :: Monad m => m a -> StreamK m a
- replicate :: Int -> a -> StreamK m a
- replicateM :: Monad m => Int -> m a -> StreamK m a
- fromIndices :: (Int -> a) -> StreamK m a
- fromIndicesM :: Monad m => (Int -> m a) -> StreamK m a
- iterate :: (a -> a) -> a -> StreamK m a
- iterateM :: Monad m => (a -> m a) -> m a -> StreamK m a
- foldr1 :: Monad m => (a -> a -> a) -> StreamK m a -> m (Maybe a)
- foldlM' :: Monad m => (b -> a -> m b) -> m b -> StreamK m a -> m b
- foldlMx' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> StreamK m a -> m b
- fold :: Monad m => Fold m a b -> StreamK m a -> m b
- foldBreak :: Monad m => Fold m a b -> StreamK m a -> m (b, StreamK m a)
- foldEither :: Monad m => Fold m a b -> StreamK m a -> m (Either (Fold m a b) (b, StreamK m a))
- foldConcat :: Monad m => Producer m a b -> Fold m b c -> StreamK m a -> m (c, StreamK m a)
- parseDBreak :: Monad m => Parser a m b -> StreamK m a -> m (Either ParseError b, StreamK m a)
- parseD :: Monad m => Parser a m b -> StreamK m a -> m (Either ParseError b)
- parseBreakChunks :: (Monad m, Unbox a) => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b, StreamK m (Array a))
- parseChunks :: (Monad m, Unbox a) => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b)
- parseBreak :: forall m a b. Monad m => ParserK a m b -> StreamK m a -> m (Either ParseError b, StreamK m a)
- parse :: Monad m => ParserK a m b -> StreamK m a -> m (Either ParseError b)
- parseBreakChunksGeneric :: forall m a b. Monad m => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b, StreamK m (Array a))
- parseChunksGeneric :: Monad m => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b)
- head :: Monad m => StreamK m a -> m (Maybe a)
- elem :: (Monad m, Eq a) => a -> StreamK m a -> m Bool
- notElem :: (Monad m, Eq a) => a -> StreamK m a -> m Bool
- all :: Monad m => (a -> Bool) -> StreamK m a -> m Bool
- any :: Monad m => (a -> Bool) -> StreamK m a -> m Bool
- last :: Monad m => StreamK m a -> m (Maybe a)
- minimum :: (Monad m, Ord a) => StreamK m a -> m (Maybe a)
- minimumBy :: Monad m => (a -> a -> Ordering) -> StreamK m a -> m (Maybe a)
- maximum :: (Monad m, Ord a) => StreamK m a -> m (Maybe a)
- maximumBy :: Monad m => (a -> a -> Ordering) -> StreamK m a -> m (Maybe a)
- findIndices :: (a -> Bool) -> StreamK m a -> StreamK m Int
- lookup :: (Monad m, Eq a) => a -> StreamK m (a, b) -> m (Maybe b)
- findM :: Monad m => (a -> m Bool) -> StreamK m a -> m (Maybe a)
- find :: Monad m => (a -> Bool) -> StreamK m a -> m (Maybe a)
- (!!) :: Monad m => StreamK m a -> Int -> m (Maybe a)
- mapM_ :: Monad m => (a -> m b) -> StreamK m a -> m ()
- toList :: Monad m => StreamK m a -> m [a]
- hoist :: (Monad m, Monad n) => (forall x. m x -> n x) -> StreamK m a -> StreamK n a
- scanl' :: (b -> a -> b) -> b -> StreamK m a -> StreamK m b
- scanlx' :: (x -> a -> x) -> x -> (x -> b) -> StreamK m a -> StreamK m b
- filter :: (a -> Bool) -> StreamK m a -> StreamK m a
- take :: Int -> StreamK m a -> StreamK m a
- takeWhile :: (a -> Bool) -> StreamK m a -> StreamK m a
- drop :: Int -> StreamK m a -> StreamK m a
- dropWhile :: (a -> Bool) -> StreamK m a -> StreamK m a
- mapM :: Monad m => (a -> m b) -> StreamK m a -> StreamK m b
- sequence :: Monad m => StreamK m (m a) -> StreamK m a
- intersperseM :: Monad m => m a -> StreamK m a -> StreamK m a
- intersperse :: Monad m => a -> StreamK m a -> StreamK m a
- insertBy :: (a -> a -> Ordering) -> a -> StreamK m a -> StreamK m a
- deleteBy :: (a -> a -> Bool) -> a -> StreamK m a -> StreamK m a
- sortBy :: Monad m => (a -> a -> Ordering) -> StreamK m a -> StreamK m a
- mapMaybe :: (a -> Maybe b) -> StreamK m a -> StreamK m b
- zipWith :: Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c
- zipWithM :: Monad m => (a -> b -> m c) -> StreamK m a -> StreamK m b -> StreamK m c
- mergeBy :: (a -> a -> Ordering) -> StreamK m a -> StreamK m a -> StreamK m a
- mergeByM :: Monad m => (a -> a -> m Ordering) -> StreamK m a -> StreamK m a -> StreamK m a
- the :: (Eq a, Monad m) => StreamK m a -> m (Maybe a)
- handle :: (MonadCatch m, Exception e) => (e -> m (StreamK m a)) -> StreamK m a -> StreamK m a
- bracketIO :: (MonadIO m, MonadCatch m) => IO b -> (b -> IO c) -> (b -> StreamK m a) -> StreamK m a
Setup
To execute the code examples provided in this module in ghci, please run the following commands first.
>>>
:m
>>>
import Control.Concurrent (threadDelay)
>>>
import Data.Function (fix, (&))
>>>
import Data.Semigroup (cycle1)
>>>
effect n = print n >> return n
>>>
import Streamly.Data.StreamK (StreamK)
>>>
import qualified Streamly.Data.Fold as Fold
>>>
import qualified Streamly.Data.Parser as Parser
>>>
import qualified Streamly.Data.Stream as Stream
>>>
import qualified Streamly.Data.StreamK as StreamK
>>>
import qualified Streamly.FileSystem.Dir as Dir
For APIs that have not been released yet.
>>>
import qualified Streamly.Internal.Data.StreamK as StreamK
>>>
import qualified Streamly.Internal.FileSystem.Dir as Dir
The stream type
StreamK type
type Stream = StreamK Source #
Deprecated: Please use StreamK instead.
Continuation Passing Style (CPS) version of Streamly.Data.Stream.Stream.
Unlike Streamly.Data.Stream.Stream, StreamK
can be composed recursively
without affecting performance.
Semigroup instance appends two streams:
>>>
(<>) = Stream.append
Instances
CrossStreamK type wrapper
data CrossStreamK m a Source #
A newtype wrapper for the StreamK
type adding a cross product style
monad instance.
A Monad
bind behaves like a for
loop:
>>>
:{
Stream.fold Fold.toList $ StreamK.toStream $ StreamK.unCross $ do x <- StreamK.mkCross $ StreamK.fromStream $ Stream.fromList [1,2] -- Perform the following actions for each x in the stream return x :} [1,2]
Nested monad binds behave like nested for
loops:
>>>
:{
Stream.fold Fold.toList $ StreamK.toStream $ StreamK.unCross $ do x <- StreamK.mkCross $ StreamK.fromStream $ Stream.fromList [1,2] y <- StreamK.mkCross $ StreamK.fromStream $ Stream.fromList [3,4] -- Perform the following actions for each x, for each y return (x, y) :} [(1,3),(1,4),(2,3),(2,4)]
Instances
unCross :: CrossStreamK m a -> StreamK m a Source #
Unwrap the StreamK
type from CrossStreamK
newtype.
This is a type level operation with no runtime overhead.
mkCross :: StreamK m a -> CrossStreamK m a Source #
Wrap the StreamK
type in a CrossStreamK
newtype to enable cross
product style applicative and monad instances.
This is a type level operation with no runtime overhead.
foldr/build Fusion
mkStream :: (forall r. State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a Source #
foldStream :: State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r Source #
Fold a stream by providing a State, stop continuation, a singleton continuation and a yield continuation. The stream will not use the SVar passed via State.
foldStreamShared :: State StreamK m a -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> StreamK m a -> m r Source #
Fold a stream by providing an SVar, a stop continuation, a singleton continuation and a yield continuation. The stream would share the current SVar passed via the State.
foldrM :: (a -> m b -> m b) -> m b -> StreamK m a -> m b Source #
Lazy right fold with a monadic step function.
foldrS :: (a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b Source #
Right fold to a streaming monad.
foldrS StreamK.cons StreamK.nil === id
foldrS
can be used to perform stateless stream to stream transformations
like map and filter in general. It can be coupled with a scan to perform
stateful transformations. However, note that the custom map and filter
routines can be much more efficient than this due to better stream fusion.
>>>
input = StreamK.fromStream $ Stream.fromList [1..5]
>>>
Stream.fold Fold.toList $ StreamK.toStream $ StreamK.foldrS StreamK.cons StreamK.nil input
[1,2,3,4,5]
Find if any element in the stream is True
:
>>>
step x xs = if odd x then StreamK.fromPure True else xs
>>>
input = StreamK.fromStream (Stream.fromList (2:4:5:undefined)) :: StreamK IO Int
>>>
Stream.fold Fold.toList $ StreamK.toStream $ StreamK.foldrS step (StreamK.fromPure False) input
[True]
Map (+2) on odd elements and filter out the even elements:
>>>
step x xs = if odd x then (x + 2) `StreamK.cons` xs else xs
>>>
input = StreamK.fromStream (Stream.fromList [1..5]) :: StreamK IO Int
>>>
Stream.fold Fold.toList $ StreamK.toStream $ StreamK.foldrS step StreamK.nil input
[3,5,7]
Pre-release
foldrSShared :: (a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b Source #
Fold sharing the SVar state within the reconstructed stream
foldrSM :: Monad m => (m a -> StreamK m b -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b Source #
buildM :: Monad m => (forall r. (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r) -> StreamK m a Source #
buildSM :: Monad m => ((m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a Source #
augmentS :: ((a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a Source #
augmentSM :: Monad m => ((m a -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a) -> StreamK m a -> StreamK m a Source #
Construction
Primitives
fromYieldK :: YieldK m a -> StreamK m a Source #
Make a singleton stream from a callback function. The callback function calls the one-shot yield continuation to yield an element.
consK :: YieldK m a -> StreamK m a -> StreamK m a Source #
Add a yield function at the head of the stream.
cons :: a -> StreamK m a -> StreamK m a infixr 5 Source #
A right associative prepend operation to add a pure value at the head of an existing stream::
>>>
s = 1 `StreamK.cons` 2 `StreamK.cons` 3 `StreamK.cons` StreamK.nil
>>>
Stream.fold Fold.toList (StreamK.toStream s)
[1,2,3]
It can be used efficiently with foldr
:
>>>
fromFoldable = Prelude.foldr StreamK.cons StreamK.nil
Same as the following but more efficient:
>>>
cons x xs = return x `StreamK.consM` xs
(.:) :: a -> StreamK m a -> StreamK m a infixr 5 Source #
Operator equivalent of cons
.
> toList $ 1 .: 2 .: 3 .: nil [1,2,3]
consM :: Monad m => m a -> StreamK m a -> StreamK m a infixr 5 Source #
A right associative prepend operation to add an effectful value at the head of an existing stream::
>>>
s = putStrLn "hello" `StreamK.consM` putStrLn "world" `StreamK.consM` StreamK.nil
>>>
Stream.fold Fold.drain (StreamK.toStream s)
hello world
It can be used efficiently with foldr
:
>>>
fromFoldableM = Prelude.foldr StreamK.consM StreamK.nil
Same as the following but more efficient:
>>>
consM x xs = StreamK.fromEffect x `StreamK.append` xs
consMBy :: Monad m => (StreamK m a -> StreamK m a -> StreamK m a) -> m a -> StreamK m a -> StreamK m a Source #
A stream that terminates without producing any output or side effect.
>>>
Stream.fold Fold.toList (StreamK.toStream StreamK.nil)
[]
nilM :: Applicative m => m b -> StreamK m a Source #
A stream that terminates without producing any output, but produces a side effect.
>>>
Stream.fold Fold.toList (StreamK.toStream (StreamK.nilM (print "nil")))
"nil" []
Pre-release
Unfolding
unfoldr :: (b -> Maybe (a, b)) -> b -> StreamK m a Source #
>>>
:{
unfoldr step s = case step s of Nothing -> StreamK.nil Just (a, b) -> a `StreamK.cons` unfoldr step b :}
Build a stream by unfolding a pure step function step
starting from a
seed s
. The step function returns the next element in the stream and the
next seed value. When it is done it returns Nothing
and the stream ends.
For example,
>>>
:{
let f b = if b > 2 then Nothing else Just (b, b + 1) in StreamK.toList $ StreamK.unfoldr f 0 :} [0,1,2]
unfoldrMWith :: Monad m => (m a -> StreamK m a -> StreamK m a) -> (b -> m (Maybe (a, b))) -> b -> StreamK m a Source #
unfoldrM :: Monad m => (b -> m (Maybe (a, b))) -> b -> StreamK m a Source #
Build a stream by unfolding a monadic step function starting from a
seed. The step function returns the next element in the stream and the next
seed value. When it is done it returns Nothing
and the stream ends. For
example,
>>>
:{
let f b = if b > 2 then return Nothing else return (Just (b, b + 1)) in StreamK.toList $ StreamK.unfoldrM f 0 :} [0,1,2]
From Values
fromEffect :: Monad m => m a -> StreamK m a Source #
repeat :: a -> StreamK m a Source #
Generate an infinite stream by repeating a pure value.
Pre-release
repeatMWith :: (m a -> t m a -> t m a) -> m a -> t m a Source #
Like repeatM
but takes a stream cons
operation to combine the actions
in a stream specific manner. A serial cons would repeat the values serially
while an async cons would repeat concurrently.
Pre-release
From Indices
Iteration
iterateMWith :: Monad m => (m a -> StreamK m a -> StreamK m a) -> (a -> m a) -> m a -> StreamK m a Source #
From Containers
fromFoldable :: Foldable f => f a -> StreamK m a Source #
>>>
fromFoldable = Prelude.foldr StreamK.cons StreamK.nil
Construct a stream from a Foldable
containing pure values:
Cyclic
mfix :: Monad m => (m a -> StreamK m a) -> StreamK m a Source #
We can define cyclic structures using let
:
>>>
let (a, b) = ([1, b], head a) in (a, b)
([1,1],1)
The function fix
defined as:
>>>
fix f = let x = f x in x
ensures that the argument of a function and its output refer to the same
lazy value x
i.e. the same location in memory. Thus x
can be defined
in terms of itself, creating structures with cyclic references.
>>>
f ~(a, b) = ([1, b], head a)
>>>
fix f
([1,1],1)
mfix
is essentially the same as fix
but for monadic
values.
Using mfix
for streams we can construct a stream in which each element of
the stream is defined in a cyclic fashion. The argument of the function
being fixed represents the current element of the stream which is being
returned by the stream monad. Thus, we can use the argument to construct
itself.
In the following example, the argument action
of the function f
represents the tuple (x,y)
returned by it in a given iteration. We define
the first element of the tuple in terms of the second.
>>>
import System.IO.Unsafe (unsafeInterleaveIO)
>>>
:{
main = Stream.fold (Fold.drainMapM print) $ StreamK.toStream $ StreamK.mfix f where f action = StreamK.unCross $ do let incr n act = fmap ((+n) . snd) $ unsafeInterleaveIO act x <- StreamK.mkCross $ StreamK.fromStream $ Stream.sequence $ Stream.fromList [incr 1 action, incr 2 action] y <- StreamK.mkCross $ StreamK.fromStream $ Stream.fromList [4,5] return (x, y) :}
Note: you cannot achieve this by just changing the order of the monad statements because that would change the order in which the stream elements are generated.
Note that the function f
must be lazy in its argument, that's why we use
unsafeInterleaveIO
on action
because IO monad is strict.
Pre-release
Elimination
Primitives
Strict Left Folds
foldlx' :: forall m a b x. Monad m => (x -> a -> x) -> x -> (x -> b) -> StreamK m a -> m b Source #
Strict left fold with an extraction function. Like the standard strict
left fold, but applies a user supplied extraction function (the third
argument) to the folded value at the end. This is designed to work with the
foldl
library. The suffix x
is a mnemonic for extraction.
Note that the accumulator is always evaluated including the initial value.
Lazy Right Folds
Specific Folds
init :: Applicative m => StreamK m a -> m (Maybe (StreamK m a)) Source #
Extract all but the last element of the stream, if any.
Note: This will end up buffering the entire stream.
Pre-release
Mapping
mapMWith :: (m b -> StreamK m b -> StreamK m b) -> (a -> m b) -> StreamK m a -> StreamK m b Source #
Combining Two Streams
Appending
Interleave
interleave :: StreamK m a -> StreamK m a -> StreamK m a infixr 6 Source #
Note: When joining many streams in a left associative manner earlier
streams will get exponential priority than the ones joining later. Because
of exponentially high weighting of left streams it can be used with
concatMapWith
even on a large number of streams.
interleaveFst :: StreamK m a -> StreamK m a -> StreamK m a infixr 6 Source #
Like interleave
but stops interleaving as soon as the first stream stops.
interleaveMin :: StreamK m a -> StreamK m a -> StreamK m a infixr 6 Source #
Like interleave
but stops interleaving as soon as any of the two streams
stops.
Cross Product
crossApplyWith :: (StreamK m b -> StreamK m b -> StreamK m b) -> StreamK m (a -> b) -> StreamK m a -> StreamK m b Source #
crossApply :: StreamK m (a -> b) -> StreamK m a -> StreamK m b Source #
Apply a stream of functions to a stream of values and flatten the results.
Note that the second stream is evaluated multiple times.
Definition:
>>>
crossApply = StreamK.crossApplyWith StreamK.append
>>>
crossApply = Stream.crossWith id
crossWith :: Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c Source #
Definition:
>>>
crossWith f m1 m2 = fmap f m1 `StreamK.crossApply` m2
Note that the second stream is evaluated multiple times.
cross :: Monad m => StreamK m a -> StreamK m b -> StreamK m (a, b) Source #
Given a StreamK m a
and StreamK m b
generate a stream with all possible
combinations of the tuple (a, b)
.
Definition:
>>>
cross = StreamK.crossWith (,)
The second stream is evaluated multiple times. If that is not desired it can
be cached in an Array
and then generated from the array before
calling this function. Caching may also improve performance if the stream is
expensive to evaluate.
See cross
for a much faster fused
alternative.
Time: O(m x n)
Pre-release
Concat
before :: Monad m => m b -> StreamK m a -> StreamK m a Source #
Run an action before evaluating the stream.
concatMapWith :: (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #
Perform a concatMap
using a specified concat strategy. The first
argument specifies a merge or concat function that is used to merge the
streams generated by the map function.
bindWith :: (StreamK m b -> StreamK m b -> StreamK m b) -> StreamK m a -> (a -> StreamK m b) -> StreamK m b Source #
concatIterateWith :: (StreamK m a -> StreamK m a -> StreamK m a) -> (a -> StreamK m a) -> StreamK m a -> StreamK m a Source #
Yield an input element in the output stream, map a stream generator on it
and repeat the process on the resulting stream. Resulting streams are
flattened using the concatMapWith
combinator. This can be used for a depth
first style (DFS) traversal of a tree like structure.
Example, list a directory tree using DFS:
>>>
f = StreamK.fromStream . either Dir.readEitherPaths (const Stream.nil)
>>>
input = StreamK.fromPure (Left ".")
>>>
ls = StreamK.concatIterateWith StreamK.append f input
Note that iterateM
is a special case of concatIterateWith
:
>>>
iterateM f = StreamK.concatIterateWith StreamK.append (StreamK.fromEffect . f) . StreamK.fromEffect
Pre-release
concatIterateLeftsWith :: b ~ Either a c => (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m b -> StreamK m b Source #
In an Either
stream iterate on Left
s. This is a special case of
concatIterateWith
:
>>>
concatIterateLeftsWith combine f = StreamK.concatIterateWith combine (either f (const StreamK.nil))
To traverse a directory tree:
>>>
input = StreamK.fromPure (Left ".")
>>>
ls = StreamK.concatIterateLeftsWith StreamK.append (StreamK.fromStream . Dir.readEither) input
Pre-release
concatIterateScanWith :: Monad m => (StreamK m a -> StreamK m a -> StreamK m a) -> (b -> a -> m (b, StreamK m a)) -> m b -> StreamK m a -> StreamK m a Source #
Like iterateMap
but carries a state in the stream generation function.
This can be used to traverse graph like structures, we can remember the
visited nodes in the state to avoid cycles.
Note that a combination of iterateMap
and usingState
can also be used to
traverse graphs. However, this function provides a more localized state
instead of using a global state.
See also: mfix
Pre-release
Merge
mergeMapWith :: (StreamK m b -> StreamK m b -> StreamK m b) -> (a -> StreamK m b) -> StreamK m a -> StreamK m b Source #
Combine streams in pairs using a binary combinator, the resulting streams are then combined again in pairs recursively until we get to a single combined stream. The composition would thus form a binary tree.
For example, you can sort a stream using merge sort like this:
>>>
s = StreamK.fromStream $ Stream.fromList [5,1,7,9,2]
>>>
generate = StreamK.fromPure
>>>
combine = StreamK.mergeBy compare
>>>
Stream.fold Fold.toList $ StreamK.toStream $ StreamK.mergeMapWith combine generate s
[1,2,5,7,9]
Note that if the stream length is not a power of 2, the binary tree composed by mergeMapWith would not be balanced, which may or may not be important depending on what you are trying to achieve.
Caution: the stream of streams must be finite
Pre-release
mergeIterateWith :: (StreamK m a -> StreamK m a -> StreamK m a) -> (a -> StreamK m a) -> StreamK m a -> StreamK m a Source #
Like concatIterateWith
but uses the pairwise flattening combinator
mergeMapWith
for flattening the resulting streams. This can be used for a
balanced traversal of a tree like structure.
Example, list a directory tree using balanced traversal:
>>>
f = StreamK.fromStream . either Dir.readEitherPaths (const Stream.nil)
>>>
input = StreamK.fromPure (Left ".")
>>>
ls = StreamK.mergeIterateWith StreamK.interleave f input
Pre-release
Buffered Operations
foldlS :: (StreamK m b -> a -> StreamK m b) -> StreamK m b -> StreamK m a -> StreamK m b Source #
Lazy left fold to a stream.
foldlT :: (Monad m, Monad (s m), MonadTrans s) => (s m b -> a -> s m b) -> s m b -> StreamK m a -> s m b Source #
Lazy left fold to an arbitrary transformer monad.
foldrT :: (Monad m, Monad (s m), MonadTrans s) => (a -> s m b -> s m b) -> s m b -> StreamK m a -> s m b Source #
Right associative fold to an arbitrary transformer monad.
Instances
Specialized Generation
repeatM :: Monad m => m a -> StreamK m a Source #
>>>
repeatM = StreamK.sequence . StreamK.repeat
>>>
repeatM = fix . StreamK.consM
>>>
repeatM = cycle1 . StreamK.fromEffect
Generate a stream by repeatedly executing a monadic action forever.
>>>
:{
repeatAction = StreamK.repeatM (threadDelay 1000000 >> print 1) & StreamK.take 10 & StreamK.fold Fold.drain :}
fromIndices :: (Int -> a) -> StreamK m a Source #
iterate :: (a -> a) -> a -> StreamK m a Source #
>>>
iterate f x = x `StreamK.cons` iterate f x
Generate an infinite stream with x
as the first element and each
successive element derived by applying the function f
on the previous
element.
>>>
StreamK.toList $ StreamK.take 5 $ StreamK.iterate (+1) 1
[1,2,3,4,5]
iterateM :: Monad m => (a -> m a) -> m a -> StreamK m a Source #
>>>
iterateM f m = m >>= \a -> return a `StreamK.consM` iterateM f (f a)
Generate an infinite stream with the first element generated by the action
m
and each successive element derived by applying the monadic function
f
on the previous element.
>>>
:{
StreamK.iterateM (\x -> print x >> return (x + 1)) (return 0) & StreamK.take 3 & StreamK.toList :} 0 1 [0,1,2]
Elimination
General Folds
foldlM' :: Monad m => (b -> a -> m b) -> m b -> StreamK m a -> m b Source #
Like foldl'
but with a monadic step function.
foldlMx' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> StreamK m a -> m b Source #
Like foldx
, but with a monadic step function.
fold :: Monad m => Fold m a b -> StreamK m a -> m b Source #
Fold a stream using the supplied left Fold
and reducing the resulting
expression strictly at each step. The behavior is similar to foldl'
. A
Fold
can terminate early without consuming the full stream. See the
documentation of individual Fold
s for termination behavior.
Definitions:
>>>
fold f = fmap fst . StreamK.foldBreak f
>>>
fold f = StreamK.parseD (Parser.fromFold f)
Example:
>>>
StreamK.fold Fold.sum $ StreamK.fromStream $ Stream.enumerateFromTo 1 100
5050
foldEither :: Monad m => Fold m a b -> StreamK m a -> m (Either (Fold m a b) (b, StreamK m a)) Source #
Fold resulting in either breaking the stream or continuation of the fold. Instead of supplying the input stream in one go we can run the fold multiple times, each time supplying the next segment of the input stream. If the fold has not yet finished it returns a fold that can be run again otherwise it returns the fold result and the residual stream.
Internal
foldConcat :: Monad m => Producer m a b -> Fold m b c -> StreamK m a -> m (c, StreamK m a) Source #
Generate streams from individual elements of a stream and fold the concatenation of those streams using the supplied fold. Return the result of the fold and residual stream.
For example, this can be used to efficiently fold an Array Word8 stream using Word8 folds.
Internal
parseDBreak :: Monad m => Parser a m b -> StreamK m a -> m (Either ParseError b, StreamK m a) Source #
Run a Parser
over a stream and return rest of the Stream.
parseBreakChunks :: (Monad m, Unbox a) => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b, StreamK m (Array a)) Source #
parseChunks :: (Monad m, Unbox a) => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b) Source #
parseBreak :: forall m a b. Monad m => ParserK a m b -> StreamK m a -> m (Either ParseError b, StreamK m a) Source #
Similar to parseBreak
but works on singular elements.
parse :: Monad m => ParserK a m b -> StreamK m a -> m (Either ParseError b) Source #
Run a ParserK
over a StreamK
. Please use parseChunks
where possible,
for better performance.
parseBreakChunksGeneric :: forall m a b. Monad m => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b, StreamK m (Array a)) Source #
Similar to parseBreak
but works on generic arrays
parseChunksGeneric :: Monad m => ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b) Source #
Specialized Folds
last :: Monad m => StreamK m a -> m (Maybe a) Source #
Extract the last element of the stream, if any.
Map and Fold
mapM_ :: Monad m => (a -> m b) -> StreamK m a -> m () Source #
Apply a monadic action to each element of the stream and discard the output of the action.
Conversions
Transformation
By folding (scans)
Filtering
Mapping
Inserting
Deleting
Reordering
sortBy :: Monad m => (a -> a -> Ordering) -> StreamK m a -> StreamK m a Source #
Sort the input stream using a supplied comparison function.
Sorting can be achieved by simply:
>>>
sortBy cmp = StreamK.mergeMapWith (StreamK.mergeBy cmp) StreamK.fromPure
However, this combinator uses a parser to first split the input stream into down and up sorted segments and then merges them to optimize sorting when pre-sorted sequences exist in the input stream.
O(n) space
Map and Filter
Zipping
zipWith :: Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c Source #
Zipping of n
streams can be performed by combining the streams pair
wise using mergeMapWith
with O(n * log n) time complexity. If used
with concatMapWith
it will have O(n^2) performance.
Merging
mergeBy :: (a -> a -> Ordering) -> StreamK m a -> StreamK m a -> StreamK m a Source #
Merging of n
streams can be performed by combining the streams pair
wise using mergeMapWith
to give O(n * log n) time complexity. If used
with concatMapWith
it will have O(n^2) performance.
Transformation comprehensions
Exceptions
handle :: (MonadCatch m, Exception e) => (e -> m (StreamK m a)) -> StreamK m a -> StreamK m a Source #
Like Streamly.Data.Stream.handle
but with one
significant difference, this function observes exceptions from the consumer
of the stream as well.
You can also convert StreamK
to Stream
and use exception handling from
Stream
module:
>>>
handle f s = StreamK.fromStream $ Stream.handle (\e -> StreamK.toStream (f e)) (StreamK.toStream s)
Resource Management
bracketIO :: (MonadIO m, MonadCatch m) => IO b -> (b -> IO c) -> (b -> StreamK m a) -> StreamK m a Source #
Like Streamly.Data.Stream.bracketIO
but with one
significant difference, this function observes exceptions from the consumer
of the stream as well. Therefore, it cleans up the resource promptly when
the consumer encounters an exception.
You can also convert StreamK
to Stream
and use resource handling from
Stream
module:
>>>
bracketIO bef aft bet = StreamK.fromStream $ Stream.bracketIO bef aft (StreamK.toStream . bet)