streamly-0.8.3: Dataflow programming and declarative concurrency
Copyright(c) 2017 Composewell Technologies
LicenseBSD-3-Clause
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellSafe-Inferred
LanguageHaskell2010

Streamly.Internal.Data.Stream.IsStream.Reduce

Description

Reduce streams by streams, folds or parsers.

Synopsis

Reduce By Streams

dropPrefix :: t m a -> t m a -> t m a Source #

Drop prefix from the input stream if present.

Space: O(1)

Unimplemented - Help wanted.

dropInfix :: t m a -> t m a -> t m a Source #

Drop all matching infix from the input stream if present. Infix stream may be consumed multiple times.

Space: O(n) where n is the length of the infix.

Unimplemented - Help wanted.

dropSuffix :: t m a -> t m a -> t m a Source #

Drop suffix from the input stream if present. Suffix stream may be consumed multiple times.

Space: O(n) where n is the length of the suffix.

Unimplemented - Help wanted.

Reduce By Folds

Reduce a stream by folding or parsing chunks of the stream. Functions generally ending in these shapes:

f (Fold m a b) -> t m a -> t m b
f (Parser m a b) -> t m a -> t m b

Generic Folding

Apply folds on a stream.

foldMany :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #

Apply a Fold repeatedly on a stream and emit the fold outputs in the output stream.

To sum every two contiguous elements in a stream:

>>> f = Fold.take 2 Fold.sum
>>> Stream.toList $ Stream.foldMany f $ Stream.fromList [1..10]
[3,7,11,15,19]

On an empty stream the output is empty:

>>> Stream.toList $ Stream.foldMany f $ Stream.fromList []
[]

Note Stream.foldMany (Fold.take 0) would result in an infinite loop in a non-empty stream.

Since: 0.8.0

foldManyPost :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #

Like foldMany but appends empty fold output if the fold and stream termination aligns:

>>> f = Fold.take 2 Fold.sum
>>> Stream.toList $ Stream.foldManyPost f $ Stream.fromList []
[0]
>>> Stream.toList $ Stream.foldManyPost f $ Stream.fromList [1..9]
[3,7,11,15,9]
>>> Stream.toList $ Stream.foldManyPost f $ Stream.fromList [1..10]
[3,7,11,15,19,0]

Pre-release

refoldMany :: (IsStream t, Monad m) => Refold m c a b -> m c -> t m a -> t m b Source #

Like foldMany but using the Refold type instead of Fold.

Pre-release

foldSequence :: t m (Fold m a b) -> t m a -> t m b Source #

Apply a stream of folds to an input stream and emit the results in the output stream.

Unimplemented

foldIterateM :: (IsStream t, Monad m) => (b -> m (Fold m a b)) -> m b -> t m a -> t m b Source #

Iterate a fold generator on a stream. The initial value b is used to generate the first fold, the fold is applied on the stream and the result of the fold is used to generate the next fold and so on.

>>> import Data.Monoid (Sum(..))
>>> f x = return (Fold.take 2 (Fold.sconcat x))
>>> s = Stream.map Sum $ Stream.fromList [1..10]
>>> Stream.toList $ Stream.map getSum $ Stream.foldIterateM f (pure 0) s
[3,10,21,36,55,55]

This is the streaming equivalent of monad like sequenced application of folds where next fold is dependent on the previous fold.

Pre-release

refoldIterateM :: (IsStream t, Monad m) => Refold m b a b -> m b -> t m a -> t m b Source #

Like foldIterateM but using the Refold type instead. This could be much more efficient due to stream fusion.

Internal

Chunking

Element unaware grouping.

chunksOf :: (IsStream t, Monad m) => Int -> Fold m a b -> t m a -> t m b Source #

Group the input stream into groups of n elements each and then fold each group using the provided fold function.

>>> Stream.toList $ Stream.chunksOf 2 Fold.sum (Stream.enumerateFromTo 1 10)
[3,7,11,15,19]

This can be considered as an n-fold version of take where we apply take repeatedly on the leftover stream until the stream exhausts.

chunksOf n f = foldMany (FL.take n f)

Since: 0.7.0

arraysOf :: (IsStream t, MonadIO m, Storable a) => Int -> t m a -> t m (Array a) Source #

arraysOf n stream groups the elements in the input stream into arrays of n elements each.

Same as the following but may be more efficient:

arraysOf n = Stream.foldMany (A.writeN n)

Pre-release

intervalsOf :: (IsStream t, MonadAsync m) => Double -> Fold m a b -> t m a -> t m b Source #

Group the input stream into windows of n second each and then fold each group using the provided fold function.

>>> Stream.toList $ Stream.take 5 $ Stream.intervalsOf 1 Fold.sum $ Stream.constRate 2 $ Stream.enumerateFrom 1
[...,...,...,...,...]

Since: 0.7.0

chunksOfTimeout :: (IsStream t, MonadAsync m, Functor (t m)) => Int -> Double -> Fold m a b -> t m a -> t m b Source #

Like chunksOf but if the chunk is not completed within the specified time interval then emit whatever we have collected till now. The chunk timeout is reset whenever a chunk is emitted.

>>> s = Stream.delayPost 0.3 $ Stream.fromList [1..1000]
>>> f = Stream.mapM_ print $ Stream.chunksOfTimeout 5 1 Fold.toList s

Pre-release

Splitting

Streams can be sliced into segments in space or in time. We use the term chunk to refer to a spatial length of the stream (spatial window) and the term session to refer to a length in time (time window).

splitOn :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #

Split on an infixed separator element, dropping the separator. The supplied Fold is applied on the split segments. Splits the stream on separator elements determined by the supplied predicate, separator is considered as infixed between two segments:

>>> splitOn' p xs = Stream.toList $ Stream.splitOn p Fold.toList (Stream.fromList xs)
>>> splitOn' (== '.') "a.b"
["a","b"]

An empty stream is folded to the default value of the fold:

>>> splitOn' (== '.') ""
[""]

If one or both sides of the separator are missing then the empty segment on that side is folded to the default output of the fold:

>>> splitOn' (== '.') "."
["",""]
>>> splitOn' (== '.') ".a"
["","a"]
>>> splitOn' (== '.') "a."
["a",""]
>>> splitOn' (== '.') "a..b"
["a","","b"]

splitOn is an inverse of intercalating single element:

Stream.intercalate (Stream.fromPure '.') Unfold.fromList . Stream.splitOn (== '.') Fold.toList === id

Assuming the input stream does not contain the separator:

Stream.splitOn (== '.') Fold.toList . Stream.intercalate (Stream.fromPure '.') Unfold.fromList === id

Since: 0.7.0

splitOnSuffix :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #

Split on a suffixed separator element, dropping the separator. The supplied Fold is applied on the split segments.

>>> splitOnSuffix' p xs = Stream.toList $ Stream.splitOnSuffix p Fold.toList (Stream.fromList xs)
>>> splitOnSuffix' (== '.') "a.b."
["a","b"]
>>> splitOnSuffix' (== '.') "a."
["a"]

An empty stream results in an empty output stream:

>>> splitOnSuffix' (== '.') ""
[]

An empty segment consisting of only a suffix is folded to the default output of the fold:

>>> splitOnSuffix' (== '.') "."
[""]
>>> splitOnSuffix' (== '.') "a..b.."
["a","","b",""]

A suffix is optional at the end of the stream:

>>> splitOnSuffix' (== '.') "a"
["a"]
>>> splitOnSuffix' (== '.') ".a"
["","a"]
>>> splitOnSuffix' (== '.') "a.b"
["a","b"]
lines = splitOnSuffix (== '\n')

splitOnSuffix is an inverse of intercalateSuffix with a single element:

Stream.intercalateSuffix (Stream.fromPure '.') Unfold.fromList . Stream.splitOnSuffix (== '.') Fold.toList === id

Assuming the input stream does not contain the separator:

Stream.splitOnSuffix (== '.') Fold.toList . Stream.intercalateSuffix (Stream.fromPure '.') Unfold.fromList === id

Since: 0.7.0

splitOnPrefix :: (a -> Bool) -> Fold m a b -> t m a -> t m b Source #

Split on a prefixed separator element, dropping the separator. The supplied Fold is applied on the split segments.

> splitOnPrefix' p xs = Stream.toList $ Stream.splitOnPrefix p (Fold.toList) (Stream.fromList xs)
> splitOnPrefix' (== .) ".a.b"
["a","b"]

An empty stream results in an empty output stream: > splitOnPrefix' (== .) "" []

An empty segment consisting of only a prefix is folded to the default output of the fold:

> splitOnPrefix' (== .) "."
[""]

> splitOnPrefix' (== .) ".a.b."
["a","b",""]

> splitOnPrefix' (== .) ".a..b"
["a","","b"]

A prefix is optional at the beginning of the stream:

> splitOnPrefix' (== .) "a"
["a"]

> splitOnPrefix' (== .) "a.b"
["a","b"]

splitOnPrefix is an inverse of intercalatePrefix with a single element:

Stream.intercalatePrefix (Stream.fromPure '.') Unfold.fromList . Stream.splitOnPrefix (== '.') Fold.toList === id

Assuming the input stream does not contain the separator:

Stream.splitOnPrefix (== '.') Fold.toList . Stream.intercalatePrefix (Stream.fromPure '.') Unfold.fromList === id

Unimplemented

splitWithSuffix :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #

Like splitOnSuffix but keeps the suffix attached to the resulting splits.

>>> splitWithSuffix' p xs = Stream.toList $ splitWithSuffix p Fold.toList (Stream.fromList xs)
>>> splitWithSuffix' (== '.') ""
[]
>>> splitWithSuffix' (== '.') "."
["."]
>>> splitWithSuffix' (== '.') "a"
["a"]
>>> splitWithSuffix' (== '.') ".a"
[".","a"]
>>> splitWithSuffix' (== '.') "a."
["a."]
>>> splitWithSuffix' (== '.') "a.b"
["a.","b"]
>>> splitWithSuffix' (== '.') "a.b."
["a.","b."]
>>> splitWithSuffix' (== '.') "a..b.."
["a.",".","b.","."]

Since: 0.7.0

splitBySeq :: (IsStream t, MonadAsync m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #

Like splitOnSeq but splits the separator as well, as an infix token.

>>> splitOn'_ pat xs = Stream.toList $ Stream.splitBySeq (Array.fromList pat) Fold.toList (Stream.fromList xs)
>>> splitOn'_ "" "hello"
["h","","e","","l","","l","","o"]
>>> splitOn'_ "hello" ""
[""]
>>> splitOn'_ "hello" "hello"
["","hello",""]
>>> splitOn'_ "x" "hello"
["hello"]
>>> splitOn'_ "h" "hello"
["","h","ello"]
>>> splitOn'_ "o" "hello"
["hell","o",""]
>>> splitOn'_ "e" "hello"
["h","e","llo"]
>>> splitOn'_ "l" "hello"
["he","l","","l","o"]
>>> splitOn'_ "ll" "hello"
["he","ll","o"]

Pre-release

splitOnSeq :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #

Like splitOn but the separator is a sequence of elements instead of a single element.

For illustration, let's define a function that operates on pure lists:

>>> splitOnSeq' pat xs = Stream.toList $ Stream.splitOnSeq (Array.fromList pat) Fold.toList (Stream.fromList xs)
>>> splitOnSeq' "" "hello"
["h","e","l","l","o"]
>>> splitOnSeq' "hello" ""
[""]
>>> splitOnSeq' "hello" "hello"
["",""]
>>> splitOnSeq' "x" "hello"
["hello"]
>>> splitOnSeq' "h" "hello"
["","ello"]
>>> splitOnSeq' "o" "hello"
["hell",""]
>>> splitOnSeq' "e" "hello"
["h","llo"]
>>> splitOnSeq' "l" "hello"
["he","","o"]
>>> splitOnSeq' "ll" "hello"
["he","o"]

splitOnSeq is an inverse of intercalate. The following law always holds:

intercalate . splitOnSeq == id

The following law holds when the separator is non-empty and contains none of the elements present in the input lists:

splitOnSeq . intercalate == id

Pre-release

splitOnSuffixSeq :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #

Like splitSuffixBy but the separator is a sequence of elements, instead of a predicate for a single element.

>>> splitOnSuffixSeq_ pat xs = Stream.toList $ Stream.splitOnSuffixSeq (Array.fromList pat) Fold.toList (Stream.fromList xs)
>>> splitOnSuffixSeq_ "." ""
[]
>>> splitOnSuffixSeq_ "." "."
[""]
>>> splitOnSuffixSeq_ "." "a"
["a"]
>>> splitOnSuffixSeq_ "." ".a"
["","a"]
>>> splitOnSuffixSeq_ "." "a."
["a"]
>>> splitOnSuffixSeq_ "." "a.b"
["a","b"]
>>> splitOnSuffixSeq_ "." "a.b."
["a","b"]
>>> splitOnSuffixSeq_ "." "a..b.."
["a","","b",""]
lines = splitOnSuffixSeq "\n"

splitOnSuffixSeq is an inverse of intercalateSuffix. The following law always holds:

intercalateSuffix . splitOnSuffixSeq == id

The following law holds when the separator is non-empty and contains none of the elements present in the input lists:

splitSuffixOn . intercalateSuffix == id

Pre-release

splitWithSuffixSeq :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #

Like splitOnSuffixSeq but keeps the suffix intact in the splits.

>>> splitWithSuffixSeq' pat xs = Stream.toList $ Stream.splitWithSuffixSeq (Array.fromList pat) Fold.toList (Stream.fromList xs)
>>> splitWithSuffixSeq' "." ""
[]
>>> splitWithSuffixSeq' "." "."
["."]
>>> splitWithSuffixSeq' "." "a"
["a"]
>>> splitWithSuffixSeq' "." ".a"
[".","a"]
>>> splitWithSuffixSeq' "." "a."
["a."]
>>> splitWithSuffixSeq' "." "a.b"
["a.","b"]
>>> splitWithSuffixSeq' "." "a.b."
["a.","b."]
>>> splitWithSuffixSeq' "." "a..b.."
["a.",".","b.","."]

Pre-release

Keyed Window Classification

Split the stream into chunks or windows by position or time. Each window can be associated with a key, all events associated with a particular key in the window can be folded to a single result. The window termination can be dynamically controlled by the fold.

The term "chunk" is used for a window defined by position of elements and the term "session" is used for a time window.

Tumbling Windows

A new window starts after the previous window is finished.

classifySessionsBy Source #

Arguments

:: (IsStream t, MonadAsync m, Ord k) 
=> Double

timer tick in seconds

-> Bool

reset the timer when an event is received

-> (Int -> m Bool)

predicate to eject sessions based on session count

-> Double

session timeout in seconds

-> Fold m a b

Fold to be applied to session data

-> t m (AbsTime, (k, a))

timestamp, (session key, session data)

-> t m (k, b)

session key, fold result

classifySessionsBy tick keepalive predicate timeout fold stream classifies an input event stream consisting of (timestamp, (key, value)) into sessions based on the key, folding all the values corresponding to the same key into a session using the supplied fold.

When the fold terminates or a timeout occurs, a tuple consisting of the session key and the folded value is emitted in the output stream. The timeout is measured from the first event in the session. If the keepalive option is set to True the timeout is reset to 0 whenever an event is received.

The timestamp in the input stream is an absolute time from some epoch, characterizing the time when the input event was generated. The notion of current time is maintained by a monotonic event time clock using the timestamps seen in the input stream. The latest timestamp seen till now is used as the base for the current time. When no new events are seen, a timer is started with a clock resolution of tick seconds. This timer is used to detect session timeouts in the absence of new events.

To ensure an upper bound on the memory used the number of sessions can be limited to an upper bound. If the ejection predicate returns True, the oldest session is ejected before inserting a new session.

>>> :{
Stream.mapM_ print
    $ Stream.classifySessionsBy 1 False (const (return False)) 3 (Fold.take 3 Fold.toList)
    $ Stream.timestamped
    $ Stream.delay 0.1
    $ (,) <$> Stream.fromList [1,2,3] <*> Stream.fromList ['a','b','c']
:}
(1,"abc")
(2,"abc")
(3,"abc")

Pre-release

classifySessionsOf Source #

Arguments

:: (IsStream t, MonadAsync m, Ord k) 
=> (Int -> m Bool)

predicate to eject sessions on session count

-> Double

time window size

-> Fold m a b

Fold to be applied to session data

-> t m (AbsTime, (k, a))

timestamp, (session key, session data)

-> t m (k, b) 

Same as classifySessionsBy with a timer tick of 1 second and keepalive option set to False.

classifySessionsOf = classifySessionsBy 1 False

Pre-release

Keep Alive Windows

The window size is extended if an event arrives within the specified window size. This can represent sessions with idle or inactive timeout.

classifyKeepAliveSessions Source #

Arguments

:: (IsStream t, MonadAsync m, Ord k) 
=> (Int -> m Bool)

predicate to eject sessions on session count

-> Double

session inactive timeout

-> Fold m a b

Fold to be applied to session payload data

-> t m (AbsTime, (k, a))

timestamp, (session key, session data)

-> t m (k, b) 

Same as classifySessionsBy with a timer tick of 1 second and keepalive option set to True.

classifyKeepAliveSessions = classifySessionsBy 1 True

Pre-release

Reduce By Parsers

Generic Parsing

Apply parsers on a stream.

parseMany :: (IsStream t, MonadThrow m) => Parser m a b -> t m a -> t m b Source #

Apply a Parser repeatedly on a stream and emit the parsed values in the output stream.

This is the streaming equivalent of the many parse combinator.

>>> Stream.toList $ Stream.parseMany (Parser.takeBetween 0 2 Fold.sum) $ Stream.fromList [1..10]
[3,7,11,15,19]
> Stream.toList $ Stream.parseMany (Parser.line Fold.toList) $ Stream.fromList "hello\nworld"
["hello\n","world"]

foldMany f = parseMany (fromFold f)

Known Issues: When the parser fails there is no way to get the remaining stream.

Pre-release

parseManyD :: (IsStream t, MonadThrow m) => Parser m a b -> t m a -> t m b Source #

parseManyTill :: Parser m a b -> Parser m a x -> t m a -> t m b Source #

parseManyTill collect test stream tries the parser test on the input, if test fails it backtracks and tries collect, after collect succeeds test is tried again and so on. The parser stops when test succeeds. The output of test is discarded and the output of collect is emitted in the output stream. The parser fails if collect fails.

Unimplemented

parseSequence :: t m (Parser m a b) -> t m a -> t m b Source #

Apply a stream of parsers to an input stream and emit the results in the output stream.

Pre-release

parseIterate :: (IsStream t, MonadThrow m) => (b -> Parser m a b) -> b -> t m a -> t m b Source #

Iterate a parser generating function on a stream. The initial value b is used to generate the first parser, the parser is applied on the stream and the result is used to generate the next parser and so on.

>>> import Data.Monoid (Sum(..))
>>> Stream.toList $ Stream.map getSum $ Stream.parseIterate (\b -> Parser.takeBetween 0 2 (Fold.sconcat b)) 0 $ Stream.map Sum $ Stream.fromList [1..10]
[3,10,21,36,55,55]

This is the streaming equivalent of monad like sequenced application of parsers where next parser is dependent on the previous parser.

Pre-release

Grouping

wordsBy :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #

Like splitOn after stripping leading, trailing, and repeated separators. Therefore, ".a..b." with . as the separator would be parsed as ["a","b"]. In other words, its like parsing words from whitespace separated text.

>>> wordsBy' p xs = Stream.toList $ Stream.wordsBy p Fold.toList (Stream.fromList xs)
>>> wordsBy' (== ',') ""
[]
>>> wordsBy' (== ',') ","
[]
>>> wordsBy' (== ',') ",a,,b,"
["a","b"]
words = wordsBy isSpace

Since: 0.7.0

groups :: (IsStream t, Monad m, Eq a) => Fold m a b -> t m a -> t m b Source #

groups = groupsBy (==)
groups = groupsByRolling (==)

Groups contiguous spans of equal elements together in individual groups.

>>> Stream.toList $ Stream.groups Fold.toList $ Stream.fromList [1,1,2,2]
[[1,1],[2,2]]

Since: 0.7.0

groupsBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> Fold m a b -> t m a -> t m b Source #

groupsBy cmp f $ S.fromList [a,b,c,...] assigns the element a to the first group, if b `cmp` a is True then b is also assigned to the same group. If c `cmp` a is True then c is also assigned to the same group and so on. When the comparison fails a new group is started. Each group is folded using the fold f and the result of the fold is emitted in the output stream.

>>> Stream.toList $ Stream.groupsBy (>) Fold.toList $ Stream.fromList [1,3,7,0,2,5]
[[1,3,7],[0,2,5]]

Since: 0.7.0

groupsByRolling :: (IsStream t, Monad m) => (a -> a -> Bool) -> Fold m a b -> t m a -> t m b Source #

Unlike groupsBy this function performs a rolling comparison of two successive elements in the input stream. groupsByRolling cmp f $ S.fromList [a,b,c,...] assigns the element a to the first group, if a `cmp` b is True then b is also assigned to the same group. If b `cmp` c is True then c is also assigned to the same group and so on. When the comparison fails a new group is started. Each group is folded using the fold f.

>>> Stream.toList $ Stream.groupsByRolling (\a b -> a + 1 == b) Fold.toList $ Stream.fromList [1,2,3,7,8,9]
[[1,2,3],[7,8,9]]

Since: 0.7.0

Nested splitting

splitInnerBy :: (IsStream t, Monad m) => (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a) Source #

splitInnerBy splitter joiner stream splits the inner containers f a of an input stream t m (f a) using the splitter function. Container elements f a are collected until a split occurs, then all the elements before the split are joined using the joiner function.

For example, if we have a stream of Array Word8, we may want to split the stream into arrays representing lines separated by 'n' byte such that the resulting stream after a split would be one array for each line.

CAUTION! This is not a true streaming function as the container size after the split and merge may not be bounded.

Pre-release

splitInnerBySuffix :: (IsStream t, Monad m, Eq (f a), Monoid (f a)) => (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a) Source #

Like splitInnerBy but splits assuming the separator joins the segment in a suffix style.

Pre-release