Copyright | (c) 2017 Composewell Technologies |
---|---|
License | BSD-3-Clause |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | Safe-Inferred |
Language | Haskell2010 |
Synopsis
- transform :: (IsStream t, Monad m) => Pipe m a b -> t m a -> t m b
- foldrS :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b
- foldrSShared :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b
- foldrT :: (IsStream t, Monad m, Monad (s m), MonadTrans s) => (a -> s m b -> s m b) -> s m b -> t m a -> s m b
- map :: (IsStream t, Monad m) => (a -> b) -> t m a -> t m b
- sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a
- mapM :: forall t m a b. (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b
- smapM :: (IsStream t, Monad m) => (s -> a -> m (s, b)) -> m s -> t m a -> t m b
- trace :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m a
- trace_ :: (IsStream t, Monad m) => m b -> t m a -> t m a
- tap :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m a
- tapOffsetEvery :: (IsStream t, Monad m) => Int -> Int -> Fold m a b -> t m a -> t m a
- tapAsync :: (IsStream t, MonadAsync m) => Fold m a b -> t m a -> t m a
- tapAsyncK :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> t m a
- distributeAsync_ :: (Foldable f, IsStream t, MonadAsync m) => f (t m a -> m b) -> t m a -> t m a
- tapRate :: (IsStream t, MonadAsync m, MonadCatch m) => Double -> (Int -> m b) -> t m a -> t m a
- pollCounts :: (IsStream t, MonadAsync m) => (a -> Bool) -> (t m Int -> t m Int) -> Fold m Int b -> t m a -> t m a
- scan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b
- postscan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b
- scanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b
- scanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b
- scanlMAfter' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b
- postscanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b
- postscanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b
- prescanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b
- prescanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b
- scanl1' :: (IsStream t, Monad m) => (a -> a -> a) -> t m a -> t m a
- scanl1M' :: (IsStream t, Monad m) => (a -> a -> m a) -> t m a -> t m a
- with :: forall (t :: (Type -> Type) -> Type -> Type) m a b s. Functor (t m) => (t m a -> t m (s, a)) -> (((s, a) -> b) -> t m (s, a) -> t m (s, a)) -> ((s, a) -> b) -> t m a -> t m a
- deleteBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> a -> t m a -> t m a
- filter :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
- filterM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a
- uniq :: (Eq a, IsStream t, Monad m) => t m a -> t m a
- uniqBy :: (IsStream t, Monad m, Functor (t m)) => (a -> a -> Bool) -> t m a -> t m a
- nubBy :: (a -> a -> Bool) -> t m a -> t m a
- nubWindowBy :: Int -> (a -> a -> Bool) -> t m a -> t m a
- prune :: (a -> Bool) -> t m a -> t m a
- repeated :: t m a -> t m a
- take :: (IsStream t, Monad m) => Int -> t m a -> t m a
- takeInterval :: (MonadIO m, IsStream t, TimeUnit64 d) => d -> t m a -> t m a
- takeLast :: Int -> t m a -> t m a
- takeLastInterval :: Double -> t m a -> t m a
- takeWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
- takeWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a
- takeWhileLast :: (a -> Bool) -> t m a -> t m a
- takeWhileAround :: (a -> Bool) -> t m a -> t m a
- drop :: (IsStream t, Monad m) => Int -> t m a -> t m a
- dropInterval :: (MonadIO m, IsStream t, TimeUnit64 d) => d -> t m a -> t m a
- dropLast :: Int -> t m a -> t m a
- dropLastInterval :: Int -> t m a -> t m a
- dropWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
- dropWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a
- dropWhileLast :: (a -> Bool) -> t m a -> t m a
- dropWhileAround :: (a -> Bool) -> t m a -> t m a
- intersperse :: (IsStream t, MonadAsync m) => a -> t m a -> t m a
- intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
- intersperseBySpan :: Int -> m a -> t m a -> t m a
- intersperseSuffix :: (IsStream t, Monad m) => m a -> t m a -> t m a
- intersperseSuffixBySpan :: (IsStream t, Monad m) => Int -> m a -> t m a -> t m a
- interjectSuffix :: (IsStream t, MonadAsync m) => Double -> m a -> t m a -> t m a
- intersperseM_ :: (IsStream t, Monad m) => m b -> t m a -> t m a
- delay :: (IsStream t, MonadIO m) => Double -> t m a -> t m a
- intersperseSuffix_ :: (IsStream t, Monad m) => m b -> t m a -> t m a
- delayPost :: (IsStream t, MonadIO m) => Double -> t m a -> t m a
- interspersePrefix_ :: (IsStream t, MonadAsync m) => m b -> t m a -> t m a
- delayPre :: (IsStream t, MonadIO m) => Double -> t m a -> t m a
- insertBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> a -> t m a -> t m a
- reverse :: (IsStream t, Monad m) => t m a -> t m a
- reverse' :: (IsStream t, MonadIO m, Storable a) => t m a -> t m a
- reassembleBy :: Fold m a b -> (a -> a -> Int) -> t m a -> t m b
- indexed :: (IsStream t, Monad m) => t m a -> t m (Int, a)
- indexedR :: (IsStream t, Monad m) => Int -> t m a -> t m (Int, a)
- timestamped :: (IsStream t, MonadAsync m, Functor (t m)) => t m a -> t m (AbsTime, a)
- timestampWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m (AbsTime, a)
- timeIndexed :: (IsStream t, MonadAsync m, Functor (t m)) => t m a -> t m (RelTime64, a)
- timeIndexWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m (RelTime64, a)
- findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int
- elemIndices :: (IsStream t, Eq a, Monad m) => a -> t m a -> t m Int
- rollingMapM :: (IsStream t, Monad m) => (a -> a -> m b) -> t m a -> t m b
- rollingMap :: (IsStream t, Monad m) => (a -> a -> b) -> t m a -> t m b
- catMaybes :: (IsStream t, Monad m, Functor (t m)) => t m (Maybe a) -> t m a
- mapMaybe :: (IsStream t, Monad m) => (a -> Maybe b) -> t m a -> t m b
- mapMaybeM :: (IsStream t, MonadAsync m, Functor (t m)) => (a -> m (Maybe b)) -> t m a -> t m b
- lefts :: (IsStream t, Monad m, Functor (t m)) => t m (Either a b) -> t m a
- rights :: (IsStream t, Monad m, Functor (t m)) => t m (Either a b) -> t m b
- both :: Functor (t m) => t m (Either a a) -> t m a
- mkAsync :: (IsStream t, MonadAsync m) => t m a -> t m a
- mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a
- applyAsync :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b
- (|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b
- (|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b
- maxThreads :: IsStream t => Int -> t m a -> t m a
- maxBuffer :: IsStream t => Int -> t m a -> t m a
- sampleOld :: Int -> t m a -> t m a
- sampleNew :: Int -> t m a -> t m a
- sampleRate :: Double -> t m a -> t m a
- data Rate = Rate {}
- rate :: IsStream t => Maybe Rate -> t m a -> t m a
- avgRate :: IsStream t => Double -> t m a -> t m a
- minRate :: IsStream t => Double -> t m a -> t m a
- maxRate :: IsStream t => Double -> t m a -> t m a
- constRate :: IsStream t => Double -> t m a -> t m a
- inspectMode :: IsStream t => t m a -> t m a
- scanx :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b
Piping
Pass through a Pipe
.
transform :: (IsStream t, Monad m) => Pipe m a b -> t m a -> t m b Source #
Use a Pipe
to transform a stream.
Pre-release
Folding
foldrS :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b Source #
Right fold to a streaming monad.
foldrS Stream.cons Stream.nil === id
foldrS
can be used to perform stateless stream to stream transformations
like map and filter in general. It can be coupled with a scan to perform
stateful transformations. However, note that the custom map and filter
routines can be much more efficient than this due to better stream fusion.
>>>
Stream.toList $ Stream.foldrS Stream.cons Stream.nil $ Stream.fromList [1..5]
[1,2,3,4,5]
Find if any element in the stream is True
:
>>>
Stream.toList $ Stream.foldrS (\x xs -> if odd x then return True else xs) (return False) $ (Stream.fromList (2:4:5:undefined) :: Stream.SerialT IO Int)
[True]
Map (+2) on odd elements and filter out the even elements:
>>>
Stream.toList $ Stream.foldrS (\x xs -> if odd x then (x + 2) `Stream.cons` xs else xs) Stream.nil $ (Stream.fromList [1..5] :: Stream.SerialT IO Int)
[3,5,7]
foldrM
can also be represented in terms of foldrS
, however, the former
is much more efficient:
foldrM f z s = runIdentityT $ foldrS (\x xs -> lift $ f x (runIdentityT xs)) (lift z) s
Pre-release
foldrSShared :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b Source #
foldrT :: (IsStream t, Monad m, Monad (s m), MonadTrans s) => (a -> s m b -> s m b) -> s m b -> t m a -> s m b Source #
Right fold to a transformer monad. This is the most general right fold
function. foldrS
is a special case of foldrT
, however foldrS
implementation can be more efficient:
foldrS = foldrT foldrM f z s = runIdentityT $ foldrT (\x xs -> lift $ f x (runIdentityT xs)) (lift z) s
foldrT
can be used to translate streamly streams to other transformer
monads e.g. to a different streaming type.
Pre-release
Mapping
Stateless one-to-one maps.
sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a Source #
sequence = mapM id
Replace the elements of a stream of monadic actions with the outputs of those actions.
>>> drain $ Stream.sequence $ Stream.fromList [putStr "a", putStr "b", putStrLn "c"] abc >>> :{ drain $ Stream.replicateM 3 (return $ threadDelay 1000000 >> print 1) & (fromSerial . Stream.sequence) :} 1 1 1 >>> :{ drain $ Stream.replicateM 3 (return $ threadDelay 1000000 >> print 1) & (fromAsync . Stream.sequence) :} 1 1 1
Concurrent (do not use with fromParallel
on infinite streams)
Since: 0.1.0
mapM :: forall t m a b. (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b Source #
mapM f = sequence . map f
Apply a monadic function to each element of the stream and replace it with the output of the resulting action.
>>> drain $ Stream.mapM putStr $ Stream.fromList ["a", "b", "c"] abc >>> :{ drain $ Stream.replicateM 10 (return 1) & (fromSerial . Stream.mapM (x -> threadDelay 1000000 >> print x)) :} 1 ... 1 > drain $ Stream.replicateM 10 (return 1) & (fromAsync . Stream.mapM (x -> threadDelay 1000000 >> print x))
Concurrent (do not use with fromParallel
on infinite streams)
Since: 0.1.0
smapM :: (IsStream t, Monad m) => (s -> a -> m (s, b)) -> m s -> t m a -> t m b Source #
A stateful mapM
, equivalent to a left scan, more like mapAccumL.
Hopefully, this is a better alternative to scan
. Separation of state from
the output makes it easier to think in terms of a shared state, and also
makes it easier to keep the state fully strict and the output lazy.
See also: scanlM'
Pre-release
Mapping Side Effects (Observation)
See also the intersperse*_ combinators.
trace :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m a Source #
Apply a monadic function to each element flowing through the stream and discard the results.
>>> Stream.drain $ Stream.trace print (Stream.enumerateFromTo 1 2) 1 2
Compare with tap
.
Since: 0.7.0
trace_ :: (IsStream t, Monad m) => m b -> t m a -> t m a Source #
Perform a side effect before yielding each element of the stream and discard the results.
>>> Stream.drain $ Stream.trace_ (print "got here") (Stream.enumerateFromTo 1 2) "got here" "got here"
Same as interspersePrefix_
but always serial.
See also: trace
Pre-release
tap :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m a Source #
Tap the data flowing through a stream into a Fold
. For example, you may
add a tap to log the contents flowing through the stream. The fold is used
only for effects, its result is discarded.
Fold m a b | -----stream m a ---------------stream m a-----
>>>
Stream.drain $ Stream.tap (Fold.drainBy print) (Stream.enumerateFromTo 1 2)
1 2
Compare with trace
.
Since: 0.7.0
tapOffsetEvery :: (IsStream t, Monad m) => Int -> Int -> Fold m a b -> t m a -> t m a Source #
tapOffsetEvery offset n
taps every n
th element in the stream
starting at offset
. offset
can be between 0
and n - 1
. Offset 0
means start at the first element in the stream. If the offset is outside
this range then offset
is used as offset.mod
n
>>>
Stream.drain $ Stream.tapOffsetEvery 0 2 (Fold.rmapM print Fold.toList) $ Stream.enumerateFromTo 0 10
[0,2,4,6,8,10]
tapAsync :: (IsStream t, MonadAsync m) => Fold m a b -> t m a -> t m a Source #
Redirect a copy of the stream to a supplied fold and run it concurrently
in an independent thread. The fold may buffer some elements. The buffer size
is determined by the prevailing maxBuffer
setting.
Stream m a -> m b | -----stream m a ---------------stream m a-----
>>> Stream.drain $ Stream.tapAsync (Fold.drainBy print) (Stream.enumerateFromTo 1 2) 1 2
Exceptions from the concurrently running fold are propagated to the current computation. Note that, because of buffering in the fold, exceptions may be delayed and may not correspond to the current element being processed in the parent stream, but we guarantee that before the parent stream stops the tap finishes and all exceptions from it are drained.
Compare with tap
.
Pre-release
tapAsyncK :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> t m a Source #
Redirect a copy of the stream to a supplied fold and run it concurrently
in an independent thread. The fold may buffer some elements. The buffer size
is determined by the prevailing maxBuffer
setting.
Stream m a -> m b | -----stream m a ---------------stream m a-----
> S.drain $ S.tapAsync (S.mapM_ print) (S.enumerateFromTo 1 2) 1 2
Exceptions from the concurrently running fold are propagated to the current computation. Note that, because of buffering in the fold, exceptions may be delayed and may not correspond to the current element being processed in the parent stream, but we guarantee that before the parent stream stops the tap finishes and all exceptions from it are drained.
Compare with tap
.
Pre-release
distributeAsync_ :: (Foldable f, IsStream t, MonadAsync m) => f (t m a -> m b) -> t m a -> t m a Source #
Concurrently distribute a stream to a collection of fold functions, discarding the outputs of the folds.
> Stream.drain $ Stream.distributeAsync_ [Stream.mapM_ print, Stream.mapM_ print] (Stream.enumerateFromTo 1 2) 1 2 1 2
distributeAsync_ = flip (foldr tapAsync)
Pre-release
tapRate :: (IsStream t, MonadAsync m, MonadCatch m) => Double -> (Int -> m b) -> t m a -> t m a Source #
Calls the supplied function with the number of elements consumed
every n
seconds. The given function is run in a separate thread
until the end of the stream. In case there is an exception in the
stream the thread is killed during the next major GC.
Note: The action is not guaranteed to run if the main thread exits.
> delay n = threadDelay (round $ n * 1000000) >> return n > Stream.toList $ Stream.tapRate 2 (n -> print $ show n ++ " elements processed") (delay 1 Stream.|: delay 0.5 Stream.|: delay 0.5 Stream.|: Stream.nil) "2 elements processed" [1.0,0.5,0.5] "1 elements processed"
Note: This may not work correctly on 32-bit machines.
Pre-release
pollCounts :: (IsStream t, MonadAsync m) => (a -> Bool) -> (t m Int -> t m Int) -> Fold m Int b -> t m a -> t m a Source #
pollCounts predicate transform fold stream
counts those elements in the
stream that pass the predicate
. The resulting count stream is sent to
another thread which transforms it using transform
and then folds it using
fold
. The thread is automatically cleaned up if the stream stops or
aborts due to exception.
For example, to print the count of elements processed every second:
> Stream.drain $ Stream.pollCounts (const True) (Stream.rollingMap (-) . Stream.delayPost 1) (FLold.drainBy print) $ Stream.enumerateFrom 0
Note: This may not work correctly on 32-bit machines.
Pre-release
Scanning By Fold
scan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #
Scan a stream using the given monadic fold.
>>>
Stream.toList $ Stream.takeWhile (< 10) $ Stream.scan Fold.sum (Stream.fromList [1..10])
[0,1,3,6]
Since: 0.7.0
postscan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #
Postscan a stream using the given monadic fold.
The following example extracts the input stream up to a point where the running average of elements is no more than 10:
>>>
import Data.Maybe (fromJust)
>>>
let avg = Fold.teeWith (/) Fold.sum (fmap fromIntegral Fold.length)
>>>
:{
Stream.toList $ Stream.map (fromJust . fst) $ Stream.takeWhile (\(_,x) -> x <= 10) $ Stream.postscan (Fold.tee Fold.last avg) (Stream.enumerateFromTo 1.0 100.0) :} [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0]
Since: 0.7.0
Scanning
Left scans. Stateful, mostly one-to-one maps.
scanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b Source #
Strict left scan. Like map
, scanl'
too is a one to one transformation,
however it adds an extra element.
>>> Stream.toList $ Stream.scanl' (+) 0 $ fromList [1,2,3,4] [0,1,3,6,10]
>>> Stream.toList $ Stream.scanl' (flip (:)) [] $ Stream.fromList [1,2,3,4] [[],[1],[2,1],[3,2,1],[4,3,2,1]]
The output of scanl'
is the initial value of the accumulator followed by
all the intermediate steps and the final result of foldl'
.
By streaming the accumulated state after each fold step, we can share the state across multiple stages of stream composition. Each stage can modify or extend the state, do some processing with it and emit it for the next stage, thus modularizing the stream processing. This can be useful in stateful or event-driven programming.
Consider the following monolithic example, computing the sum and the product
of the elements in a stream in one go using a foldl'
:
>>> Stream.foldl' ((s, p) x -> (s + x, p * x)) (0,1) $ Stream.fromList 1,2,3,4
Using scanl'
we can make it modular by computing the sum in the first
stage and passing it down to the next stage for computing the product:
>>> :{ Stream.foldl' ((_, p) (s, x) -> (s, p * x)) (0,1) $ Stream.scanl' ((s, _) x -> (s + x, x)) (0,1) $ Stream.fromList [1,2,3,4] :} (10,24)
IMPORTANT: scanl'
evaluates the accumulator to WHNF. To avoid building
lazy expressions inside the accumulator, it is recommended that a strict
data structure is used for accumulator.
>>>
scanl' f z xs = scanlM' (\a b -> return (f a b)) (return z) xs
>>>
scanl' f z xs = z `Stream.cons` postscanl' f z xs
See also: usingStateT
Since: 0.2.0
scanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b Source #
Like scanl'
but with a monadic step function and a monadic seed.
Since: 0.4.0
Since: 0.8.0 (signature change)
scanlMAfter' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b Source #
scanlMAfter' accumulate initial done stream
is like scanlM'
except
that it provides an additional done
function to be applied on the
accumulator when the stream stops. The result of done
is also emitted in
the stream.
This function can be used to allocate a resource in the beginning of the scan and release it when the stream ends or to flush the internal state of the scan at the end.
Pre-release
postscanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b Source #
Like scanl'
but does not stream the initial value of the accumulator.
>>>
postscanl' f z = postscanlM' (\a b -> return (f a b)) (return z)
>>>
postscanl' f z xs = Stream.drop 1 $ Stream.scanl' f z xs
Since: 0.7.0
postscanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b Source #
Like postscanl'
but with a monadic step function and a monadic seed.
>>>
postscanlM' f z xs = Stream.drop 1 $ Stream.scanlM' f z xs
Since: 0.7.0
Since: 0.8.0 (signature change)
prescanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b Source #
Like scanl' but does not stream the final value of the accumulator.
Pre-release
prescanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b Source #
Like prescanl' but with a monadic step function and a monadic seed.
Pre-release
scanl1' :: (IsStream t, Monad m) => (a -> a -> a) -> t m a -> t m a Source #
Like scanl'
but for a non-empty stream. The first element of the stream
is used as the initial value of the accumulator. Does nothing if the stream
is empty.
>>> Stream.toList $ Stream.scanl1' (+) $ fromList [1,2,3,4] [1,3,6,10]
Since: 0.6.0
scanl1M' :: (IsStream t, Monad m) => (a -> a -> m a) -> t m a -> t m a Source #
Like scanl1'
but with a monadic step function.
Since: 0.6.0
Filtering
Produce a subset of the stream using criteria based on the values of the elements. We can use a concatMap and scan for filtering but these combinators are more efficient and convenient.
with :: forall (t :: (Type -> Type) -> Type -> Type) m a b s. Functor (t m) => (t m a -> t m (s, a)) -> (((s, a) -> b) -> t m (s, a) -> t m (s, a)) -> ((s, a) -> b) -> t m a -> t m a Source #
Modify a t m a -> t m a
stream transformation that accepts a predicate
(a -> b)
to accept ((s, a) -> b)
instead, provided a transformation t m
a -> t m (s, a)
. Convenient to filter with index or time.
filterWithIndex = with indexed filter filterWithAbsTime = with timestamped filter filterWithRelTime = with timeIndexed filter
Pre-release
deleteBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> a -> t m a -> t m a Source #
Deletes the first occurrence of the element in the stream that satisfies the given equality predicate.
>>> Stream.toList $ Stream.deleteBy (==) 3 $ Stream.fromList [1,3,3,5] [1,3,5]
Since: 0.6.0
filter :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #
Include only those elements that pass a predicate.
Since: 0.1.0
filterM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #
Same as filter
but with a monadic predicate.
Since: 0.4.0
uniq :: (Eq a, IsStream t, Monad m) => t m a -> t m a Source #
Drop repeated elements that are adjacent to each other.
Since: 0.6.0
nubBy :: (a -> a -> Bool) -> t m a -> t m a Source #
Drop repeated elements anywhere in the stream.
Caution: not scalable for infinite streams
See also: nubWindowBy
Unimplemented
nubWindowBy :: Int -> (a -> a -> Bool) -> t m a -> t m a Source #
Drop repeated elements within the specified tumbling window in the stream.
nubBy = nubWindowBy maxBound
Unimplemented
prune :: (a -> Bool) -> t m a -> t m a Source #
Strip all leading and trailing occurrences of an element passing a predicate and make all other consecutive occurrences uniq.
prune p = dropWhileAround p $ uniqBy (x y -> p x && p y)
> Stream.prune isSpace (Stream.fromList " hello world! ") "hello world!"
Space: O(1)
Unimplemented
Trimming
Produce a subset of the stream trimmed at ends.
take :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #
Take first n
elements from the stream and discard the rest.
Since: 0.1.0
takeInterval :: (MonadIO m, IsStream t, TimeUnit64 d) => d -> t m a -> t m a Source #
takeInterval duration
yields stream elements upto specified time
duration
. The duration starts when the stream is evaluated for the first
time, before the first element is yielded. The time duration is checked
before generating each element, if the duration has expired the stream
stops.
The total time taken in executing the stream is guaranteed to be at least
duration
, however, because the duration is checked before generating an
element, the upper bound is indeterminate and depends on the time taken in
generating and processing the last element.
No element is yielded if the duration is zero. At least one element is yielded if the duration is non-zero.
Pre-release
takeLast :: Int -> t m a -> t m a Source #
Take n
elements at the end of the stream.
O(n) space, where n is the number elements taken.
Unimplemented
takeLastInterval :: Double -> t m a -> t m a Source #
Take time interval i
seconds at the end of the stream.
O(n) space, where n is the number elements taken.
Unimplemented
takeWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #
End the stream as soon as the predicate fails on an element.
Since: 0.1.0
takeWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #
Same as takeWhile
but with a monadic predicate.
Since: 0.4.0
takeWhileLast :: (a -> Bool) -> t m a -> t m a Source #
Take all consecutive elements at the end of the stream for which the predicate is true.
O(n) space, where n is the number elements taken.
Unimplemented
takeWhileAround :: (a -> Bool) -> t m a -> t m a Source #
Like takeWhile
and takeWhileLast
combined.
O(n) space, where n is the number elements taken from the end.
Unimplemented
drop :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #
Discard first n
elements from the stream and take the rest.
Since: 0.1.0
dropInterval :: (MonadIO m, IsStream t, TimeUnit64 d) => d -> t m a -> t m a Source #
dropInterval duration
drops stream elements until specified duration
has
passed. The duration begins when the stream is evaluated for the first
time. The time duration is checked after generating a stream element, the
element is yielded if the duration has expired otherwise it is dropped.
The time elapsed before starting to generate the first element is at most
duration
, however, because the duration expiry is checked after the
element is generated, the lower bound is indeterminate and depends on the
time taken in generating an element.
All elements are yielded if the duration is zero.
Pre-release
dropLast :: Int -> t m a -> t m a Source #
Drop n
elements at the end of the stream.
O(n) space, where n is the number elements dropped.
Unimplemented
dropLastInterval :: Int -> t m a -> t m a Source #
Drop time interval i
seconds at the end of the stream.
O(n) space, where n is the number elements dropped.
Unimplemented
dropWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #
Drop elements in the stream as long as the predicate succeeds and then take the rest of the stream.
Since: 0.1.0
dropWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #
Same as dropWhile
but with a monadic predicate.
Since: 0.4.0
dropWhileLast :: (a -> Bool) -> t m a -> t m a Source #
Drop all consecutive elements at the end of the stream for which the predicate is true.
O(n) space, where n is the number elements dropped.
Unimplemented
dropWhileAround :: (a -> Bool) -> t m a -> t m a Source #
Like dropWhile
and dropWhileLast
combined.
O(n) space, where n is the number elements dropped from the end.
Unimplemented
Inserting Elements
Produce a superset of the stream. This is the opposite of filtering/sampling. We can always use concatMap and scan for inserting but these combinators are more efficient and convenient.
intersperse :: (IsStream t, MonadAsync m) => a -> t m a -> t m a Source #
Insert a pure value between successive elements of a stream.
>>>
Stream.toList $ Stream.intersperse ',' $ Stream.fromList "hello"
"h,e,l,l,o"
Since: 0.7.0
intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a Source #
Insert an effect and its output before consuming an element of a stream except the first one.
>>>
Stream.toList $ Stream.trace putChar $ Stream.intersperseM (putChar '.' >> return ',') $ Stream.fromList "hello"
h.,e.,l.,l.,o"h,e,l,l,o"
Be careful about the order of effects. In the above example we used trace after the intersperse, if we use it before the intersperse the output would be he.l.l.o."h,e,l,l,o".
>>>
Stream.toList $ Stream.intersperseM (putChar '.' >> return ',') $ Stream.trace putChar $ Stream.fromList "hello"
he.l.l.o."h,e,l,l,o"
Since: 0.5.0
intersperseBySpan :: Int -> m a -> t m a -> t m a Source #
Intersperse a monadic action into the input stream after every n
elements.
> Stream.toList $ Stream.intersperseBySpan 2 (return ',') $ Stream.fromList "hello" "he,ll,o"
Unimplemented
intersperseSuffix :: (IsStream t, Monad m) => m a -> t m a -> t m a Source #
Insert an effect and its output after consuming an element of a stream.
>>>
Stream.toList $ Stream.trace putChar $ intersperseSuffix (putChar '.' >> return ',') $ Stream.fromList "hello"
h.,e.,l.,l.,o.,"h,e,l,l,o,"
Pre-release
intersperseSuffixBySpan :: (IsStream t, Monad m) => Int -> m a -> t m a -> t m a Source #
Like intersperseSuffix
but intersperses an effectful action into the
input stream after every n
elements and after the last element.
>>>
Stream.toList $ Stream.intersperseSuffixBySpan 2 (return ',') $ Stream.fromList "hello"
"he,ll,o,"
Pre-release
interjectSuffix :: (IsStream t, MonadAsync m) => Double -> m a -> t m a -> t m a Source #
Intersperse a monadic action into the input stream after every n
seconds.
> import Control.Concurrent (threadDelay) > Stream.drain $ Stream.interjectSuffix 1 (putChar ',') $ Stream.mapM (x -> threadDelay 1000000 >> putChar x) $ Stream.fromList "hello" h,e,l,l,o
Pre-release
Inserting Side Effects/Time
intersperseM_ :: (IsStream t, Monad m) => m b -> t m a -> t m a Source #
Insert a side effect before consuming an element of a stream except the first one.
>>>
Stream.drain $ Stream.trace putChar $ Stream.intersperseM_ (putChar '.') $ Stream.fromList "hello"
h.e.l.l.o
Pre-release
delay :: (IsStream t, MonadIO m) => Double -> t m a -> t m a Source #
Introduce a delay of specified seconds before consuming an element of the stream except the first one.
>>>
Stream.mapM_ print $ Stream.timestamped $ Stream.delay 1 $ Stream.enumerateFromTo 1 3
(AbsTime (TimeSpec {sec = ..., nsec = ...}),1) (AbsTime (TimeSpec {sec = ..., nsec = ...}),2) (AbsTime (TimeSpec {sec = ..., nsec = ...}),3)
Since: 0.8.0
intersperseSuffix_ :: (IsStream t, Monad m) => m b -> t m a -> t m a Source #
Insert a side effect after consuming an element of a stream.
>>> Stream.mapM_ putChar $ Stream.intersperseSuffix_ (threadDelay 1000000) $ Stream.fromList "hello" hello
Pre-release
delayPost :: (IsStream t, MonadIO m) => Double -> t m a -> t m a Source #
Introduce a delay of specified seconds after consuming an element of a stream.
>>>
Stream.mapM_ print $ Stream.timestamped $ Stream.delayPost 1 $ Stream.enumerateFromTo 1 3
(AbsTime (TimeSpec {sec = ..., nsec = ...}),1) (AbsTime (TimeSpec {sec = ..., nsec = ...}),2) (AbsTime (TimeSpec {sec = ..., nsec = ...}),3)
Pre-release
interspersePrefix_ :: (IsStream t, MonadAsync m) => m b -> t m a -> t m a Source #
Insert a side effect before consuming an element of a stream.
>>>
Stream.toList $ Stream.trace putChar $ Stream.interspersePrefix_ (putChar '.' >> return ',') $ Stream.fromList "hello"
.h.e.l.l.o"hello"
Same as trace_
but may be concurrent.
Concurrent
Pre-release
delayPre :: (IsStream t, MonadIO m) => Double -> t m a -> t m a Source #
Introduce a delay of specified seconds before consuming an element of a stream.
>>>
Stream.mapM_ print $ Stream.timestamped $ Stream.delayPre 1 $ Stream.enumerateFromTo 1 3
(AbsTime (TimeSpec {sec = ..., nsec = ...}),1) (AbsTime (TimeSpec {sec = ..., nsec = ...}),2) (AbsTime (TimeSpec {sec = ..., nsec = ...}),3)
Pre-release
Element Aware Insertion
Opposite of filtering
insertBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> a -> t m a -> t m a Source #
insertBy cmp elem stream
inserts elem
before the first element in
stream
that is less than elem
when compared using cmp
.
insertBy cmp x =mergeBy
cmp (fromPure
x)
>>> Stream.toList $ Stream.insertBy compare 2 $ Stream.fromList [1,3,5] [1,2,3,5]
Since: 0.6.0
Reordering
reverse :: (IsStream t, Monad m) => t m a -> t m a Source #
Returns the elements of the stream in reverse order. The stream must be finite. Note that this necessarily buffers the entire stream in memory.
>>>
reverse = Stream.foldlT (flip Stream.cons) Stream.nil
Since 0.7.0 (Monad m constraint)
Since: 0.1.1
reassembleBy :: Fold m a b -> (a -> a -> Int) -> t m a -> t m b Source #
Buffer until the next element in sequence arrives. The function argument determines the difference in sequence numbers. This could be useful in implementing sequenced streams, for example, TCP reassembly.
Unimplemented
Position Indexing
indexed :: (IsStream t, Monad m) => t m a -> t m (Int, a) Source #
indexed = Stream.postscanl' (\(i, _) x -> (i + 1, x)) (-1,undefined) indexed = Stream.zipWith (,) (Stream.enumerateFrom 0)
Pair each element in a stream with its index, starting from index 0.
>>>
Stream.toList $ Stream.indexed $ Stream.fromList "hello"
[(0,'h'),(1,'e'),(2,'l'),(3,'l'),(4,'o')]
Since: 0.6.0
indexedR :: (IsStream t, Monad m) => Int -> t m a -> t m (Int, a) Source #
indexedR n = Stream.postscanl' (\(i, _) x -> (i - 1, x)) (n + 1,undefined) indexedR n = Stream.zipWith (,) (Stream.enumerateFromThen n (n - 1))
Pair each element in a stream with its index, starting from the
given index n
and counting down.
>>>
Stream.toList $ Stream.indexedR 10 $ Stream.fromList "hello"
[(10,'h'),(9,'e'),(8,'l'),(7,'l'),(6,'o')]
Since: 0.6.0
Time Indexing
timestamped :: (IsStream t, MonadAsync m, Functor (t m)) => t m a -> t m (AbsTime, a) Source #
timestampWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m (AbsTime, a) Source #
Pair each element in a stream with an absolute timestamp, using a clock of specified granularity. The timestamp is generated just before the element is consumed.
>>>
Stream.mapM_ print $ Stream.timestampWith 0.01 $ Stream.delay 1 $ Stream.enumerateFromTo 1 3
(AbsTime (TimeSpec {sec = ..., nsec = ...}),1) (AbsTime (TimeSpec {sec = ..., nsec = ...}),2) (AbsTime (TimeSpec {sec = ..., nsec = ...}),3)
Pre-release
timeIndexed :: (IsStream t, MonadAsync m, Functor (t m)) => t m a -> t m (RelTime64, a) Source #
Pair each element in a stream with relative times starting from 0, using a 10 ms granularity clock. The time is measured just before the element is consumed.
>>>
Stream.mapM_ print $ Stream.timeIndexed $ Stream.delay 1 $ Stream.enumerateFromTo 1 3
(RelTime64 (NanoSecond64 ...),1) (RelTime64 (NanoSecond64 ...),2) (RelTime64 (NanoSecond64 ...),3)
Pre-release
timeIndexWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m (RelTime64, a) Source #
Pair each element in a stream with relative times starting from 0, using a clock with the specified granularity. The time is measured just before the element is consumed.
>>>
Stream.mapM_ print $ Stream.timeIndexWith 0.01 $ Stream.delay 1 $ Stream.enumerateFromTo 1 3
(RelTime64 (NanoSecond64 ...),1) (RelTime64 (NanoSecond64 ...),2) (RelTime64 (NanoSecond64 ...),3)
Pre-release
Searching
findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int Source #
Find all the indices where the element in the stream satisfies the given predicate.
findIndices = fold Fold.findIndices
Since: 0.5.0
elemIndices :: (IsStream t, Eq a, Monad m) => a -> t m a -> t m Int Source #
Find all the indices where the value of the element in the stream is equal to the given value.
elemIndices a = findIndices (== a)
Since: 0.5.0
Rolling map
Map using the previous element.
rollingMapM :: (IsStream t, Monad m) => (a -> a -> m b) -> t m a -> t m b Source #
Like rollingMap
but with an effectful map function.
Pre-release
rollingMap :: (IsStream t, Monad m) => (a -> a -> b) -> t m a -> t m b Source #
Apply a function on every two successive elements of a stream. If the stream consists of a single element the output is an empty stream.
This is the stream equivalent of the list idiom zipWith f xs (tail xs)
.
Pre-release
Maybe Streams
mapMaybeM :: (IsStream t, MonadAsync m, Functor (t m)) => (a -> m (Maybe b)) -> t m a -> t m b Source #
Either Streams
both :: Functor (t m) => t m (Either a a) -> t m a Source #
Remove the either wrapper and flatten both lefts and as well as rights in the output stream.
Pre-release
Concurrent Evaluation
Concurrent Pipelines
Run streaming stages concurrently.
mkAsync :: (IsStream t, MonadAsync m) => t m a -> t m a Source #
Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it terminates if the buffer is full and a worker thread is kicked off again to evaluate the remaining stream when there is space in the buffer. The consumer consumes the stream lazily from the buffer.
Since: 0.2.0 (Streamly)
Since: 0.8.0
mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a Source #
Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it blocks if the buffer is full until there is space in the buffer. The consumer consumes the stream lazily from the buffer.
mkParallel = IsStream.fromStreamD . mkParallelD . IsStream.toStreamD
Pre-release
applyAsync :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b Source #
Same as |$
.
Internal
(|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b infixr 0 Source #
Parallel transform application operator; applies a stream transformation
function t m a -> t m b
to a stream t m a
concurrently; the input stream
is evaluated asynchronously in an independent thread yielding elements to a
buffer and the transformation function runs in another thread consuming the
input from the buffer. |$
is just like regular function application
operator $
except that it is concurrent.
If you read the signature as (t m a -> t m b) -> (t m a -> t m b)
you can
look at it as a transformation that converts a transform function to a
buffered concurrent transform function.
The following code prints a value every second even though each stage adds a 1 second delay.
>>>
:{
Stream.drain $ Stream.mapM (\x -> threadDelay 1000000 >> print x) |$ Stream.replicateM 3 (threadDelay 1000000 >> return 1) :} 1 1 1
Concurrent
Since: 0.3.0 (Streamly)
Since: 0.8.0
(|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b infixl 1 Source #
Concurrency Control
maxThreads :: IsStream t => Int -> t m a -> t m a Source #
Specify the maximum number of threads that can be spawned concurrently for
any concurrent combinator in a stream.
A value of 0 resets the thread limit to default, a negative value means
there is no limit. The default value is 1500. maxThreads
does not affect
ParallelT
streams as they can use unbounded number of threads.
When the actions in a stream are IO bound, having blocking IO calls, this option can be used to control the maximum number of in-flight IO requests. When the actions are CPU bound this option can be used to control the amount of CPU used by the stream.
Since: 0.4.0 (Streamly)
Since: 0.8.0
Buffering and Sampling
Evaluate strictly using a buffer of results. When the buffer becomes full we can block, drop the new elements, drop the oldest element and insert the new at the end or keep dropping elements uniformly to match the rate of the consumer.
maxBuffer :: IsStream t => Int -> t m a -> t m a Source #
Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.
CAUTION! using an unbounded maxBuffer
value (i.e. a negative value)
coupled with an unbounded maxThreads
value is a recipe for disaster in
presence of infinite streams, or very large streams. Especially, it must
not be used when pure
is used in ZipAsyncM
streams as pure
in
applicative zip streams generates an infinite stream causing unbounded
concurrent generation with no limit on the buffer or threads.
Since: 0.4.0 (Streamly)
Since: 0.8.0
sampleOld :: Int -> t m a -> t m a Source #
Evaluate the input stream continuously and keep only the oldest n
elements in the buffer, discard the new ones when the buffer is full. When
the output stream is evaluated it consumes the values from the buffer in a
FIFO manner.
Unimplemented
sampleNew :: Int -> t m a -> t m a Source #
Evaluate the input stream continuously and keep only the latest n
elements in a ring buffer, keep discarding the older ones to make space for
the new ones. When the output stream is evaluated it consumes the values
from the buffer in a FIFO manner.
Unimplemented
sampleRate :: Double -> t m a -> t m a Source #
Rate Limiting
Evaluate the stream at uniform intervals to maintain a specified evaluation rate.
Specifies the stream yield rate in yields per second (Hertz
).
We keep accumulating yield credits at rateGoal
. At any point of time we
allow only as many yields as we have accumulated as per rateGoal
since the
start of time. If the consumer or the producer is slower or faster, the
actual rate may fall behind or exceed rateGoal
. We try to recover the gap
between the two by increasing or decreasing the pull rate from the producer.
However, if the gap becomes more than rateBuffer
we try to recover only as
much as rateBuffer
.
rateLow
puts a bound on how low the instantaneous rate can go when
recovering the rate gap. In other words, it determines the maximum yield
latency. Similarly, rateHigh
puts a bound on how high the instantaneous
rate can go when recovering the rate gap. In other words, it determines the
minimum yield latency. We reduce the latency by increasing concurrency,
therefore we can say that it puts an upper bound on concurrency.
If the rateGoal
is 0 or negative the stream never yields a value.
If the rateBuffer
is 0 or negative we do not attempt to recover.
Since: 0.5.0 (Streamly)
Since: 0.8.0
rate :: IsStream t => Maybe Rate -> t m a -> t m a Source #
Specify the pull rate of a stream.
A Nothing
value resets the rate to default which is unlimited. When the
rate is specified, concurrent production may be ramped up or down
automatically to achieve the specified yield rate. The specific behavior for
different styles of Rate
specifications is documented under Rate
. The
effective maximum production rate achieved by a stream is governed by:
- The
maxThreads
limit - The
maxBuffer
limit - The maximum rate that the stream producer can achieve
- The maximum rate that the stream consumer can achieve
Since: 0.5.0 (Streamly)
Since: 0.8.0
avgRate :: IsStream t => Double -> t m a -> t m a Source #
Same as rate (Just $ Rate (r/2) r (2*r) maxBound)
Specifies the average production rate of a stream in number of yields
per second (i.e. Hertz
). Concurrent production is ramped up or down
automatically to achieve the specified average yield rate. The rate can
go down to half of the specified rate on the lower side and double of
the specified rate on the higher side.
Since: 0.5.0 (Streamly)
Since: 0.8.0
minRate :: IsStream t => Double -> t m a -> t m a Source #
Same as rate (Just $ Rate r r (2*r) maxBound)
Specifies the minimum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go below the specified rate, even though it may possibly go above it at times, the upper limit is double of the specified rate.
Since: 0.5.0 (Streamly)
Since: 0.8.0
maxRate :: IsStream t => Double -> t m a -> t m a Source #
Same as rate (Just $ Rate (r/2) r r maxBound)
Specifies the maximum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go above the specified rate, even though it may possibly go below it at times, the lower limit is half of the specified rate. This can be useful in applications where certain resource usage must not be allowed to go beyond certain limits.
Since: 0.5.0 (Streamly)
Since: 0.8.0
constRate :: IsStream t => Double -> t m a -> t m a Source #
Same as rate (Just $ Rate r r r 0)
Specifies a constant yield rate. If for some reason the actual rate goes above or below the specified rate we do not try to recover it by increasing or decreasing the rate in future. This can be useful in applications like graphics frame refresh where we need to maintain a constant refresh rate.
Since: 0.5.0 (Streamly)
Since: 0.8.0
Diagnostics
inspectMode :: IsStream t => t m a -> t m a Source #
Print debug information about an SVar when the stream ends
Pre-release
Deprecated
scanx :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b Source #
Deprecated: Please use scanl followed by map instead.
Strict left scan with an extraction function. Like scanl'
, but applies a
user supplied extraction function (the third argument) at each step. This is
designed to work with the foldl
library. The suffix x
is a mnemonic for
extraction.
Since 0.2.0
Since: 0.7.0 (Monad m constraint)