Copyright | (c) 2017 Composewell Technologies |
---|---|
License | BSD-3-Clause |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
Reduce streams by streams, folds or parsers.
Synopsis
- dropPrefix :: t m a -> t m a -> t m a
- dropInfix :: t m a -> t m a -> t m a
- dropSuffix :: t m a -> t m a -> t m a
- foldMany :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b
- foldManyPost :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b
- refoldMany :: (IsStream t, Monad m) => Refold m c a b -> m c -> t m a -> t m b
- foldSequence :: t m (Fold m a b) -> t m a -> t m b
- foldIterateM :: (IsStream t, Monad m) => (b -> m (Fold m a b)) -> m b -> t m a -> t m b
- refoldIterateM :: (IsStream t, Monad m) => Refold m b a b -> m b -> t m a -> t m b
- chunksOf :: (IsStream t, Monad m) => Int -> Fold m a b -> t m a -> t m b
- arraysOf :: (IsStream t, MonadIO m, Storable a) => Int -> t m a -> t m (Array a)
- intervalsOf :: (IsStream t, MonadAsync m) => Double -> Fold m a b -> t m a -> t m b
- chunksOfTimeout :: (IsStream t, MonadAsync m, Functor (t m)) => Int -> Double -> Fold m a b -> t m a -> t m b
- splitOn :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b
- splitOnSuffix :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b
- splitOnPrefix :: (a -> Bool) -> Fold m a b -> t m a -> t m b
- splitWithSuffix :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b
- splitBySeq :: (IsStream t, MonadAsync m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b
- splitOnSeq :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b
- splitOnSuffixSeq :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b
- splitWithSuffixSeq :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b
- classifySessionsBy :: (IsStream t, MonadAsync m, Ord k) => Double -> Bool -> (Int -> m Bool) -> Double -> Fold m a b -> t m (AbsTime, (k, a)) -> t m (k, b)
- classifySessionsOf :: (IsStream t, MonadAsync m, Ord k) => (Int -> m Bool) -> Double -> Fold m a b -> t m (AbsTime, (k, a)) -> t m (k, b)
- classifyKeepAliveSessions :: (IsStream t, MonadAsync m, Ord k) => (Int -> m Bool) -> Double -> Fold m a b -> t m (AbsTime, (k, a)) -> t m (k, b)
- parseMany :: (IsStream t, MonadThrow m) => Parser m a b -> t m a -> t m b
- parseManyD :: (IsStream t, MonadThrow m) => Parser m a b -> t m a -> t m b
- parseManyTill :: Parser m a b -> Parser m a x -> t m a -> t m b
- parseSequence :: t m (Parser m a b) -> t m a -> t m b
- parseIterate :: (IsStream t, MonadThrow m) => (b -> Parser m a b) -> b -> t m a -> t m b
- wordsBy :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b
- groups :: (IsStream t, Monad m, Eq a) => Fold m a b -> t m a -> t m b
- groupsBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> Fold m a b -> t m a -> t m b
- groupsByRolling :: (IsStream t, Monad m) => (a -> a -> Bool) -> Fold m a b -> t m a -> t m b
- splitInnerBy :: (IsStream t, Monad m) => (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a)
- splitInnerBySuffix :: (IsStream t, Monad m, Eq (f a), Monoid (f a)) => (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a)
Reduce By Streams
dropPrefix :: t m a -> t m a -> t m a Source #
Drop prefix from the input stream if present.
Space: O(1)
Unimplemented - Help wanted.
dropInfix :: t m a -> t m a -> t m a Source #
Drop all matching infix from the input stream if present. Infix stream may be consumed multiple times.
Space: O(n)
where n is the length of the infix.
Unimplemented - Help wanted.
dropSuffix :: t m a -> t m a -> t m a Source #
Drop suffix from the input stream if present. Suffix stream may be consumed multiple times.
Space: O(n)
where n is the length of the suffix.
Unimplemented - Help wanted.
Reduce By Folds
Reduce a stream by folding or parsing chunks of the stream. Functions generally ending in these shapes:
f (Fold m a b) -> t m a -> t m b f (Parser m a b) -> t m a -> t m b
Generic Folding
Apply folds on a stream.
foldMany :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #
Apply a Fold
repeatedly on a stream and emit the fold outputs in the
output stream.
To sum every two contiguous elements in a stream:
>>>
f = Fold.take 2 Fold.sum
>>>
Stream.toList $ Stream.foldMany f $ Stream.fromList [1..10]
[3,7,11,15,19]
On an empty stream the output is empty:
>>>
Stream.toList $ Stream.foldMany f $ Stream.fromList []
[]
Note Stream.foldMany (Fold.take 0)
would result in an infinite loop in a
non-empty stream.
Since: 0.8.0
foldManyPost :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #
Like foldMany
but appends empty fold output if the fold and stream
termination aligns:
>>>
f = Fold.take 2 Fold.sum
>>>
Stream.toList $ Stream.foldManyPost f $ Stream.fromList []
[0]>>>
Stream.toList $ Stream.foldManyPost f $ Stream.fromList [1..9]
[3,7,11,15,9]>>>
Stream.toList $ Stream.foldManyPost f $ Stream.fromList [1..10]
[3,7,11,15,19,0]
Pre-release
foldSequence :: t m (Fold m a b) -> t m a -> t m b Source #
Apply a stream of folds to an input stream and emit the results in the output stream.
Unimplemented
foldIterateM :: (IsStream t, Monad m) => (b -> m (Fold m a b)) -> m b -> t m a -> t m b Source #
Iterate a fold generator on a stream. The initial value b
is used to
generate the first fold, the fold is applied on the stream and the result of
the fold is used to generate the next fold and so on.
>>> import Data.Monoid (Sum(..)) >>> f x = return (Fold.take 2 (Fold.sconcat x)) >>> s = Stream.map Sum $ Stream.fromList [1..10] >>> Stream.toList $ Stream.map getSum $ Stream.foldIterateM f (pure 0) s [3,10,21,36,55,55]
This is the streaming equivalent of monad like sequenced application of folds where next fold is dependent on the previous fold.
Pre-release
refoldIterateM :: (IsStream t, Monad m) => Refold m b a b -> m b -> t m a -> t m b Source #
Like foldIterateM
but using the Refold
type instead. This could be
much more efficient due to stream fusion.
Internal
Chunking
Element unaware grouping.
chunksOf :: (IsStream t, Monad m) => Int -> Fold m a b -> t m a -> t m b Source #
Group the input stream into groups of n
elements each and then fold each
group using the provided fold function.
>>>
Stream.toList $ Stream.chunksOf 2 Fold.sum (Stream.enumerateFromTo 1 10)
[3,7,11,15,19]
This can be considered as an n-fold version of take
where we apply
take
repeatedly on the leftover stream until the stream exhausts.
chunksOf n f = foldMany (FL.take n f)
Since: 0.7.0
arraysOf :: (IsStream t, MonadIO m, Storable a) => Int -> t m a -> t m (Array a) Source #
arraysOf n stream
groups the elements in the input stream into arrays of
n
elements each.
Same as the following but may be more efficient:
arraysOf n = Stream.foldMany (A.writeN n)
Pre-release
intervalsOf :: (IsStream t, MonadAsync m) => Double -> Fold m a b -> t m a -> t m b Source #
Group the input stream into windows of n
second each and then fold each
group using the provided fold function.
>>>
Stream.toList $ Stream.take 5 $ Stream.intervalsOf 1 Fold.sum $ Stream.constRate 2 $ Stream.enumerateFrom 1
[...,...,...,...,...]
Since: 0.7.0
chunksOfTimeout :: (IsStream t, MonadAsync m, Functor (t m)) => Int -> Double -> Fold m a b -> t m a -> t m b Source #
Like chunksOf
but if the chunk is not completed within the specified
time interval then emit whatever we have collected till now. The chunk
timeout is reset whenever a chunk is emitted.
>>>
s = Stream.delayPost 0.3 $ Stream.fromList [1..1000]
>>>
f = Stream.mapM_ print $ Stream.chunksOfTimeout 5 1 Fold.toList s
Pre-release
Splitting
Streams can be sliced into segments in space or in time. We use the
term chunk
to refer to a spatial length of the stream (spatial window)
and the term session
to refer to a length in time (time window).
splitOn :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #
Split on an infixed separator element, dropping the separator. The
supplied Fold
is applied on the split segments. Splits the stream on
separator elements determined by the supplied predicate, separator is
considered as infixed between two segments:
>>>
splitOn' p xs = Stream.toList $ Stream.splitOn p Fold.toList (Stream.fromList xs)
>>>
splitOn' (== '.') "a.b"
["a","b"]
An empty stream is folded to the default value of the fold:
>>>
splitOn' (== '.') ""
[""]
If one or both sides of the separator are missing then the empty segment on that side is folded to the default output of the fold:
>>>
splitOn' (== '.') "."
["",""]
>>>
splitOn' (== '.') ".a"
["","a"]
>>>
splitOn' (== '.') "a."
["a",""]
>>>
splitOn' (== '.') "a..b"
["a","","b"]
splitOn is an inverse of intercalating single element:
Stream.intercalate (Stream.fromPure '.') Unfold.fromList . Stream.splitOn (== '.') Fold.toList === id
Assuming the input stream does not contain the separator:
Stream.splitOn (== '.') Fold.toList . Stream.intercalate (Stream.fromPure '.') Unfold.fromList === id
Since: 0.7.0
splitOnSuffix :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #
Split on a suffixed separator element, dropping the separator. The
supplied Fold
is applied on the split segments.
>>>
splitOnSuffix' p xs = Stream.toList $ Stream.splitOnSuffix p Fold.toList (Stream.fromList xs)
>>>
splitOnSuffix' (== '.') "a.b."
["a","b"]
>>>
splitOnSuffix' (== '.') "a."
["a"]
An empty stream results in an empty output stream:
>>>
splitOnSuffix' (== '.') ""
[]
An empty segment consisting of only a suffix is folded to the default output of the fold:
>>>
splitOnSuffix' (== '.') "."
[""]
>>>
splitOnSuffix' (== '.') "a..b.."
["a","","b",""]
A suffix is optional at the end of the stream:
>>>
splitOnSuffix' (== '.') "a"
["a"]
>>>
splitOnSuffix' (== '.') ".a"
["","a"]
>>>
splitOnSuffix' (== '.') "a.b"
["a","b"]
lines = splitOnSuffix (== '\n')
splitOnSuffix
is an inverse of intercalateSuffix
with a single element:
Stream.intercalateSuffix (Stream.fromPure '.') Unfold.fromList . Stream.splitOnSuffix (== '.') Fold.toList === id
Assuming the input stream does not contain the separator:
Stream.splitOnSuffix (== '.') Fold.toList . Stream.intercalateSuffix (Stream.fromPure '.') Unfold.fromList === id
Since: 0.7.0
splitOnPrefix :: (a -> Bool) -> Fold m a b -> t m a -> t m b Source #
Split on a prefixed separator element, dropping the separator. The
supplied Fold
is applied on the split segments.
> splitOnPrefix' p xs = Stream.toList $ Stream.splitOnPrefix p (Fold.toList) (Stream.fromList xs)
> splitOnPrefix' (== .
) ".a.b"
["a","b"]
An empty stream results in an empty output stream:
> splitOnPrefix' (==
.
) ""
[]
An empty segment consisting of only a prefix is folded to the default output of the fold:
> splitOnPrefix' (==.
) "." [""] > splitOnPrefix' (==.
) ".a.b." ["a","b",""] > splitOnPrefix' (==.
) ".a..b" ["a","","b"]
A prefix is optional at the beginning of the stream:
> splitOnPrefix' (==.
) "a" ["a"] > splitOnPrefix' (==.
) "a.b" ["a","b"]
splitOnPrefix
is an inverse of intercalatePrefix
with a single element:
Stream.intercalatePrefix (Stream.fromPure '.') Unfold.fromList . Stream.splitOnPrefix (== '.') Fold.toList === id
Assuming the input stream does not contain the separator:
Stream.splitOnPrefix (== '.') Fold.toList . Stream.intercalatePrefix (Stream.fromPure '.') Unfold.fromList === id
Unimplemented
splitWithSuffix :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #
Like splitOnSuffix
but keeps the suffix attached to the resulting
splits.
>>>
splitWithSuffix' p xs = Stream.toList $ splitWithSuffix p Fold.toList (Stream.fromList xs)
>>>
splitWithSuffix' (== '.') ""
[]
>>>
splitWithSuffix' (== '.') "."
["."]
>>>
splitWithSuffix' (== '.') "a"
["a"]
>>>
splitWithSuffix' (== '.') ".a"
[".","a"]
>>>
splitWithSuffix' (== '.') "a."
["a."]
>>>
splitWithSuffix' (== '.') "a.b"
["a.","b"]
>>>
splitWithSuffix' (== '.') "a.b."
["a.","b."]
>>>
splitWithSuffix' (== '.') "a..b.."
["a.",".","b.","."]
Since: 0.7.0
splitBySeq :: (IsStream t, MonadAsync m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #
Like splitOnSeq
but splits the separator as well, as an infix token.
>>>
splitOn'_ pat xs = Stream.toList $ Stream.splitBySeq (Array.fromList pat) Fold.toList (Stream.fromList xs)
>>>
splitOn'_ "" "hello"
["h","","e","","l","","l","","o"]
>>>
splitOn'_ "hello" ""
[""]
>>>
splitOn'_ "hello" "hello"
["","hello",""]
>>>
splitOn'_ "x" "hello"
["hello"]
>>>
splitOn'_ "h" "hello"
["","h","ello"]
>>>
splitOn'_ "o" "hello"
["hell","o",""]
>>>
splitOn'_ "e" "hello"
["h","e","llo"]
>>>
splitOn'_ "l" "hello"
["he","l","","l","o"]
>>>
splitOn'_ "ll" "hello"
["he","ll","o"]
Pre-release
splitOnSeq :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #
Like splitOn
but the separator is a sequence of elements instead of a
single element.
For illustration, let's define a function that operates on pure lists:
>>>
splitOnSeq' pat xs = Stream.toList $ Stream.splitOnSeq (Array.fromList pat) Fold.toList (Stream.fromList xs)
>>>
splitOnSeq' "" "hello"
["h","e","l","l","o"]
>>>
splitOnSeq' "hello" ""
[""]
>>>
splitOnSeq' "hello" "hello"
["",""]
>>>
splitOnSeq' "x" "hello"
["hello"]
>>>
splitOnSeq' "h" "hello"
["","ello"]
>>>
splitOnSeq' "o" "hello"
["hell",""]
>>>
splitOnSeq' "e" "hello"
["h","llo"]
>>>
splitOnSeq' "l" "hello"
["he","","o"]
>>>
splitOnSeq' "ll" "hello"
["he","o"]
splitOnSeq
is an inverse of intercalate
. The following law always holds:
intercalate . splitOnSeq == id
The following law holds when the separator is non-empty and contains none of the elements present in the input lists:
splitOnSeq . intercalate == id
Pre-release
splitOnSuffixSeq :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #
Like splitSuffixBy
but the separator is a sequence of elements, instead
of a predicate for a single element.
>>>
splitOnSuffixSeq_ pat xs = Stream.toList $ Stream.splitOnSuffixSeq (Array.fromList pat) Fold.toList (Stream.fromList xs)
>>>
splitOnSuffixSeq_ "." ""
[]
>>>
splitOnSuffixSeq_ "." "."
[""]
>>>
splitOnSuffixSeq_ "." "a"
["a"]
>>>
splitOnSuffixSeq_ "." ".a"
["","a"]
>>>
splitOnSuffixSeq_ "." "a."
["a"]
>>>
splitOnSuffixSeq_ "." "a.b"
["a","b"]
>>>
splitOnSuffixSeq_ "." "a.b."
["a","b"]
>>>
splitOnSuffixSeq_ "." "a..b.."
["a","","b",""]
lines = splitOnSuffixSeq "\n"
splitOnSuffixSeq
is an inverse of intercalateSuffix
. The following law
always holds:
intercalateSuffix . splitOnSuffixSeq == id
The following law holds when the separator is non-empty and contains none of the elements present in the input lists:
splitSuffixOn . intercalateSuffix == id
Pre-release
splitWithSuffixSeq :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #
Like splitOnSuffixSeq
but keeps the suffix intact in the splits.
>>>
splitWithSuffixSeq' pat xs = Stream.toList $ Stream.splitWithSuffixSeq (Array.fromList pat) Fold.toList (Stream.fromList xs)
>>>
splitWithSuffixSeq' "." ""
[]
>>>
splitWithSuffixSeq' "." "."
["."]
>>>
splitWithSuffixSeq' "." "a"
["a"]
>>>
splitWithSuffixSeq' "." ".a"
[".","a"]
>>>
splitWithSuffixSeq' "." "a."
["a."]
>>>
splitWithSuffixSeq' "." "a.b"
["a.","b"]
>>>
splitWithSuffixSeq' "." "a.b."
["a.","b."]
>>>
splitWithSuffixSeq' "." "a..b.."
["a.",".","b.","."]
Pre-release
Keyed Window Classification
Split the stream into chunks or windows by position or time. Each window can be associated with a key, all events associated with a particular key in the window can be folded to a single result. The window termination can be dynamically controlled by the fold.
The term "chunk" is used for a window defined by position of elements and the term "session" is used for a time window.
Tumbling Windows
A new window starts after the previous window is finished.
:: (IsStream t, MonadAsync m, Ord k) | |
=> Double | timer tick in seconds |
-> Bool | reset the timer when an event is received |
-> (Int -> m Bool) | predicate to eject sessions based on session count |
-> Double | session timeout in seconds |
-> Fold m a b | Fold to be applied to session data |
-> t m (AbsTime, (k, a)) | timestamp, (session key, session data) |
-> t m (k, b) | session key, fold result |
classifySessionsBy tick keepalive predicate timeout fold stream
classifies an input event stream
consisting of (timestamp, (key,
value))
into sessions based on the key
, folding all the values
corresponding to the same key into a session using the supplied fold
.
When the fold terminates or a timeout
occurs, a tuple consisting of the
session key and the folded value is emitted in the output stream. The
timeout is measured from the first event in the session. If the keepalive
option is set to True
the timeout is reset to 0 whenever an event is
received.
The timestamp
in the input stream is an absolute time from some epoch,
characterizing the time when the input event was generated. The notion of
current time is maintained by a monotonic event time clock using the
timestamps seen in the input stream. The latest timestamp seen till now is
used as the base for the current time. When no new events are seen, a timer
is started with a clock resolution of tick
seconds. This timer is used to
detect session timeouts in the absence of new events.
To ensure an upper bound on the memory used the number of sessions can be
limited to an upper bound. If the ejection predicate
returns True
, the
oldest session is ejected before inserting a new session.
>>>
:{
Stream.mapM_ print $ Stream.classifySessionsBy 1 False (const (return False)) 3 (Fold.take 3 Fold.toList) $ Stream.timestamped $ Stream.delay 0.1 $ (,) <$> Stream.fromList [1,2,3] <*> Stream.fromList ['a','b','c'] :} (1,"abc") (2,"abc") (3,"abc")
Pre-release
:: (IsStream t, MonadAsync m, Ord k) | |
=> (Int -> m Bool) | predicate to eject sessions on session count |
-> Double | time window size |
-> Fold m a b | Fold to be applied to session data |
-> t m (AbsTime, (k, a)) | timestamp, (session key, session data) |
-> t m (k, b) |
Same as classifySessionsBy
with a timer tick of 1 second and keepalive
option set to False
.
classifySessionsOf = classifySessionsBy 1 False
Pre-release
Keep Alive Windows
The window size is extended if an event arrives within the specified window size. This can represent sessions with idle or inactive timeout.
classifyKeepAliveSessions Source #
:: (IsStream t, MonadAsync m, Ord k) | |
=> (Int -> m Bool) | predicate to eject sessions on session count |
-> Double | session inactive timeout |
-> Fold m a b | Fold to be applied to session payload data |
-> t m (AbsTime, (k, a)) | timestamp, (session key, session data) |
-> t m (k, b) |
Same as classifySessionsBy
with a timer tick of 1 second and keepalive
option set to True
.
classifyKeepAliveSessions = classifySessionsBy 1 True
Pre-release
Reduce By Parsers
Generic Parsing
Apply parsers on a stream.
parseMany :: (IsStream t, MonadThrow m) => Parser m a b -> t m a -> t m b Source #
Apply a Parser
repeatedly on a stream and emit the parsed values in the
output stream.
This is the streaming equivalent of the many
parse combinator.
>>>
Stream.toList $ Stream.parseMany (Parser.takeBetween 0 2 Fold.sum) $ Stream.fromList [1..10]
[3,7,11,15,19]
> Stream.toList $ Stream.parseMany (Parser.line Fold.toList) $ Stream.fromList "hello\nworld" ["hello\n","world"]
foldMany f = parseMany (fromFold f)
Known Issues: When the parser fails there is no way to get the remaining stream.
Pre-release
parseManyD :: (IsStream t, MonadThrow m) => Parser m a b -> t m a -> t m b Source #
parseManyTill :: Parser m a b -> Parser m a x -> t m a -> t m b Source #
parseManyTill collect test stream
tries the parser test
on the input,
if test
fails it backtracks and tries collect
, after collect
succeeds
test
is tried again and so on. The parser stops when test
succeeds. The
output of test
is discarded and the output of collect
is emitted in the
output stream. The parser fails if collect
fails.
Unimplemented
parseSequence :: t m (Parser m a b) -> t m a -> t m b Source #
Apply a stream of parsers to an input stream and emit the results in the output stream.
Pre-release
parseIterate :: (IsStream t, MonadThrow m) => (b -> Parser m a b) -> b -> t m a -> t m b Source #
Iterate a parser generating function on a stream. The initial value b
is
used to generate the first parser, the parser is applied on the stream and
the result is used to generate the next parser and so on.
>>>
import Data.Monoid (Sum(..))
>>>
Stream.toList $ Stream.map getSum $ Stream.parseIterate (\b -> Parser.takeBetween 0 2 (Fold.sconcat b)) 0 $ Stream.map Sum $ Stream.fromList [1..10]
[3,10,21,36,55,55]
This is the streaming equivalent of monad like sequenced application of parsers where next parser is dependent on the previous parser.
Pre-release
Grouping
wordsBy :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #
Like splitOn
after stripping leading, trailing, and repeated separators.
Therefore, ".a..b."
with .
as the separator would be parsed as
["a","b"]
. In other words, its like parsing words from whitespace
separated text.
>>>
wordsBy' p xs = Stream.toList $ Stream.wordsBy p Fold.toList (Stream.fromList xs)
>>>
wordsBy' (== ',') ""
[]
>>>
wordsBy' (== ',') ","
[]
>>>
wordsBy' (== ',') ",a,,b,"
["a","b"]
words = wordsBy isSpace
Since: 0.7.0
groups :: (IsStream t, Monad m, Eq a) => Fold m a b -> t m a -> t m b Source #
groups = groupsBy (==) groups = groupsByRolling (==)
Groups contiguous spans of equal elements together in individual groups.
>>>
Stream.toList $ Stream.groups Fold.toList $ Stream.fromList [1,1,2,2]
[[1,1],[2,2]]
Since: 0.7.0
groupsBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> Fold m a b -> t m a -> t m b Source #
groupsBy cmp f $ S.fromList [a,b,c,...]
assigns the element a
to the
first group, if b `cmp` a
is True
then b
is also assigned to the same
group. If c `cmp` a
is True
then c
is also assigned to the same
group and so on. When the comparison fails a new group is started. Each
group is folded using the fold f
and the result of the fold is emitted in
the output stream.
>>>
Stream.toList $ Stream.groupsBy (>) Fold.toList $ Stream.fromList [1,3,7,0,2,5]
[[1,3,7],[0,2,5]]
Since: 0.7.0
groupsByRolling :: (IsStream t, Monad m) => (a -> a -> Bool) -> Fold m a b -> t m a -> t m b Source #
Unlike groupsBy
this function performs a rolling comparison of two
successive elements in the input stream. groupsByRolling cmp f $ S.fromList
[a,b,c,...]
assigns the element a
to the first group, if a `cmp` b
is
True
then b
is also assigned to the same group. If b `cmp` c
is
True
then c
is also assigned to the same group and so on. When the
comparison fails a new group is started. Each group is folded using the fold
f
.
>>>
Stream.toList $ Stream.groupsByRolling (\a b -> a + 1 == b) Fold.toList $ Stream.fromList [1,2,3,7,8,9]
[[1,2,3],[7,8,9]]
Since: 0.7.0
Nested splitting
splitInnerBy :: (IsStream t, Monad m) => (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a) Source #
splitInnerBy splitter joiner stream
splits the inner containers f a
of
an input stream t m (f a)
using the splitter
function. Container
elements f a
are collected until a split occurs, then all the elements
before the split are joined using the joiner
function.
For example, if we have a stream of Array Word8
, we may want to split the
stream into arrays representing lines separated by 'n' byte such that the
resulting stream after a split would be one array for each line.
CAUTION! This is not a true streaming function as the container size after the split and merge may not be bounded.
Pre-release
splitInnerBySuffix :: (IsStream t, Monad m, Eq (f a), Monoid (f a)) => (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a) Source #
Like splitInnerBy
but splits assuming the separator joins the segment in
a suffix style.
Pre-release