Copyright | (c) 2017 Composewell Technologies |
---|---|
License | BSD-3-Clause |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
Bottom level IsStream module that can be used by all other upper level IsStream modules.
Synopsis
- fromPure :: IsStream t => a -> t m a
- fromEffect :: (Monad m, IsStream t) => m a -> t m a
- repeatM :: (IsStream t, MonadAsync m) => m a -> t m a
- timesWith :: (IsStream t, MonadAsync m) => Double -> t m (AbsTime, RelTime64)
- absTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m AbsTime
- relTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m RelTime64
- fold :: Monad m => Fold m a b -> SerialT m a -> m b
- fold_ :: Monad m => Fold m a b -> SerialT m a -> m (b, SerialT m a)
- scanlMAfter' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b
- postscanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b
- smapM :: (IsStream t, Monad m) => (s -> a -> m (s, b)) -> m s -> t m a -> t m b
- take :: (IsStream t, Monad m) => Int -> t m a -> t m a
- takeWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
- drop :: (IsStream t, Monad m) => Int -> t m a -> t m a
- findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int
- intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
- interjectSuffix :: (IsStream t, MonadAsync m) => Double -> m a -> t m a -> t m a
- reverse :: (IsStream t, Monad m) => t m a -> t m a
- reverse' :: (IsStream t, MonadIO m, Storable a) => t m a -> t m a
- concatM :: (IsStream t, Monad m) => m (t m a) -> t m a
- concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b
- concatMap :: (IsStream t, Monad m) => (a -> t m b) -> t m a -> t m b
- splitOnSeq :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b
- yield :: IsStream t => a -> t m a
- yieldM :: (Monad m, IsStream t) => m a -> t m a
Generation
fromPure :: IsStream t => a -> t m a Source #
fromPure a = a `cons` nil
Create a singleton stream from a pure value.
The following holds in monadic streams, but not in Zip streams:
fromPure = pure fromPure = fromEffect . pure
In Zip applicative streams fromPure
is not the same as pure
because in that
case pure
is equivalent to repeat
instead. fromPure
and pure
are
equally efficient, in other cases fromPure
may be slightly more efficient
than the other equivalent definitions.
Since: 0.8.0 (Renamed yield to fromPure)
fromEffect :: (Monad m, IsStream t) => m a -> t m a Source #
fromEffect m = m `consM` nil
Create a singleton stream from a monadic action.
> Stream.toList $ Stream.fromEffect getLine hello ["hello"]
Since: 0.8.0 (Renamed yieldM to fromEffect)
repeatM :: (IsStream t, MonadAsync m) => m a -> t m a Source #
repeatM = fix . consM repeatM = cycle1 . fromEffect
Generate a stream by repeatedly executing a monadic action forever.
drain $ fromSerial $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1) drain $ fromAsync $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1)
Concurrent, infinite (do not use with fromParallel
)
Since: 0.2.0
timesWith :: (IsStream t, MonadAsync m) => Double -> t m (AbsTime, RelTime64) Source #
timesWith g
returns a stream of time value tuples. The first component
of the tuple is an absolute time reference (epoch) denoting the start of the
stream and the second component is a time relative to the reference.
The argument g
specifies the granularity of the relative time in seconds.
A lower granularity clock gives higher precision but is more expensive in
terms of CPU usage. Any granularity lower than 1 ms is treated as 1 ms.
>>>
import Control.Concurrent (threadDelay)
>>>
import Streamly.Internal.Data.Stream.IsStream.Common as Stream (timesWith)
>>>
Stream.mapM_ (\x -> print x >> threadDelay 1000000) $ Stream.take 3 $ Stream.timesWith 0.01
(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...)) (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...)) (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
Note: This API is not safe on 32-bit machines.
Pre-release
absTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m AbsTime Source #
absTimesWith g
returns a stream of absolute timestamps using a clock of
granularity g
specified in seconds. A low granularity clock is more
expensive in terms of CPU usage. Any granularity lower than 1 ms is treated
as 1 ms.
>>>
Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ absTimesWith 0.01
AbsTime (TimeSpec {sec = ..., nsec = ...}) AbsTime (TimeSpec {sec = ..., nsec = ...}) AbsTime (TimeSpec {sec = ..., nsec = ...})
Note: This API is not safe on 32-bit machines.
Pre-release
relTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m RelTime64 Source #
relTimesWith g
returns a stream of relative time values starting from 0,
using a clock of granularity g
specified in seconds. A low granularity
clock is more expensive in terms of CPU usage. Any granularity lower than 1
ms is treated as 1 ms.
>>>
Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimesWith 0.01
RelTime64 (NanoSecond64 ...) RelTime64 (NanoSecond64 ...) RelTime64 (NanoSecond64 ...)
Note: This API is not safe on 32-bit machines.
Pre-release
Elimination
fold :: Monad m => Fold m a b -> SerialT m a -> m b Source #
Fold a stream using the supplied left Fold
and reducing the resulting
expression strictly at each step. The behavior is similar to foldl'
. A
Fold
can terminate early without consuming the full stream. See the
documentation of individual Fold
s for termination behavior.
>>>
Stream.fold Fold.sum (Stream.enumerateFromTo 1 100)
5050
Folds never fail, therefore, they produce a default value even when no input is provided. It means we can always fold an empty stream and get a valid result. For example:
>>>
Stream.fold Fold.sum Stream.nil
0
However, foldMany
on an empty stream results in an empty stream.
Therefore, Stream.fold f
is not the same as Stream.head . Stream.foldMany
f
.
fold f = Stream.parse (Parser.fromFold f)
Since: 0.7.0
Transformation
scanlMAfter' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b Source #
scanlMAfter' accumulate initial done stream
is like scanlM'
except
that it provides an additional done
function to be applied on the
accumulator when the stream stops. The result of done
is also emitted in
the stream.
This function can be used to allocate a resource in the beginning of the scan and release it when the stream ends or to flush the internal state of the scan at the end.
Pre-release
postscanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b Source #
Like postscanl'
but with a monadic step function and a monadic seed.
Since: 0.7.0
Since: 0.8.0 (signature change)
smapM :: (IsStream t, Monad m) => (s -> a -> m (s, b)) -> m s -> t m a -> t m b Source #
A stateful mapM
, equivalent to a left scan, more like mapAccumL.
Hopefully, this is a better alternative to scan
. Separation of state from
the output makes it easier to think in terms of a shared state, and also
makes it easier to keep the state fully strict and the output lazy.
See also: scanlM'
Pre-release
The stateful step function can be simplified to (s -> a -> m b)
to provide
a read-only environment. However, that would just be mapM
.
The initial action could be m (s, Maybe b)
, and we can also add a final
action s -> m (Maybe b)
. This can be used to get pre/post scan like
functionality and also to flush the state in the end like scanlMAfter'.
We can also use it along with a fusible version of bracket to get
scanlMAfter' like functionality. See issue #677.
This can be further generalized to a type similar to Fold/Parser, giving it filtering and parsing capability as well (this is in fact equivalent to parseMany):
smapM :: (s -> a -> m (Step s b)) -> m s -> t m a -> t m b
take :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #
Take first n
elements from the stream and discard the rest.
Since: 0.1.0
takeWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #
End the stream as soon as the predicate fails on an element.
Since: 0.1.0
drop :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #
Discard first n
elements from the stream and take the rest.
Since: 0.1.0
findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int Source #
Find all the indices where the element in the stream satisfies the given predicate.
findIndices = fold Fold.findIndices
Since: 0.5.0
intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a Source #
Insert an effect and its output before consuming an element of a stream except the first one.
>>>
Stream.toList $ Stream.trace putChar $ Stream.intersperseM (putChar '.' >> return ',') $ Stream.fromList "hello"
h.,e.,l.,l.,o"h,e,l,l,o"
Since: 0.5.0
interjectSuffix :: (IsStream t, MonadAsync m) => Double -> m a -> t m a -> t m a Source #
Intersperse a monadic action into the input stream after every n
seconds.
> import Control.Concurrent (threadDelay) > Stream.drain $ Stream.interjectSuffix 1 (putChar ',') $ Stream.mapM (x -> threadDelay 1000000 >> putChar x) $ Stream.fromList "hello" h,e,l,l,o
Pre-release
reverse :: (IsStream t, Monad m) => t m a -> t m a Source #
Returns the elements of the stream in reverse order. The stream must be finite. Note that this necessarily buffers the entire stream in memory.
Since 0.7.0 (Monad m constraint)
Since: 0.1.1
Nesting
concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b Source #
Map a stream producing monadic function on each element of the stream
and then flatten the results into a single stream. Since the stream
generation function is monadic, unlike concatMap
, it can produce an
effect at the beginning of each iteration of the inner loop.
Since: 0.6.0
concatMap :: (IsStream t, Monad m) => (a -> t m b) -> t m a -> t m b Source #
Map a stream producing function on each element of the stream and then flatten the results into a single stream.
concatMap f =concatMapM
(return . f) concatMap =concatMapWith
serial
concatMap f = 'concat . map f' concatMap f =unfoldMany
(UF.lmap f UF.fromStream)
Since: 0.6.0
splitOnSeq :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #
Like splitOn
but the separator is a sequence of elements instead of a
single element.
For illustration, let's define a function that operates on pure lists:
>>>
splitOnSeq' pat xs = Stream.toList $ Stream.splitOnSeq (Array.fromList pat) Fold.toList (Stream.fromList xs)
>>>
splitOnSeq' "" "hello"
["h","e","l","l","o"]
>>>
splitOnSeq' "hello" ""
[""]
>>>
splitOnSeq' "hello" "hello"
["",""]
>>>
splitOnSeq' "x" "hello"
["hello"]
>>>
splitOnSeq' "h" "hello"
["","ello"]
>>>
splitOnSeq' "o" "hello"
["hell",""]
>>>
splitOnSeq' "e" "hello"
["h","llo"]
>>>
splitOnSeq' "l" "hello"
["he","","o"]
>>>
splitOnSeq' "ll" "hello"
["he","o"]
splitOnSeq
is an inverse of intercalate
. The following law always holds:
intercalate . splitOn == id
The following law holds when the separator is non-empty and contains none of the elements present in the input lists:
splitOn . intercalate == id
Pre-release