| Copyright | (c) 2017 Composewell Technologies |
|---|---|
| License | BSD-3-Clause |
| Maintainer | streamly@composewell.com |
| Stability | experimental |
| Portability | GHC |
| Safe Haskell | Safe-Inferred |
| Language | Haskell2010 |
Streamly.Internal.Data.Stream.IsStream.Generate
Description
Most of the combinators in this module can be implemented as unfolds. Some of them however can only be expressed in terms StreamK e.g. cons/consM, fromFoldable, mfix. We can possibly remove those from this module which can be expressed as unfolds. Unless we want to use rewrite rules to rewrite them as StreamK when StreamK is used, avoiding conversion to StreamD. Will that help? Are there any other reasons to keep these and not use unfolds?
Synopsis
- nil :: IsStream t => t m a
- nilM :: (IsStream t, Monad m) => m b -> t m a
- cons :: IsStream t => a -> t m a -> t m a
- (.:) :: IsStream t => a -> t m a -> t m a
- consM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
- (|:) :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
- unfold :: (IsStream t, Monad m) => Unfold m a b -> a -> t m b
- unfold0 :: (IsStream t, Monad m) => Unfold m Void b -> t m b
- unfoldr :: (Monad m, IsStream t) => (b -> Maybe (a, b)) -> b -> t m a
- unfoldrM :: forall t m b a. (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a
- fromPure :: IsStream t => a -> t m a
- fromEffect :: (Monad m, IsStream t) => m a -> t m a
- repeat :: (IsStream t, Monad m) => a -> t m a
- repeatM :: (IsStream t, MonadAsync m) => m a -> t m a
- replicate :: (IsStream t, Monad m) => Int -> a -> t m a
- replicateM :: forall t m a. (IsStream t, MonadAsync m) => Int -> m a -> t m a
- class Enum a => Enumerable a where
- enumerateFrom :: (IsStream t, Monad m) => a -> t m a
- enumerateFromTo :: (IsStream t, Monad m) => a -> a -> t m a
- enumerateFromThen :: (IsStream t, Monad m) => a -> a -> t m a
- enumerateFromThenTo :: (IsStream t, Monad m) => a -> a -> a -> t m a
- enumerate :: (IsStream t, Monad m, Bounded a, Enumerable a) => t m a
- enumerateTo :: (IsStream t, Monad m, Bounded a, Enumerable a) => a -> t m a
- times :: (IsStream t, MonadAsync m) => t m (AbsTime, RelTime64)
- absTimes :: (IsStream t, MonadAsync m, Functor (t m)) => t m AbsTime
- absTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m AbsTime
- relTimes :: (IsStream t, MonadAsync m, Functor (t m)) => t m RelTime64
- relTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m RelTime64
- durations :: Double -> t m RelTime64
- ticks :: Rate -> t m ()
- timeout :: AbsTime -> t m ()
- fromIndices :: (IsStream t, Monad m) => (Int -> a) -> t m a
- fromIndicesM :: forall t m a. (IsStream t, MonadAsync m) => (Int -> m a) -> t m a
- iterate :: (IsStream t, Monad m) => (a -> a) -> a -> t m a
- iterateM :: forall t m a. (IsStream t, MonadAsync m) => (a -> m a) -> m a -> t m a
- mfix :: (IsStream t, Monad m) => (m a -> t m a) -> t m a
- fromList :: (Monad m, IsStream t) => [a] -> t m a
- fromListM :: (MonadAsync m, IsStream t) => [m a] -> t m a
- fromFoldable :: (IsStream t, Foldable f) => f a -> t m a
- fromFoldableM :: (IsStream t, MonadAsync m, Foldable f) => f (m a) -> t m a
- fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> SerialT m a
- fromPrimIORef :: (IsStream t, MonadIO m, Prim a) => IORef a -> t m a
- once :: (Monad m, IsStream t) => m a -> t m a
- yield :: IsStream t => a -> t m a
- yieldM :: (Monad m, IsStream t) => m a -> t m a
- each :: (IsStream t, Foldable f) => f a -> t m a
- fromHandle :: (IsStream t, MonadIO m) => Handle -> t m String
- currentTime :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m AbsTime
Primitives
cons :: IsStream t => a -> t m a -> t m a infixr 5 Source #
Construct a stream by adding a pure value at the head of an existing
stream. For serial streams this is the same as (return a) `consM` r but
more efficient. For concurrent streams this is not concurrent whereas
consM is concurrent. For example:
> toList $ 1 `cons` 2 `cons` 3 `cons` nil [1,2,3]
Since: 0.1.0
consM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a infixr 5 Source #
Constructs a stream by adding a monadic action at the head of an existing stream. For example:
> toList $ getLine `consM` getLine `consM` nil hello world ["hello","world"]
Concurrent (do not use fromParallel to construct infinite streams)
Since: 0.2.0
(|:) :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a infixr 5 Source #
Operator equivalent of consM. We can read it as "parallel colon"
to remember that | comes before :.
> toList $ getLine |: getLine |: nil hello world ["hello","world"]
let delay = threadDelay 1000000 >> print 1 drain $ fromSerial $ delay |: delay |: delay |: nil drain $ fromParallel $ delay |: delay |: delay |: nil
Concurrent (do not use fromParallel to construct infinite streams)
Since: 0.2.0
From Unfold
unfold :: (IsStream t, Monad m) => Unfold m a b -> a -> t m b Source #
Convert an Unfold into a stream by supplying it an input seed.
>>>Stream.drain $ Stream.unfold (Unfold.replicateM 3) (putStrLn "hello")hello hello hello
Since: 0.7.0
unfold0 :: (IsStream t, Monad m) => Unfold m Void b -> t m b Source #
Convert an Unfold with a closed input end into a stream.
Pre-release
Unfolding
unfoldr :: (Monad m, IsStream t) => (b -> Maybe (a, b)) -> b -> t m a Source #
>>>:{unfoldr step s = case step s of Nothing -> Stream.nil Just (a, b) -> a `Stream.cons` unfoldr step b :}
Build a stream by unfolding a pure step function step starting from a
seed s. The step function returns the next element in the stream and the
next seed value. When it is done it returns Nothing and the stream ends.
For example,
>>>:{let f b = if b > 2 then Nothing else Just (b, b + 1) in Stream.toList $ Stream.unfoldr f 0 :} [0,1,2]
Since: 0.1.0
unfoldrM :: forall t m b a. (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a Source #
Build a stream by unfolding a monadic step function starting from a
seed. The step function returns the next element in the stream and the next
seed value. When it is done it returns Nothing and the stream ends. For
example,
>>>:{let f b = if b > 2 then return Nothing else return (Just (b, b + 1)) in Stream.toList $ Stream.unfoldrM f 0 :} [0,1,2]
When run concurrently, the next unfold step can run concurrently with the processing of the output of the previous step. Note that more than one step cannot run concurrently as the next step depends on the output of the previous step.
>>>:{let f b = if b > 2 then return Nothing else threadDelay 1000000 >> return (Just (b, b + 1)) in Stream.toList $ Stream.delay 1 $ Stream.fromAsync $ Stream.unfoldrM f 0 :} [0,1,2]
Concurrent
Since: 0.1.0
From Values
fromPure :: IsStream t => a -> t m a Source #
fromPure a = a `cons` nil
Create a singleton stream from a pure value.
The following holds in monadic streams, but not in Zip streams:
fromPure = pure fromPure = fromEffect . pure
In Zip applicative streams fromPure is not the same as pure because in that
case pure is equivalent to repeat instead. fromPure and pure are
equally efficient, in other cases fromPure may be slightly more efficient
than the other equivalent definitions.
Since: 0.8.0 (Renamed yield to fromPure)
fromEffect :: (Monad m, IsStream t) => m a -> t m a Source #
fromEffect m = m `consM` nil
Create a singleton stream from a monadic action.
> Stream.toList $ Stream.fromEffect getLine hello ["hello"]
Since: 0.8.0 (Renamed yieldM to fromEffect)
repeat :: (IsStream t, Monad m) => a -> t m a Source #
Generate an infinite stream by repeating a pure value.
Since: 0.4.0
repeatM :: (IsStream t, MonadAsync m) => m a -> t m a Source #
>>>repeatM = fix . consM>>>repeatM = cycle1 . fromEffect
Generate a stream by repeatedly executing a monadic action forever.
>>>:{repeatAsync = Stream.repeatM (threadDelay 1000000 >> print 1) & Stream.take 10 & Stream.fromAsync & Stream.drain :}
Concurrent, infinite (do not use with fromParallel)
Since: 0.2.0
replicate :: (IsStream t, Monad m) => Int -> a -> t m a Source #
>>>replicate n = Stream.take n . Stream.repeat
Generate a stream of length n by repeating a value n times.
Since: 0.6.0
replicateM :: forall t m a. (IsStream t, MonadAsync m) => Int -> m a -> t m a Source #
>>>replicateM n = Stream.take n . Stream.repeatM
Generate a stream by performing a monadic action n times. Same as:
>>>pr n = threadDelay 1000000 >> print n
This runs serially and takes 3 seconds:
>>>Stream.drain $ Stream.fromSerial $ Stream.replicateM 3 $ pr 11 1 1
This runs concurrently and takes just 1 second:
>>>Stream.drain $ Stream.fromAsync $ Stream.replicateM 3 $ pr 11 1 1
Concurrent
Since: 0.1.1
Enumeration
class Enum a => Enumerable a where Source #
Types that can be enumerated as a stream. The operations in this type
class are equivalent to those in the Enum type class, except that these
generate a stream instead of a list. Use the functions in
Streamly.Internal.Data.Stream.Enumeration module to define new instances.
Since: 0.6.0
Methods
enumerateFrom :: (IsStream t, Monad m) => a -> t m a Source #
enumerateFrom from generates a stream starting with the element
from, enumerating up to maxBound when the type is Bounded or
generating an infinite stream when the type is not Bounded.
>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFrom (0 :: Int) [0,1,2,3]
For Fractional types, enumeration is numerically stable. However, no
overflow or underflow checks are performed.
>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFrom 1.1 [1.1,2.1,3.1,4.1]
Since: 0.6.0
enumerateFromTo :: (IsStream t, Monad m) => a -> a -> t m a Source #
Generate a finite stream starting with the element from, enumerating
the type up to the value to. If to is smaller than from then an
empty stream is returned.
>>> Stream.toList $ Stream.enumerateFromTo 0 4 [0,1,2,3,4]
For Fractional types, the last element is equal to the specified to
value after rounding to the nearest integral value.
>>> Stream.toList $ Stream.enumerateFromTo 1.1 4 [1.1,2.1,3.1,4.1] >>> Stream.toList $ Stream.enumerateFromTo 1.1 4.6 [1.1,2.1,3.1,4.1,5.1]
Since: 0.6.0
enumerateFromThen :: (IsStream t, Monad m) => a -> a -> t m a Source #
enumerateFromThen from then generates a stream whose first element
is from, the second element is then and the successive elements are
in increments of then - from. Enumeration can occur downwards or
upwards depending on whether then comes before or after from. For
Bounded types the stream ends when maxBound is reached, for
unbounded types it keeps enumerating infinitely.
>>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThen 0 2 [0,2,4,6] >>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThen 0 (-2) [0,-2,-4,-6]
Since: 0.6.0
enumerateFromThenTo :: (IsStream t, Monad m) => a -> a -> a -> t m a Source #
enumerateFromThenTo from then to generates a finite stream whose
first element is from, the second element is then and the successive
elements are in increments of then - from up to to. Enumeration can
occur downwards or upwards depending on whether then comes before or
after from.
>>> Stream.toList $ Stream.enumerateFromThenTo 0 2 6 [0,2,4,6] >>> Stream.toList $ Stream.enumerateFromThenTo 0 (-2) (-6) [0,-2,-4,-6]
Since: 0.6.0
Instances
enumerateTo :: (IsStream t, Monad m, Bounded a, Enumerable a) => a -> t m a Source #
Time Enumeration
times :: (IsStream t, MonadAsync m) => t m (AbsTime, RelTime64) Source #
times returns a stream of time value tuples with clock of 10 ms
granularity. The first component of the tuple is an absolute time reference
(epoch) denoting the start of the stream and the second component is a time
relative to the reference.
>>>Stream.mapM_ (\x -> print x >> threadDelay 1000000) $ Stream.take 3 $ Stream.times(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...)) (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...)) (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
Note: This API is not safe on 32-bit machines.
Pre-release
absTimes :: (IsStream t, MonadAsync m, Functor (t m)) => t m AbsTime Source #
absTimes returns a stream of absolute timestamps using a clock of 10 ms
granularity.
>>>Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.absTimesAbsTime (TimeSpec {sec = ..., nsec = ...}) AbsTime (TimeSpec {sec = ..., nsec = ...}) AbsTime (TimeSpec {sec = ..., nsec = ...})
Note: This API is not safe on 32-bit machines.
Pre-release
absTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m AbsTime Source #
absTimesWith g returns a stream of absolute timestamps using a clock of
granularity g specified in seconds. A low granularity clock is more
expensive in terms of CPU usage. Any granularity lower than 1 ms is treated
as 1 ms.
>>>Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ absTimesWith 0.01AbsTime (TimeSpec {sec = ..., nsec = ...}) AbsTime (TimeSpec {sec = ..., nsec = ...}) AbsTime (TimeSpec {sec = ..., nsec = ...})
Note: This API is not safe on 32-bit machines.
Pre-release
relTimes :: (IsStream t, MonadAsync m, Functor (t m)) => t m RelTime64 Source #
relTimes returns a stream of relative time values starting from 0,
using a clock of granularity 10 ms.
>>>Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimesRelTime64 (NanoSecond64 ...) RelTime64 (NanoSecond64 ...) RelTime64 (NanoSecond64 ...)
Note: This API is not safe on 32-bit machines.
Pre-release
relTimesWith :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m RelTime64 Source #
relTimesWith g returns a stream of relative time values starting from 0,
using a clock of granularity g specified in seconds. A low granularity
clock is more expensive in terms of CPU usage. Any granularity lower than 1
ms is treated as 1 ms.
>>>Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimesWith 0.01RelTime64 (NanoSecond64 ...) RelTime64 (NanoSecond64 ...) RelTime64 (NanoSecond64 ...)
Note: This API is not safe on 32-bit machines.
Pre-release
durations :: Double -> t m RelTime64 Source #
durations g returns a stream of relative time values measuring the time
elapsed since the immediate predecessor element of the stream was generated.
The first element of the stream is always 0. durations uses a clock of
granularity g specified in seconds. A low granularity clock is more
expensive in terms of CPU usage. The minimum granularity is 1 millisecond.
Durations lower than 1 ms will be 0.
Note: This API is not safe on 32-bit machines.
Unimplemented
ticks :: Rate -> t m () Source #
Generate ticks at the specified rate. The rate is adaptive, the tick
generation speed can be increased or decreased at different times to achieve
the specified rate. The specific behavior for different styles of Rate
specifications is documented under Rate. The effective maximum rate
achieved by a stream is governed by the processor speed.
Unimplemented
timeout :: AbsTime -> t m () Source #
Generate a singleton event at or after the specified absolute time. Note that this is different from a threadDelay, a threadDelay starts from the time when the action is evaluated, whereas if we use AbsTime based timeout it will immediately expire if the action is evaluated too late.
Unimplemented
From Generators
fromIndices :: (IsStream t, Monad m) => (Int -> a) -> t m a Source #
>>>fromIndices f = fmap f $ Stream.enumerateFrom 0>>>fromIndices f = let g i = f i `Stream.cons` g (i + 1) in g 0
Generate an infinite stream, whose values are the output of a function f
applied on the corresponding index. Index starts at 0.
>>>Stream.toList $ Stream.take 5 $ Stream.fromIndices id[0,1,2,3,4]
Since: 0.6.0
fromIndicesM :: forall t m a. (IsStream t, MonadAsync m) => (Int -> m a) -> t m a Source #
>>>fromIndicesM f = Stream.mapM f $ Stream.enumerateFrom 0>>>fromIndicesM f = let g i = f i `Stream.consM` g (i + 1) in g 0
Generate an infinite stream, whose values are the output of a monadic
function f applied on the corresponding index. Index starts at 0.
Concurrent
Since: 0.6.0
Iteration
iterate :: (IsStream t, Monad m) => (a -> a) -> a -> t m a Source #
>>>iterate f x = x `Stream.cons` iterate f x
Generate an infinite stream with x as the first element and each
successive element derived by applying the function f on the previous
element.
>>>Stream.toList $ Stream.take 5 $ Stream.iterate (+1) 1[1,2,3,4,5]
Since: 0.1.2
iterateM :: forall t m a. (IsStream t, MonadAsync m) => (a -> m a) -> m a -> t m a Source #
>>>iterateM f m = m >>= \a -> return a `Stream.consM` iterateM f (f a)
Generate an infinite stream with the first element generated by the action
m and each successive element derived by applying the monadic function
f on the previous element.
>>>pr n = threadDelay 1000000 >> print n>>>:{Stream.iterateM (\x -> pr x >> return (x + 1)) (return 0) & Stream.take 3 & Stream.fromSerial & Stream.toList :} 0 1 [0,1,2]
When run concurrently, the next iteration can run concurrently with the processing of the previous iteration. Note that more than one iteration cannot run concurrently as the next iteration depends on the output of the previous iteration.
>>>:{Stream.iterateM (\x -> pr x >> return (x + 1)) (return 0) & Stream.delay 1 & Stream.take 3 & Stream.fromAsync & Stream.toList :} 0 1 ...
Concurrent
Since: 0.1.2
Since: 0.7.0 (signature change)
Cyclic Elements
mfix :: (IsStream t, Monad m) => (m a -> t m a) -> t m a Source #
We can define cyclic structures using let:
>>>let (a, b) = ([1, b], head a) in (a, b)([1,1],1)
The function fix defined as:
>>>fix f = let x = f x in x
ensures that the argument of a function and its output refer to the same
lazy value x i.e. the same location in memory. Thus x can be defined
in terms of itself, creating structures with cyclic references.
>>>f ~(a, b) = ([1, b], head a)>>>fix f([1,1],1)
mfix is essentially the same as fix but for monadic
values.
Using mfix for streams we can construct a stream in which each element of
the stream is defined in a cyclic fashion. The argument of the function
being fixed represents the current element of the stream which is being
returned by the stream monad. Thus, we can use the argument to construct
itself.
In the following example, the argument action of the function f
represents the tuple (x,y) returned by it in a given iteration. We define
the first element of the tuple in terms of the second.
>>>import Streamly.Internal.Data.Stream.IsStream as Stream>>>import System.IO.Unsafe (unsafeInterleaveIO)
>>>:{main = Stream.mapM_ print $ Stream.mfix f where f action = do let incr n act = fmap ((+n) . snd) $ unsafeInterleaveIO act x <- Stream.fromListM [incr 1 action, incr 2 action] y <- Stream.fromList [4,5] return (x, y) :}
Note: you cannot achieve this by just changing the order of the monad statements because that would change the order in which the stream elements are generated.
Note that the function f must be lazy in its argument, that's why we use
unsafeInterleaveIO on action because IO monad is strict.
Pre-release
From Containers
fromList :: (Monad m, IsStream t) => [a] -> t m a Source #
fromList =foldrconsnil
Construct a stream from a list of pure values. This is more efficient than
fromFoldable for serial streams.
Since: 0.4.0
fromListM :: (MonadAsync m, IsStream t) => [m a] -> t m a Source #
>>>fromListM = Stream.fromFoldableM>>>fromListM = Stream.sequence . Stream.fromList>>>fromListM = Stream.mapM id . Stream.fromList>>>fromListM = Prelude.foldr Stream.consM Stream.nil
Construct a stream from a list of monadic actions. This is more efficient
than fromFoldableM for serial streams.
Since: 0.4.0
fromFoldable :: (IsStream t, Foldable f) => f a -> t m a Source #
>>>fromFoldable = Prelude.foldr Stream.cons Stream.nil
Construct a stream from a Foldable containing pure values:
Since: 0.2.0
fromFoldableM :: (IsStream t, MonadAsync m, Foldable f) => f (m a) -> t m a Source #
>>>fromFoldableM = Prelude.foldr Stream.consM Stream.nil
Construct a stream from a Foldable containing monadic actions.
>>>pr n = threadDelay 1000000 >> print n>>>Stream.drain $ Stream.fromSerial $ Stream.fromFoldableM $ map pr [1,2,3]1 2 3
>>>Stream.drain $ Stream.fromAsync $ Stream.fromFoldableM $ map pr [1,2,3]... ... ...
Concurrent (do not use with fromParallel on infinite containers)
Since: 0.3.0
fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> SerialT m a Source #
Takes a callback setter function and provides it with a callback. The callback when invoked adds a value at the tail of the stream. Returns a stream of values generated by the callback.
Pre-release
fromPrimIORef :: (IsStream t, MonadIO m, Prim a) => IORef a -> t m a Source #
Construct a stream by reading a Prim IORef repeatedly.
Pre-release
Deprecated
once :: (Monad m, IsStream t) => m a -> t m a Source #
Deprecated: Please use fromEffect instead.
Same as fromEffect
Since: 0.2.0
fromHandle :: (IsStream t, MonadIO m) => Handle -> t m String Source #
Deprecated: Please use Streamly.FileSystem.Handle module (see the changelog)
Read lines from an IO Handle into a stream of Strings.
Since: 0.1.0
currentTime :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m AbsTime Source #
Deprecated: Please use absTimes instead