streamly-0.8.3: Dataflow programming and declarative concurrency
Copyright(c) 2017 Composewell Technologies
LicenseBSD-3-Clause
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellSafe-Inferred
LanguageHaskell2010

Streamly.Internal.Data.Stream.IsStream.Transform

Description

 
Synopsis

Piping

Pass through a Pipe.

transform :: (IsStream t, Monad m) => Pipe m a b -> t m a -> t m b Source #

Use a Pipe to transform a stream.

Pre-release

Folding

foldrS :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b Source #

Right fold to a streaming monad.

foldrS Stream.cons Stream.nil === id

foldrS can be used to perform stateless stream to stream transformations like map and filter in general. It can be coupled with a scan to perform stateful transformations. However, note that the custom map and filter routines can be much more efficient than this due to better stream fusion.

>>> Stream.toList $ Stream.foldrS Stream.cons Stream.nil $ Stream.fromList [1..5]
[1,2,3,4,5]

Find if any element in the stream is True:

>>> Stream.toList $ Stream.foldrS (\x xs -> if odd x then return True else xs) (return False) $ (Stream.fromList (2:4:5:undefined) :: Stream.SerialT IO Int)
[True]

Map (+2) on odd elements and filter out the even elements:

>>> Stream.toList $ Stream.foldrS (\x xs -> if odd x then (x + 2) `Stream.cons` xs else xs) Stream.nil $ (Stream.fromList [1..5] :: Stream.SerialT IO Int)
[3,5,7]

foldrM can also be represented in terms of foldrS, however, the former is much more efficient:

foldrM f z s = runIdentityT $ foldrS (\x xs -> lift $ f x (runIdentityT xs)) (lift z) s

Pre-release

foldrSShared :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b Source #

foldrT :: (IsStream t, Monad m, Monad (s m), MonadTrans s) => (a -> s m b -> s m b) -> s m b -> t m a -> s m b Source #

Right fold to a transformer monad. This is the most general right fold function. foldrS is a special case of foldrT, however foldrS implementation can be more efficient:

foldrS = foldrT
foldrM f z s = runIdentityT $ foldrT (\x xs -> lift $ f x (runIdentityT xs)) (lift z) s

foldrT can be used to translate streamly streams to other transformer monads e.g. to a different streaming type.

Pre-release

Mapping

Stateless one-to-one maps.

map :: (IsStream t, Monad m) => (a -> b) -> t m a -> t m b Source #

map = fmap

Same as fmap.

> S.toList $ S.map (+1) $ S.fromList [1,2,3]
[2,3,4]

Since: 0.4.0

sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a Source #

sequence = mapM id

Replace the elements of a stream of monadic actions with the outputs of those actions.

>>> drain $ Stream.sequence $ Stream.fromList [putStr "a", putStr "b", putStrLn "c"]
abc

>>> :{
drain $ Stream.replicateM 3 (return $ threadDelay 1000000 >> print 1)
 & (fromSerial . Stream.sequence)
:}
1
1
1

>>> :{
drain $ Stream.replicateM 3 (return $ threadDelay 1000000 >> print 1)
 & (fromAsync . Stream.sequence)
:}
1
1
1

Concurrent (do not use with fromParallel on infinite streams)

Since: 0.1.0

mapM :: forall t m a b. (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b Source #

mapM f = sequence . map f

Apply a monadic function to each element of the stream and replace it with the output of the resulting action.

>>> drain $ Stream.mapM putStr $ Stream.fromList ["a", "b", "c"]
abc

>>> :{
   drain $ Stream.replicateM 10 (return 1)
     & (fromSerial . Stream.mapM (x -> threadDelay 1000000 >> print x))
:}
1
...
1

> drain $ Stream.replicateM 10 (return 1)
 & (fromAsync . Stream.mapM (x -> threadDelay 1000000 >> print x))

Concurrent (do not use with fromParallel on infinite streams)

Since: 0.1.0

smapM :: (IsStream t, Monad m) => (s -> a -> m (s, b)) -> m s -> t m a -> t m b Source #

A stateful mapM, equivalent to a left scan, more like mapAccumL. Hopefully, this is a better alternative to scan. Separation of state from the output makes it easier to think in terms of a shared state, and also makes it easier to keep the state fully strict and the output lazy.

See also: scanlM'

Pre-release

Mapping Side Effects (Observation)

See also the intersperse*_ combinators.

trace :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m a Source #

Apply a monadic function to each element flowing through the stream and discard the results.

>>> Stream.drain $ Stream.trace print (Stream.enumerateFromTo 1 2)
1
2

Compare with tap.

Since: 0.7.0

trace_ :: (IsStream t, Monad m) => m b -> t m a -> t m a Source #

Perform a side effect before yielding each element of the stream and discard the results.

>>> Stream.drain $ Stream.trace_ (print "got here") (Stream.enumerateFromTo 1 2)
"got here"
"got here"

Same as interspersePrefix_ but always serial.

See also: trace

Pre-release

tap :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m a Source #

Tap the data flowing through a stream into a Fold. For example, you may add a tap to log the contents flowing through the stream. The fold is used only for effects, its result is discarded.

                  Fold m a b
                      |
-----stream m a ---------------stream m a-----

>>> Stream.drain $ Stream.tap (Fold.drainBy print) (Stream.enumerateFromTo 1 2)
1
2

Compare with trace.

Since: 0.7.0

tapOffsetEvery :: (IsStream t, Monad m) => Int -> Int -> Fold m a b -> t m a -> t m a Source #

tapOffsetEvery offset n taps every nth element in the stream starting at offset. offset can be between 0 and n - 1. Offset 0 means start at the first element in the stream. If the offset is outside this range then offset mod n is used as offset.

>>> Stream.drain $ Stream.tapOffsetEvery 0 2 (Fold.rmapM print Fold.toList) $ Stream.enumerateFromTo 0 10
[0,2,4,6,8,10]

tapAsync :: (IsStream t, MonadAsync m) => Fold m a b -> t m a -> t m a Source #

Redirect a copy of the stream to a supplied fold and run it concurrently in an independent thread. The fold may buffer some elements. The buffer size is determined by the prevailing maxBuffer setting.

              Stream m a -> m b
                      |
-----stream m a ---------------stream m a-----

>>> Stream.drain $ Stream.tapAsync (Fold.drainBy print) (Stream.enumerateFromTo 1 2)
1
2

Exceptions from the concurrently running fold are propagated to the current computation. Note that, because of buffering in the fold, exceptions may be delayed and may not correspond to the current element being processed in the parent stream, but we guarantee that before the parent stream stops the tap finishes and all exceptions from it are drained.

Compare with tap.

Pre-release

tapAsyncK :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> t m a Source #

Redirect a copy of the stream to a supplied fold and run it concurrently in an independent thread. The fold may buffer some elements. The buffer size is determined by the prevailing maxBuffer setting.

              Stream m a -> m b
                      |
-----stream m a ---------------stream m a-----

> S.drain $ S.tapAsync (S.mapM_ print) (S.enumerateFromTo 1 2)
1
2

Exceptions from the concurrently running fold are propagated to the current computation. Note that, because of buffering in the fold, exceptions may be delayed and may not correspond to the current element being processed in the parent stream, but we guarantee that before the parent stream stops the tap finishes and all exceptions from it are drained.

Compare with tap.

Pre-release

distributeAsync_ :: (Foldable f, IsStream t, MonadAsync m) => f (t m a -> m b) -> t m a -> t m a Source #

Concurrently distribute a stream to a collection of fold functions, discarding the outputs of the folds.

> Stream.drain $ Stream.distributeAsync_ [Stream.mapM_ print, Stream.mapM_ print] (Stream.enumerateFromTo 1 2)
1
2
1
2

distributeAsync_ = flip (foldr tapAsync)

Pre-release

tapRate :: (IsStream t, MonadAsync m, MonadCatch m) => Double -> (Int -> m b) -> t m a -> t m a Source #

Calls the supplied function with the number of elements consumed every n seconds. The given function is run in a separate thread until the end of the stream. In case there is an exception in the stream the thread is killed during the next major GC.

Note: The action is not guaranteed to run if the main thread exits.

> delay n = threadDelay (round $ n * 1000000) >> return n
> Stream.toList $ Stream.tapRate 2 (n -> print $ show n ++ " elements processed") (delay 1 Stream.|: delay 0.5 Stream.|: delay 0.5 Stream.|: Stream.nil)
"2 elements processed"
[1.0,0.5,0.5]
"1 elements processed"

Note: This may not work correctly on 32-bit machines.

Pre-release

pollCounts :: (IsStream t, MonadAsync m) => (a -> Bool) -> (t m Int -> t m Int) -> Fold m Int b -> t m a -> t m a Source #

pollCounts predicate transform fold stream counts those elements in the stream that pass the predicate. The resulting count stream is sent to another thread which transforms it using transform and then folds it using fold. The thread is automatically cleaned up if the stream stops or aborts due to exception.

For example, to print the count of elements processed every second:

> Stream.drain $ Stream.pollCounts (const True) (Stream.rollingMap (-) . Stream.delayPost 1) (FLold.drainBy print)
          $ Stream.enumerateFrom 0

Note: This may not work correctly on 32-bit machines.

Pre-release

Scanning By Fold

scan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #

Scan a stream using the given monadic fold.

>>> Stream.toList $ Stream.takeWhile (< 10) $ Stream.scan Fold.sum (Stream.fromList [1..10])
[0,1,3,6]

Since: 0.7.0

postscan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #

Postscan a stream using the given monadic fold.

The following example extracts the input stream up to a point where the running average of elements is no more than 10:

>>> import Data.Maybe (fromJust)
>>> let avg = Fold.teeWith (/) Fold.sum (fmap fromIntegral Fold.length)
>>> :{
 Stream.toList
  $ Stream.map (fromJust . fst)
  $ Stream.takeWhile (\(_,x) -> x <= 10)
  $ Stream.postscan (Fold.tee Fold.last avg) (Stream.enumerateFromTo 1.0 100.0)
:}
[1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0]

Since: 0.7.0

Scanning

Left scans. Stateful, mostly one-to-one maps.

scanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b Source #

Strict left scan. Like map, scanl' too is a one to one transformation, however it adds an extra element.

>>> Stream.toList $ Stream.scanl' (+) 0 $ fromList [1,2,3,4]
[0,1,3,6,10]

>>> Stream.toList $ Stream.scanl' (flip (:)) [] $ Stream.fromList [1,2,3,4]
[[],[1],[2,1],[3,2,1],[4,3,2,1]]

The output of scanl' is the initial value of the accumulator followed by all the intermediate steps and the final result of foldl'.

By streaming the accumulated state after each fold step, we can share the state across multiple stages of stream composition. Each stage can modify or extend the state, do some processing with it and emit it for the next stage, thus modularizing the stream processing. This can be useful in stateful or event-driven programming.

Consider the following monolithic example, computing the sum and the product of the elements in a stream in one go using a foldl':

>>> Stream.foldl' ((s, p) x -> (s + x, p * x)) (0,1) $ Stream.fromList 1,2,3,4

Using scanl' we can make it modular by computing the sum in the first stage and passing it down to the next stage for computing the product:

>>> :{
  Stream.foldl' ((_, p) (s, x) -> (s, p * x)) (0,1)
  $ Stream.scanl' ((s, _) x -> (s + x, x)) (0,1)
  $ Stream.fromList [1,2,3,4]
:}
(10,24)

IMPORTANT: scanl' evaluates the accumulator to WHNF. To avoid building lazy expressions inside the accumulator, it is recommended that a strict data structure is used for accumulator.

>>> scanl' f z xs = scanlM' (\a b -> return (f a b)) (return z) xs
>>> scanl' f z xs = z `Stream.cons` postscanl' f z xs

See also: usingStateT

Since: 0.2.0

scanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b Source #

Like scanl' but with a monadic step function and a monadic seed.

Since: 0.4.0

Since: 0.8.0 (signature change)

scanlMAfter' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b Source #

scanlMAfter' accumulate initial done stream is like scanlM' except that it provides an additional done function to be applied on the accumulator when the stream stops. The result of done is also emitted in the stream.

This function can be used to allocate a resource in the beginning of the scan and release it when the stream ends or to flush the internal state of the scan at the end.

Pre-release

postscanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b Source #

Like scanl' but does not stream the initial value of the accumulator.

>>> postscanl' f z = postscanlM' (\a b -> return (f a b)) (return z)
>>> postscanl' f z xs = Stream.drop 1 $ Stream.scanl' f z xs

Since: 0.7.0

postscanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b Source #

Like postscanl' but with a monadic step function and a monadic seed.

>>> postscanlM' f z xs = Stream.drop 1 $ Stream.scanlM' f z xs

Since: 0.7.0

Since: 0.8.0 (signature change)

prescanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b Source #

Like scanl' but does not stream the final value of the accumulator.

Pre-release

prescanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b Source #

Like prescanl' but with a monadic step function and a monadic seed.

Pre-release

scanl1' :: (IsStream t, Monad m) => (a -> a -> a) -> t m a -> t m a Source #

Like scanl' but for a non-empty stream. The first element of the stream is used as the initial value of the accumulator. Does nothing if the stream is empty.

>>> Stream.toList $ Stream.scanl1' (+) $ fromList [1,2,3,4]
[1,3,6,10]

Since: 0.6.0

scanl1M' :: (IsStream t, Monad m) => (a -> a -> m a) -> t m a -> t m a Source #

Like scanl1' but with a monadic step function.

Since: 0.6.0

Filtering

Produce a subset of the stream using criteria based on the values of the elements. We can use a concatMap and scan for filtering but these combinators are more efficient and convenient.

with :: forall (t :: (Type -> Type) -> Type -> Type) m a b s. Functor (t m) => (t m a -> t m (s, a)) -> (((s, a) -> b) -> t m (s, a) -> t m (s, a)) -> ((s, a) -> b) -> t m a -> t m a Source #

Modify a t m a -> t m a stream transformation that accepts a predicate (a -> b) to accept ((s, a) -> b) instead, provided a transformation t m a -> t m (s, a). Convenient to filter with index or time.

filterWithIndex = with indexed filter
filterWithAbsTime = with timestamped filter
filterWithRelTime = with timeIndexed filter

Pre-release

deleteBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> a -> t m a -> t m a Source #

Deletes the first occurrence of the element in the stream that satisfies the given equality predicate.

>>> Stream.toList $ Stream.deleteBy (==) 3 $ Stream.fromList [1,3,3,5]
[1,3,5]

Since: 0.6.0

filter :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #

Include only those elements that pass a predicate.

Since: 0.1.0

filterM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #

Same as filter but with a monadic predicate.

Since: 0.4.0

uniq :: (Eq a, IsStream t, Monad m) => t m a -> t m a Source #

Drop repeated elements that are adjacent to each other.

Since: 0.6.0

uniqBy :: (IsStream t, Monad m, Functor (t m)) => (a -> a -> Bool) -> t m a -> t m a Source #

Drop repeated elements that are adjacent to each other using the supplied comparison function.

@uniq = uniqBy (==)

To strip duplicate path separators:

f x y = x == / && x == y
Stream.toList $ Stream.uniqBy f $ Stream.fromList "/a/b"
"ab"

Space: O(1)

See also: nubBy.

Pre-release

nubBy :: (a -> a -> Bool) -> t m a -> t m a Source #

Drop repeated elements anywhere in the stream.

Caution: not scalable for infinite streams

See also: nubWindowBy

Unimplemented

nubWindowBy :: Int -> (a -> a -> Bool) -> t m a -> t m a Source #

Drop repeated elements within the specified tumbling window in the stream.

nubBy = nubWindowBy maxBound

Unimplemented

prune :: (a -> Bool) -> t m a -> t m a Source #

Strip all leading and trailing occurrences of an element passing a predicate and make all other consecutive occurrences uniq.

prune p = dropWhileAround p $ uniqBy (x y -> p x && p y)
> Stream.prune isSpace (Stream.fromList "  hello      world!   ")
"hello world!"

Space: O(1)

Unimplemented

repeated :: t m a -> t m a Source #

Emit only repeated elements, once.

Unimplemented

Trimming

Produce a subset of the stream trimmed at ends.

take :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #

Take first n elements from the stream and discard the rest.

Since: 0.1.0

takeInterval :: (MonadIO m, IsStream t, TimeUnit64 d) => d -> t m a -> t m a Source #

takeInterval duration yields stream elements upto specified time duration. The duration starts when the stream is evaluated for the first time, before the first element is yielded. The time duration is checked before generating each element, if the duration has expired the stream stops.

The total time taken in executing the stream is guaranteed to be at least duration, however, because the duration is checked before generating an element, the upper bound is indeterminate and depends on the time taken in generating and processing the last element.

No element is yielded if the duration is zero. At least one element is yielded if the duration is non-zero.

Pre-release

takeLast :: Int -> t m a -> t m a Source #

Take n elements at the end of the stream.

O(n) space, where n is the number elements taken.

Unimplemented

takeLastInterval :: Double -> t m a -> t m a Source #

Take time interval i seconds at the end of the stream.

O(n) space, where n is the number elements taken.

Unimplemented

takeWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #

End the stream as soon as the predicate fails on an element.

Since: 0.1.0

takeWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #

Same as takeWhile but with a monadic predicate.

Since: 0.4.0

takeWhileLast :: (a -> Bool) -> t m a -> t m a Source #

Take all consecutive elements at the end of the stream for which the predicate is true.

O(n) space, where n is the number elements taken.

Unimplemented

takeWhileAround :: (a -> Bool) -> t m a -> t m a Source #

Like takeWhile and takeWhileLast combined.

O(n) space, where n is the number elements taken from the end.

Unimplemented

drop :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #

Discard first n elements from the stream and take the rest.

Since: 0.1.0

dropInterval :: (MonadIO m, IsStream t, TimeUnit64 d) => d -> t m a -> t m a Source #

dropInterval duration drops stream elements until specified duration has passed. The duration begins when the stream is evaluated for the first time. The time duration is checked after generating a stream element, the element is yielded if the duration has expired otherwise it is dropped.

The time elapsed before starting to generate the first element is at most duration, however, because the duration expiry is checked after the element is generated, the lower bound is indeterminate and depends on the time taken in generating an element.

All elements are yielded if the duration is zero.

Pre-release

dropLast :: Int -> t m a -> t m a Source #

Drop n elements at the end of the stream.

O(n) space, where n is the number elements dropped.

Unimplemented

dropLastInterval :: Int -> t m a -> t m a Source #

Drop time interval i seconds at the end of the stream.

O(n) space, where n is the number elements dropped.

Unimplemented

dropWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #

Drop elements in the stream as long as the predicate succeeds and then take the rest of the stream.

Since: 0.1.0

dropWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #

Same as dropWhile but with a monadic predicate.

Since: 0.4.0

dropWhileLast :: (a -> Bool) -> t m a -> t m a Source #

Drop all consecutive elements at the end of the stream for which the predicate is true.

O(n) space, where n is the number elements dropped.

Unimplemented

dropWhileAround :: (a -> Bool) -> t m a -> t m a Source #

Like dropWhile and dropWhileLast combined.

O(n) space, where n is the number elements dropped from the end.

Unimplemented

Inserting Elements

Produce a superset of the stream. This is the opposite of filtering/sampling. We can always use concatMap and scan for inserting but these combinators are more efficient and convenient.

intersperse :: (IsStream t, MonadAsync m) => a -> t m a -> t m a Source #

Insert a pure value between successive elements of a stream.

>>> Stream.toList $ Stream.intersperse ',' $ Stream.fromList "hello"
"h,e,l,l,o"

Since: 0.7.0

intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a Source #

Insert an effect and its output before consuming an element of a stream except the first one.

>>> Stream.toList $ Stream.trace putChar $ Stream.intersperseM (putChar '.' >> return ',') $ Stream.fromList "hello"
h.,e.,l.,l.,o"h,e,l,l,o"

Be careful about the order of effects. In the above example we used trace after the intersperse, if we use it before the intersperse the output would be he.l.l.o."h,e,l,l,o".

>>> Stream.toList $ Stream.intersperseM (putChar '.' >> return ',') $ Stream.trace putChar $ Stream.fromList "hello"
he.l.l.o."h,e,l,l,o"

Since: 0.5.0

intersperseBySpan :: Int -> m a -> t m a -> t m a Source #

Intersperse a monadic action into the input stream after every n elements.

> Stream.toList $ Stream.intersperseBySpan 2 (return ',') $ Stream.fromList "hello"
"he,ll,o"

Unimplemented

intersperseSuffix :: (IsStream t, Monad m) => m a -> t m a -> t m a Source #

Insert an effect and its output after consuming an element of a stream.

>>> Stream.toList $ Stream.trace putChar $ intersperseSuffix (putChar '.' >> return ',') $ Stream.fromList "hello"
h.,e.,l.,l.,o.,"h,e,l,l,o,"

Pre-release

intersperseSuffixBySpan :: (IsStream t, Monad m) => Int -> m a -> t m a -> t m a Source #

Like intersperseSuffix but intersperses an effectful action into the input stream after every n elements and after the last element.

>>> Stream.toList $ Stream.intersperseSuffixBySpan 2 (return ',') $ Stream.fromList "hello"
"he,ll,o,"

Pre-release

interjectSuffix :: (IsStream t, MonadAsync m) => Double -> m a -> t m a -> t m a Source #

Intersperse a monadic action into the input stream after every n seconds.

> import Control.Concurrent (threadDelay)
> Stream.drain $ Stream.interjectSuffix 1 (putChar ',') $ Stream.mapM (x -> threadDelay 1000000 >> putChar x) $ Stream.fromList "hello"
h,e,l,l,o

Pre-release

Inserting Side Effects/Time

intersperseM_ :: (IsStream t, Monad m) => m b -> t m a -> t m a Source #

Insert a side effect before consuming an element of a stream except the first one.

>>> Stream.drain $ Stream.trace putChar $ Stream.intersperseM_ (putChar '.') $ Stream.fromList "hello"
h.e.l.l.o

Pre-release

delay :: (IsStream t, MonadIO m) => Double -> t m a -> t m a Source #

Introduce a delay of specified seconds before consuming an element of the stream except the first one.

>>> Stream.mapM_ print $ Stream.timestamped $ Stream.delay 1 $ Stream.enumerateFromTo 1 3
(AbsTime (TimeSpec {sec = ..., nsec = ...}),1)
(AbsTime (TimeSpec {sec = ..., nsec = ...}),2)
(AbsTime (TimeSpec {sec = ..., nsec = ...}),3)

Since: 0.8.0

intersperseSuffix_ :: (IsStream t, Monad m) => m b -> t m a -> t m a Source #

Insert a side effect after consuming an element of a stream.

>>> Stream.mapM_ putChar $ Stream.intersperseSuffix_ (threadDelay 1000000) $ Stream.fromList "hello"
hello

Pre-release

delayPost :: (IsStream t, MonadIO m) => Double -> t m a -> t m a Source #

Introduce a delay of specified seconds after consuming an element of a stream.

>>> Stream.mapM_ print $ Stream.timestamped $ Stream.delayPost 1 $ Stream.enumerateFromTo 1 3
(AbsTime (TimeSpec {sec = ..., nsec = ...}),1)
(AbsTime (TimeSpec {sec = ..., nsec = ...}),2)
(AbsTime (TimeSpec {sec = ..., nsec = ...}),3)

Pre-release

interspersePrefix_ :: (IsStream t, MonadAsync m) => m b -> t m a -> t m a Source #

Insert a side effect before consuming an element of a stream.

>>> Stream.toList $ Stream.trace putChar $ Stream.interspersePrefix_ (putChar '.' >> return ',') $ Stream.fromList "hello"
.h.e.l.l.o"hello"

Same as trace_ but may be concurrent.

Concurrent

Pre-release

delayPre :: (IsStream t, MonadIO m) => Double -> t m a -> t m a Source #

Introduce a delay of specified seconds before consuming an element of a stream.

>>> Stream.mapM_ print $ Stream.timestamped $ Stream.delayPre 1 $ Stream.enumerateFromTo 1 3
(AbsTime (TimeSpec {sec = ..., nsec = ...}),1)
(AbsTime (TimeSpec {sec = ..., nsec = ...}),2)
(AbsTime (TimeSpec {sec = ..., nsec = ...}),3)

Pre-release

Element Aware Insertion

Opposite of filtering

insertBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> a -> t m a -> t m a Source #

insertBy cmp elem stream inserts elem before the first element in stream that is less than elem when compared using cmp.

insertBy cmp x = mergeBy cmp (fromPure x)
>>> Stream.toList $ Stream.insertBy compare 2 $ Stream.fromList [1,3,5]
[1,2,3,5]

Since: 0.6.0

Reordering

reverse :: (IsStream t, Monad m) => t m a -> t m a Source #

Returns the elements of the stream in reverse order. The stream must be finite. Note that this necessarily buffers the entire stream in memory.

>>> reverse = Stream.foldlT (flip Stream.cons) Stream.nil

Since 0.7.0 (Monad m constraint)

Since: 0.1.1

reverse' :: (IsStream t, MonadIO m, Storable a) => t m a -> t m a Source #

Like reverse but several times faster, requires a Storable instance.

Pre-release

reassembleBy :: Fold m a b -> (a -> a -> Int) -> t m a -> t m b Source #

Buffer until the next element in sequence arrives. The function argument determines the difference in sequence numbers. This could be useful in implementing sequenced streams, for example, TCP reassembly.

Unimplemented

Position Indexing

indexed :: (IsStream t, Monad m) => t m a -> t m (Int, a) Source #

indexed = Stream.postscanl' (\(i, _) x -> (i + 1, x)) (-1,undefined)
indexed = Stream.zipWith (,) (Stream.enumerateFrom 0)

Pair each element in a stream with its index, starting from index 0.

>>> Stream.toList $ Stream.indexed $ Stream.fromList "hello"
[(0,'h'),(1,'e'),(2,'l'),(3,'l'),(4,'o')]

Since: 0.6.0

indexedR :: (IsStream t, Monad m) => Int -> t m a -> t m (Int, a) Source #

indexedR n = Stream.postscanl' (\(i, _) x -> (i - 1, x)) (n + 1,undefined)
indexedR n = Stream.zipWith (,) (Stream.enumerateFromThen n (n - 1))

Pair each element in a stream with its index, starting from the given index n and counting down.

>>> Stream.toList $ Stream.indexedR 10 $ Stream.fromList "hello"
[(10,'h'),(9,'e'),(8,'l'),(7,'l'),(6,'o')]

Since: 0.6.0

Time Indexing

timestamped :: (IsStream t, MonadAsync m, Functor (t m)) => t m a -> t m (AbsTime, a) Source #

timestampWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m (AbsTime, a) Source #

Pair each element in a stream with an absolute timestamp, using a clock of specified granularity. The timestamp is generated just before the element is consumed.

>>> Stream.mapM_ print $ Stream.timestampWith 0.01 $ Stream.delay 1 $ Stream.enumerateFromTo 1 3
(AbsTime (TimeSpec {sec = ..., nsec = ...}),1)
(AbsTime (TimeSpec {sec = ..., nsec = ...}),2)
(AbsTime (TimeSpec {sec = ..., nsec = ...}),3)

Pre-release

timeIndexed :: (IsStream t, MonadAsync m, Functor (t m)) => t m a -> t m (RelTime64, a) Source #

Pair each element in a stream with relative times starting from 0, using a 10 ms granularity clock. The time is measured just before the element is consumed.

>>> Stream.mapM_ print $ Stream.timeIndexed $ Stream.delay 1 $ Stream.enumerateFromTo 1 3
(RelTime64 (NanoSecond64 ...),1)
(RelTime64 (NanoSecond64 ...),2)
(RelTime64 (NanoSecond64 ...),3)

Pre-release

timeIndexWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m (RelTime64, a) Source #

Pair each element in a stream with relative times starting from 0, using a clock with the specified granularity. The time is measured just before the element is consumed.

>>> Stream.mapM_ print $ Stream.timeIndexWith 0.01 $ Stream.delay 1 $ Stream.enumerateFromTo 1 3
(RelTime64 (NanoSecond64 ...),1)
(RelTime64 (NanoSecond64 ...),2)
(RelTime64 (NanoSecond64 ...),3)

Pre-release

Searching

findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int Source #

Find all the indices where the element in the stream satisfies the given predicate.

findIndices = fold Fold.findIndices

Since: 0.5.0

elemIndices :: (IsStream t, Eq a, Monad m) => a -> t m a -> t m Int Source #

Find all the indices where the value of the element in the stream is equal to the given value.

elemIndices a = findIndices (== a)

Since: 0.5.0

Rolling map

Map using the previous element.

rollingMapM :: (IsStream t, Monad m) => (a -> a -> m b) -> t m a -> t m b Source #

Like rollingMap but with an effectful map function.

Pre-release

rollingMap :: (IsStream t, Monad m) => (a -> a -> b) -> t m a -> t m b Source #

Apply a function on every two successive elements of a stream. If the stream consists of a single element the output is an empty stream.

This is the stream equivalent of the list idiom zipWith f xs (tail xs).

Pre-release

Maybe Streams

catMaybes :: (IsStream t, Monad m, Functor (t m)) => t m (Maybe a) -> t m a Source #

In a stream of Maybes, discard Nothings and unwrap Justs.

Pre-release

mapMaybe :: (IsStream t, Monad m) => (a -> Maybe b) -> t m a -> t m b Source #

Map a Maybe returning function to a stream, filter out the Nothing elements, and return a stream of values extracted from Just.

Equivalent to:

mapMaybe f = Stream.map fromJust . Stream.filter isJust . Stream.map f

Since: 0.3.0

mapMaybeM :: (IsStream t, MonadAsync m, Functor (t m)) => (a -> m (Maybe b)) -> t m a -> t m b Source #

Like mapMaybe but maps a monadic function.

Equivalent to:

mapMaybeM f = Stream.map fromJust . Stream.filter isJust . Stream.mapM f

Concurrent (do not use with fromParallel on infinite streams)

Since: 0.3.0

Either Streams

lefts :: (IsStream t, Monad m, Functor (t m)) => t m (Either a b) -> t m a Source #

Discard Rights and unwrap Lefts in an Either stream.

Pre-release

rights :: (IsStream t, Monad m, Functor (t m)) => t m (Either a b) -> t m b Source #

Discard Lefts and unwrap Rights in an Either stream.

Pre-release

both :: Functor (t m) => t m (Either a a) -> t m a Source #

Remove the either wrapper and flatten both lefts and as well as rights in the output stream.

Pre-release

Concurrent Evaluation

Concurrent Pipelines

Run streaming stages concurrently.

mkAsync :: (IsStream t, MonadAsync m) => t m a -> t m a Source #

Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it terminates if the buffer is full and a worker thread is kicked off again to evaluate the remaining stream when there is space in the buffer. The consumer consumes the stream lazily from the buffer.

Since: 0.2.0 (Streamly)

Since: 0.8.0

mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a Source #

Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it blocks if the buffer is full until there is space in the buffer. The consumer consumes the stream lazily from the buffer.

mkParallel = IsStream.fromStreamD . mkParallelD . IsStream.toStreamD

Pre-release

applyAsync :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b Source #

Same as |$.

Internal

(|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b infixr 0 Source #

Parallel transform application operator; applies a stream transformation function t m a -> t m b to a stream t m a concurrently; the input stream is evaluated asynchronously in an independent thread yielding elements to a buffer and the transformation function runs in another thread consuming the input from the buffer. |$ is just like regular function application operator $ except that it is concurrent.

If you read the signature as (t m a -> t m b) -> (t m a -> t m b) you can look at it as a transformation that converts a transform function to a buffered concurrent transform function.

The following code prints a value every second even though each stage adds a 1 second delay.

>>> :{
Stream.drain $
   Stream.mapM (\x -> threadDelay 1000000 >> print x)
     |$ Stream.replicateM 3 (threadDelay 1000000 >> return 1)
:}
1
1
1

Concurrent

Since: 0.3.0 (Streamly)

Since: 0.8.0

(|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b infixl 1 Source #

Same as |$ but with arguments reversed.

(|&) = flip (|$)

Concurrent

Since: 0.3.0 (Streamly)

Since: 0.8.0

Concurrency Control

maxThreads :: IsStream t => Int -> t m a -> t m a Source #

Specify the maximum number of threads that can be spawned concurrently for any concurrent combinator in a stream. A value of 0 resets the thread limit to default, a negative value means there is no limit. The default value is 1500. maxThreads does not affect ParallelT streams as they can use unbounded number of threads.

When the actions in a stream are IO bound, having blocking IO calls, this option can be used to control the maximum number of in-flight IO requests. When the actions are CPU bound this option can be used to control the amount of CPU used by the stream.

Since: 0.4.0 (Streamly)

Since: 0.8.0

Buffering and Sampling

Evaluate strictly using a buffer of results. When the buffer becomes full we can block, drop the new elements, drop the oldest element and insert the new at the end or keep dropping elements uniformly to match the rate of the consumer.

maxBuffer :: IsStream t => Int -> t m a -> t m a Source #

Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.

CAUTION! using an unbounded maxBuffer value (i.e. a negative value) coupled with an unbounded maxThreads value is a recipe for disaster in presence of infinite streams, or very large streams. Especially, it must not be used when pure is used in ZipAsyncM streams as pure in applicative zip streams generates an infinite stream causing unbounded concurrent generation with no limit on the buffer or threads.

Since: 0.4.0 (Streamly)

Since: 0.8.0

sampleOld :: Int -> t m a -> t m a Source #

Evaluate the input stream continuously and keep only the oldest n elements in the buffer, discard the new ones when the buffer is full. When the output stream is evaluated it consumes the values from the buffer in a FIFO manner.

Unimplemented

sampleNew :: Int -> t m a -> t m a Source #

Evaluate the input stream continuously and keep only the latest n elements in a ring buffer, keep discarding the older ones to make space for the new ones. When the output stream is evaluated it consumes the values from the buffer in a FIFO manner.

Unimplemented

sampleRate :: Double -> t m a -> t m a Source #

Like sampleNew but samples at uniform intervals to match the consumer rate. Note that sampleNew leads to non-uniform sampling depending on the consumer pattern.

Unimplemented

Rate Limiting

Evaluate the stream at uniform intervals to maintain a specified evaluation rate.

data Rate Source #

Specifies the stream yield rate in yields per second (Hertz). We keep accumulating yield credits at rateGoal. At any point of time we allow only as many yields as we have accumulated as per rateGoal since the start of time. If the consumer or the producer is slower or faster, the actual rate may fall behind or exceed rateGoal. We try to recover the gap between the two by increasing or decreasing the pull rate from the producer. However, if the gap becomes more than rateBuffer we try to recover only as much as rateBuffer.

rateLow puts a bound on how low the instantaneous rate can go when recovering the rate gap. In other words, it determines the maximum yield latency. Similarly, rateHigh puts a bound on how high the instantaneous rate can go when recovering the rate gap. In other words, it determines the minimum yield latency. We reduce the latency by increasing concurrency, therefore we can say that it puts an upper bound on concurrency.

If the rateGoal is 0 or negative the stream never yields a value. If the rateBuffer is 0 or negative we do not attempt to recover.

Since: 0.5.0 (Streamly)

Since: 0.8.0

Constructors

Rate 

Fields

rate :: IsStream t => Maybe Rate -> t m a -> t m a Source #

Specify the pull rate of a stream. A Nothing value resets the rate to default which is unlimited. When the rate is specified, concurrent production may be ramped up or down automatically to achieve the specified yield rate. The specific behavior for different styles of Rate specifications is documented under Rate. The effective maximum production rate achieved by a stream is governed by:

  • The maxThreads limit
  • The maxBuffer limit
  • The maximum rate that the stream producer can achieve
  • The maximum rate that the stream consumer can achieve

Since: 0.5.0 (Streamly)

Since: 0.8.0

avgRate :: IsStream t => Double -> t m a -> t m a Source #

Same as rate (Just $ Rate (r/2) r (2*r) maxBound)

Specifies the average production rate of a stream in number of yields per second (i.e. Hertz). Concurrent production is ramped up or down automatically to achieve the specified average yield rate. The rate can go down to half of the specified rate on the lower side and double of the specified rate on the higher side.

Since: 0.5.0 (Streamly)

Since: 0.8.0

minRate :: IsStream t => Double -> t m a -> t m a Source #

Same as rate (Just $ Rate r r (2*r) maxBound)

Specifies the minimum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go below the specified rate, even though it may possibly go above it at times, the upper limit is double of the specified rate.

Since: 0.5.0 (Streamly)

Since: 0.8.0

maxRate :: IsStream t => Double -> t m a -> t m a Source #

Same as rate (Just $ Rate (r/2) r r maxBound)

Specifies the maximum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go above the specified rate, even though it may possibly go below it at times, the lower limit is half of the specified rate. This can be useful in applications where certain resource usage must not be allowed to go beyond certain limits.

Since: 0.5.0 (Streamly)

Since: 0.8.0

constRate :: IsStream t => Double -> t m a -> t m a Source #

Same as rate (Just $ Rate r r r 0)

Specifies a constant yield rate. If for some reason the actual rate goes above or below the specified rate we do not try to recover it by increasing or decreasing the rate in future. This can be useful in applications like graphics frame refresh where we need to maintain a constant refresh rate.

Since: 0.5.0 (Streamly)

Since: 0.8.0

Diagnostics

inspectMode :: IsStream t => t m a -> t m a Source #

Print debug information about an SVar when the stream ends

Pre-release

Deprecated

scanx :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b Source #

Deprecated: Please use scanl followed by map instead.

Strict left scan with an extraction function. Like scanl', but applies a user supplied extraction function (the third argument) at each step. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction.

Since 0.2.0

Since: 0.7.0 (Monad m constraint)