{-# OPTIONS_GHC -Wno-deprecations #-}
module Streamly.Internal.Data.Stream.IsStream.Reduce {-# DEPRECATED "Please use \"Streamly.Data.Stream.*\" instead." #-}
(
dropPrefix
, dropInfix
, dropSuffix
, foldMany
, foldManyPost
, refoldMany
, foldSequence
, foldIterateM
, refoldIterateM
, chunksOf
, arraysOf
, intervalsOf
, chunksOfTimeout
, splitOn
, splitOnSuffix
, splitOnPrefix
, splitOnAny
, splitWithSuffix
, splitBySeq
, splitOnSeq
, splitOnSuffixSeq
, splitWithSuffixSeq
, splitOnSuffixSeqAny
, classifySessionsByGeneric
, classifySessionsBy
, classifySessionsOf
, classifyKeepAliveSessions
, parseMany
, parseManyD
, parseManyTill
, parseSequence
, parseIterate
, wordsBy
, wordsOn
, groups
, groupsBy
, groupsByRolling
, splitInnerBy
, splitInnerBySuffix
)
where
#include "inline.hs"
import Control.Exception (assert)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Heap (Entry(..))
import Data.Kind (Type)
import Data.Map (Map)
import Data.Maybe (isNothing)
import Data.Proxy (Proxy(..))
import Foreign.Storable (Storable)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Fold (Fold (..))
import Streamly.Internal.Data.IsMap (IsMap(..))
import Streamly.Internal.Data.Refold.Type (Refold (..))
import Streamly.Internal.Data.Parser (Parser (..), ParseError)
import Streamly.Internal.Data.Array (Array)
import Streamly.Internal.Data.Stream.IsStream.Common
( fold
, interjectSuffix
, intersperseM
, map
, scanlMAfter'
, foldManyPost
, splitOnSeq
, fromPure)
import Streamly.Internal.Data.Stream.IsStream.Type
(IsStream(..), fromStreamD, toStreamD, cons)
import Streamly.Internal.Data.Stream.Serial(toStreamK)
import Streamly.Internal.Data.Time.Units
( AbsTime, MilliSecond64(..), addToAbsTime, toRelTime
, toAbsTime)
import Streamly.Data.MutByteArray (Unbox)
import qualified Data.Heap as H
import qualified Streamly.Internal.Data.Unfold as Unfold
import qualified Streamly.Internal.Data.Array as A
(chunksOf, read)
import qualified Streamly.Internal.Data.Fold as FL
(Fold, Step(..), takeEndBy_, takeEndBy, catMaybes, take)
import qualified Streamly.Internal.Data.IsMap as IsMap
import qualified Streamly.Internal.Data.Parser as PRD (Parser(..))
import qualified Streamly.Internal.Data.Stream.IsStream.Type as IsStream
import qualified Streamly.Internal.Data.Stream as D
( foldMany
, groupsOf
, refoldMany
, foldIterateM
, refoldIterateM
, parseManyD
, parseIterateD
, groupsBy
, groupsRollingBy
, wordsBy
, splitOnSuffixSeq
, splitInnerBy
, splitInnerBySuffix
)
import qualified Streamly.Internal.Data.Stream.IsStream.Expand as Expand
import qualified Streamly.Internal.Data.Stream.IsStream.Transform as Transform
import Prelude hiding (concatMap, map)
{-# INLINE dropPrefix #-}
dropPrefix ::
t m a -> t m a -> t m a
dropPrefix :: forall (t :: * -> * -> *) m a. t m a -> t m a -> t m a
dropPrefix = [Char] -> t m a -> t m a -> t m a
forall a. HasCallStack => [Char] -> a
error [Char]
"Not implemented yet!"
{-# INLINE dropInfix #-}
dropInfix ::
t m a -> t m a -> t m a
dropInfix :: forall (t :: * -> * -> *) m a. t m a -> t m a -> t m a
dropInfix = [Char] -> t m a -> t m a -> t m a
forall a. HasCallStack => [Char] -> a
error [Char]
"Not implemented yet!"
{-# INLINE dropSuffix #-}
dropSuffix ::
t m a -> t m a -> t m a
dropSuffix :: forall (t :: * -> * -> *) m a. t m a -> t m a -> t m a
dropSuffix = [Char] -> t m a -> t m a -> t m a
forall a. HasCallStack => [Char] -> a
error [Char]
"Not implemented yet!"
{-# INLINE foldMany #-}
foldMany
:: (IsStream t, Monad m)
=> Fold m a b
-> t m a
-> t m b
foldMany :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m b
foldMany Fold m a b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
D.foldMany Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE refoldMany #-}
refoldMany :: (IsStream t, Monad m) =>
Refold m c a b -> m c -> t m a -> t m b
refoldMany :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) c a b.
(IsStream t, Monad m) =>
Refold m c a b -> m c -> t m a -> t m b
refoldMany Refold m c a b
f m c
action = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> (t m a -> Stream m b) -> t m a -> t m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Refold m c a b -> m c -> Stream m a -> Stream m b
forall (m :: * -> *) x a b.
Monad m =>
Refold m x a b -> m x -> Stream m a -> Stream m b
D.refoldMany Refold m c a b
f m c
action (Stream m a -> Stream m b)
-> (t m a -> Stream m a) -> t m a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE foldSequence #-}
foldSequence
::
t m (Fold m a b)
-> t m a
-> t m b
foldSequence :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
t m (Fold m a b) -> t m a -> t m b
foldSequence t m (Fold m a b)
_f t m a
_m = t m b
forall a. HasCallStack => a
undefined
{-# INLINE foldIterateM #-}
foldIterateM ::
(IsStream t, Monad m) => (b -> m (Fold m a b)) -> m b -> t m a -> t m b
foldIterateM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> m (Fold m a b)) -> m b -> t m a -> t m b
foldIterateM b -> m (Fold m a b)
f m b
i t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (b -> m (Fold m a b)) -> m b -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
Monad m =>
(b -> m (Fold m a b)) -> m b -> Stream m a -> Stream m b
D.foldIterateM b -> m (Fold m a b)
f m b
i (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE refoldIterateM #-}
refoldIterateM :: (IsStream t, Monad m) =>
Refold m b a b -> m b -> t m a -> t m b
refoldIterateM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
Refold m b a b -> m b -> t m a -> t m b
refoldIterateM Refold m b a b
c m b
i t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Refold m b a b -> m b -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
Monad m =>
Refold m b a b -> m b -> Stream m a -> Stream m b
D.refoldIterateM Refold m b a b
c m b
i (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE parseMany #-}
parseMany
:: (IsStream t, Monad m)
=> Parser a m b
-> t m a
-> t m (Either ParseError b)
parseMany :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Parser a m b -> t m a -> t m (Either ParseError b)
parseMany Parser a m b
p t m a
m =
Stream m (Either ParseError b) -> t m (Either ParseError b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m (Either ParseError b) -> t m (Either ParseError b))
-> Stream m (Either ParseError b) -> t m (Either ParseError b)
forall a b. (a -> b) -> a -> b
$ Parser a m b -> Stream m a -> Stream m (Either ParseError b)
forall (m :: * -> *) a b.
Monad m =>
Parser a m b -> Stream m a -> Stream m (Either ParseError b)
D.parseManyD Parser a m b
p (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE parseManyD #-}
parseManyD
:: (IsStream t, Monad m)
=> PRD.Parser a m b
-> t m a
-> t m (Either ParseError b)
parseManyD :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Parser a m b -> t m a -> t m (Either ParseError b)
parseManyD Parser a m b
p t m a
m =
Stream m (Either ParseError b) -> t m (Either ParseError b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m (Either ParseError b) -> t m (Either ParseError b))
-> Stream m (Either ParseError b) -> t m (Either ParseError b)
forall a b. (a -> b) -> a -> b
$ Parser a m b -> Stream m a -> Stream m (Either ParseError b)
forall (m :: * -> *) a b.
Monad m =>
Parser a m b -> Stream m a -> Stream m (Either ParseError b)
D.parseManyD Parser a m b
p (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE parseSequence #-}
parseSequence
::
t m (Parser a m b)
-> t m a
-> t m b
parseSequence :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
t m (Parser a m b) -> t m a -> t m b
parseSequence t m (Parser a m b)
_f t m a
_m = t m b
forall a. HasCallStack => a
undefined
{-# INLINE parseManyTill #-}
parseManyTill ::
Parser a m b
-> Parser a m x
-> t m a
-> t m b
parseManyTill :: forall a (m :: * -> *) b x (t :: (* -> *) -> * -> *).
Parser a m b -> Parser a m x -> t m a -> t m b
parseManyTill = Parser a m b -> Parser a m x -> t m a -> t m b
forall a. HasCallStack => a
undefined
{-# INLINE parseIterate #-}
parseIterate
:: (IsStream t, Monad m)
=> (b -> Parser a m b)
-> b
-> t m a
-> t m (Either ParseError b)
parseIterate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> Parser a m b) -> b -> t m a -> t m (Either ParseError b)
parseIterate b -> Parser a m b
f b
i t m a
m = Stream m (Either ParseError b) -> t m (Either ParseError b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m (Either ParseError b) -> t m (Either ParseError b))
-> Stream m (Either ParseError b) -> t m (Either ParseError b)
forall a b. (a -> b) -> a -> b
$
(b -> Parser a m b)
-> b -> Stream m a -> Stream m (Either ParseError b)
forall (m :: * -> *) b a.
Monad m =>
(b -> Parser a m b)
-> b -> Stream m a -> Stream m (Either ParseError b)
D.parseIterateD b -> Parser a m b
f b
i (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE groupsBy #-}
groupsBy
:: (IsStream t, Monad m)
=> (a -> a -> Bool)
-> Fold m a b
-> t m a
-> t m b
groupsBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> Fold m a b -> t m a -> t m b
groupsBy a -> a -> Bool
cmp Fold m a b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
D.groupsBy a -> a -> Bool
cmp Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE groupsByRolling #-}
groupsByRolling
:: (IsStream t, Monad m)
=> (a -> a -> Bool)
-> Fold m a b
-> t m a
-> t m b
groupsByRolling :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> Fold m a b -> t m a -> t m b
groupsByRolling a -> a -> Bool
cmp Fold m a b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
D.groupsRollingBy a -> a -> Bool
cmp Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE groups #-}
groups :: (IsStream t, Monad m, Eq a) => Fold m a b -> t m a -> t m b
groups :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m, Eq a) =>
Fold m a b -> t m a -> t m b
groups = (a -> a -> Bool) -> Fold m a b -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> Fold m a b -> t m a -> t m b
groupsBy a -> a -> Bool
forall a. Eq a => a -> a -> Bool
(==)
{-# INLINE splitOn #-}
splitOn
:: (IsStream t, Monad m)
=> (a -> Bool) -> Fold m a b -> t m a -> t m b
splitOn :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Bool) -> Fold m a b -> t m a -> t m b
splitOn a -> Bool
predicate Fold m a b
f =
Fold m a b -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m b
foldManyPost ((a -> Bool) -> Fold m a b -> Fold m a b
forall (m :: * -> *) a b.
Monad m =>
(a -> Bool) -> Fold m a b -> Fold m a b
FL.takeEndBy_ a -> Bool
predicate Fold m a b
f)
{-# INLINE splitOnSuffix #-}
splitOnSuffix
:: (IsStream t, Monad m)
=> (a -> Bool) -> Fold m a b -> t m a -> t m b
splitOnSuffix :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Bool) -> Fold m a b -> t m a -> t m b
splitOnSuffix a -> Bool
predicate Fold m a b
f = Fold m a b -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m b
foldMany ((a -> Bool) -> Fold m a b -> Fold m a b
forall (m :: * -> *) a b.
Monad m =>
(a -> Bool) -> Fold m a b -> Fold m a b
FL.takeEndBy_ a -> Bool
predicate Fold m a b
f)
{-# INLINE splitOnPrefix #-}
splitOnPrefix ::
(a -> Bool) -> Fold m a b -> t m a -> t m b
splitOnPrefix :: forall a (m :: * -> *) b (t :: (* -> *) -> * -> *).
(a -> Bool) -> Fold m a b -> t m a -> t m b
splitOnPrefix a -> Bool
_predicate Fold m a b
_f = t m a -> t m b
forall a. HasCallStack => a
undefined
{-# INLINE wordsBy #-}
wordsBy
:: (IsStream t, Monad m)
=> (a -> Bool) -> Fold m a b -> t m a -> t m b
wordsBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Bool) -> Fold m a b -> t m a -> t m b
wordsBy a -> Bool
predicate Fold m a b
f t m a
m =
Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
D.wordsBy a -> Bool
predicate Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE splitWithSuffix #-}
splitWithSuffix
:: (IsStream t, Monad m)
=> (a -> Bool) -> Fold m a b -> t m a -> t m b
splitWithSuffix :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Bool) -> Fold m a b -> t m a -> t m b
splitWithSuffix a -> Bool
predicate Fold m a b
f = Fold m a b -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m b
foldMany ((a -> Bool) -> Fold m a b -> Fold m a b
forall (m :: * -> *) a b.
Monad m =>
(a -> Bool) -> Fold m a b -> Fold m a b
FL.takeEndBy a -> Bool
predicate Fold m a b
f)
{-# INLINE splitOnAny #-}
splitOnAny ::
[Array a] -> Fold m a b -> t m a -> t m b
splitOnAny :: forall a (m :: * -> *) b (t :: (* -> *) -> * -> *).
[Array a] -> Fold m a b -> t m a -> t m b
splitOnAny [Array a]
_subseq Fold m a b
_f t m a
_m =
t m b
forall a. HasCallStack => a
undefined
{-# INLINE splitBySeq #-}
splitBySeq
:: (IsStream t, MonadAsync m, Storable a, Unbox a, Enum a, Eq a)
=> Array a -> Fold m a b -> t m a -> t m b
splitBySeq :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m, Storable a, Unbox a, Enum a, Eq a) =>
Array a -> Fold m a b -> t m a -> t m b
splitBySeq Array a
patt Fold m a b
f t m a
m =
m b -> t m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
intersperseM (Fold m a b -> SerialT m a -> m b
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> SerialT m a -> m b
fold Fold m a b
f (Stream m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> SerialT m a) -> Stream m a -> SerialT m a
forall a b. (a -> b) -> a -> b
$ Array a -> Stream m a
forall (m :: * -> *) a. (Monad m, Unbox a) => Array a -> Stream m a
A.read Array a
patt)) (t m b -> t m b) -> t m b -> t m b
forall a b. (a -> b) -> a -> b
$ Array a -> Fold m a b -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadIO m, Storable a, Unbox a, Enum a, Eq a) =>
Array a -> Fold m a b -> t m a -> t m b
splitOnSeq Array a
patt Fold m a b
f t m a
m
{-# INLINE splitOnSuffixSeq #-}
splitOnSuffixSeq
:: (IsStream t, MonadIO m, Storable a, Unbox a, Enum a, Eq a)
=> Array a -> Fold m a b -> t m a -> t m b
splitOnSuffixSeq :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadIO m, Storable a, Unbox a, Enum a, Eq a) =>
Array a -> Fold m a b -> t m a -> t m b
splitOnSuffixSeq Array a
patt Fold m a b
f t m a
m =
Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Bool -> Array a -> Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
(MonadIO m, Storable a, Unbox a, Enum a, Eq a) =>
Bool -> Array a -> Fold m a b -> Stream m a -> Stream m b
D.splitOnSuffixSeq Bool
False Array a
patt Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE wordsOn #-}
wordsOn ::
Array a -> Fold m a b -> t m a -> t m b
wordsOn :: forall a (m :: * -> *) b (t :: (* -> *) -> * -> *).
Array a -> Fold m a b -> t m a -> t m b
wordsOn Array a
_subseq Fold m a b
_f t m a
_m =
t m b
forall a. HasCallStack => a
undefined
{-# INLINE splitWithSuffixSeq #-}
splitWithSuffixSeq
:: (IsStream t, MonadIO m, Storable a, Unbox a, Enum a, Eq a)
=> Array a -> Fold m a b -> t m a -> t m b
splitWithSuffixSeq :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadIO m, Storable a, Unbox a, Enum a, Eq a) =>
Array a -> Fold m a b -> t m a -> t m b
splitWithSuffixSeq Array a
patt Fold m a b
f t m a
m =
Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Bool -> Array a -> Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
(MonadIO m, Storable a, Unbox a, Enum a, Eq a) =>
Bool -> Array a -> Fold m a b -> Stream m a -> Stream m b
D.splitOnSuffixSeq Bool
True Array a
patt Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE splitOnSuffixSeqAny #-}
splitOnSuffixSeqAny ::
[Array a] -> Fold m a b -> t m a -> t m b
splitOnSuffixSeqAny :: forall a (m :: * -> *) b (t :: (* -> *) -> * -> *).
[Array a] -> Fold m a b -> t m a -> t m b
splitOnSuffixSeqAny [Array a]
_subseq Fold m a b
_f t m a
_m = t m b
forall a. HasCallStack => a
undefined
{-# INLINE chunksOf #-}
chunksOf
:: (IsStream t, Monad m)
=> Int -> Fold m a b -> t m a -> t m b
chunksOf :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Int -> Fold m a b -> t m a -> t m b
chunksOf Int
n Fold m a b
f = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> (t m a -> Stream m b) -> t m a -> t m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
Int -> Fold m a b -> Stream m a -> Stream m b
D.groupsOf Int
n Fold m a b
f (Stream m a -> Stream m b)
-> (t m a -> Stream m a) -> t m a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE arraysOf #-}
arraysOf :: (IsStream t, MonadIO m, Unbox a)
=> Int -> t m a -> t m (Array a)
arraysOf :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m, Unbox a) =>
Int -> t m a -> t m (Array a)
arraysOf Int
n = Stream m (Array a) -> t m (Array a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m (Array a) -> t m (Array a))
-> (t m a -> Stream m (Array a)) -> t m a -> t m (Array a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Stream m a -> Stream m (Array a)
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Stream m a -> Stream m (Array a)
A.chunksOf Int
n (Stream m a -> Stream m (Array a))
-> (t m a -> Stream m a) -> t m a -> Stream m (Array a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE intervalsOf #-}
intervalsOf
:: (IsStream t, MonadAsync m)
=> Double -> Fold m a b -> t m a -> t m b
intervalsOf :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
Double -> Fold m a b -> t m a -> t m b
intervalsOf Double
n Fold m a b
f t m a
xs =
(Maybe a -> Bool) -> Fold m (Maybe a) b -> t m (Maybe a) -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Bool) -> Fold m a b -> t m a -> t m b
splitWithSuffix Maybe a -> Bool
forall a. Maybe a -> Bool
isNothing (Fold m a b -> Fold m (Maybe a) b
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Fold m (Maybe a) b
FL.catMaybes Fold m a b
f)
(Double -> m (Maybe a) -> t m (Maybe a) -> t m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
Double -> m a -> t m a -> t m a
interjectSuffix Double
n (Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing) ((a -> Maybe a) -> t m a -> t m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
map a -> Maybe a
forall a. a -> Maybe a
Just t m a
xs))
{-# INLINE chunksOfTimeout #-}
chunksOfTimeout :: (IsStream t, MonadAsync m, Functor (t m))
=> Int -> Double -> FL.Fold m a b -> t m a -> t m b
chunksOfTimeout :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m, Functor (t m)) =>
Int -> Double -> Fold m a b -> t m a -> t m b
chunksOfTimeout Int
n Double
timeout Fold m a b
f =
(((), b) -> b) -> t m ((), b) -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
map ((), b) -> b
forall a b. (a, b) -> b
snd
(t m ((), b) -> t m b) -> (t m a -> t m ((), b)) -> t m a -> t m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, ((), a))
-> t m ((), b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
(IsStream t, MonadAsync m, Ord k) =>
Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
classifySessionsBy
Double
0.1 Bool
False (m Bool -> Int -> m Bool
forall a b. a -> b -> a
const (Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False)) Double
timeout (Int -> Fold m a b -> Fold m a b
forall (m :: * -> *) a b.
Monad m =>
Int -> Fold m a b -> Fold m a b
FL.take Int
n Fold m a b
f)
(t m (AbsTime, ((), a)) -> t m ((), b))
-> (t m a -> t m (AbsTime, ((), a))) -> t m a -> t m ((), b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m ((), a) -> t m (AbsTime, ((), a))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
t m a -> t m (AbsTime, a)
Transform.timestamped
(t m ((), a) -> t m (AbsTime, ((), a)))
-> (t m a -> t m ((), a)) -> t m a -> t m (AbsTime, ((), a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> ((), a)) -> t m a -> t m ((), a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
map ((),)
data SessionState t m f s b = SessionState
{ forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
sessionCurTime :: !AbsTime
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
sessionEventTime :: !AbsTime
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Int
sessionCount :: !Int
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Heap (Entry AbsTime (Key f))
sessionTimerHeap :: H.Heap (H.Entry AbsTime (Key f))
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> f s
sessionKeyValueMap :: f s
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> t m (Key f, b)
sessionOutputStream :: t (m :: Type -> Type) (Key f, b)
}
data SessionEntry s = LiveSession !AbsTime !s | ZombieSession
ejectEntry :: (Monad m, IsStream t, IsMap f) =>
(acc -> m b)
-> heap
-> f entry
-> t m (Key f, b)
-> Int
-> acc
-> Key f
-> m (heap, f entry, t m (Key f, b), Int)
ejectEntry :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) (f :: * -> *) acc b
heap entry.
(Monad m, IsStream t, IsMap f) =>
(acc -> m b)
-> heap
-> f entry
-> t m (Key f, b)
-> Int
-> acc
-> Key f
-> m (heap, f entry, t m (Key f, b), Int)
ejectEntry acc -> m b
extract heap
hp f entry
mp t m (Key f, b)
out Int
cnt acc
acc Key f
key = do
b
sess <- acc -> m b
extract acc
acc
let out1 :: t m (Key f, b)
out1 = (Key f
key, b
sess) (Key f, b) -> t m (Key f, b) -> t m (Key f, b)
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
`cons` t m (Key f, b)
out
let mp1 :: f entry
mp1 = Key f -> f entry -> f entry
forall a. Key f -> f a -> f a
forall (f :: * -> *) a. IsMap f => Key f -> f a -> f a
IsMap.mapDelete Key f
key f entry
mp
(heap, f entry, t m (Key f, b), Int)
-> m (heap, f entry, t m (Key f, b), Int)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (heap
hp, f entry
mp1, t m (Key f, b)
out1, Int
cnt Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
{-# NOINLINE flush #-}
flush :: (Monad m, IsMap f, IsStream t) =>
(s -> m b)
-> SessionState t m f (SessionEntry s) b
-> m (SessionState t m f (SessionEntry s) b)
flush :: forall (m :: * -> *) (f :: * -> *) (t :: (* -> *) -> * -> *) s b.
(Monad m, IsMap f, IsStream t) =>
(s -> m b)
-> SessionState t m f (SessionEntry s) b
-> m (SessionState t m f (SessionEntry s) b)
flush s -> m b
extract session :: SessionState t m f (SessionEntry s) b
session@SessionState{f (SessionEntry s)
t m (Key f, b)
Int
Heap (Entry AbsTime (Key f))
AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Int
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Heap (Entry AbsTime (Key f))
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> f s
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> t m (Key f, b)
sessionCurTime :: AbsTime
sessionEventTime :: AbsTime
sessionCount :: Int
sessionTimerHeap :: Heap (Entry AbsTime (Key f))
sessionKeyValueMap :: f (SessionEntry s)
sessionOutputStream :: t m (Key f, b)
..} = do
(Heap (Entry AbsTime (Key f))
hp', f (SessionEntry s)
mp', t m (Key f, b)
out, Int
count) <-
(Heap (Entry AbsTime (Key f)), f (SessionEntry s), t m (Key f, b),
Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
t m (Key f, b), Int)
forall {f :: * -> *} {t :: (* -> *) -> * -> *} {p}.
(IsMap f, IsStream t) =>
(Heap (Entry p (Key f)), f (SessionEntry s), t m (Key f, b), Int)
-> m (Heap (Entry p (Key f)), f (SessionEntry s), t m (Key f, b),
Int)
ejectAll
( Heap (Entry AbsTime (Key f))
sessionTimerHeap
, f (SessionEntry s)
sessionKeyValueMap
, t m (Key f, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
IsStream.nil
, Int
sessionCount
)
SessionState t m f (SessionEntry s) b
-> m (SessionState t m f (SessionEntry s) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (SessionState t m f (SessionEntry s) b
-> m (SessionState t m f (SessionEntry s) b))
-> SessionState t m f (SessionEntry s) b
-> m (SessionState t m f (SessionEntry s) b)
forall a b. (a -> b) -> a -> b
$ SessionState t m f (SessionEntry s) b
session
{ sessionCount = count
, sessionTimerHeap = hp'
, sessionKeyValueMap = mp'
, sessionOutputStream = out
}
where
ejectAll :: (Heap (Entry p (Key f)), f (SessionEntry s), t m (Key f, b), Int)
-> m (Heap (Entry p (Key f)), f (SessionEntry s), t m (Key f, b),
Int)
ejectAll (Heap (Entry p (Key f))
hp, f (SessionEntry s)
mp, t m (Key f, b)
out, !Int
cnt) = do
let hres :: Maybe (Entry p (Key f), Heap (Entry p (Key f)))
hres = Heap (Entry p (Key f))
-> Maybe (Entry p (Key f), Heap (Entry p (Key f)))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry p (Key f))
hp
case Maybe (Entry p (Key f), Heap (Entry p (Key f)))
hres of
Just (Entry p
_ Key f
key, Heap (Entry p (Key f))
hp1) -> do
(Heap (Entry p (Key f)), f (SessionEntry s), t m (Key f, b), Int)
r <- case Key f -> f (SessionEntry s) -> Maybe (SessionEntry s)
forall a. Key f -> f a -> Maybe a
forall (f :: * -> *) a. IsMap f => Key f -> f a -> Maybe a
IsMap.mapLookup Key f
key f (SessionEntry s)
mp of
Maybe (SessionEntry s)
Nothing -> (Heap (Entry p (Key f)), f (SessionEntry s), t m (Key f, b), Int)
-> m (Heap (Entry p (Key f)), f (SessionEntry s), t m (Key f, b),
Int)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry p (Key f))
hp1, f (SessionEntry s)
mp, t m (Key f, b)
out, Int
cnt)
Just SessionEntry s
ZombieSession ->
(Heap (Entry p (Key f)), f (SessionEntry s), t m (Key f, b), Int)
-> m (Heap (Entry p (Key f)), f (SessionEntry s), t m (Key f, b),
Int)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry p (Key f))
hp1, Key f -> f (SessionEntry s) -> f (SessionEntry s)
forall a. Key f -> f a -> f a
forall (f :: * -> *) a. IsMap f => Key f -> f a -> f a
IsMap.mapDelete Key f
key f (SessionEntry s)
mp, t m (Key f, b)
out, Int
cnt)
Just (LiveSession AbsTime
_ s
acc) ->
(s -> m b)
-> Heap (Entry p (Key f))
-> f (SessionEntry s)
-> t m (Key f, b)
-> Int
-> s
-> Key f
-> m (Heap (Entry p (Key f)), f (SessionEntry s), t m (Key f, b),
Int)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) (f :: * -> *) acc b
heap entry.
(Monad m, IsStream t, IsMap f) =>
(acc -> m b)
-> heap
-> f entry
-> t m (Key f, b)
-> Int
-> acc
-> Key f
-> m (heap, f entry, t m (Key f, b), Int)
ejectEntry s -> m b
extract Heap (Entry p (Key f))
hp1 f (SessionEntry s)
mp t m (Key f, b)
out Int
cnt s
acc Key f
key
(Heap (Entry p (Key f)), f (SessionEntry s), t m (Key f, b), Int)
-> m (Heap (Entry p (Key f)), f (SessionEntry s), t m (Key f, b),
Int)
ejectAll (Heap (Entry p (Key f)), f (SessionEntry s), t m (Key f, b), Int)
r
Maybe (Entry p (Key f), Heap (Entry p (Key f)))
Nothing -> do
Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (f (SessionEntry s) -> Bool
forall a. f a -> Bool
forall (f :: * -> *) a. IsMap f => f a -> Bool
IsMap.mapNull f (SessionEntry s)
mp) (() -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
(Heap (Entry p (Key f)), f (SessionEntry s), t m (Key f, b), Int)
-> m (Heap (Entry p (Key f)), f (SessionEntry s), t m (Key f, b),
Int)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry p (Key f))
hp, f (SessionEntry s)
mp, t m (Key f, b)
out, Int
cnt)
{-# NOINLINE ejectOne #-}
ejectOne :: (IsMap f, IsStream t, Monad m) =>
Bool
-> (acc -> m b)
-> ( H.Heap (Entry AbsTime (Key f))
, f (SessionEntry acc)
, t m (Key f, b)
, Int
)
-> m ( H.Heap (Entry AbsTime (Key f))
, f (SessionEntry acc)
, t m (Key f, b), Int
)
ejectOne :: forall (f :: * -> *) (t :: (* -> *) -> * -> *) (m :: * -> *) acc b.
(IsMap f, IsStream t, Monad m) =>
Bool
-> (acc -> m b)
-> (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
ejectOne Bool
reset acc -> m b
extract = (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
forall {f :: * -> *} {t :: (* -> *) -> * -> *}.
(IsMap f, IsStream t) =>
(Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
go
where
go :: (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
go (Heap (Entry AbsTime (Key f))
hp, f (SessionEntry acc)
mp, t m (Key f, b)
out, !Int
cnt) = do
let hres :: Maybe (Entry AbsTime (Key f), Heap (Entry AbsTime (Key f)))
hres = Heap (Entry AbsTime (Key f))
-> Maybe (Entry AbsTime (Key f), Heap (Entry AbsTime (Key f)))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry AbsTime (Key f))
hp
case Maybe (Entry AbsTime (Key f), Heap (Entry AbsTime (Key f)))
hres of
Just (Entry AbsTime
expiry Key f
key, Heap (Entry AbsTime (Key f))
hp1) ->
case Key f -> f (SessionEntry acc) -> Maybe (SessionEntry acc)
forall a. Key f -> f a -> Maybe a
forall (f :: * -> *) a. IsMap f => Key f -> f a -> Maybe a
IsMap.mapLookup Key f
key f (SessionEntry acc)
mp of
Maybe (SessionEntry acc)
Nothing -> (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
go (Heap (Entry AbsTime (Key f))
hp1, f (SessionEntry acc)
mp, t m (Key f, b)
out, Int
cnt)
Just SessionEntry acc
ZombieSession ->
(Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
go (Heap (Entry AbsTime (Key f))
hp1, Key f -> f (SessionEntry acc) -> f (SessionEntry acc)
forall a. Key f -> f a -> f a
forall (f :: * -> *) a. IsMap f => Key f -> f a -> f a
IsMap.mapDelete Key f
key f (SessionEntry acc)
mp, t m (Key f, b)
out, Int
cnt)
Just (LiveSession AbsTime
expiry1 acc
acc) -> do
if Bool -> Bool
not Bool
reset Bool -> Bool -> Bool
|| AbsTime
expiry1 AbsTime -> AbsTime -> Bool
forall a. Ord a => a -> a -> Bool
<= AbsTime
expiry
then (acc -> m b)
-> Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> t m (Key f, b)
-> Int
-> acc
-> Key f
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) (f :: * -> *) acc b
heap entry.
(Monad m, IsStream t, IsMap f) =>
(acc -> m b)
-> heap
-> f entry
-> t m (Key f, b)
-> Int
-> acc
-> Key f
-> m (heap, f entry, t m (Key f, b), Int)
ejectEntry acc -> m b
extract Heap (Entry AbsTime (Key f))
hp1 f (SessionEntry acc)
mp t m (Key f, b)
out Int
cnt acc
acc Key f
key
else
let hp2 :: Heap (Entry AbsTime (Key f))
hp2 = Entry AbsTime (Key f)
-> Heap (Entry AbsTime (Key f)) -> Heap (Entry AbsTime (Key f))
forall a. Ord a => a -> Heap a -> Heap a
H.insert (AbsTime -> Key f -> Entry AbsTime (Key f)
forall p a. p -> a -> Entry p a
Entry AbsTime
expiry1 Key f
key) Heap (Entry AbsTime (Key f))
hp1
in (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
go (Heap (Entry AbsTime (Key f))
hp2, f (SessionEntry acc)
mp, t m (Key f, b)
out, Int
cnt)
Maybe (Entry AbsTime (Key f), Heap (Entry AbsTime (Key f)))
Nothing -> do
Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (f (SessionEntry acc) -> Bool
forall a. f a -> Bool
forall (f :: * -> *) a. IsMap f => f a -> Bool
IsMap.mapNull f (SessionEntry acc)
mp) (() -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
(Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime (Key f))
hp, f (SessionEntry acc)
mp, t m (Key f, b)
out, Int
cnt)
{-# NOINLINE ejectExpired #-}
ejectExpired :: (IsMap f, IsStream t, Monad m) =>
Bool
-> (Int -> m Bool)
-> (acc -> m b)
-> SessionState t m f (SessionEntry acc) b
-> AbsTime
-> m (SessionState t m f (SessionEntry acc) b)
ejectExpired :: forall (f :: * -> *) (t :: (* -> *) -> * -> *) (m :: * -> *) acc b.
(IsMap f, IsStream t, Monad m) =>
Bool
-> (Int -> m Bool)
-> (acc -> m b)
-> SessionState t m f (SessionEntry acc) b
-> AbsTime
-> m (SessionState t m f (SessionEntry acc) b)
ejectExpired Bool
reset Int -> m Bool
ejectPred acc -> m b
extract session :: SessionState t m f (SessionEntry acc) b
session@SessionState{f (SessionEntry acc)
t m (Key f, b)
Int
Heap (Entry AbsTime (Key f))
AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Int
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Heap (Entry AbsTime (Key f))
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> f s
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> t m (Key f, b)
sessionCurTime :: AbsTime
sessionEventTime :: AbsTime
sessionCount :: Int
sessionTimerHeap :: Heap (Entry AbsTime (Key f))
sessionKeyValueMap :: f (SessionEntry acc)
sessionOutputStream :: t m (Key f, b)
..} AbsTime
curTime = do
(Heap (Entry AbsTime (Key f))
hp', f (SessionEntry acc)
mp', t m (Key f, b)
out, Int
count) <-
Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> t m (Key f, b)
-> Int
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
forall {f :: * -> *} {t :: (* -> *) -> * -> *}.
(IsMap f, IsStream t) =>
Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> t m (Key f, b)
-> Int
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
ejectLoop
Heap (Entry AbsTime (Key f))
sessionTimerHeap f (SessionEntry acc)
sessionKeyValueMap t m (Key f, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
IsStream.nil Int
sessionCount
SessionState t m f (SessionEntry acc) b
-> m (SessionState t m f (SessionEntry acc) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (SessionState t m f (SessionEntry acc) b
-> m (SessionState t m f (SessionEntry acc) b))
-> SessionState t m f (SessionEntry acc) b
-> m (SessionState t m f (SessionEntry acc) b)
forall a b. (a -> b) -> a -> b
$ SessionState t m f (SessionEntry acc) b
session
{ sessionCurTime = curTime
, sessionCount = count
, sessionTimerHeap = hp'
, sessionKeyValueMap = mp'
, sessionOutputStream = out
}
where
ejectLoop :: Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> t m (Key f, b)
-> Int
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
ejectLoop Heap (Entry AbsTime (Key f))
hp f (SessionEntry acc)
mp t m (Key f, b)
out !Int
cnt = do
let hres :: Maybe (Entry AbsTime (Key f), Heap (Entry AbsTime (Key f)))
hres = Heap (Entry AbsTime (Key f))
-> Maybe (Entry AbsTime (Key f), Heap (Entry AbsTime (Key f)))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry AbsTime (Key f))
hp
case Maybe (Entry AbsTime (Key f), Heap (Entry AbsTime (Key f)))
hres of
Just (Entry AbsTime
expiry Key f
key, Heap (Entry AbsTime (Key f))
hp1) -> do
(Bool
eject, Bool
force) <-
if AbsTime
curTime AbsTime -> AbsTime -> Bool
forall a. Ord a => a -> a -> Bool
>= AbsTime
expiry
then (Bool, Bool) -> m (Bool, Bool)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
True, Bool
False)
else do
Bool
r <- Int -> m Bool
ejectPred Int
cnt
(Bool, Bool) -> m (Bool, Bool)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
r, Bool
r)
if Bool
eject
then
case Key f -> f (SessionEntry acc) -> Maybe (SessionEntry acc)
forall a. Key f -> f a -> Maybe a
forall (f :: * -> *) a. IsMap f => Key f -> f a -> Maybe a
IsMap.mapLookup Key f
key f (SessionEntry acc)
mp of
Maybe (SessionEntry acc)
Nothing -> Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> t m (Key f, b)
-> Int
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
ejectLoop Heap (Entry AbsTime (Key f))
hp1 f (SessionEntry acc)
mp t m (Key f, b)
out Int
cnt
Just SessionEntry acc
ZombieSession ->
Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> t m (Key f, b)
-> Int
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
ejectLoop Heap (Entry AbsTime (Key f))
hp1 (Key f -> f (SessionEntry acc) -> f (SessionEntry acc)
forall a. Key f -> f a -> f a
forall (f :: * -> *) a. IsMap f => Key f -> f a -> f a
IsMap.mapDelete Key f
key f (SessionEntry acc)
mp) t m (Key f, b)
out Int
cnt
Just (LiveSession AbsTime
expiry1 acc
acc) -> do
if AbsTime
expiry1 AbsTime -> AbsTime -> Bool
forall a. Ord a => a -> a -> Bool
<= AbsTime
curTime Bool -> Bool -> Bool
|| Bool -> Bool
not Bool
reset Bool -> Bool -> Bool
|| Bool
force
then do
(Heap (Entry AbsTime (Key f))
hp2,f (SessionEntry acc)
mp1,t m (Key f, b)
out1,Int
cnt1) <-
(acc -> m b)
-> Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> t m (Key f, b)
-> Int
-> acc
-> Key f
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) (f :: * -> *) acc b
heap entry.
(Monad m, IsStream t, IsMap f) =>
(acc -> m b)
-> heap
-> f entry
-> t m (Key f, b)
-> Int
-> acc
-> Key f
-> m (heap, f entry, t m (Key f, b), Int)
ejectEntry acc -> m b
extract Heap (Entry AbsTime (Key f))
hp1 f (SessionEntry acc)
mp t m (Key f, b)
out Int
cnt acc
acc Key f
key
Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> t m (Key f, b)
-> Int
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
ejectLoop Heap (Entry AbsTime (Key f))
hp2 f (SessionEntry acc)
mp1 t m (Key f, b)
out1 Int
cnt1
else
let hp2 :: Heap (Entry AbsTime (Key f))
hp2 = Entry AbsTime (Key f)
-> Heap (Entry AbsTime (Key f)) -> Heap (Entry AbsTime (Key f))
forall a. Ord a => a -> Heap a -> Heap a
H.insert (AbsTime -> Key f -> Entry AbsTime (Key f)
forall p a. p -> a -> Entry p a
Entry AbsTime
expiry1 Key f
key) Heap (Entry AbsTime (Key f))
hp1
in Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> t m (Key f, b)
-> Int
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
ejectLoop Heap (Entry AbsTime (Key f))
hp2 f (SessionEntry acc)
mp t m (Key f, b)
out Int
cnt
else (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime (Key f))
hp, f (SessionEntry acc)
mp, t m (Key f, b)
out, Int
cnt)
Maybe (Entry AbsTime (Key f), Heap (Entry AbsTime (Key f)))
Nothing -> do
Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (f (SessionEntry acc) -> Bool
forall a. f a -> Bool
forall (f :: * -> *) a. IsMap f => f a -> Bool
IsMap.mapNull f (SessionEntry acc)
mp) (() -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
(Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime (Key f))
hp, f (SessionEntry acc)
mp, t m (Key f, b)
out, Int
cnt)
{-# INLINE classifySessionsByGeneric #-}
classifySessionsByGeneric
:: forall t m f a b. (IsStream t, MonadAsync m, IsMap f)
=> Proxy (f :: (Type -> Type))
-> Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (Key f, a))
-> t m (Key f, b)
classifySessionsByGeneric :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) a b.
(IsStream t, MonadAsync m, IsMap f) =>
Proxy f
-> Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (Key f, a))
-> t m (Key f, b)
classifySessionsByGeneric Proxy f
_ Double
tick Bool
reset Int -> m Bool
ejectPred Double
tmout
(Fold s -> a -> m (Step s b)
step m (Step s b)
initial s -> m b
extract s -> m b
final) t m (AbsTime, (Key f, a))
input =
Unfold m (SessionState SerialT m f (SessionEntry s) b) (Key f, b)
-> t m (SessionState SerialT m f (SessionEntry s) b)
-> t m (Key f, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Unfold m a b -> t m a -> t m b
Expand.unfoldMany
((SessionState SerialT m f (SessionEntry s) b
-> Stream m (Key f, b))
-> Unfold m (Stream m (Key f, b)) (Key f, b)
-> Unfold
m (SessionState SerialT m f (SessionEntry s) b) (Key f, b)
forall a c (m :: * -> *) b.
(a -> c) -> Unfold m c b -> Unfold m a b
Unfold.lmap (SerialT m (Key f, b) -> Stream m (Key f, b)
forall (m :: * -> *) a. SerialT m a -> Stream m a
toStreamK (SerialT m (Key f, b) -> Stream m (Key f, b))
-> (SessionState SerialT m f (SessionEntry s) b
-> SerialT m (Key f, b))
-> SessionState SerialT m f (SessionEntry s) b
-> Stream m (Key f, b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SessionState SerialT m f (SessionEntry s) b -> SerialT m (Key f, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> t m (Key f, b)
sessionOutputStream) Unfold m (Stream m (Key f, b)) (Key f, b)
forall (m :: * -> *) a. Applicative m => Unfold m (StreamK m a) a
Unfold.fromStreamK)
(t m (SessionState SerialT m f (SessionEntry s) b)
-> t m (Key f, b))
-> t m (SessionState SerialT m f (SessionEntry s) b)
-> t m (Key f, b)
forall a b. (a -> b) -> a -> b
$ (SessionState SerialT m f (SessionEntry s) b
-> Maybe (AbsTime, (Key f, a))
-> m (SessionState SerialT m f (SessionEntry s) b))
-> m (SessionState SerialT m f (SessionEntry s) b)
-> (SessionState SerialT m f (SessionEntry s) b
-> m (SessionState SerialT m f (SessionEntry s) b))
-> t m (Maybe (AbsTime, (Key f, a)))
-> t m (SessionState SerialT m f (SessionEntry s) b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b
scanlMAfter' SessionState SerialT m f (SessionEntry s) b
-> Maybe (AbsTime, (Key f, a))
-> m (SessionState SerialT m f (SessionEntry s) b)
forall {t :: (* -> *) -> * -> *} {f :: * -> *}.
(IsStream t, IsMap f) =>
SessionState t m f (SessionEntry s) b
-> Maybe (AbsTime, (Key f, a))
-> m (SessionState t m f (SessionEntry s) b)
sstep (SessionState SerialT m f (SessionEntry s) b
-> m (SessionState SerialT m f (SessionEntry s) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return SessionState SerialT m f (SessionEntry s) b
forall {m :: * -> *} {s} {b}. SessionState SerialT m f s b
szero) ((s -> m b)
-> SessionState SerialT m f (SessionEntry s) b
-> m (SessionState SerialT m f (SessionEntry s) b)
forall (m :: * -> *) (f :: * -> *) (t :: (* -> *) -> * -> *) s b.
(Monad m, IsMap f, IsStream t) =>
(s -> m b)
-> SessionState t m f (SessionEntry s) b
-> m (SessionState t m f (SessionEntry s) b)
flush s -> m b
final)
(t m (Maybe (AbsTime, (Key f, a)))
-> t m (SessionState SerialT m f (SessionEntry s) b))
-> t m (Maybe (AbsTime, (Key f, a)))
-> t m (SessionState SerialT m f (SessionEntry s) b)
forall a b. (a -> b) -> a -> b
$ Double
-> m (Maybe (AbsTime, (Key f, a)))
-> t m (Maybe (AbsTime, (Key f, a)))
-> t m (Maybe (AbsTime, (Key f, a)))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
Double -> m a -> t m a -> t m a
interjectSuffix Double
tick (Maybe (AbsTime, (Key f, a)) -> m (Maybe (AbsTime, (Key f, a)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (AbsTime, (Key f, a))
forall a. Maybe a
Nothing)
(t m (Maybe (AbsTime, (Key f, a)))
-> t m (Maybe (AbsTime, (Key f, a))))
-> t m (Maybe (AbsTime, (Key f, a)))
-> t m (Maybe (AbsTime, (Key f, a)))
forall a b. (a -> b) -> a -> b
$ ((AbsTime, (Key f, a)) -> Maybe (AbsTime, (Key f, a)))
-> t m (AbsTime, (Key f, a)) -> t m (Maybe (AbsTime, (Key f, a)))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
map (AbsTime, (Key f, a)) -> Maybe (AbsTime, (Key f, a))
forall a. a -> Maybe a
Just t m (AbsTime, (Key f, a))
input
where
timeoutMs :: RelTime
timeoutMs = MilliSecond64 -> RelTime
forall a. TimeUnit a => a -> RelTime
toRelTime (Double -> MilliSecond64
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
tmout Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000) :: MilliSecond64)
tickMs :: RelTime
tickMs = MilliSecond64 -> RelTime
forall a. TimeUnit a => a -> RelTime
toRelTime (Double -> MilliSecond64
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
tick Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000) :: MilliSecond64)
szero :: SessionState SerialT m f s b
szero = SessionState
{ sessionCurTime :: AbsTime
sessionCurTime = MilliSecond64 -> AbsTime
forall a. TimeUnit a => a -> AbsTime
toAbsTime (MilliSecond64
0 :: MilliSecond64)
, sessionEventTime :: AbsTime
sessionEventTime = MilliSecond64 -> AbsTime
forall a. TimeUnit a => a -> AbsTime
toAbsTime (MilliSecond64
0 :: MilliSecond64)
, sessionCount :: Int
sessionCount = Int
0
, sessionTimerHeap :: Heap (Entry AbsTime (Key f))
sessionTimerHeap = Heap (Entry AbsTime (Key f))
forall a. Heap a
H.empty
, sessionKeyValueMap :: f s
sessionKeyValueMap = f s
forall {s}. f s
forall (f :: * -> *) a. IsMap f => f a
IsMap.mapEmpty :: f s
, sessionOutputStream :: SerialT m (Key f, b)
sessionOutputStream = SerialT m (Key f, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
IsStream.nil
}
sstep :: SessionState t m f (SessionEntry s) b
-> Maybe (AbsTime, (Key f, a))
-> m (SessionState t m f (SessionEntry s) b)
sstep session :: SessionState t m f (SessionEntry s) b
session@SessionState{f (SessionEntry s)
t m (Key f, b)
Int
Heap (Entry AbsTime (Key f))
AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Int
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Heap (Entry AbsTime (Key f))
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> f s
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> t m (Key f, b)
sessionCurTime :: AbsTime
sessionEventTime :: AbsTime
sessionCount :: Int
sessionTimerHeap :: Heap (Entry AbsTime (Key f))
sessionKeyValueMap :: f (SessionEntry s)
sessionOutputStream :: t m (Key f, b)
..} (Just (AbsTime
timestamp, (Key f
key, a
value))) = do
let curTime :: AbsTime
curTime = AbsTime -> AbsTime -> AbsTime
forall a. Ord a => a -> a -> a
max AbsTime
sessionEventTime AbsTime
timestamp
mOld :: Maybe (SessionEntry s)
mOld = Key f -> f (SessionEntry s) -> Maybe (SessionEntry s)
forall a. Key f -> f a -> Maybe a
forall (f :: * -> *) a. IsMap f => Key f -> f a -> Maybe a
IsMap.mapLookup Key f
key f (SessionEntry s)
sessionKeyValueMap
let done :: b -> m (SessionState t m f (SessionEntry s) b)
done b
fb = do
let (f (SessionEntry s)
mp, Int
cnt) = case Maybe (SessionEntry s)
mOld of
Just (LiveSession AbsTime
_ s
_) ->
( Key f -> SessionEntry s -> f (SessionEntry s) -> f (SessionEntry s)
forall a. Key f -> a -> f a -> f a
forall (f :: * -> *) a. IsMap f => Key f -> a -> f a -> f a
IsMap.mapInsert
Key f
key SessionEntry s
forall s. SessionEntry s
ZombieSession f (SessionEntry s)
sessionKeyValueMap
, Int
sessionCount Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1
)
Maybe (SessionEntry s)
_ -> (f (SessionEntry s)
sessionKeyValueMap, Int
sessionCount)
SessionState t m f (SessionEntry s) b
-> m (SessionState t m f (SessionEntry s) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (SessionState t m f (SessionEntry s) b
-> m (SessionState t m f (SessionEntry s) b))
-> SessionState t m f (SessionEntry s) b
-> m (SessionState t m f (SessionEntry s) b)
forall a b. (a -> b) -> a -> b
$ SessionState t m f (SessionEntry s) b
session
{ sessionCurTime = curTime
, sessionEventTime = curTime
, sessionCount = cnt
, sessionKeyValueMap = mp
, sessionOutputStream = fromPure (key, fb)
}
partial :: s -> m (SessionState t m f (SessionEntry s) b)
partial s
fs1 = do
let expiry :: AbsTime
expiry = AbsTime -> RelTime -> AbsTime
addToAbsTime AbsTime
timestamp RelTime
timeoutMs
(Heap (Entry AbsTime (Key f))
hp1, f (SessionEntry s)
mp1, t m (Key f, b)
out1, Int
cnt1) <- do
let vars :: (Heap (Entry AbsTime (Key f)), f (SessionEntry s), t m a, Int)
vars = (Heap (Entry AbsTime (Key f))
sessionTimerHeap, f (SessionEntry s)
sessionKeyValueMap,
t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
IsStream.nil, Int
sessionCount)
case Maybe (SessionEntry s)
mOld of
Maybe (SessionEntry s)
Nothing -> do
Bool
eject <- Int -> m Bool
ejectPred Int
sessionCount
(Heap (Entry AbsTime (Key f))
hp, f (SessionEntry s)
mp, t m (Key f, b)
out, Int
cnt) <-
if Bool
eject
then Bool
-> (s -> m b)
-> (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
t m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
t m (Key f, b), Int)
forall (f :: * -> *) (t :: (* -> *) -> * -> *) (m :: * -> *) acc b.
(IsMap f, IsStream t, Monad m) =>
Bool
-> (acc -> m b)
-> (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
t m (Key f, b), Int)
ejectOne Bool
reset s -> m b
extract (Heap (Entry AbsTime (Key f)), f (SessionEntry s), t m (Key f, b),
Int)
forall {m :: * -> *} {a}.
(Heap (Entry AbsTime (Key f)), f (SessionEntry s), t m a, Int)
vars
else (Heap (Entry AbsTime (Key f)), f (SessionEntry s), t m (Key f, b),
Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
t m (Key f, b), Int)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime (Key f)), f (SessionEntry s), t m (Key f, b),
Int)
forall {m :: * -> *} {a}.
(Heap (Entry AbsTime (Key f)), f (SessionEntry s), t m a, Int)
vars
let hp' :: Heap (Entry AbsTime (Key f))
hp' = Entry AbsTime (Key f)
-> Heap (Entry AbsTime (Key f)) -> Heap (Entry AbsTime (Key f))
forall a. Ord a => a -> Heap a -> Heap a
H.insert (AbsTime -> Key f -> Entry AbsTime (Key f)
forall p a. p -> a -> Entry p a
Entry AbsTime
expiry Key f
key) Heap (Entry AbsTime (Key f))
hp
in (Heap (Entry AbsTime (Key f)), f (SessionEntry s), t m (Key f, b),
Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
t m (Key f, b), Int)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime (Key f))
hp', f (SessionEntry s)
mp, t m (Key f, b)
out, Int
cnt Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
Just SessionEntry s
_ -> (Heap (Entry AbsTime (Key f)), f (SessionEntry s), t m (Key f, b),
Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
t m (Key f, b), Int)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime (Key f)), f (SessionEntry s), t m (Key f, b),
Int)
forall {m :: * -> *} {a}.
(Heap (Entry AbsTime (Key f)), f (SessionEntry s), t m a, Int)
vars
let acc :: SessionEntry s
acc = AbsTime -> s -> SessionEntry s
forall s. AbsTime -> s -> SessionEntry s
LiveSession AbsTime
expiry s
fs1
mp2 :: f (SessionEntry s)
mp2 = Key f -> SessionEntry s -> f (SessionEntry s) -> f (SessionEntry s)
forall a. Key f -> a -> f a -> f a
forall (f :: * -> *) a. IsMap f => Key f -> a -> f a -> f a
IsMap.mapInsert Key f
key SessionEntry s
acc f (SessionEntry s)
mp1
SessionState t m f (SessionEntry s) b
-> m (SessionState t m f (SessionEntry s) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (SessionState t m f (SessionEntry s) b
-> m (SessionState t m f (SessionEntry s) b))
-> SessionState t m f (SessionEntry s) b
-> m (SessionState t m f (SessionEntry s) b)
forall a b. (a -> b) -> a -> b
$ SessionState
{ sessionCurTime :: AbsTime
sessionCurTime = AbsTime
curTime
, sessionEventTime :: AbsTime
sessionEventTime = AbsTime
curTime
, sessionCount :: Int
sessionCount = Int
cnt1
, sessionTimerHeap :: Heap (Entry AbsTime (Key f))
sessionTimerHeap = Heap (Entry AbsTime (Key f))
hp1
, sessionKeyValueMap :: f (SessionEntry s)
sessionKeyValueMap = f (SessionEntry s)
mp2
, sessionOutputStream :: t m (Key f, b)
sessionOutputStream = t m (Key f, b)
out1
}
Step s b
res0 <- do
case Maybe (SessionEntry s)
mOld of
Just (LiveSession AbsTime
_ s
acc) -> Step s b -> m (Step s b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s b -> m (Step s b)) -> Step s b -> m (Step s b)
forall a b. (a -> b) -> a -> b
$ s -> Step s b
forall s b. s -> Step s b
FL.Partial s
acc
Maybe (SessionEntry s)
_ -> m (Step s b)
initial
case Step s b
res0 of
FL.Done b
_ ->
[Char] -> m (SessionState t m f (SessionEntry s) b)
forall a. HasCallStack => [Char] -> a
error ([Char] -> m (SessionState t m f (SessionEntry s) b))
-> [Char] -> m (SessionState t m f (SessionEntry s) b)
forall a b. (a -> b) -> a -> b
$ [Char]
"classifySessionsBy: "
[Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"The supplied fold must consume at least one input"
FL.Partial s
fs -> do
Step s b
res <- s -> a -> m (Step s b)
step s
fs a
value
case Step s b
res of
FL.Done b
fb -> b -> m (SessionState t m f (SessionEntry s) b)
forall {m :: * -> *} {t :: (* -> *) -> * -> *} {b} {m :: * -> *}.
(Monad m, IsStream t) =>
b -> m (SessionState t m f (SessionEntry s) b)
done b
fb
FL.Partial s
fs1 -> s -> m (SessionState t m f (SessionEntry s) b)
forall {t :: (* -> *) -> * -> *}.
IsStream t =>
s -> m (SessionState t m f (SessionEntry s) b)
partial s
fs1
sstep sessionState :: SessionState t m f (SessionEntry s) b
sessionState@SessionState{f (SessionEntry s)
t m (Key f, b)
Int
Heap (Entry AbsTime (Key f))
AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Int
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Heap (Entry AbsTime (Key f))
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> f s
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> t m (Key f, b)
sessionCurTime :: AbsTime
sessionEventTime :: AbsTime
sessionCount :: Int
sessionTimerHeap :: Heap (Entry AbsTime (Key f))
sessionKeyValueMap :: f (SessionEntry s)
sessionOutputStream :: t m (Key f, b)
..} Maybe (AbsTime, (Key f, a))
Nothing =
let curTime :: AbsTime
curTime = AbsTime -> RelTime -> AbsTime
addToAbsTime AbsTime
sessionCurTime RelTime
tickMs
in Bool
-> (Int -> m Bool)
-> (s -> m b)
-> SessionState t m f (SessionEntry s) b
-> AbsTime
-> m (SessionState t m f (SessionEntry s) b)
forall (f :: * -> *) (t :: (* -> *) -> * -> *) (m :: * -> *) acc b.
(IsMap f, IsStream t, Monad m) =>
Bool
-> (Int -> m Bool)
-> (acc -> m b)
-> SessionState t m f (SessionEntry acc) b
-> AbsTime
-> m (SessionState t m f (SessionEntry acc) b)
ejectExpired Bool
reset Int -> m Bool
ejectPred s -> m b
extract SessionState t m f (SessionEntry s) b
sessionState AbsTime
curTime
{-# INLINE classifySessionsBy #-}
classifySessionsBy
:: (IsStream t, MonadAsync m, Ord k)
=> Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
classifySessionsBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
(IsStream t, MonadAsync m, Ord k) =>
Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
classifySessionsBy = Proxy (Map k)
-> Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (Key (Map k), a))
-> t m (Key (Map k), b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) a b.
(IsStream t, MonadAsync m, IsMap f) =>
Proxy f
-> Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (Key f, a))
-> t m (Key f, b)
classifySessionsByGeneric (Proxy (Map k)
forall {k}. Proxy (Map k)
forall {k} (t :: k). Proxy t
Proxy :: Proxy (Map k))
{-# INLINE classifyKeepAliveSessions #-}
classifyKeepAliveSessions ::
(IsStream t, MonadAsync m, Ord k)
=> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
classifyKeepAliveSessions :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
(IsStream t, MonadAsync m, Ord k) =>
(Int -> m Bool)
-> Double -> Fold m a b -> t m (AbsTime, (k, a)) -> t m (k, b)
classifyKeepAliveSessions = Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
(IsStream t, MonadAsync m, Ord k) =>
Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
classifySessionsBy Double
1 Bool
True
{-# INLINE classifySessionsOf #-}
classifySessionsOf ::
(IsStream t, MonadAsync m, Ord k)
=> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
classifySessionsOf :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
(IsStream t, MonadAsync m, Ord k) =>
(Int -> m Bool)
-> Double -> Fold m a b -> t m (AbsTime, (k, a)) -> t m (k, b)
classifySessionsOf = Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
(IsStream t, MonadAsync m, Ord k) =>
Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
classifySessionsBy Double
1 Bool
False
{-# INLINE splitInnerBy #-}
splitInnerBy
:: (IsStream t, Monad m)
=> (f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a))
-> t m (f a)
-> t m (f a)
splitInnerBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) a.
(IsStream t, Monad m) =>
(f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a)
splitInnerBy f a -> m (f a, Maybe (f a))
splitter f a -> f a -> m (f a)
joiner t m (f a)
xs =
Stream m (f a) -> t m (f a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m (f a) -> t m (f a)) -> Stream m (f a) -> t m (f a)
forall a b. (a -> b) -> a -> b
$ (f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a)
forall (m :: * -> *) (f :: * -> *) a.
Monad m =>
(f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a)
D.splitInnerBy f a -> m (f a, Maybe (f a))
splitter f a -> f a -> m (f a)
joiner (Stream m (f a) -> Stream m (f a))
-> Stream m (f a) -> Stream m (f a)
forall a b. (a -> b) -> a -> b
$ t m (f a) -> Stream m (f a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m (f a)
xs
{-# INLINE splitInnerBySuffix #-}
splitInnerBySuffix
:: (IsStream t, Monad m, Eq (f a), Monoid (f a))
=> (f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a))
-> t m (f a)
-> t m (f a)
splitInnerBySuffix :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) a.
(IsStream t, Monad m, Eq (f a), Monoid (f a)) =>
(f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a)
splitInnerBySuffix f a -> m (f a, Maybe (f a))
splitter f a -> f a -> m (f a)
joiner t m (f a)
xs =
Stream m (f a) -> t m (f a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m (f a) -> t m (f a)) -> Stream m (f a) -> t m (f a)
forall a b. (a -> b) -> a -> b
$ (f a -> Bool)
-> (f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a))
-> Stream m (f a)
-> Stream m (f a)
forall (m :: * -> *) (f :: * -> *) a.
Monad m =>
(f a -> Bool)
-> (f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a))
-> Stream m (f a)
-> Stream m (f a)
D.splitInnerBySuffix (f a -> f a -> Bool
forall a. Eq a => a -> a -> Bool
== f a
forall a. Monoid a => a
mempty) f a -> m (f a, Maybe (f a))
splitter f a -> f a -> m (f a)
joiner (Stream m (f a) -> Stream m (f a))
-> Stream m (f a) -> Stream m (f a)
forall a b. (a -> b) -> a -> b
$ t m (f a) -> Stream m (f a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m (f a)
xs