-- |
-- Module      : Streamly.Internal.Data.Stream.IsStream.Reduce
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- Reduce streams by streams, folds or parsers.

module Streamly.Internal.Data.Stream.IsStream.Reduce
    (
    -- * Reduce By Streams
      dropPrefix
    , dropInfix
    , dropSuffix

    -- * Reduce By Folds
    -- |
    -- Reduce a stream by folding or parsing chunks of the stream.  Functions
    -- generally ending in these shapes:
    --
    -- @
    -- f (Fold m a b) -> t m a -> t m b
    -- f (Parser m a b) -> t m a -> t m b
    -- @

    -- ** Generic Folding
    -- | Apply folds on a stream.
    , foldMany
    , foldManyPost
    , foldSequence
    , foldIterateM

    -- ** Chunking
    -- | Element unaware grouping.
    , chunksOf
    , arraysOf
    , intervalsOf

    -- ** Splitting
    -- | Streams can be sliced into segments in space or in time. We use the
    -- term @chunk@ to refer to a spatial length of the stream (spatial window)
    -- and the term @session@ to refer to a length in time (time window).

    -- -- *** Using Element Separators
    , splitOn
    , splitOnSuffix
    , splitOnPrefix

    -- , splitBy
    , splitWithSuffix
    -- , splitByPrefix

    -- -- *** Splitting By Sequences
    , splitBySeq
    , splitOnSeq
    , splitOnSuffixSeq
    -- , splitOnPrefixSeq

    -- Keeping the delimiters
    , splitWithSuffixSeq
    -- , splitByPrefixSeq
    -- , wordsBySeq

    -- Splitting using multiple sequence separators
    -- , splitOnAnySeq
    -- , splitOnAnySuffixSeq
    -- , splitOnAnyPrefixSeq

    -- -- *** Splitting By Streams
    -- -- | Splitting a stream using another stream as separator.

    -- ** Keyed Window Classification

    -- | Split the stream into chunks or windows by position or time. Each
    -- window can be associated with a key, all events associated with a
    -- particular key in the window can be folded to a single result.  The
    -- window termination can be dynamically controlled by the fold.
    --
    -- The term "chunk" is used for a window defined by position of elements
    -- and the term "session" is used for a time window.

    -- *** Tumbling Windows
    -- | A new window starts after the previous window is finished.

    -- , classifyChunksOf
    , classifySessionsBy
    , classifySessionsOf

    -- *** Keep Alive Windows
    -- | The window size is extended if an event arrives within the specified
    -- window size. This can represent sessions with idle or inactive timeout.

    -- , classifyKeepAliveChunks
    , classifyKeepAliveSessions

    {-
    -- *** Sliding Windows
    -- | A new window starts after the specified slide from the previous
    -- window. Therefore windows can overlap.
    , classifySlidingChunks
    , classifySlidingSessions
    -- *** Sliding Window Buffers
    -- , slidingChunkBuffer
    -- , slidingSessionBuffer
    -}

    -- * Reduce By Parsers
    -- ** Generic Parsing
    -- | Apply parsers on a stream.
    , parseMany
    , parseManyD
    , parseManyTill
    , parseSequence
    , parseIterate

    -- ** Grouping
    -- In imperative terms, grouped folding can be considered as a nested loop
    -- where we loop over the stream to group elements and then loop over
    -- individual groups to fold them to a single value that is yielded in the
    -- output stream.

    , wordsBy -- stripAndCompactBy
    , groups
    , groupsBy
    , groupsByRolling

    -- -- *** Searching Sequences
    -- , seqIndices -- search a sequence in the stream

    -- -- *** Searching Multiple Sequences
    -- , seqIndicesAny -- search any of the given sequence in the stream

    -- -- -- ** Searching Streams
    -- -- | Finding a stream within another stream.

    -- * Nested splitting
    , splitInnerBy
    , splitInnerBySuffix

     -- * Fold2
    , chunksOf2
    )
where

#include "inline.hs"

import Control.Concurrent (threadDelay)
import Control.Exception (assert)
import Control.Monad.Catch (MonadThrow)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Heap (Entry(..))
import Data.Kind (Type)
import Data.Maybe (isNothing)
import Foreign.Storable (Storable)
import Streamly.Internal.Data.Fold.Type (Fold (..), Fold2 (..))
import Streamly.Internal.Data.Parser (Parser (..))
import Streamly.Internal.Data.Array.Foreign.Type (Array)
import Streamly.Internal.Data.SVar (MonadAsync)
import Streamly.Internal.Data.Stream.IsStream.Common
    ( concatMap
    , fold
    , interjectSuffix
    , intersperseM
    , repeatM
    , scanlMAfter'
    , splitOnSeq
    , fromPure)
import Streamly.Internal.Data.Stream.StreamK (IsStream)
import Streamly.Internal.Data.Time.Units
       ( AbsTime, MilliSecond64(..), addToAbsTime, toRelTime
       , toAbsTime)
import Streamly.Internal.Data.Tuple.Strict (Tuple'(..))

import qualified Data.Heap as H
import qualified Data.Map.Strict as Map
import qualified Streamly.Internal.Data.Array.Foreign.Type as A
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Data.Parser.ParserK.Type as PRK
import qualified Streamly.Internal.Data.Parser.ParserD as PRD
import qualified Streamly.Internal.Data.Stream.Parallel as Par
import qualified Streamly.Internal.Data.Stream.Serial as Serial
import qualified Streamly.Internal.Data.Stream.StreamD as D
import qualified Streamly.Internal.Data.Stream.StreamK as K

import Prelude hiding (concatMap)

-- $setup
-- >>> :m
-- >>> import Prelude hiding (zipWith, concatMap, concat)
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import Streamly.Internal.Data.Stream.IsStream as Stream
-- >>> import qualified Streamly.Data.Fold as Fold
-- >>> import qualified Streamly.Internal.Data.Fold as Fold
-- >>> import qualified Streamly.Internal.Data.Unfold as Unfold
-- >>> import qualified Streamly.Internal.Data.Parser as Parser
-- >>> import qualified Streamly.Data.Array.Foreign as Array

------------------------------------------------------------------------------
-- Trimming
------------------------------------------------------------------------------

-- | Drop prefix from the input stream if present.
--
-- Space: @O(1)@
--
-- /Unimplemented/ - Help wanted.
{-# INLINE dropPrefix #-}
dropPrefix ::
    -- (Eq a, IsStream t, Monad m) =>
    t m a -> t m a -> t m a
dropPrefix :: t m a -> t m a -> t m a
dropPrefix = [Char] -> t m a -> t m a -> t m a
forall a. HasCallStack => [Char] -> a
error [Char]
"Not implemented yet!"

-- | Drop all matching infix from the input stream if present. Infix stream
-- may be consumed multiple times.
--
-- Space: @O(n)@ where n is the length of the infix.
--
-- /Unimplemented/ - Help wanted.
{-# INLINE dropInfix #-}
dropInfix ::
    -- (Eq a, IsStream t, Monad m) =>
    t m a -> t m a -> t m a
dropInfix :: t m a -> t m a -> t m a
dropInfix = [Char] -> t m a -> t m a -> t m a
forall a. HasCallStack => [Char] -> a
error [Char]
"Not implemented yet!"

-- | Drop suffix from the input stream if present. Suffix stream may be
-- consumed multiple times.
--
-- Space: @O(n)@ where n is the length of the suffix.
--
-- /Unimplemented/ - Help wanted.
{-# INLINE dropSuffix #-}
dropSuffix ::
    -- (Eq a, IsStream t, Monad m) =>
    t m a -> t m a -> t m a
dropSuffix :: t m a -> t m a -> t m a
dropSuffix = [Char] -> t m a -> t m a -> t m a
forall a. HasCallStack => [Char] -> a
error [Char]
"Not implemented yet!"

------------------------------------------------------------------------------
-- Folding
------------------------------------------------------------------------------

-- Splitting operations that take a predicate and a Fold can be
-- expressed using parseMany. Operations like chunksOf, intervalsOf, split*,
-- can be expressed using parseMany when used with an appropriate Parser.
--
-- XXX We need takeGE/takeBetween to implement "some" using "many".

-- | Like 'foldMany' but appends empty fold output if the fold and stream
-- termination aligns:
--
-- >>> f = Fold.take 2 Fold.sum
-- >>> Stream.toList $ Stream.foldManyPost f $ Stream.fromList []
-- [0]
-- >>> Stream.toList $ Stream.foldManyPost f $ Stream.fromList [1..9]
-- [3,7,11,15,9]
-- >>> Stream.toList $ Stream.foldManyPost f $ Stream.fromList [1..10]
-- [3,7,11,15,19,0]
--
-- /Pre-release/
--
{-# INLINE foldManyPost #-}
foldManyPost
    :: (IsStream t, Monad m)
    => Fold m a b
    -> t m a
    -> t m b
foldManyPost :: Fold m a b -> t m a -> t m b
foldManyPost Fold m a b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
D.foldManyPost Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)

-- | Apply a 'Fold' repeatedly on a stream and emit the fold outputs in the
-- output stream.
--
-- To sum every two contiguous elements in a stream:
--
-- >>> f = Fold.take 2 Fold.sum
-- >>> Stream.toList $ Stream.foldMany f $ Stream.fromList [1..10]
-- [3,7,11,15,19]
--
-- On an empty stream the output is empty:
--
-- >>> Stream.toList $ Stream.foldMany f $ Stream.fromList []
-- []
--
-- Note @Stream.foldMany (Fold.take 0)@ would result in an infinite loop in a
-- non-empty stream.
--
-- @since 0.8.0
--
{-# INLINE foldMany #-}
foldMany
    :: (IsStream t, Monad m)
    => Fold m a b
    -> t m a
    -> t m b
foldMany :: Fold m a b -> t m a -> t m b
foldMany Fold m a b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
D.foldMany Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)

-- | Apply a stream of folds to an input stream and emit the results in the
-- output stream.
--
-- /Pre-release/
--
{-# INLINE foldSequence #-}
foldSequence
       :: -- (IsStream t, Monad m) =>
       t m (Fold m a b)
    -> t m a
    -> t m b
foldSequence :: t m (Fold m a b) -> t m a -> t m b
foldSequence t m (Fold m a b)
_f t m a
_m = t m b
forall a. HasCallStack => a
undefined

-- | Iterate a fold generator on a stream. The initial value @b@ is used to
-- generate the first fold, the fold is applied on the stream and the result of
-- the fold is used to generate the next fold and so on.
--
-- @
-- >>> import Data.Monoid (Sum(..))
-- >>> f x = return (Fold.take 2 (Fold.sconcat x))
-- >>> s = Stream.map Sum $ Stream.fromList [1..10]
-- >>> Stream.toList $ Stream.map getSum $ Stream.foldIterateM f 0 s
-- [3,10,21,36,55,55]
--
-- @
--
-- This is the streaming equivalent of monad like sequenced application of
-- folds where next fold is dependent on the previous fold.
--
-- /Pre-release/
--
{-# INLINE foldIterateM #-}
foldIterateM ::
       (IsStream t, Monad m) => (b -> m (Fold m a b)) -> b -> t m a -> t m b
foldIterateM :: (b -> m (Fold m a b)) -> b -> t m a -> t m b
foldIterateM b -> m (Fold m a b)
f b
i t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (b -> m (Fold m a b)) -> b -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
Monad m =>
(b -> m (Fold m a b)) -> b -> Stream m a -> Stream m b
D.foldIterateM b -> m (Fold m a b)
f b
i (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)

------------------------------------------------------------------------------
-- Parsing
------------------------------------------------------------------------------

-- | Apply a 'Parser' repeatedly on a stream and emit the parsed values in the
-- output stream.
--
-- This is the streaming equivalent of the 'Streamly.Internal.Data.Parser.many'
-- parse combinator.
--
-- >>> Stream.toList $ Stream.parseMany (Parser.takeBetween 0 2 Fold.sum) $ Stream.fromList [1..10]
-- [3,7,11,15,19]
--
-- @
-- > Stream.toList $ Stream.parseMany (Parser.line Fold.toList) $ Stream.fromList "hello\\nworld"
-- ["hello\\n","world"]
--
-- @
--
-- @
-- foldMany f = parseMany (fromFold f)
-- @
--
-- Known Issues: When the parser fails there is no way to get the remaining
-- stream.
--
-- /Pre-release/
--
{-# INLINE parseMany #-}
parseMany
    :: (IsStream t, MonadThrow m)
    => Parser m a b
    -> t m a
    -> t m b
parseMany :: Parser m a b -> t m a -> t m b
parseMany Parser m a b
p t m a
m =
    Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Parser m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
MonadThrow m =>
Parser m a b -> Stream m a -> Stream m b
D.parseMany (Parser m a b -> Parser m a b
forall (m :: * -> *) a b.
MonadThrow m =>
Parser m a b -> Parser m a b
PRK.fromParserK Parser m a b
p) (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)

{-# INLINE parseManyD #-}
parseManyD
    :: (IsStream t, MonadThrow m)
    => PRD.Parser m a b
    -> t m a
    -> t m b
parseManyD :: Parser m a b -> t m a -> t m b
parseManyD Parser m a b
p t m a
m =
    Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Parser m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
MonadThrow m =>
Parser m a b -> Stream m a -> Stream m b
D.parseMany Parser m a b
p (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)

-- | Apply a stream of parsers to an input stream and emit the results in the
-- output stream.
--
-- /Pre-release/
--
{-# INLINE parseSequence #-}
parseSequence
       :: -- (IsStream t, Monad m) =>
       t m (Parser m a b)
    -> t m a
    -> t m b
parseSequence :: t m (Parser m a b) -> t m a -> t m b
parseSequence t m (Parser m a b)
_f t m a
_m = t m b
forall a. HasCallStack => a
undefined

-- XXX Change the parser arguments' order
--
-- | @parseManyTill collect test stream@ tries the parser @test@ on the input,
-- if @test@ fails it backtracks and tries @collect@, after @collect@ succeeds
-- @test@ is tried again and so on. The parser stops when @test@ succeeds.  The
-- output of @test@ is discarded and the output of @collect@ is emitted in the
-- output stream. The parser fails if @collect@ fails.
--
-- /Unimplemented/
--
{-# INLINE parseManyTill #-}
parseManyTill ::
    -- (IsStream t, MonadThrow m) =>
       Parser m a b
    -> Parser m a x
    -> t m a
    -> t m b
parseManyTill :: Parser m a b -> Parser m a x -> t m a -> t m b
parseManyTill = Parser m a b -> Parser m a x -> t m a -> t m b
forall a. HasCallStack => a
undefined

-- | Iterate a parser generating function on a stream. The initial value @b@ is
-- used to generate the first parser, the parser is applied on the stream and
-- the result is used to generate the next parser and so on.
--
-- >>> import Data.Monoid (Sum(..))
-- >>> Stream.toList $ Stream.map getSum $ Stream.parseIterate (\b -> Parser.takeBetween 0 2 (Fold.sconcat b)) 0 $ Stream.map Sum $ Stream.fromList [1..10]
-- [3,10,21,36,55,55]
--
-- This is the streaming equivalent of monad like sequenced application of
-- parsers where next parser is dependent on the previous parser.
--
-- /Pre-release/
--
{-# INLINE parseIterate #-}
parseIterate
    :: (IsStream t, MonadThrow m)
    => (b -> Parser m a b)
    -> b
    -> t m a
    -> t m b
parseIterate :: (b -> Parser m a b) -> b -> t m a -> t m b
parseIterate b -> Parser m a b
f b
i t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$
    (b -> Parser m a b) -> b -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
MonadThrow m =>
(b -> Parser m a b) -> b -> Stream m a -> Stream m b
D.parseIterate (Parser m a b -> Parser m a b
forall (m :: * -> *) a b.
MonadThrow m =>
Parser m a b -> Parser m a b
PRK.fromParserK (Parser m a b -> Parser m a b)
-> (b -> Parser m a b) -> b -> Parser m a b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. b -> Parser m a b
f) b
i (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)

------------------------------------------------------------------------------
-- Generalized grouping
------------------------------------------------------------------------------

-- This combinator is the most general grouping combinator and can be used to
-- implement all other grouping combinators.
--
-- XXX check if this can implement the splitOn combinator i.e. we can slide in
-- new elements, slide out old elements and incrementally compute the hash.
-- Also, can we implement the windowed classification combinators using this?
--
-- In fact this is a parse. Instead of using a special return value in the fold
-- we are using a mapping function.
--
-- Note that 'scanl'' (usually followed by a map to extract the desired value
-- from the accumulator) can be used to realize many implementations e.g. a
-- sliding window implementation. A scan followed by a mapMaybe is also a good
-- pattern to express many problems where we want to emit a filtered output and
-- not emit an output on every input.
--
-- Passing on of the initial accumulator value to the next fold is equivalent
-- to returning the leftover concept.

{-
-- | @groupScan splitter fold stream@ folds the input stream using @fold@.
-- @splitter@ is applied on the accumulator of the fold every time an item is
-- consumed by the fold. The fold continues until @splitter@ returns a 'Just'
-- value.  A 'Just' result from the @splitter@ specifies a result to be emitted
-- in the output stream and the initial value of the accumulator for the next
-- group's fold. This allows us to control whether to start fresh for the next
-- fold or to continue from the previous fold's output.
--
{-# INLINE groupScan #-}
groupScan
    :: (IsStream t, Monad m)
    => (x -> m (Maybe (b, x))) -> Fold m a x -> t m a -> t m b
groupScan split fold m = undefined
-}

------------------------------------------------------------------------------
-- Grouping
------------------------------------------------------------------------------

-- | @groupsBy cmp f $ S.fromList [a,b,c,...]@ assigns the element @a@ to the
-- first group, if @b \`cmp` a@ is 'True' then @b@ is also assigned to the same
-- group.  If @c \`cmp` a@ is 'True' then @c@ is also assigned to the same
-- group and so on. When the comparison fails a new group is started. Each
-- group is folded using the fold @f@ and the result of the fold is emitted in
-- the output stream.
--
-- >>> Stream.toList $ Stream.groupsBy (>) Fold.toList $ Stream.fromList [1,3,7,0,2,5]
-- [[1,3,7],[0,2,5]]
--
-- @since 0.7.0
{-# INLINE groupsBy #-}
groupsBy
    :: (IsStream t, Monad m)
    => (a -> a -> Bool)
    -> Fold m a b
    -> t m a
    -> t m b
groupsBy :: (a -> a -> Bool) -> Fold m a b -> t m a -> t m b
groupsBy a -> a -> Bool
cmp Fold m a b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
D.groupsBy a -> a -> Bool
cmp Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)

-- | Unlike @groupsBy@ this function performs a rolling comparison of two
-- successive elements in the input stream. @groupsByRolling cmp f $ S.fromList
-- [a,b,c,...]@ assigns the element @a@ to the first group, if @a \`cmp` b@ is
-- 'True' then @b@ is also assigned to the same group.  If @b \`cmp` c@ is
-- 'True' then @c@ is also assigned to the same group and so on. When the
-- comparison fails a new group is started. Each group is folded using the fold
-- @f@.
--
-- >>> Stream.toList $ Stream.groupsByRolling (\a b -> a + 1 == b) Fold.toList $ Stream.fromList [1,2,3,7,8,9]
-- [[1,2,3],[7,8,9]]
--
-- @since 0.7.0
{-# INLINE groupsByRolling #-}
groupsByRolling
    :: (IsStream t, Monad m)
    => (a -> a -> Bool)
    -> Fold m a b
    -> t m a
    -> t m b
groupsByRolling :: (a -> a -> Bool) -> Fold m a b -> t m a -> t m b
groupsByRolling a -> a -> Bool
cmp Fold m a b
f t m a
m =  Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
D.groupsRollingBy a -> a -> Bool
cmp Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)

-- |
-- > groups = groupsBy (==)
-- > groups = groupsByRolling (==)
--
-- Groups contiguous spans of equal elements together in individual groups.
--
-- >>> Stream.toList $ Stream.groups Fold.toList $ Stream.fromList [1,1,2,2]
-- [[1,1],[2,2]]
--
-- @since 0.7.0
{-# INLINE groups #-}
groups :: (IsStream t, Monad m, Eq a) => Fold m a b -> t m a -> t m b
groups :: Fold m a b -> t m a -> t m b
groups = (a -> a -> Bool) -> Fold m a b -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> Fold m a b -> t m a -> t m b
groupsBy a -> a -> Bool
forall a. Eq a => a -> a -> Bool
(==)

------------------------------------------------------------------------------
-- Splitting - by a predicate
------------------------------------------------------------------------------

-- In general we can use deintercalate for splitting.  Then we can also use
-- uniqBy to condense the separators.  One way to generalize splitting is to
-- output:
--
-- data Segment a b = Empty | Segment b | Separator a
--
-- XXX splitOn and splitOnSuffix have a different behavior on an empty stream,
-- is that desirable?

-- | Split on an infixed separator element, dropping the separator.  The
-- supplied 'Fold' is applied on the split segments.  Splits the stream on
-- separator elements determined by the supplied predicate, separator is
-- considered as infixed between two segments:
--
-- >>> splitOn' p xs = Stream.toList $ Stream.splitOn p Fold.toList (Stream.fromList xs)
-- >>> splitOn' (== '.') "a.b"
-- ["a","b"]
--
-- An empty stream is folded to the default value of the fold:
--
-- >>> splitOn' (== '.') ""
-- [""]
--
-- If one or both sides of the separator are missing then the empty segment on
-- that side is folded to the default output of the fold:
--
-- >>> splitOn' (== '.') "."
-- ["",""]
--
-- >>> splitOn' (== '.') ".a"
-- ["","a"]
--
-- >>> splitOn' (== '.') "a."
-- ["a",""]
--
-- >>> splitOn' (== '.') "a..b"
-- ["a","","b"]
--
-- splitOn is an inverse of intercalating single element:
--
-- > Stream.intercalate (Stream.fromPure '.') Unfold.fromList . Stream.splitOn (== '.') Fold.toList === id
--
-- Assuming the input stream does not contain the separator:
--
-- > Stream.splitOn (== '.') Fold.toList . Stream.intercalate (Stream.fromPure '.') Unfold.fromList === id
--
-- @since 0.7.0

{-# INLINE splitOn #-}
splitOn
    :: (IsStream t, Monad m)
    => (a -> Bool) -> Fold m a b -> t m a -> t m b
splitOn :: (a -> Bool) -> Fold m a b -> t m a -> t m b
splitOn a -> Bool
predicate Fold m a b
f =
    -- We can express the infix splitting in terms of optional suffix split
    -- fold.  After applying a suffix split fold repeatedly if the last segment
    -- ends with a suffix then we need to return the default output of the fold
    -- after that to make it an infix split.
    --
    -- Alternately, we can also express it using an optional prefix split fold.
    -- If the first segment starts with a prefix then we need to emit the
    -- default output of the fold before that to make it an infix split, and
    -- then apply prefix split fold repeatedly.
    --
    -- Since a suffix split fold can be easily expressed using a
    -- non-backtracking fold, we use that.
    Fold m a b -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m b
foldManyPost ((a -> Bool) -> Fold m a b -> Fold m a b
forall (m :: * -> *) a b.
Monad m =>
(a -> Bool) -> Fold m a b -> Fold m a b
FL.takeEndBy_ a -> Bool
predicate Fold m a b
f)

-- | Split on a suffixed separator element, dropping the separator.  The
-- supplied 'Fold' is applied on the split segments.
--
-- >>> splitOnSuffix' p xs = Stream.toList $ Stream.splitOnSuffix p Fold.toList (Stream.fromList xs)
-- >>> splitOnSuffix' (== '.') "a.b."
-- ["a","b"]
--
-- >>> splitOnSuffix' (== '.') "a."
-- ["a"]
--
-- An empty stream results in an empty output stream:
--
-- >>> splitOnSuffix' (== '.') ""
-- []
--
-- An empty segment consisting of only a suffix is folded to the default output
-- of the fold:
--
-- >>> splitOnSuffix' (== '.') "."
-- [""]
--
-- >>> splitOnSuffix' (== '.') "a..b.."
-- ["a","","b",""]
--
-- A suffix is optional at the end of the stream:
--
-- >>> splitOnSuffix' (== '.') "a"
-- ["a"]
--
-- >>> splitOnSuffix' (== '.') ".a"
-- ["","a"]
--
-- >>> splitOnSuffix' (== '.') "a.b"
-- ["a","b"]
--
-- > lines = splitOnSuffix (== '\n')
--
-- 'splitOnSuffix' is an inverse of 'intercalateSuffix' with a single element:
--
-- > Stream.intercalateSuffix (Stream.fromPure '.') Unfold.fromList . Stream.splitOnSuffix (== '.') Fold.toList === id
--
-- Assuming the input stream does not contain the separator:
--
-- > Stream.splitOnSuffix (== '.') Fold.toList . Stream.intercalateSuffix (Stream.fromPure '.') Unfold.fromList === id
--
-- @since 0.7.0

{-# INLINE splitOnSuffix #-}
splitOnSuffix
    :: (IsStream t, Monad m)
    => (a -> Bool) -> Fold m a b -> t m a -> t m b
splitOnSuffix :: (a -> Bool) -> Fold m a b -> t m a -> t m b
splitOnSuffix a -> Bool
predicate Fold m a b
f = Fold m a b -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m b
foldMany ((a -> Bool) -> Fold m a b -> Fold m a b
forall (m :: * -> *) a b.
Monad m =>
(a -> Bool) -> Fold m a b -> Fold m a b
FL.takeEndBy_ a -> Bool
predicate Fold m a b
f)

-- | Split on a prefixed separator element, dropping the separator.  The
-- supplied 'Fold' is applied on the split segments.
--
-- @
-- > splitOnPrefix' p xs = Stream.toList $ Stream.splitOnPrefix p (Fold.toList) (Stream.fromList xs)
-- > splitOnPrefix' (== '.') ".a.b"
-- ["a","b"]
-- @
--
-- An empty stream results in an empty output stream:
-- @
-- > splitOnPrefix' (== '.') ""
-- []
-- @
--
-- An empty segment consisting of only a prefix is folded to the default output
-- of the fold:
--
-- @
-- > splitOnPrefix' (== '.') "."
-- [""]
--
-- > splitOnPrefix' (== '.') ".a.b."
-- ["a","b",""]
--
-- > splitOnPrefix' (== '.') ".a..b"
-- ["a","","b"]
--
-- @
--
-- A prefix is optional at the beginning of the stream:
--
-- @
-- > splitOnPrefix' (== '.') "a"
-- ["a"]
--
-- > splitOnPrefix' (== '.') "a.b"
-- ["a","b"]
-- @
--
-- 'splitOnPrefix' is an inverse of 'intercalatePrefix' with a single element:
--
-- > Stream.intercalatePrefix (Stream.fromPure '.') Unfold.fromList . Stream.splitOnPrefix (== '.') Fold.toList === id
--
-- Assuming the input stream does not contain the separator:
--
-- > Stream.splitOnPrefix (== '.') Fold.toList . Stream.intercalatePrefix (Stream.fromPure '.') Unfold.fromList === id
--
-- /Unimplemented/

{-# INLINE splitOnPrefix #-}
splitOnPrefix :: -- (IsStream t, MonadCatch m) =>
    (a -> Bool) -> Fold m a b -> t m a -> t m b
splitOnPrefix :: (a -> Bool) -> Fold m a b -> t m a -> t m b
splitOnPrefix a -> Bool
_predicate Fold m a b
_f = t m a -> t m b
forall a. HasCallStack => a
undefined
    -- parseMany (Parser.sliceBeginBy predicate f)

-- | Like 'splitOn' after stripping leading, trailing, and repeated separators.
-- Therefore, @".a..b."@ with '.' as the separator would be parsed as
-- @["a","b"]@.  In other words, its like parsing words from whitespace
-- separated text.
--
-- >>> wordsBy' p xs = Stream.toList $ Stream.wordsBy p Fold.toList (Stream.fromList xs)
--
-- >>> wordsBy' (== ',') ""
-- []
--
-- >>> wordsBy' (== ',') ","
-- []
--
-- >>> wordsBy' (== ',') ",a,,b,"
-- ["a","b"]
--
-- > words = wordsBy isSpace
--
-- @since 0.7.0

-- It is equivalent to splitting in any of the infix/prefix/suffix styles
-- followed by removal of empty segments.
{-# INLINE wordsBy #-}
wordsBy
    :: (IsStream t, Monad m)
    => (a -> Bool) -> Fold m a b -> t m a -> t m b
wordsBy :: (a -> Bool) -> Fold m a b -> t m a -> t m b
wordsBy a -> Bool
predicate Fold m a b
f t m a
m =
    Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
D.wordsBy a -> Bool
predicate Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)

-- | Like 'splitOnSuffix' but keeps the suffix attached to the resulting
-- splits.
--
-- >>> splitWithSuffix' p xs = Stream.toList $ splitWithSuffix p Fold.toList (Stream.fromList xs)
--
-- >>> splitWithSuffix' (== '.') ""
-- []
--
-- >>> splitWithSuffix' (== '.') "."
-- ["."]
--
-- >>> splitWithSuffix' (== '.') "a"
-- ["a"]
--
-- >>> splitWithSuffix' (== '.') ".a"
-- [".","a"]
--
-- >>> splitWithSuffix' (== '.') "a."
-- ["a."]
--
-- >>> splitWithSuffix' (== '.') "a.b"
-- ["a.","b"]
--
-- >>> splitWithSuffix' (== '.') "a.b."
-- ["a.","b."]
--
-- >>> splitWithSuffix' (== '.') "a..b.."
-- ["a.",".","b.","."]
--
-- @since 0.7.0

{-# INLINE splitWithSuffix #-}
splitWithSuffix
    :: (IsStream t, Monad m)
    => (a -> Bool) -> Fold m a b -> t m a -> t m b
splitWithSuffix :: (a -> Bool) -> Fold m a b -> t m a -> t m b
splitWithSuffix a -> Bool
predicate Fold m a b
f = Fold m a b -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m b
foldMany ((a -> Bool) -> Fold m a b -> Fold m a b
forall (m :: * -> *) a b.
Monad m =>
(a -> Bool) -> Fold m a b -> Fold m a b
FL.takeEndBy a -> Bool
predicate Fold m a b
f)

------------------------------------------------------------------------------
-- Splitting - on a delimiter sequence
------------------------------------------------------------------------------

-- Int list examples for splitOn:
--
-- >>> splitList [] [1,2,3,3,4]
-- > [[1],[2],[3],[3],[4]]
--
-- >>> splitList [5] [1,2,3,3,4]
-- > [[1,2,3,3,4]]
--
-- >>> splitList [1] [1,2,3,3,4]
-- > [[],[2,3,3,4]]
--
-- >>> splitList [4] [1,2,3,3,4]
-- > [[1,2,3,3],[]]
--
-- >>> splitList [2] [1,2,3,3,4]
-- > [[1],[3,3,4]]
--
-- >>> splitList [3] [1,2,3,3,4]
-- > [[1,2],[],[4]]
--
-- >>> splitList [3,3] [1,2,3,3,4]
-- > [[1,2],[4]]
--
-- >>> splitList [1,2,3,3,4] [1,2,3,3,4]
-- > [[],[]]

{-
-- This can be implemented easily using Rabin Karp
-- | Split on any one of the given patterns.
{-# INLINE splitOnAny #-}
splitOnAny
    :: (IsStream t, Monad m, Storable a, Integral a)
    => [Array a] -> Fold m a b -> t m a -> t m b
splitOnAny subseq f m = undefined -- D.fromStreamD $ D.splitOnAny f subseq (D.toStreamD m)
-}

-- XXX use a non-monadic intersperse to remove the MonadAsync constraint.
-- XXX Use two folds, one ring buffer fold for separator sequence and the other
-- split consumer fold. The input is fed to the ring fold first and the
-- rejected input is fed to the split fold. If the separator matches, the ring
-- fold would consume all.
--
-- | Like 'splitOnSeq' but splits the separator as well, as an infix token.
--
-- >>> splitOn'_ pat xs = Stream.toList $ Stream.splitBySeq (Array.fromList pat) Fold.toList (Stream.fromList xs)
--
-- >>> splitOn'_ "" "hello"
-- ["h","","e","","l","","l","","o"]
--
-- >>> splitOn'_ "hello" ""
-- [""]
--
-- >>> splitOn'_ "hello" "hello"
-- ["","hello",""]
--
-- >>> splitOn'_ "x" "hello"
-- ["hello"]
--
-- >>> splitOn'_ "h" "hello"
-- ["","h","ello"]
--
-- >>> splitOn'_ "o" "hello"
-- ["hell","o",""]
--
-- >>> splitOn'_ "e" "hello"
-- ["h","e","llo"]
--
-- >>> splitOn'_ "l" "hello"
-- ["he","l","","l","o"]
--
-- >>> splitOn'_ "ll" "hello"
-- ["he","ll","o"]
--
-- /Pre-release/
{-# INLINE splitBySeq #-}
splitBySeq
    :: (IsStream t, MonadAsync m, Storable a, Enum a, Eq a)
    => Array a -> Fold m a b -> t m a -> t m b
splitBySeq :: Array a -> Fold m a b -> t m a -> t m b
splitBySeq Array a
patt Fold m a b
f t m a
m =
    m b -> t m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
intersperseM (Fold m a b -> SerialT m a -> m b
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> SerialT m a -> m b
fold Fold m a b
f (Array a -> SerialT m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t, Storable a) =>
Array a -> t m a
A.toStream Array a
patt)) (t m b -> t m b) -> t m b -> t m b
forall a b. (a -> b) -> a -> b
$ Array a -> Fold m a b -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadIO m, Storable a, Enum a, Eq a) =>
Array a -> Fold m a b -> t m a -> t m b
splitOnSeq Array a
patt Fold m a b
f t m a
m

-- | Like 'splitSuffixBy' but the separator is a sequence of elements, instead
-- of a predicate for a single element.
--
-- >>> splitOnSuffixSeq_ pat xs = Stream.toList $ Stream.splitOnSuffixSeq (Array.fromList pat) Fold.toList (Stream.fromList xs)
--
-- >>> splitOnSuffixSeq_ "." ""
-- []
--
-- >>> splitOnSuffixSeq_ "." "."
-- [""]
--
-- >>> splitOnSuffixSeq_ "." "a"
-- ["a"]
--
-- >>> splitOnSuffixSeq_ "." ".a"
-- ["","a"]
--
-- >>> splitOnSuffixSeq_ "." "a."
-- ["a"]
--
-- >>> splitOnSuffixSeq_ "." "a.b"
-- ["a","b"]
--
-- >>> splitOnSuffixSeq_ "." "a.b."
-- ["a","b"]
--
-- >>> splitOnSuffixSeq_ "." "a..b.."
-- ["a","","b",""]
--
-- > lines = splitOnSuffixSeq "\n"
--
-- 'splitOnSuffixSeq' is an inverse of 'intercalateSuffix'. The following law
-- always holds:
--
-- > intercalateSuffix . splitOnSuffixSeq == id
--
-- The following law holds when the separator is non-empty and contains none of
-- the elements present in the input lists:
--
-- > splitSuffixOn . intercalateSuffix == id
--
-- /Pre-release/
{-# INLINE splitOnSuffixSeq #-}
splitOnSuffixSeq
    :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a)
    => Array a -> Fold m a b -> t m a -> t m b
splitOnSuffixSeq :: Array a -> Fold m a b -> t m a -> t m b
splitOnSuffixSeq Array a
patt Fold m a b
f t m a
m =
    Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Bool -> Array a -> Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
(MonadIO m, Storable a, Enum a, Eq a) =>
Bool -> Array a -> Fold m a b -> Stream m a -> Stream m b
D.splitOnSuffixSeq Bool
False Array a
patt Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)

{-
-- | Like 'splitOn' but drops any empty splits.
--
{-# INLINE wordsOn #-}
wordsOn
    :: (IsStream t, Monad m, Storable a, Eq a)
    => Array a -> Fold m a b -> t m a -> t m b
wordsOn subseq f m = undefined -- D.fromStreamD $ D.wordsOn f subseq (D.toStreamD m)
-}

-- | Like 'splitOnSuffixSeq' but keeps the suffix intact in the splits.
--
-- >>> splitWithSuffixSeq' pat xs = Stream.toList $ Stream.splitWithSuffixSeq (Array.fromList pat) Fold.toList (Stream.fromList xs)
--
-- >>> splitWithSuffixSeq' "." ""
-- []
--
-- >>> splitWithSuffixSeq' "." "."
-- ["."]
--
-- >>> splitWithSuffixSeq' "." "a"
-- ["a"]
--
-- >>> splitWithSuffixSeq' "." ".a"
-- [".","a"]
--
-- >>> splitWithSuffixSeq' "." "a."
-- ["a."]
--
-- >>> splitWithSuffixSeq' "." "a.b"
-- ["a.","b"]
--
-- >>> splitWithSuffixSeq' "." "a.b."
-- ["a.","b."]
--
-- >>> splitWithSuffixSeq' "." "a..b.."
-- ["a.",".","b.","."]
--
-- /Pre-release/
{-# INLINE splitWithSuffixSeq #-}
splitWithSuffixSeq
    :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a)
    => Array a -> Fold m a b -> t m a -> t m b
splitWithSuffixSeq :: Array a -> Fold m a b -> t m a -> t m b
splitWithSuffixSeq Array a
patt Fold m a b
f t m a
m =
    Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Bool -> Array a -> Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
(MonadIO m, Storable a, Enum a, Eq a) =>
Bool -> Array a -> Fold m a b -> Stream m a -> Stream m b
D.splitOnSuffixSeq Bool
True Array a
patt Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)

{-
-- This can be implemented easily using Rabin Karp
-- | Split post any one of the given patterns.
{-# INLINE splitOnSuffixSeqAny #-}
splitOnSuffixSeqAny
    :: (IsStream t, Monad m, Storable a, Integral a)
    => [Array a] -> Fold m a b -> t m a -> t m b
splitOnSuffixSeqAny subseq f m = undefined
    -- D.fromStreamD $ D.splitPostAny f subseq (D.toStreamD m)
-}

------------------------------------------------------------------------------
-- Chunking
------------------------------------------------------------------------------

-- | Group the input stream into groups of @n@ elements each and then fold each
-- group using the provided fold function.
--
-- >>> Stream.toList $ Stream.chunksOf 2 Fold.sum (Stream.enumerateFromTo 1 10)
-- [3,7,11,15,19]
--
-- This can be considered as an n-fold version of 'take' where we apply
-- 'take' repeatedly on the leftover stream until the stream exhausts.
--
-- @chunksOf n f = foldMany (FL.take n f)@
--
-- @since 0.7.0
{-# INLINE chunksOf #-}
chunksOf
    :: (IsStream t, Monad m)
    => Int -> Fold m a b -> t m a -> t m b
chunksOf :: Int -> Fold m a b -> t m a -> t m b
chunksOf Int
n Fold m a b
f = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> (t m a -> Stream m b) -> t m a -> t m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
Int -> Fold m a b -> Stream m a -> Stream m b
D.chunksOf Int
n Fold m a b
f (Stream m a -> Stream m b)
-> (t m a -> Stream m a) -> t m a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD

-- |
--
-- /Pre-release/
{-# INLINE chunksOf2 #-}
chunksOf2
    :: (IsStream t, Monad m)
    => Int -> m c -> Fold2 m c a b -> t m a -> t m b
chunksOf2 :: Int -> m c -> Fold2 m c a b -> t m a -> t m b
chunksOf2 Int
n m c
action Fold2 m c a b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Int -> m c -> Fold2 m c a b -> Stream m a -> Stream m b
forall (m :: * -> *) c a b.
Monad m =>
Int -> m c -> Fold2 m c a b -> Stream m a -> Stream m b
D.groupsOf2 Int
n m c
action Fold2 m c a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)

-- | @arraysOf n stream@ groups the elements in the input stream into arrays of
-- @n@ elements each.
--
-- Same as the following but may be more efficient:
--
-- > arraysOf n = Stream.foldMany (A.writeN n)
--
-- /Pre-release/
{-# INLINE arraysOf #-}
arraysOf :: (IsStream t, MonadIO m, Storable a)
    => Int -> t m a -> t m (Array a)
arraysOf :: Int -> t m a -> t m (Array a)
arraysOf Int
n = Stream m (Array a) -> t m (Array a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m (Array a) -> t m (Array a))
-> (t m a -> Stream m (Array a)) -> t m a -> t m (Array a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Stream m a -> Stream m (Array a)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Stream m a -> Stream m (Array a)
A.arraysOf Int
n (Stream m a -> Stream m (Array a))
-> (t m a -> Stream m a) -> t m a -> Stream m (Array a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD

-- XXX we can implement this by repeatedly applying the 'lrunFor' fold.
-- XXX add this example after fixing the serial stream rate control
--
-- | Group the input stream into windows of @n@ second each and then fold each
-- group using the provided fold function.
--
-- >>> Stream.toList $ Stream.take 5 $ Stream.intervalsOf 1 Fold.sum $ Stream.constRate 2 $ Stream.enumerateFrom 1
-- [...,...,...,...,...]
--
-- @since 0.7.0
{-# INLINE intervalsOf #-}
intervalsOf
    :: (IsStream t, MonadAsync m)
    => Double -> Fold m a b -> t m a -> t m b
intervalsOf :: Double -> Fold m a b -> t m a -> t m b
intervalsOf Double
n Fold m a b
f t m a
xs =
    (Maybe a -> Bool) -> Fold m (Maybe a) b -> t m (Maybe a) -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Bool) -> Fold m a b -> t m a -> t m b
splitWithSuffix Maybe a -> Bool
forall a. Maybe a -> Bool
isNothing (Fold m a b -> Fold m (Maybe a) b
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Fold m (Maybe a) b
FL.catMaybes Fold m a b
f)
        (Double -> m (Maybe a) -> t m (Maybe a) -> t m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
Double -> m a -> t m a -> t m a
interjectSuffix Double
n (Maybe a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing) ((a -> Maybe a) -> t m a -> t m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Serial.map a -> Maybe a
forall a. a -> Maybe a
Just t m a
xs))

------------------------------------------------------------------------------
-- Windowed classification
------------------------------------------------------------------------------

-- We divide the stream into windows or chunks in space or time and each window
-- can be associated with a key, all events associated with a particular key in
-- the window can be folded to a single result. The stream can be split into
-- windows by size or by using a split predicate on the elements in the stream.
-- For example, when we receive a closing flag, we can close the window.
--
-- A "chunk" is a space window and a "session" is a time window. Are there any
-- other better short words to describe them. An alternative is to use
-- "swindow" and "twindow". Another word for "session" could be "spell".
--
-- TODO: To mark the position in space or time we can have Indexed or
-- TimeStamped types. That can make it easy to deal with the position indices
-- or timestamps.

------------------------------------------------------------------------------
-- Keyed Sliding Windows
------------------------------------------------------------------------------

{-
{-# INLINABLE classifySlidingChunks #-}
classifySlidingChunks
    :: (IsStream t, MonadAsync m, Ord k)
    => Int              -- ^ window size
    -> Int              -- ^ window slide
    -> Fold m a b       -- ^ Fold to be applied to window events
    -> t m (k, a, Bool) -- ^ window key, data, close event
    -> t m (k, b)
classifySlidingChunks wsize wslide (Fold step initial extract) str
    = undefined

-- XXX Another variant could be to slide the window on an event, e.g. in TCP we
-- slide the send window when an ack is received and we slide the receive
-- window when a sequence is complete. Sliding is stateful in case of TCP,
-- sliding releases the send buffer or makes data available to the user from
-- the receive buffer.
{-# INLINABLE classifySlidingSessions #-}
classifySlidingSessions
    :: (IsStream t, MonadAsync m, Ord k)
    => Double         -- ^ timer tick in seconds
    -> Double         -- ^ time window size
    -> Double         -- ^ window slide
    -> Fold m a b     -- ^ Fold to be applied to window events
    -> t m (k, a, Bool, AbsTime) -- ^ window key, data, close flag, timestamp
    -> t m (k, b)
classifySlidingSessions tick interval slide (Fold step initial extract) str
    = undefined
-}

------------------------------------------------------------------------------
-- Sliding Window Buffers
------------------------------------------------------------------------------

-- These buffered versions could be faster than concurrent incremental folds of
-- all overlapping windows as in many cases we may not need all the values to
-- compute the fold, we can just compute the result using the old value and new
-- value.  However, we may need the buffer once in a while, for example for
-- string search we usually compute the hash incrementally but when the hash
-- matches the hash of the pattern we need to compare the whole string.
--
-- XXX we should be able to implement sequence based splitting combinators
-- using this combinator.

{-
-- | Buffer n elements of the input in a ring buffer. When t new elements are
-- collected, slide the window to remove the same number of oldest elements,
-- insert the new elements, and apply an incremental fold on the sliding
-- window, supplying the outgoing elements, the new ring buffer as arguments.
slidingChunkBuffer
    :: (IsStream t, Monad m, Ord a, Storable a)
    => Int -- window size
    -> Int -- window slide
    -> Fold m (Ring a, Array a) b
    -> t m a
    -> t m b
slidingChunkBuffer = undefined

-- Buffer n seconds worth of stream elements of the input in a radix tree.
-- Every t seconds, remove the items that are older than n seconds, and apply
-- an incremental fold on the sliding window, supplying the outgoing elements,
-- and the new radix tree buffer as arguments.
slidingSessionBuffer
    :: (IsStream t, Monad m, Ord a, Storable a)
    => Int    -- window size
    -> Int    -- tick size
    -> Fold m (RTree a, Array a) b
    -> t m a
    -> t m b
slidingSessionBuffer = undefined
-}

------------------------------------------------------------------------------
-- Keyed Session Windows
------------------------------------------------------------------------------

{-
-- | Keyed variable size space windows. Close the window if we do not receive a
-- window event in the next "spaceout" elements.
{-# INLINABLE classifyChunksBy #-}
classifyChunksBy
    :: (IsStream t, MonadAsync m, Ord k)
    => Int   -- ^ window spaceout (spread)
    -> Bool  -- ^ reset the spaceout when a chunk window element is received
    -> Fold m a b       -- ^ Fold to be applied to chunk window elements
    -> t m (k, a, Bool) -- ^ chunk key, data, last element
    -> t m (k, b)
classifyChunksBy spanout reset (Fold step initial extract) str = undefined

-- | Like 'classifyChunksOf' but the chunk size is reset if an element is
-- received within the chunk size window. The chunk gets closed only if no
-- element is received within the chunk window.
--
{-# INLINABLE classifyKeepAliveChunks #-}
classifyKeepAliveChunks
    :: (IsStream t, MonadAsync m, Ord k)
    => Int   -- ^ window spaceout (spread)
    -> Fold m a b       -- ^ Fold to be applied to chunk window elements
    -> t m (k, a, Bool) -- ^ chunk key, data, last element
    -> t m (k, b)
classifyKeepAliveChunks spanout = classifyChunksBy spanout True
-}

data SessionState t m k a b = SessionState
    { SessionState t m k a b -> AbsTime
sessionCurTime :: !AbsTime  -- ^ time since last event
    , SessionState t m k a b -> AbsTime
sessionEventTime :: !AbsTime -- ^ time as per last event
    , SessionState t m k a b -> Int
sessionCount :: !Int -- ^ total number sessions in progress
    , SessionState t m k a b -> Heap (Entry AbsTime k)
sessionTimerHeap :: H.Heap (H.Entry AbsTime k) -- ^ heap for timeouts
    , SessionState t m k a b -> Map k a
sessionKeyValueMap :: Map.Map k a -- ^ Stored sessions for keys
    , SessionState t m k a b -> t m (k, b)
sessionOutputStream :: t (m :: Type -> Type) (k, b) -- ^ Completed sessions
    }

-- | @classifySessionsBy tick keepalive predicate timeout fold stream@
-- classifies an input event @stream@ consisting of  @(timestamp, (key,
-- value))@ into sessions based on the @key@, folding all the values
-- corresponding to the same key into a session using the supplied @fold@.
--
-- When the fold terminates or a @timeout@ occurs, a tuple consisting of the
-- session key and the folded value is emitted in the output stream. The
-- timeout is measured from the first event in the session.  If the @keepalive@
-- option is set to 'True' the timeout is reset to 0 whenever an event is
-- received.
--
-- The @timestamp@ in the input stream is an absolute time from some epoch,
-- characterizing the time when the input event was generated.  The notion of
-- current time is maintained by a monotonic event time clock using the
-- timestamps seen in the input stream. The latest timestamp seen till now is
-- used as the base for the current time.  When no new events are seen, a timer
-- is started with a clock resolution of @tick@ seconds. This timer is used to
-- detect session timeouts in the absence of new events.
--
-- To ensure an upper bound on the memory used the number of sessions can be
-- limited to an upper bound. If the ejection @predicate@ returns 'True', the
-- oldest session is ejected before inserting a new session.
--
-- >>> :{
-- Stream.mapM_ print
--     $ Stream.classifySessionsBy 1 False (const (return False)) 3 (Fold.take 3 Fold.toList)
--     $ Stream.timestamped
--     $ Stream.delay 0.1
--     $ (,) <$> Stream.fromList [1,2,3] <*> Stream.fromList ['a','b','c']
-- :}
-- (1,"abc")
-- (2,"abc")
-- (3,"abc")
--
-- /Pre-release/
--
{-# INLINABLE classifySessionsBy #-}
classifySessionsBy
    :: (IsStream t, MonadAsync m, Ord k)
    => Double         -- ^ timer tick in seconds
    -> Bool           -- ^ reset the timer when an event is received
    -> (Int -> m Bool) -- ^ predicate to eject sessions based on session count
    -> Double         -- ^ session timeout in seconds
    -> Fold m a b  -- ^ Fold to be applied to session data
    -> t m (AbsTime, (k, a)) -- ^ timestamp, (session key, session data)
    -> t m (k, b) -- ^ session key, fold result
classifySessionsBy :: Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
classifySessionsBy Double
tick Bool
reset Int -> m Bool
ejectPred Double
tmout
    (Fold s -> a -> m (Step s b)
step m (Step s b)
initial s -> m b
extract) t m (AbsTime, (k, a))
str =
    (SessionState t m k (Tuple' AbsTime s) b -> t m (k, b))
-> t m (SessionState t m k (Tuple' AbsTime s) b) -> t m (k, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
concatMap SessionState t m k (Tuple' AbsTime s) b -> t m (k, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> t m (k, b)
sessionOutputStream (t m (SessionState t m k (Tuple' AbsTime s) b) -> t m (k, b))
-> t m (SessionState t m k (Tuple' AbsTime s) b) -> t m (k, b)
forall a b. (a -> b) -> a -> b
$
        (SessionState t m k (Tuple' AbsTime s) b
 -> Maybe (AbsTime, (k, a))
 -> m (SessionState t m k (Tuple' AbsTime s) b))
-> m (SessionState t m k (Tuple' AbsTime s) b)
-> (SessionState t m k (Tuple' AbsTime s) b
    -> m (SessionState t m k (Tuple' AbsTime s) b))
-> t m (Maybe (AbsTime, (k, a)))
-> t m (SessionState t m k (Tuple' AbsTime s) b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b
scanlMAfter' SessionState t m k (Tuple' AbsTime s) b
-> Maybe (AbsTime, (k, a))
-> m (SessionState t m k (Tuple' AbsTime s) b)
forall (t :: (* -> *) -> * -> *) k (t :: (* -> *) -> * -> *)
       (m :: * -> *) b (m :: * -> *).
(IsStream t, Ord k) =>
SessionState t m k (Tuple' AbsTime s) b
-> Maybe (AbsTime, (k, a))
-> m (SessionState t m k (Tuple' AbsTime s) b)
sstep (SessionState t m k (Tuple' AbsTime s) b
-> m (SessionState t m k (Tuple' AbsTime s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return SessionState t m k (Tuple' AbsTime s) b
forall (m :: * -> *) k a b. SessionState t m k a b
szero) SessionState t m k (Tuple' AbsTime s) b
-> m (SessionState t m k (Tuple' AbsTime s) b)
forall k (t :: (* -> *) -> * -> *) (t :: (* -> *) -> * -> *)
       (m :: * -> *) a b (m :: * -> *).
(Ord k, IsStream t) =>
SessionState t m k (Tuple' a s) b
-> m (SessionState t m k (Tuple' a s) b)
flush t m (Maybe (AbsTime, (k, a)))
stream

    where

    timeoutMs :: RelTime
timeoutMs = MilliSecond64 -> RelTime
forall a. TimeUnit a => a -> RelTime
toRelTime (Double -> MilliSecond64
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
tmout Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000) :: MilliSecond64)
    tickMs :: RelTime
tickMs = MilliSecond64 -> RelTime
forall a. TimeUnit a => a -> RelTime
toRelTime (Double -> MilliSecond64
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
tick Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000) :: MilliSecond64)
    szero :: SessionState t m k a b
szero = SessionState :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
AbsTime
-> AbsTime
-> Int
-> Heap (Entry AbsTime k)
-> Map k a
-> t m (k, b)
-> SessionState t m k a b
SessionState
        { sessionCurTime :: AbsTime
sessionCurTime = MilliSecond64 -> AbsTime
forall a. TimeUnit a => a -> AbsTime
toAbsTime (MilliSecond64
0 :: MilliSecond64)
        , sessionEventTime :: AbsTime
sessionEventTime = MilliSecond64 -> AbsTime
forall a. TimeUnit a => a -> AbsTime
toAbsTime (MilliSecond64
0 :: MilliSecond64)
        , sessionCount :: Int
sessionCount = Int
0
        , sessionTimerHeap :: Heap (Entry AbsTime k)
sessionTimerHeap = Heap (Entry AbsTime k)
forall a. Heap a
H.empty
        , sessionKeyValueMap :: Map k a
sessionKeyValueMap = Map k a
forall k a. Map k a
Map.empty
        , sessionOutputStream :: t m (k, b)
sessionOutputStream = t m (k, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
K.nil
        }

    -- We can eject sessions based on the current session count to limit
    -- memory consumption. There are two possible strategies:
    --
    -- 1) Eject old sessions or sessions beyond a certain/lower timeout
    -- threshold even before timeout, effectively reduce the timeout.
    -- 2) Drop creation of new sessions but keep accepting new events for the
    -- old ones.
    --
    -- We use the first strategy as of now.

    -- Got a new stream input element
    sstep :: SessionState t m k (Tuple' AbsTime s) b
-> Maybe (AbsTime, (k, a))
-> m (SessionState t m k (Tuple' AbsTime s) b)
sstep session :: SessionState t m k (Tuple' AbsTime s) b
session@SessionState{t m (k, b)
Int
Map k (Tuple' AbsTime s)
Heap (Entry AbsTime k)
AbsTime
sessionOutputStream :: t m (k, b)
sessionKeyValueMap :: Map k (Tuple' AbsTime s)
sessionTimerHeap :: Heap (Entry AbsTime k)
sessionCount :: Int
sessionEventTime :: AbsTime
sessionCurTime :: AbsTime
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> t m (k, b)
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Map k a
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Heap (Entry AbsTime k)
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Int
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
..} (Just (AbsTime
timestamp, (k
key, a
value))) = do
        -- XXX we should use a heap in pinned memory to scale it to a large
        -- size
        --
        -- To detect session inactivity we keep a timestamp of the latest event
        -- in the Map along with the fold result.  When we purge the session
        -- from the heap we match the timestamp in the heap with the timestamp
        -- in the Map, if the latest timestamp is newer and has not expired we
        -- reinsert the key in the heap.
        --
        -- XXX if the key is an Int, we can also use an IntMap for slightly
        -- better performance.
        --
        let curTime :: AbsTime
curTime = AbsTime -> AbsTime -> AbsTime
forall a. Ord a => a -> a -> a
max AbsTime
sessionEventTime AbsTime
timestamp
            mOld :: Maybe (Tuple' AbsTime s)
mOld = k -> Map k (Tuple' AbsTime s) -> Maybe (Tuple' AbsTime s)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup k
key Map k (Tuple' AbsTime s)
sessionKeyValueMap
        let done :: b -> m (SessionState t m k (Tuple' AbsTime s) b)
done b
fb = do
                -- deleting a key from the heap is expensive, so we never
                -- delete a key from heap, we just purge it from the Map and it
                -- gets purged from the heap on timeout. We just need an extra
                -- lookup in the Map when the key is purged from the heap, that
                -- should not be expensive.
                --
                let (Map k (Tuple' AbsTime s)
mp, Int
cnt) = case Maybe (Tuple' AbsTime s)
mOld of
                        Maybe (Tuple' AbsTime s)
Nothing -> (Map k (Tuple' AbsTime s)
sessionKeyValueMap, Int
sessionCount)
                        Just Tuple' AbsTime s
_ -> (k -> Map k (Tuple' AbsTime s) -> Map k (Tuple' AbsTime s)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete k
key Map k (Tuple' AbsTime s)
sessionKeyValueMap
                                  , Int
sessionCount Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
                SessionState t m k (Tuple' AbsTime s) b
-> m (SessionState t m k (Tuple' AbsTime s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (SessionState t m k (Tuple' AbsTime s) b
 -> m (SessionState t m k (Tuple' AbsTime s) b))
-> SessionState t m k (Tuple' AbsTime s) b
-> m (SessionState t m k (Tuple' AbsTime s) b)
forall a b. (a -> b) -> a -> b
$ SessionState t m k (Tuple' AbsTime s) b
session
                    { sessionCurTime :: AbsTime
sessionCurTime = AbsTime
curTime
                    , sessionEventTime :: AbsTime
sessionEventTime = AbsTime
curTime
                    , sessionCount :: Int
sessionCount = Int
cnt
                    , sessionKeyValueMap :: Map k (Tuple' AbsTime s)
sessionKeyValueMap = Map k (Tuple' AbsTime s)
mp
                    , sessionOutputStream :: t m (k, b)
sessionOutputStream = (k, b) -> t m (k, b)
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
fromPure (k
key, b
fb)
                    }
            partial :: s -> m (SessionState t m k (Tuple' AbsTime s) b)
partial s
fs1 = do
                let acc :: Tuple' AbsTime s
acc = AbsTime -> s -> Tuple' AbsTime s
forall a b. a -> b -> Tuple' a b
Tuple' AbsTime
timestamp s
fs1
                (Heap (Entry AbsTime k)
hp1, Map k (Tuple' AbsTime s)
mp1, t m (k, b)
out1, Int
cnt1) <- do
                        let vars :: (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m a, Int)
vars = (Heap (Entry AbsTime k)
sessionTimerHeap, Map k (Tuple' AbsTime s)
sessionKeyValueMap,
                                           t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
K.nil, Int
sessionCount)
                        case Maybe (Tuple' AbsTime s)
mOld of
                            -- inserting new entry
                            Maybe (Tuple' AbsTime s)
Nothing -> do
                                -- Eject a session from heap and map is needed
                                Bool
eject <- Int -> m Bool
ejectPred Int
sessionCount
                                (Heap (Entry AbsTime k)
hp, Map k (Tuple' AbsTime s)
mp, t m (k, b)
out, Int
cnt) <-
                                    if Bool
eject
                                    then (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), Int)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
      Int)
forall k d (t :: (* -> *) -> * -> *) (m :: * -> *).
(Ord k, Num d, IsStream t) =>
(Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), d)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
      d)
ejectOne (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), Int)
forall (m :: * -> *) a.
(Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m a, Int)
vars
                                    else (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), Int)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
      Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), Int)
forall (m :: * -> *) a.
(Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m a, Int)
vars

                                -- Insert the new session in heap
                                let expiry :: AbsTime
expiry = AbsTime -> RelTime -> AbsTime
addToAbsTime AbsTime
timestamp RelTime
timeoutMs
                                    hp' :: Heap (Entry AbsTime k)
hp' = Entry AbsTime k -> Heap (Entry AbsTime k) -> Heap (Entry AbsTime k)
forall a. Ord a => a -> Heap a -> Heap a
H.insert (AbsTime -> k -> Entry AbsTime k
forall p a. p -> a -> Entry p a
Entry AbsTime
expiry k
key) Heap (Entry AbsTime k)
hp
                                 in (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), Int)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
      Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime k)
hp', Map k (Tuple' AbsTime s)
mp, t m (k, b)
out, Int
cnt Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
                            -- updating old entry
                            Just Tuple' AbsTime s
_ -> (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), Int)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
      Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), Int)
forall (m :: * -> *) a.
(Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m a, Int)
vars

                let mp2 :: Map k (Tuple' AbsTime s)
mp2 = k
-> Tuple' AbsTime s
-> Map k (Tuple' AbsTime s)
-> Map k (Tuple' AbsTime s)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
key Tuple' AbsTime s
acc Map k (Tuple' AbsTime s)
mp1
                SessionState t m k (Tuple' AbsTime s) b
-> m (SessionState t m k (Tuple' AbsTime s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (SessionState t m k (Tuple' AbsTime s) b
 -> m (SessionState t m k (Tuple' AbsTime s) b))
-> SessionState t m k (Tuple' AbsTime s) b
-> m (SessionState t m k (Tuple' AbsTime s) b)
forall a b. (a -> b) -> a -> b
$ SessionState :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
AbsTime
-> AbsTime
-> Int
-> Heap (Entry AbsTime k)
-> Map k a
-> t m (k, b)
-> SessionState t m k a b
SessionState
                    { sessionCurTime :: AbsTime
sessionCurTime = AbsTime
curTime
                    , sessionEventTime :: AbsTime
sessionEventTime = AbsTime
curTime
                    , sessionCount :: Int
sessionCount = Int
cnt1
                    , sessionTimerHeap :: Heap (Entry AbsTime k)
sessionTimerHeap = Heap (Entry AbsTime k)
hp1
                    , sessionKeyValueMap :: Map k (Tuple' AbsTime s)
sessionKeyValueMap = Map k (Tuple' AbsTime s)
mp2
                    , sessionOutputStream :: t m (k, b)
sessionOutputStream = t m (k, b)
out1
                    }
        Step s b
res0 <- do
            case Maybe (Tuple' AbsTime s)
mOld of
                Maybe (Tuple' AbsTime s)
Nothing -> m (Step s b)
initial
                Just (Tuple' AbsTime
_ s
acc) -> Step s b -> m (Step s b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s b -> m (Step s b)) -> Step s b -> m (Step s b)
forall a b. (a -> b) -> a -> b
$ s -> Step s b
forall s b. s -> Step s b
FL.Partial s
acc
        case Step s b
res0 of
            FL.Done b
fb -> b -> m (SessionState t m k (Tuple' AbsTime s) b)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) b (m :: * -> *).
(Monad m, IsStream t) =>
b -> m (SessionState t m k (Tuple' AbsTime s) b)
done b
fb
            FL.Partial s
fs -> do
                Step s b
res <- s -> a -> m (Step s b)
step s
fs a
value
                case Step s b
res of
                    FL.Done b
fb -> b -> m (SessionState t m k (Tuple' AbsTime s) b)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) b (m :: * -> *).
(Monad m, IsStream t) =>
b -> m (SessionState t m k (Tuple' AbsTime s) b)
done b
fb
                    FL.Partial s
fs1 -> s -> m (SessionState t m k (Tuple' AbsTime s) b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
IsStream t =>
s -> m (SessionState t m k (Tuple' AbsTime s) b)
partial s
fs1

    -- Got a timer tick event
    sstep sessionState :: SessionState t m k (Tuple' AbsTime s) b
sessionState@SessionState{t m (k, b)
Int
Map k (Tuple' AbsTime s)
Heap (Entry AbsTime k)
AbsTime
sessionOutputStream :: t m (k, b)
sessionKeyValueMap :: Map k (Tuple' AbsTime s)
sessionTimerHeap :: Heap (Entry AbsTime k)
sessionCount :: Int
sessionEventTime :: AbsTime
sessionCurTime :: AbsTime
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> t m (k, b)
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Map k a
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Heap (Entry AbsTime k)
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Int
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
..} Maybe (AbsTime, (k, a))
Nothing =
        let curTime :: AbsTime
curTime = AbsTime -> RelTime -> AbsTime
addToAbsTime AbsTime
sessionCurTime RelTime
tickMs
        in SessionState t m k (Tuple' AbsTime s) b
-> AbsTime -> m (SessionState t m k (Tuple' AbsTime s) b)
forall k (t :: (* -> *) -> * -> *) (t :: (* -> *) -> * -> *)
       (m :: * -> *) b (m :: * -> *).
(Ord k, IsStream t) =>
SessionState t m k (Tuple' AbsTime s) b
-> AbsTime -> m (SessionState t m k (Tuple' AbsTime s) b)
ejectExpired SessionState t m k (Tuple' AbsTime s) b
sessionState AbsTime
curTime

    flush :: SessionState t m k (Tuple' a s) b
-> m (SessionState t m k (Tuple' a s) b)
flush session :: SessionState t m k (Tuple' a s) b
session@SessionState{t m (k, b)
Int
Map k (Tuple' a s)
Heap (Entry AbsTime k)
AbsTime
sessionOutputStream :: t m (k, b)
sessionKeyValueMap :: Map k (Tuple' a s)
sessionTimerHeap :: Heap (Entry AbsTime k)
sessionCount :: Int
sessionEventTime :: AbsTime
sessionCurTime :: AbsTime
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> t m (k, b)
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Map k a
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Heap (Entry AbsTime k)
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Int
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
..} = do
        (Heap (Entry AbsTime k)
hp', Map k (Tuple' a s)
mp', t m (k, b)
out, Int
count) <-
            (Heap (Entry AbsTime k), Map k (Tuple' a s), t m (k, b), Int)
-> m (Heap (Entry AbsTime k), Map k (Tuple' a s), t m (k, b), Int)
forall k d (t :: (* -> *) -> * -> *) p a (m :: * -> *).
(Ord k, Num d, IsStream t) =>
(Heap (Entry p k), Map k (Tuple' a s), t m (k, b), d)
-> m (Heap (Entry p k), Map k (Tuple' a s), t m (k, b), d)
ejectAll
                ( Heap (Entry AbsTime k)
sessionTimerHeap
                , Map k (Tuple' a s)
sessionKeyValueMap
                , t m (k, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
K.nil
                , Int
sessionCount
                )
        SessionState t m k (Tuple' a s) b
-> m (SessionState t m k (Tuple' a s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (SessionState t m k (Tuple' a s) b
 -> m (SessionState t m k (Tuple' a s) b))
-> SessionState t m k (Tuple' a s) b
-> m (SessionState t m k (Tuple' a s) b)
forall a b. (a -> b) -> a -> b
$ SessionState t m k (Tuple' a s) b
session
            { sessionCount :: Int
sessionCount = Int
count
            , sessionTimerHeap :: Heap (Entry AbsTime k)
sessionTimerHeap = Heap (Entry AbsTime k)
hp'
            , sessionKeyValueMap :: Map k (Tuple' a s)
sessionKeyValueMap = Map k (Tuple' a s)
mp'
            , sessionOutputStream :: t m (k, b)
sessionOutputStream = t m (k, b)
out
            }

    -- delete from map and output the fold accumulator
    ejectEntry :: a
-> Map k a
-> t m (k, b)
-> d
-> s
-> k
-> m (a, Map k a, t m (k, b), d)
ejectEntry a
hp Map k a
mp t m (k, b)
out d
cnt s
acc k
key = do
        b
sess <- s -> m b
extract s
acc
        let out1 :: t m (k, b)
out1 = (k
key, b
sess) (k, b) -> t m (k, b) -> t m (k, b)
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
`K.cons` t m (k, b)
out
        let mp1 :: Map k a
mp1 = k -> Map k a -> Map k a
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete k
key Map k a
mp
        (a, Map k a, t m (k, b), d) -> m (a, Map k a, t m (k, b), d)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
hp, Map k a
mp1, t m (k, b)
out1, d
cnt d -> d -> d
forall a. Num a => a -> a -> a
- d
1)

    ejectAll :: (Heap (Entry p k), Map k (Tuple' a s), t m (k, b), d)
-> m (Heap (Entry p k), Map k (Tuple' a s), t m (k, b), d)
ejectAll (Heap (Entry p k)
hp, Map k (Tuple' a s)
mp, t m (k, b)
out, !d
cnt) = do
        let hres :: Maybe (Entry p k, Heap (Entry p k))
hres = Heap (Entry p k) -> Maybe (Entry p k, Heap (Entry p k))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry p k)
hp
        case Maybe (Entry p k, Heap (Entry p k))
hres of
            Just (Entry p
_ k
key, Heap (Entry p k)
hp1) -> do
                (Heap (Entry p k), Map k (Tuple' a s), t m (k, b), d)
r <- case k -> Map k (Tuple' a s) -> Maybe (Tuple' a s)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup k
key Map k (Tuple' a s)
mp of
                    Maybe (Tuple' a s)
Nothing -> (Heap (Entry p k), Map k (Tuple' a s), t m (k, b), d)
-> m (Heap (Entry p k), Map k (Tuple' a s), t m (k, b), d)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry p k)
hp1, Map k (Tuple' a s)
mp, t m (k, b)
out, d
cnt)
                    Just (Tuple' a
_ s
acc) -> Heap (Entry p k)
-> Map k (Tuple' a s)
-> t m (k, b)
-> d
-> s
-> k
-> m (Heap (Entry p k), Map k (Tuple' a s), t m (k, b), d)
forall d (t :: (* -> *) -> * -> *) k a a (m :: * -> *).
(Num d, IsStream t, Ord k) =>
a
-> Map k a
-> t m (k, b)
-> d
-> s
-> k
-> m (a, Map k a, t m (k, b), d)
ejectEntry Heap (Entry p k)
hp1 Map k (Tuple' a s)
mp t m (k, b)
out d
cnt s
acc k
key
                (Heap (Entry p k), Map k (Tuple' a s), t m (k, b), d)
-> m (Heap (Entry p k), Map k (Tuple' a s), t m (k, b), d)
ejectAll (Heap (Entry p k), Map k (Tuple' a s), t m (k, b), d)
r
            Maybe (Entry p k, Heap (Entry p k))
Nothing -> do
                Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (Map k (Tuple' a s) -> Bool
forall k a. Map k a -> Bool
Map.null Map k (Tuple' a s)
mp) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
                (Heap (Entry p k), Map k (Tuple' a s), t m (k, b), d)
-> m (Heap (Entry p k), Map k (Tuple' a s), t m (k, b), d)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry p k)
hp, Map k (Tuple' a s)
mp, t m (k, b)
out, d
cnt)

    ejectOne :: (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), d)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
      d)
ejectOne (Heap (Entry AbsTime k)
hp, Map k (Tuple' AbsTime s)
mp, t m (k, b)
out, !d
cnt) = do
        let hres :: Maybe (Entry AbsTime k, Heap (Entry AbsTime k))
hres = Heap (Entry AbsTime k)
-> Maybe (Entry AbsTime k, Heap (Entry AbsTime k))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry AbsTime k)
hp
        case Maybe (Entry AbsTime k, Heap (Entry AbsTime k))
hres of
            Just (Entry AbsTime
expiry k
key, Heap (Entry AbsTime k)
hp1) ->
                case k -> Map k (Tuple' AbsTime s) -> Maybe (Tuple' AbsTime s)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup k
key Map k (Tuple' AbsTime s)
mp of
                    Maybe (Tuple' AbsTime s)
Nothing -> (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), d)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
      d)
ejectOne (Heap (Entry AbsTime k)
hp1, Map k (Tuple' AbsTime s)
mp, t m (k, b)
out, d
cnt)
                    Just (Tuple' AbsTime
latestTS s
acc) -> do
                        let expiry1 :: AbsTime
expiry1 = AbsTime -> RelTime -> AbsTime
addToAbsTime AbsTime
latestTS RelTime
timeoutMs
                        if Bool -> Bool
not Bool
reset Bool -> Bool -> Bool
|| AbsTime
expiry1 AbsTime -> AbsTime -> Bool
forall a. Ord a => a -> a -> Bool
<= AbsTime
expiry
                        then Heap (Entry AbsTime k)
-> Map k (Tuple' AbsTime s)
-> t m (k, b)
-> d
-> s
-> k
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
      d)
forall d (t :: (* -> *) -> * -> *) k a a (m :: * -> *).
(Num d, IsStream t, Ord k) =>
a
-> Map k a
-> t m (k, b)
-> d
-> s
-> k
-> m (a, Map k a, t m (k, b), d)
ejectEntry Heap (Entry AbsTime k)
hp1 Map k (Tuple' AbsTime s)
mp t m (k, b)
out d
cnt s
acc k
key
                        else
                            -- reset the session timeout and continue
                            let hp2 :: Heap (Entry AbsTime k)
hp2 = Entry AbsTime k -> Heap (Entry AbsTime k) -> Heap (Entry AbsTime k)
forall a. Ord a => a -> Heap a -> Heap a
H.insert (AbsTime -> k -> Entry AbsTime k
forall p a. p -> a -> Entry p a
Entry AbsTime
expiry1 k
key) Heap (Entry AbsTime k)
hp1
                            in (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), d)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
      d)
ejectOne (Heap (Entry AbsTime k)
hp2, Map k (Tuple' AbsTime s)
mp, t m (k, b)
out, d
cnt)
            Maybe (Entry AbsTime k, Heap (Entry AbsTime k))
Nothing -> do
                Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (Map k (Tuple' AbsTime s) -> Bool
forall k a. Map k a -> Bool
Map.null Map k (Tuple' AbsTime s)
mp) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
                (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), d)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
      d)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime k)
hp, Map k (Tuple' AbsTime s)
mp, t m (k, b)
out, d
cnt)

    ejectExpired :: SessionState t m k (Tuple' AbsTime s) b
-> AbsTime -> m (SessionState t m k (Tuple' AbsTime s) b)
ejectExpired session :: SessionState t m k (Tuple' AbsTime s) b
session@SessionState{t m (k, b)
Int
Map k (Tuple' AbsTime s)
Heap (Entry AbsTime k)
AbsTime
sessionOutputStream :: t m (k, b)
sessionKeyValueMap :: Map k (Tuple' AbsTime s)
sessionTimerHeap :: Heap (Entry AbsTime k)
sessionCount :: Int
sessionEventTime :: AbsTime
sessionCurTime :: AbsTime
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> t m (k, b)
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Map k a
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Heap (Entry AbsTime k)
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Int
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
..} AbsTime
curTime = do
        (Heap (Entry AbsTime k)
hp', Map k (Tuple' AbsTime s)
mp', t m (k, b)
out, Int
count) <-
            Heap (Entry AbsTime k)
-> Map k (Tuple' AbsTime s)
-> t m (k, b)
-> Int
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
      Int)
forall k (t :: (* -> *) -> * -> *) (m :: * -> *).
(Ord k, IsStream t) =>
Heap (Entry AbsTime k)
-> Map k (Tuple' AbsTime s)
-> t m (k, b)
-> Int
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
      Int)
ejectLoop Heap (Entry AbsTime k)
sessionTimerHeap Map k (Tuple' AbsTime s)
sessionKeyValueMap t m (k, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
K.nil Int
sessionCount
        SessionState t m k (Tuple' AbsTime s) b
-> m (SessionState t m k (Tuple' AbsTime s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (SessionState t m k (Tuple' AbsTime s) b
 -> m (SessionState t m k (Tuple' AbsTime s) b))
-> SessionState t m k (Tuple' AbsTime s) b
-> m (SessionState t m k (Tuple' AbsTime s) b)
forall a b. (a -> b) -> a -> b
$ SessionState t m k (Tuple' AbsTime s) b
session
            { sessionCurTime :: AbsTime
sessionCurTime = AbsTime
curTime
            , sessionCount :: Int
sessionCount = Int
count
            , sessionTimerHeap :: Heap (Entry AbsTime k)
sessionTimerHeap = Heap (Entry AbsTime k)
hp'
            , sessionKeyValueMap :: Map k (Tuple' AbsTime s)
sessionKeyValueMap = Map k (Tuple' AbsTime s)
mp'
            , sessionOutputStream :: t m (k, b)
sessionOutputStream = t m (k, b)
out
            }

        where

        ejectLoop :: Heap (Entry AbsTime k)
-> Map k (Tuple' AbsTime s)
-> t m (k, b)
-> Int
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
      Int)
ejectLoop Heap (Entry AbsTime k)
hp Map k (Tuple' AbsTime s)
mp t m (k, b)
out !Int
cnt = do
            let hres :: Maybe (Entry AbsTime k, Heap (Entry AbsTime k))
hres = Heap (Entry AbsTime k)
-> Maybe (Entry AbsTime k, Heap (Entry AbsTime k))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry AbsTime k)
hp
            case Maybe (Entry AbsTime k, Heap (Entry AbsTime k))
hres of
                Just (Entry AbsTime
expiry k
key, Heap (Entry AbsTime k)
hp1) -> do
                    (Bool
eject, Bool
force) <-
                        if AbsTime
curTime AbsTime -> AbsTime -> Bool
forall a. Ord a => a -> a -> Bool
>= AbsTime
expiry
                        then (Bool, Bool) -> m (Bool, Bool)
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
True, Bool
False)
                        else do
                            Bool
r <- Int -> m Bool
ejectPred Int
cnt
                            (Bool, Bool) -> m (Bool, Bool)
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
r, Bool
r)
                    if Bool
eject
                    then
                        case k -> Map k (Tuple' AbsTime s) -> Maybe (Tuple' AbsTime s)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup k
key Map k (Tuple' AbsTime s)
mp of
                            Maybe (Tuple' AbsTime s)
Nothing -> Heap (Entry AbsTime k)
-> Map k (Tuple' AbsTime s)
-> t m (k, b)
-> Int
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
      Int)
ejectLoop Heap (Entry AbsTime k)
hp1 Map k (Tuple' AbsTime s)
mp t m (k, b)
out Int
cnt
                            Just (Tuple' AbsTime
latestTS s
acc) -> do
                                let expiry1 :: AbsTime
expiry1 = AbsTime -> RelTime -> AbsTime
addToAbsTime AbsTime
latestTS RelTime
timeoutMs
                                if AbsTime
expiry1 AbsTime -> AbsTime -> Bool
forall a. Ord a => a -> a -> Bool
<= AbsTime
curTime Bool -> Bool -> Bool
|| Bool -> Bool
not Bool
reset Bool -> Bool -> Bool
|| Bool
force
                                then do
                                    (Heap (Entry AbsTime k)
hp2,Map k (Tuple' AbsTime s)
mp1,t m (k, b)
out1,Int
cnt1) <-
                                        Heap (Entry AbsTime k)
-> Map k (Tuple' AbsTime s)
-> t m (k, b)
-> Int
-> s
-> k
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
      Int)
forall d (t :: (* -> *) -> * -> *) k a a (m :: * -> *).
(Num d, IsStream t, Ord k) =>
a
-> Map k a
-> t m (k, b)
-> d
-> s
-> k
-> m (a, Map k a, t m (k, b), d)
ejectEntry Heap (Entry AbsTime k)
hp1 Map k (Tuple' AbsTime s)
mp t m (k, b)
out Int
cnt s
acc k
key
                                    Heap (Entry AbsTime k)
-> Map k (Tuple' AbsTime s)
-> t m (k, b)
-> Int
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
      Int)
ejectLoop Heap (Entry AbsTime k)
hp2 Map k (Tuple' AbsTime s)
mp1 t m (k, b)
out1 Int
cnt1
                                else
                                    -- reset the session timeout and continue
                                    let hp2 :: Heap (Entry AbsTime k)
hp2 = Entry AbsTime k -> Heap (Entry AbsTime k) -> Heap (Entry AbsTime k)
forall a. Ord a => a -> Heap a -> Heap a
H.insert (AbsTime -> k -> Entry AbsTime k
forall p a. p -> a -> Entry p a
Entry AbsTime
expiry1 k
key) Heap (Entry AbsTime k)
hp1
                                    in Heap (Entry AbsTime k)
-> Map k (Tuple' AbsTime s)
-> t m (k, b)
-> Int
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
      Int)
ejectLoop Heap (Entry AbsTime k)
hp2 Map k (Tuple' AbsTime s)
mp t m (k, b)
out Int
cnt
                    else (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), Int)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
      Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime k)
hp, Map k (Tuple' AbsTime s)
mp, t m (k, b)
out, Int
cnt)
                Maybe (Entry AbsTime k, Heap (Entry AbsTime k))
Nothing -> do
                    Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (Map k (Tuple' AbsTime s) -> Bool
forall k a. Map k a -> Bool
Map.null Map k (Tuple' AbsTime s)
mp) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
                    (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), Int)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
      Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime k)
hp, Map k (Tuple' AbsTime s)
mp, t m (k, b)
out, Int
cnt)

    -- merge timer events in the stream
    stream :: t m (Maybe (AbsTime, (k, a)))
stream = ((AbsTime, (k, a)) -> Maybe (AbsTime, (k, a)))
-> t m (AbsTime, (k, a)) -> t m (Maybe (AbsTime, (k, a)))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Serial.map (AbsTime, (k, a)) -> Maybe (AbsTime, (k, a))
forall a. a -> Maybe a
Just t m (AbsTime, (k, a))
str t m (Maybe (AbsTime, (k, a)))
-> t m (Maybe (AbsTime, (k, a))) -> t m (Maybe (AbsTime, (k, a)))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
`Par.parallelFst` m (Maybe (AbsTime, (k, a))) -> t m (Maybe (AbsTime, (k, a)))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a
repeatM m (Maybe (AbsTime, (k, a)))
forall a. m (Maybe a)
timer
    timer :: m (Maybe a)
timer = do
        IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Double -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (Double -> Int) -> Double -> Int
forall a b. (a -> b) -> a -> b
$ Double
tick Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000000)
        Maybe a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing

-- | Same as 'classifySessionsBy' with a timer tick of 1 second and keepalive
-- option set to 'True'.
--
-- @
-- classifyKeepAliveSessions = classifySessionsBy 1 True
-- @
--
-- /Pre-release/
--
{-# INLINABLE classifyKeepAliveSessions #-}
classifyKeepAliveSessions ::
       (IsStream t, MonadAsync m, Ord k)
    => (Int -> m Bool) -- ^ predicate to eject sessions on session count
    -> Double -- ^ session inactive timeout
    -> Fold m a b -- ^ Fold to be applied to session payload data
    -> t m (AbsTime, (k, a)) -- ^ timestamp, (session key, session data)
    -> t m (k, b)
classifyKeepAliveSessions :: (Int -> m Bool)
-> Double -> Fold m a b -> t m (AbsTime, (k, a)) -> t m (k, b)
classifyKeepAliveSessions = Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
(IsStream t, MonadAsync m, Ord k) =>
Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
classifySessionsBy Double
1 Bool
True

------------------------------------------------------------------------------
-- Keyed tumbling windows
------------------------------------------------------------------------------

-- Tumbling windows is a special case of sliding windows where the window slide
-- is the same as the window size. Or it can be a special case of session
-- windows where the reset flag is set to False.

-- XXX instead of using the early termination flag in the stream, we can use an
-- early terminating fold instead.

{-
-- | Split the stream into fixed size chunks of specified size. Within each
-- such chunk fold the elements in buckets identified by the keys. A particular
-- bucket fold can be terminated early if a closing flag is encountered in an
-- element for that key.
--
-- @since 0.7.0
{-# INLINABLE classifyChunksOf #-}
classifyChunksOf
    :: (IsStream t, MonadAsync m, Ord k)
    => Int              -- ^ window size
    -> Fold m a b       -- ^ Fold to be applied to window events
    -> t m (k, a, Bool) -- ^ window key, data, close event
    -> t m (k, b)
classifyChunksOf wsize = classifyChunksBy wsize False
-}

-- | Same as 'classifySessionsBy' with a timer tick of 1 second and keepalive
-- option set to 'False'.
--
-- @
-- classifySessionsOf = classifySessionsBy 1 False
-- @
--
-- /Pre-release/
--
{-# INLINABLE classifySessionsOf #-}
classifySessionsOf ::
       (IsStream t, MonadAsync m, Ord k)
    => (Int -> m Bool) -- ^ predicate to eject sessions on session count
    -> Double -- ^ time window size
    -> Fold m a b -- ^ Fold to be applied to session data
    -> t m (AbsTime, (k, a)) -- ^ timestamp, (session key, session data)
    -> t m (k, b)
classifySessionsOf :: (Int -> m Bool)
-> Double -> Fold m a b -> t m (AbsTime, (k, a)) -> t m (k, b)
classifySessionsOf = Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
(IsStream t, MonadAsync m, Ord k) =>
Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
classifySessionsBy Double
1 Bool
False

------------------------------------------------------------------------------
-- Nested Split
------------------------------------------------------------------------------

-- | @splitInnerBy splitter joiner stream@ splits the inner containers @f a@ of
-- an input stream @t m (f a)@ using the @splitter@ function. Container
-- elements @f a@ are collected until a split occurs, then all the elements
-- before the split are joined using the @joiner@ function.
--
-- For example, if we have a stream of @Array Word8@, we may want to split the
-- stream into arrays representing lines separated by '\n' byte such that the
-- resulting stream after a split would be one array for each line.
--
-- CAUTION! This is not a true streaming function as the container size after
-- the split and merge may not be bounded.
--
-- /Pre-release/
{-# INLINE splitInnerBy #-}
splitInnerBy
    :: (IsStream t, Monad m)
    => (f a -> m (f a, Maybe (f a)))  -- splitter
    -> (f a -> f a -> m (f a))        -- joiner
    -> t m (f a)
    -> t m (f a)
splitInnerBy :: (f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a)
splitInnerBy f a -> m (f a, Maybe (f a))
splitter f a -> f a -> m (f a)
joiner t m (f a)
xs =
    Stream m (f a) -> t m (f a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m (f a) -> t m (f a)) -> Stream m (f a) -> t m (f a)
forall a b. (a -> b) -> a -> b
$ (f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a)
forall (m :: * -> *) (f :: * -> *) a.
Monad m =>
(f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a)
D.splitInnerBy f a -> m (f a, Maybe (f a))
splitter f a -> f a -> m (f a)
joiner (Stream m (f a) -> Stream m (f a))
-> Stream m (f a) -> Stream m (f a)
forall a b. (a -> b) -> a -> b
$ t m (f a) -> Stream m (f a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m (f a)
xs

-- | Like 'splitInnerBy' but splits assuming the separator joins the segment in
-- a suffix style.
--
-- /Pre-release/
{-# INLINE splitInnerBySuffix #-}
splitInnerBySuffix
    :: (IsStream t, Monad m, Eq (f a), Monoid (f a))
    => (f a -> m (f a, Maybe (f a)))  -- splitter
    -> (f a -> f a -> m (f a))        -- joiner
    -> t m (f a)
    -> t m (f a)
splitInnerBySuffix :: (f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a)
splitInnerBySuffix f a -> m (f a, Maybe (f a))
splitter f a -> f a -> m (f a)
joiner t m (f a)
xs =
    Stream m (f a) -> t m (f a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m (f a) -> t m (f a)) -> Stream m (f a) -> t m (f a)
forall a b. (a -> b) -> a -> b
$ (f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a)
forall (m :: * -> *) (f :: * -> *) a.
(Monad m, Eq (f a), Monoid (f a)) =>
(f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a)
D.splitInnerBySuffix f a -> m (f a, Maybe (f a))
splitter f a -> f a -> m (f a)
joiner (Stream m (f a) -> Stream m (f a))
-> Stream m (f a) -> Stream m (f a)
forall a b. (a -> b) -> a -> b
$ t m (f a) -> Stream m (f a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m (f a)
xs