Copyright | (c) 2017 Composewell Technologies |
---|---|
License | BSD-3-Clause |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | Safe-Inferred |
Language | Haskell2010 |
Bottom level IsStream module that can be used by all other upper level IsStream modules.
Synopsis
- fromPure :: IsStream t => a -> t m a
- fromEffect :: (Monad m, IsStream t) => m a -> t m a
- repeatM :: (IsStream t, MonadAsync m) => m a -> t m a
- timesWith :: (IsStream t, MonadAsync m) => Double -> t m (AbsTime, RelTime64)
- absTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m AbsTime
- relTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m RelTime64
- foldOn :: Monad m => Fold m a b -> SerialT m a -> Fold m a b
- fold :: Monad m => Fold m a b -> SerialT m a -> m b
- fold_ :: Monad m => Fold m a b -> SerialT m a -> m (b, SerialT m a)
- map :: (IsStream t, Monad m) => (a -> b) -> t m a -> t m b
- scanlMAfter' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b
- postscanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b
- smapM :: (IsStream t, Monad m) => (s -> a -> m (s, b)) -> m s -> t m a -> t m b
- take :: (IsStream t, Monad m) => Int -> t m a -> t m a
- takeWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
- takeEndBy :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
- drop :: (IsStream t, Monad m) => Int -> t m a -> t m a
- findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int
- intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
- interjectSuffix :: (IsStream t, MonadAsync m) => Double -> m a -> t m a -> t m a
- reverse :: (IsStream t, Monad m) => t m a -> t m a
- reverse' :: (IsStream t, MonadIO m, Storable a) => t m a -> t m a
- mkAsync :: (IsStream t, MonadAsync m) => t m a -> t m a
- mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a
- parallelFst :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- concatM :: (IsStream t, Monad m) => m (t m a) -> t m a
- concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b
- concatMap :: (IsStream t, Monad m) => (a -> t m b) -> t m a -> t m b
- splitOnSeq :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b
- zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c
- zipWith :: (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c
- yield :: IsStream t => a -> t m a
- yieldM :: (Monad m, IsStream t) => m a -> t m a
Generation
fromPure :: IsStream t => a -> t m a Source #
fromPure a = a `cons` nil
Create a singleton stream from a pure value.
The following holds in monadic streams, but not in Zip streams:
fromPure = pure fromPure = fromEffect . pure
In Zip applicative streams fromPure
is not the same as pure
because in that
case pure
is equivalent to repeat
instead. fromPure
and pure
are
equally efficient, in other cases fromPure
may be slightly more efficient
than the other equivalent definitions.
Since: 0.8.0 (Renamed yield to fromPure)
fromEffect :: (Monad m, IsStream t) => m a -> t m a Source #
fromEffect m = m `consM` nil
Create a singleton stream from a monadic action.
> Stream.toList $ Stream.fromEffect getLine hello ["hello"]
Since: 0.8.0 (Renamed yieldM to fromEffect)
repeatM :: (IsStream t, MonadAsync m) => m a -> t m a Source #
>>>
repeatM = fix . consM
>>>
repeatM = cycle1 . fromEffect
Generate a stream by repeatedly executing a monadic action forever.
>>>
:{
repeatAsync = Stream.repeatM (threadDelay 1000000 >> print 1) & Stream.take 10 & Stream.fromAsync & Stream.drain :}
Concurrent, infinite (do not use with fromParallel
)
Since: 0.2.0
timesWith :: (IsStream t, MonadAsync m) => Double -> t m (AbsTime, RelTime64) Source #
timesWith g
returns a stream of time value tuples. The first component
of the tuple is an absolute time reference (epoch) denoting the start of the
stream and the second component is a time relative to the reference.
The argument g
specifies the granularity of the relative time in seconds.
A lower granularity clock gives higher precision but is more expensive in
terms of CPU usage. Any granularity lower than 1 ms is treated as 1 ms.
>>>
import Control.Concurrent (threadDelay)
>>>
import Streamly.Internal.Data.Stream.IsStream.Common as Stream (timesWith)
>>>
Stream.mapM_ (\x -> print x >> threadDelay 1000000) $ Stream.take 3 $ Stream.timesWith 0.01
(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...)) (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...)) (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
Note: This API is not safe on 32-bit machines.
Pre-release
absTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m AbsTime Source #
absTimesWith g
returns a stream of absolute timestamps using a clock of
granularity g
specified in seconds. A low granularity clock is more
expensive in terms of CPU usage. Any granularity lower than 1 ms is treated
as 1 ms.
>>>
Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ absTimesWith 0.01
AbsTime (TimeSpec {sec = ..., nsec = ...}) AbsTime (TimeSpec {sec = ..., nsec = ...}) AbsTime (TimeSpec {sec = ..., nsec = ...})
Note: This API is not safe on 32-bit machines.
Pre-release
relTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m RelTime64 Source #
relTimesWith g
returns a stream of relative time values starting from 0,
using a clock of granularity g
specified in seconds. A low granularity
clock is more expensive in terms of CPU usage. Any granularity lower than 1
ms is treated as 1 ms.
>>>
Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimesWith 0.01
RelTime64 (NanoSecond64 ...) RelTime64 (NanoSecond64 ...) RelTime64 (NanoSecond64 ...)
Note: This API is not safe on 32-bit machines.
Pre-release
Elimination
foldOn :: Monad m => Fold m a b -> SerialT m a -> Fold m a b Source #
We can create higher order folds using foldOn
. We can fold a number of
streams to a given fold efficiently with full stream fusion. For example, to
fold a list of streams on the same sum fold:
>>>
concatFold = Prelude.foldl Stream.foldOn Fold.sum
>>>
fold f = Fold.finish . Stream.foldOn f
Internal
fold :: Monad m => Fold m a b -> SerialT m a -> m b Source #
Fold a stream using the supplied left Fold
and reducing the resulting
expression strictly at each step. The behavior is similar to foldl'
. A
Fold
can terminate early without consuming the full stream. See the
documentation of individual Fold
s for termination behavior.
>>>
Stream.fold Fold.sum (Stream.enumerateFromTo 1 100)
5050
Folds never fail, therefore, they produce a default value even when no input is provided. It means we can always fold an empty stream and get a valid result. For example:
>>>
Stream.fold Fold.sum Stream.nil
0
However, foldMany
on an empty stream results in an empty stream.
Therefore, Stream.fold f
is not the same as Stream.head . Stream.foldMany
f
.
fold f = Stream.parse (Parser.fromFold f)
Since: 0.7.0
Transformation
scanlMAfter' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b Source #
scanlMAfter' accumulate initial done stream
is like scanlM'
except
that it provides an additional done
function to be applied on the
accumulator when the stream stops. The result of done
is also emitted in
the stream.
This function can be used to allocate a resource in the beginning of the scan and release it when the stream ends or to flush the internal state of the scan at the end.
Pre-release
postscanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b Source #
Like postscanl'
but with a monadic step function and a monadic seed.
>>>
postscanlM' f z xs = Stream.drop 1 $ Stream.scanlM' f z xs
Since: 0.7.0
Since: 0.8.0 (signature change)
smapM :: (IsStream t, Monad m) => (s -> a -> m (s, b)) -> m s -> t m a -> t m b Source #
A stateful mapM
, equivalent to a left scan, more like mapAccumL.
Hopefully, this is a better alternative to scan
. Separation of state from
the output makes it easier to think in terms of a shared state, and also
makes it easier to keep the state fully strict and the output lazy.
See also: scanlM'
Pre-release
The stateful step function can be simplified to (s -> a -> m b)
to provide
a read-only environment. However, that would just be mapM
.
The initial action could be m (s, Maybe b)
, and we can also add a final
action s -> m (Maybe b)
. This can be used to get pre/post scan like
functionality and also to flush the state in the end like scanlMAfter'.
We can also use it along with a fusible version of bracket to get
scanlMAfter' like functionality. See issue #677.
This can be further generalized to a type similar to Fold/Parser, giving it filtering and parsing capability as well (this is in fact equivalent to parseMany):
smapM :: (s -> a -> m (Step s b)) -> m s -> t m a -> t m b
take :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #
Take first n
elements from the stream and discard the rest.
Since: 0.1.0
takeWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #
End the stream as soon as the predicate fails on an element.
Since: 0.1.0
drop :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #
Discard first n
elements from the stream and take the rest.
Since: 0.1.0
findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int Source #
Find all the indices where the element in the stream satisfies the given predicate.
findIndices = fold Fold.findIndices
Since: 0.5.0
intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a Source #
Insert an effect and its output before consuming an element of a stream except the first one.
>>>
Stream.toList $ Stream.trace putChar $ Stream.intersperseM (putChar '.' >> return ',') $ Stream.fromList "hello"
h.,e.,l.,l.,o"h,e,l,l,o"
Be careful about the order of effects. In the above example we used trace after the intersperse, if we use it before the intersperse the output would be he.l.l.o."h,e,l,l,o".
>>>
Stream.toList $ Stream.intersperseM (putChar '.' >> return ',') $ Stream.trace putChar $ Stream.fromList "hello"
he.l.l.o."h,e,l,l,o"
Since: 0.5.0
interjectSuffix :: (IsStream t, MonadAsync m) => Double -> m a -> t m a -> t m a Source #
Intersperse a monadic action into the input stream after every n
seconds.
> import Control.Concurrent (threadDelay) > Stream.drain $ Stream.interjectSuffix 1 (putChar ',') $ Stream.mapM (x -> threadDelay 1000000 >> putChar x) $ Stream.fromList "hello" h,e,l,l,o
Pre-release
reverse :: (IsStream t, Monad m) => t m a -> t m a Source #
Returns the elements of the stream in reverse order. The stream must be finite. Note that this necessarily buffers the entire stream in memory.
>>>
reverse = Stream.foldlT (flip Stream.cons) Stream.nil
Since 0.7.0 (Monad m constraint)
Since: 0.1.1
Concurrent
mkAsync :: (IsStream t, MonadAsync m) => t m a -> t m a Source #
Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it terminates if the buffer is full and a worker thread is kicked off again to evaluate the remaining stream when there is space in the buffer. The consumer consumes the stream lazily from the buffer.
Since: 0.2.0 (Streamly)
Since: 0.8.0
mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a Source #
Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it blocks if the buffer is full until there is space in the buffer. The consumer consumes the stream lazily from the buffer.
mkParallel = IsStream.fromStreamD . mkParallelD . IsStream.toStreamD
Pre-release
parallelFst :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #
Like parallel
but stops the output as soon as the first stream stops.
Pre-release
Nesting
concatM :: (IsStream t, Monad m) => m (t m a) -> t m a Source #
Given a stream value in the underlying monad, lift and join the underlying monad with the stream monad.
>>>
concatM = Stream.concat . Stream.fromEffect
>>>
concatM = Stream.concat . lift -- requires (MonadTrans t)
>>>
concatM = join . lift -- requires (MonadTrans t, Monad (t m))
Internal
concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b Source #
Map a stream producing monadic function on each element of the stream
and then flatten the results into a single stream. Since the stream
generation function is monadic, unlike concatMap
, it can produce an
effect at the beginning of each iteration of the inner loop.
Since: 0.6.0
concatMap :: (IsStream t, Monad m) => (a -> t m b) -> t m a -> t m b Source #
Map a stream producing function on each element of the stream and then flatten the results into a single stream.
>>>
concatMap f = Stream.concatMapM (return . f)
>>>
concatMap f = Stream.concatMapWith Stream.serial f
>>>
concatMap f = Stream.concat . Stream.map f
>>>
concatMap f = Stream.unfoldMany (Unfold.lmap f Unfold.fromStream)
Since: 0.6.0
splitOnSeq :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #
Like splitOn
but the separator is a sequence of elements instead of a
single element.
For illustration, let's define a function that operates on pure lists:
>>>
splitOnSeq' pat xs = Stream.toList $ Stream.splitOnSeq (Array.fromList pat) Fold.toList (Stream.fromList xs)
>>>
splitOnSeq' "" "hello"
["h","e","l","l","o"]
>>>
splitOnSeq' "hello" ""
[""]
>>>
splitOnSeq' "hello" "hello"
["",""]
>>>
splitOnSeq' "x" "hello"
["hello"]
>>>
splitOnSeq' "h" "hello"
["","ello"]
>>>
splitOnSeq' "o" "hello"
["hell",""]
>>>
splitOnSeq' "e" "hello"
["h","llo"]
>>>
splitOnSeq' "l" "hello"
["he","","o"]
>>>
splitOnSeq' "ll" "hello"
["he","o"]
splitOnSeq
is an inverse of intercalate
. The following law always holds:
intercalate . splitOnSeq == id
The following law holds when the separator is non-empty and contains none of the elements present in the input lists:
splitOnSeq . intercalate == id
Pre-release
Zipping
zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c Source #
Like zipWith
but using a monadic zipping function.
Since: 0.4.0
zipWith :: (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c Source #
Stream a
is evaluated first, followed by stream b
, the resulting
elements a
and b
are then zipped using the supplied zip function and the
result c
is yielded to the consumer.
If stream a
or stream b
ends, the zipped stream ends. If stream b
ends
first, the element a
from previous evaluation of stream a
is discarded.
> S.toList $ S.zipWith (+) (S.fromList [1,2,3]) (S.fromList [4,5,6]) [5,7,9]
Since: 0.1.0