streamly-0.8.1.1: Dataflow programming and declarative concurrency
Copyright(c) 2020 Composewell Technologies
LicenseBSD-3-Clause
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Internal.Data.Stream.IsStream.Top

Description

Top level IsStream module that can use all other lower level IsStream modules.

Synopsis

Transformation

Sampling

Value agnostic filtering.

sampleFromThen :: (IsStream t, Monad m, Functor (t m)) => Int -> Int -> t m a -> t m a Source #

sampleFromthen offset stride samples the element at offset index and then every element at strides of stride.

>>> Stream.toList $ Stream.sampleFromThen 2 3 $ Stream.enumerateFromTo 0 10
[2,5,8]

Pre-release

sampleIntervalStart :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m a Source #

Like sampleInterval but samples at the beginning of the time window.

sampleIntervalStart n = Stream.catMaybes . Stream.intervalsOf n Fold.head

Pre-release

sampleIntervalEnd :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m a Source #

Continuously evaluate the input stream and sample the last event in time window of n seconds.

This is also known as throttle in some libraries.

sampleIntervalEnd n = Stream.catMaybes . Stream.intervalsOf n Fold.last

Pre-release

sampleBurstStart :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m a Source #

Like sampleBurstEnd but samples the event at the beginning of the burst instead of at the end of it.

Pre-release

sampleBurstEnd :: (IsStream t, MonadAsync m, Functor (t m)) => Double -> t m a -> t m a Source #

Sample one event at the end of each burst of events. A burst is a group of events close together in time, it ends when an event is spaced by more than the specified time interval from the previous event.

This is known as debounce in some libraries.

The clock granularity is 10 ms.

Pre-release

Reordering

sortBy :: MonadCatch m => (a -> a -> Ordering) -> SerialT m a -> SerialT m a Source #

Sort the input stream using a supplied comparison function.

O(n) space

Note: this is not the fastest possible implementation as of now.

Pre-release

Nesting

Set like operations

These are not exactly set operations because streams are not necessarily sets, they may have duplicated elements.

intersectBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> t m a -> t m a -> t m a Source #

intersectBy is essentially a filtering operation that retains only those elements in the first stream that are present in the second stream.

>>> Stream.toList $ Stream.intersectBy (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [2,1,1,3])
[1,2,2]
>>> Stream.toList $ Stream.intersectBy (==) (Stream.fromList [2,1,1,3]) (Stream.fromList [1,2,2,4])
[2,1,1]

intersectBy is similar to but not the same as innerJoin:

>>> Stream.toList $ fmap fst $ Stream.innerJoin (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [2,1,1,3])
[1,1,2,2]

Space: O(n) where n is the number of elements in the second stream.

Time: O(m x n) where m is the number of elements in the first stream and n is the number of elements in the second stream.

Pre-release

mergeIntersectBy :: (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #

Like intersectBy but works only on sorted streams.

Space: O(1)

Time: O(m+n)

Unimplemented

differenceBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> t m a -> t m a -> t m a Source #

Delete first occurrences of those elements from the first stream that are present in the second stream. If an element occurs multiple times in the second stream as many occurrences of it are deleted from the first stream.

>>> Stream.toList $ Stream.differenceBy (==) (Stream.fromList [1,2,2]) (Stream.fromList [1,2,3])
[2]

The following laws hold:

(s1 serial s2) `differenceBy eq` s1 === s2
(s1 wSerial s2) `differenceBy eq` s1 === s2

Same as the list // operation.

Space: O(m) where m is the number of elements in the first stream.

Time: O(m x n) where m is the number of elements in the first stream and n is the number of elements in the second stream.

Pre-release

mergeDifferenceBy :: (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #

Like differenceBy but works only on sorted streams.

Space: O(1)

Unimplemented

unionBy :: (IsStream t, MonadAsync m, Semigroup (t m a)) => (a -> a -> Bool) -> t m a -> t m a -> t m a Source #

This is essentially an append operation that appends all the extra occurrences of elements from the second stream that are not already present in the first stream.

>>> Stream.toList $ Stream.unionBy (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [1,1,2,3])
[1,2,2,4,3]

Equivalent to the following except that s1 is evaluated only once:

unionBy eq s1 s2 = s1 `serial` (s2 `differenceBy eq` s1)

Similar to outerJoin but not the same.

Space: O(n)

Time: O(m x n)

Pre-release

mergeUnionBy :: (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #

Like unionBy but works only on sorted streams.

Space: O(1)

Unimplemented

Join operations

crossJoin :: Monad (t m) => t m a -> t m b -> t m (a, b) Source #

This is the same as outerProduct but less efficient.

The second stream is evaluated multiple times. If the second stream is consume-once stream then it can be cached in an Array before calling this function. Caching may also improve performance if the stream is expensive to evaluate.

Time: O(m x n)

Pre-release

innerJoin :: forall (t :: (Type -> Type) -> Type -> Type) m a b. (IsStream t, Monad (t m)) => (a -> b -> Bool) -> t m a -> t m b -> t m (a, b) Source #

For all elements in t m a, for all elements in t m b if a and b are equal by the given equality pedicate then return the tuple (a, b).

The second stream is evaluated multiple times. If the stream is a consume-once stream then the caller should cache it in an Array before calling this function. Caching may also improve performance if the stream is expensive to evaluate.

For space efficiency use the smaller stream as the second stream.

Space: O(n) assuming the second stream is cached in memory.

Time: O(m x n)

Pre-release

mergeInnerJoin :: (a -> b -> Ordering) -> t m a -> t m b -> t m (a, b) Source #

Like innerJoin but works only on sorted streams.

Space: O(1)

Time: O(m + n)

Unimplemented

hashInnerJoin :: (a -> b -> Bool) -> t m a -> t m b -> t m (a, b) Source #

Like innerJoin but uses a hashmap for efficiency.

For space efficiency use the smaller stream as the second stream.

Space: O(n)

Time: O(m + n)

Unimplemented

leftJoin :: Monad m => (a -> b -> Bool) -> SerialT m a -> SerialT m b -> SerialT m (a, Maybe b) Source #

For all elements in t m a, for all elements in t m b if a and b are equal then return the tuple (a, Just b). If a is not present in t m b then return (a, Nothing).

The second stream is evaluated multiple times. If the stream is a consume-once stream then the caller should cache it in an Array before calling this function. Caching may also improve performance if the stream is expensive to evaluate.

rightJoin = flip leftJoin

Space: O(n) assuming the second stream is cached in memory.

Time: O(m x n)

Unimplemented

mergeLeftJoin :: (a -> b -> Ordering) -> t m a -> t m b -> t m (a, Maybe b) Source #

Like leftJoin but works only on sorted streams.

Space: O(1)

Time: O(m + n)

Unimplemented

hashLeftJoin :: (a -> b -> Bool) -> t m a -> t m b -> t m (a, Maybe b) Source #

Like outerJoin but uses a hashmap for efficiency.

Space: O(n)

Time: O(m + n)

Unimplemented

outerJoin :: MonadIO m => (a -> b -> Bool) -> SerialT m a -> SerialT m b -> SerialT m (Maybe a, Maybe b) Source #

For all elements in t m a, for all elements in t m b if a and b are equal by the given equality pedicate then return the tuple (Just a, Just b). If a is not found in t m b then return (a, Nothing), return (Nothing, b) for vice-versa.

For space efficiency use the smaller stream as the second stream.

Space: O(n)

Time: O(m x n)

Unimplemented

mergeOuterJoin :: (a -> b -> Ordering) -> t m a -> t m b -> t m (Maybe a, Maybe b) Source #

Like outerJoin but works only on sorted streams.

Space: O(1)

Time: O(m + n)

Unimplemented

hashOuterJoin :: (a -> b -> Ordering) -> t m a -> t m b -> t m (Maybe a, Maybe b) Source #

Like outerJoin but uses a hashmap for efficiency.

For space efficiency use the smaller stream as the second stream.

Space: O(n)

Time: O(m + n)

Unimplemented