module Streamly.Internal.Data.Stream.IsStream.Reduce
(
dropPrefix
, dropInfix
, dropSuffix
, foldMany
, foldManyPost
, refoldMany
, foldSequence
, foldIterateM
, refoldIterateM
, chunksOf
, arraysOf
, intervalsOf
, chunksOfTimeout
, splitOn
, splitOnSuffix
, splitOnPrefix
, splitWithSuffix
, splitBySeq
, splitOnSeq
, splitOnSuffixSeq
, splitWithSuffixSeq
, classifySessionsBy
, classifySessionsOf
, classifyKeepAliveSessions
, parseMany
, parseManyD
, parseManyTill
, parseSequence
, parseIterate
, wordsBy
, groups
, groupsBy
, groupsByRolling
, splitInnerBy
, splitInnerBySuffix
)
where
#include "inline.hs"
import Control.Concurrent (threadDelay)
import Control.Exception (assert)
import Control.Monad.Catch (MonadThrow)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Heap (Entry(..))
import Data.Kind (Type)
import Data.Maybe (isNothing)
import Foreign.Storable (Storable)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Fold.Type (Fold (..))
import Streamly.Internal.Data.Refold.Type (Refold (..))
import Streamly.Internal.Data.Parser (Parser (..))
import Streamly.Internal.Data.Array.Foreign.Type (Array)
import Streamly.Internal.Data.Stream.IsStream.Common
( concatMap
, fold
, interjectSuffix
, intersperseM
, map
, parallelFst
, repeatM
, scanlMAfter'
, splitOnSeq
, fromPure)
import Streamly.Internal.Data.Stream.IsStream.Type
(IsStream(..), fromStreamD, toStreamD, cons)
import Streamly.Internal.Data.Time.Units
( AbsTime, MilliSecond64(..), addToAbsTime, toRelTime
, toAbsTime)
import qualified Data.Heap as H
import qualified Data.Map.Strict as Map
import qualified Streamly.Internal.Data.Array.Foreign.Type as A
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Data.Parser.ParserK.Type as PRK
import qualified Streamly.Internal.Data.Parser.ParserD as PRD
import qualified Streamly.Internal.Data.Stream.IsStream.Type as IsStream
import qualified Streamly.Internal.Data.Stream.StreamD as D
import qualified Streamly.Internal.Data.Stream.IsStream.Transform as Transform
import Prelude hiding (concatMap, map)
{-# INLINE dropPrefix #-}
dropPrefix ::
t m a -> t m a -> t m a
dropPrefix :: forall (t :: * -> * -> *) m a. t m a -> t m a -> t m a
dropPrefix = forall a. HasCallStack => [Char] -> a
error [Char]
"Not implemented yet!"
{-# INLINE dropInfix #-}
dropInfix ::
t m a -> t m a -> t m a
dropInfix :: forall (t :: * -> * -> *) m a. t m a -> t m a -> t m a
dropInfix = forall a. HasCallStack => [Char] -> a
error [Char]
"Not implemented yet!"
{-# INLINE dropSuffix #-}
dropSuffix ::
t m a -> t m a -> t m a
dropSuffix :: forall (t :: * -> * -> *) m a. t m a -> t m a -> t m a
dropSuffix = forall a. HasCallStack => [Char] -> a
error [Char]
"Not implemented yet!"
{-# INLINE foldManyPost #-}
foldManyPost
:: (IsStream t, Monad m)
=> Fold m a b
-> t m a
-> t m b
foldManyPost :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m b
foldManyPost Fold m a b
f t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
D.foldManyPost Fold m a b
f (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE foldMany #-}
foldMany
:: (IsStream t, Monad m)
=> Fold m a b
-> t m a
-> t m b
foldMany :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m b
foldMany Fold m a b
f t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
D.foldMany Fold m a b
f (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE refoldMany #-}
refoldMany :: (IsStream t, Monad m) =>
Refold m c a b -> m c -> t m a -> t m b
refoldMany :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) c a b.
(IsStream t, Monad m) =>
Refold m c a b -> m c -> t m a -> t m b
refoldMany Refold m c a b
f m c
action = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) x a b.
Monad m =>
Refold m x a b -> m x -> Stream m a -> Stream m b
D.refoldMany Refold m c a b
f m c
action forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE foldSequence #-}
foldSequence
::
t m (Fold m a b)
-> t m a
-> t m b
foldSequence :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
t m (Fold m a b) -> t m a -> t m b
foldSequence t m (Fold m a b)
_f t m a
_m = forall a. HasCallStack => a
undefined
{-# INLINE foldIterateM #-}
foldIterateM ::
(IsStream t, Monad m) => (b -> m (Fold m a b)) -> m b -> t m a -> t m b
foldIterateM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> m (Fold m a b)) -> m b -> t m a -> t m b
foldIterateM b -> m (Fold m a b)
f m b
i t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
Monad m =>
(b -> m (Fold m a b)) -> m b -> Stream m a -> Stream m b
D.foldIterateM b -> m (Fold m a b)
f m b
i (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE refoldIterateM #-}
refoldIterateM :: (IsStream t, Monad m) =>
Refold m b a b -> m b -> t m a -> t m b
refoldIterateM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
Refold m b a b -> m b -> t m a -> t m b
refoldIterateM Refold m b a b
c m b
i t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
Monad m =>
Refold m b a b -> m b -> Stream m a -> Stream m b
D.refoldIterateM Refold m b a b
c m b
i (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE parseMany #-}
parseMany
:: (IsStream t, MonadThrow m)
=> Parser m a b
-> t m a
-> t m b
parseMany :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadThrow m) =>
Parser m a b -> t m a -> t m b
parseMany Parser m a b
p t m a
m =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
MonadThrow m =>
Parser m a b -> Stream m a -> Stream m b
D.parseMany (forall (m :: * -> *) a b.
MonadThrow m =>
Parser m a b -> Parser m a b
PRK.fromParserK Parser m a b
p) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE parseManyD #-}
parseManyD
:: (IsStream t, MonadThrow m)
=> PRD.Parser m a b
-> t m a
-> t m b
parseManyD :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadThrow m) =>
Parser m a b -> t m a -> t m b
parseManyD Parser m a b
p t m a
m =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
MonadThrow m =>
Parser m a b -> Stream m a -> Stream m b
D.parseMany Parser m a b
p (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE parseSequence #-}
parseSequence
::
t m (Parser m a b)
-> t m a
-> t m b
parseSequence :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
t m (Parser m a b) -> t m a -> t m b
parseSequence t m (Parser m a b)
_f t m a
_m = forall a. HasCallStack => a
undefined
{-# INLINE parseManyTill #-}
parseManyTill ::
Parser m a b
-> Parser m a x
-> t m a
-> t m b
parseManyTill :: forall (m :: * -> *) a b x (t :: (* -> *) -> * -> *).
Parser m a b -> Parser m a x -> t m a -> t m b
parseManyTill = forall a. HasCallStack => a
undefined
{-# INLINE parseIterate #-}
parseIterate
:: (IsStream t, MonadThrow m)
=> (b -> Parser m a b)
-> b
-> t m a
-> t m b
parseIterate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, MonadThrow m) =>
(b -> Parser m a b) -> b -> t m a -> t m b
parseIterate b -> Parser m a b
f b
i t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) b a.
MonadThrow m =>
(b -> Parser m a b) -> b -> Stream m a -> Stream m b
D.parseIterate (forall (m :: * -> *) a b.
MonadThrow m =>
Parser m a b -> Parser m a b
PRK.fromParserK forall b c a. (b -> c) -> (a -> b) -> a -> c
. b -> Parser m a b
f) b
i (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE groupsBy #-}
groupsBy
:: (IsStream t, Monad m)
=> (a -> a -> Bool)
-> Fold m a b
-> t m a
-> t m b
groupsBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> Fold m a b -> t m a -> t m b
groupsBy a -> a -> Bool
cmp Fold m a b
f t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
(a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
D.groupsBy a -> a -> Bool
cmp Fold m a b
f (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE groupsByRolling #-}
groupsByRolling
:: (IsStream t, Monad m)
=> (a -> a -> Bool)
-> Fold m a b
-> t m a
-> t m b
groupsByRolling :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> Fold m a b -> t m a -> t m b
groupsByRolling a -> a -> Bool
cmp Fold m a b
f t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
(a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
D.groupsRollingBy a -> a -> Bool
cmp Fold m a b
f (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE groups #-}
groups :: (IsStream t, Monad m, Eq a) => Fold m a b -> t m a -> t m b
groups :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m, Eq a) =>
Fold m a b -> t m a -> t m b
groups = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> Fold m a b -> t m a -> t m b
groupsBy forall a. Eq a => a -> a -> Bool
(==)
{-# INLINE splitOn #-}
splitOn
:: (IsStream t, Monad m)
=> (a -> Bool) -> Fold m a b -> t m a -> t m b
splitOn :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Bool) -> Fold m a b -> t m a -> t m b
splitOn a -> Bool
predicate Fold m a b
f =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m b
foldManyPost (forall (m :: * -> *) a b.
Monad m =>
(a -> Bool) -> Fold m a b -> Fold m a b
FL.takeEndBy_ a -> Bool
predicate Fold m a b
f)
{-# INLINE splitOnSuffix #-}
splitOnSuffix
:: (IsStream t, Monad m)
=> (a -> Bool) -> Fold m a b -> t m a -> t m b
splitOnSuffix :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Bool) -> Fold m a b -> t m a -> t m b
splitOnSuffix a -> Bool
predicate Fold m a b
f = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m b
foldMany (forall (m :: * -> *) a b.
Monad m =>
(a -> Bool) -> Fold m a b -> Fold m a b
FL.takeEndBy_ a -> Bool
predicate Fold m a b
f)
{-# INLINE splitOnPrefix #-}
splitOnPrefix ::
(a -> Bool) -> Fold m a b -> t m a -> t m b
splitOnPrefix :: forall a (m :: * -> *) b (t :: (* -> *) -> * -> *).
(a -> Bool) -> Fold m a b -> t m a -> t m b
splitOnPrefix a -> Bool
_predicate Fold m a b
_f = forall a. HasCallStack => a
undefined
{-# INLINE wordsBy #-}
wordsBy
:: (IsStream t, Monad m)
=> (a -> Bool) -> Fold m a b -> t m a -> t m b
wordsBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Bool) -> Fold m a b -> t m a -> t m b
wordsBy a -> Bool
predicate Fold m a b
f t m a
m =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
(a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
D.wordsBy a -> Bool
predicate Fold m a b
f (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE splitWithSuffix #-}
splitWithSuffix
:: (IsStream t, Monad m)
=> (a -> Bool) -> Fold m a b -> t m a -> t m b
splitWithSuffix :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Bool) -> Fold m a b -> t m a -> t m b
splitWithSuffix a -> Bool
predicate Fold m a b
f = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m b
foldMany (forall (m :: * -> *) a b.
Monad m =>
(a -> Bool) -> Fold m a b -> Fold m a b
FL.takeEndBy a -> Bool
predicate Fold m a b
f)
{-# INLINE splitBySeq #-}
splitBySeq
:: (IsStream t, MonadAsync m, Storable a, Enum a, Eq a)
=> Array a -> Fold m a b -> t m a -> t m b
splitBySeq :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m, Storable a, Enum a, Eq a) =>
Array a -> Fold m a b -> t m a -> t m b
splitBySeq Array a
patt Fold m a b
f t m a
m =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
intersperseM (forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> SerialT m a -> m b
fold Fold m a b
f (forall (m :: * -> *) a.
(Monad m, Storable a) =>
Array a -> SerialT m a
A.toStream Array a
patt)) forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadIO m, Storable a, Enum a, Eq a) =>
Array a -> Fold m a b -> t m a -> t m b
splitOnSeq Array a
patt Fold m a b
f t m a
m
{-# INLINE splitOnSuffixSeq #-}
splitOnSuffixSeq
:: (IsStream t, MonadIO m, Storable a, Enum a, Eq a)
=> Array a -> Fold m a b -> t m a -> t m b
splitOnSuffixSeq :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadIO m, Storable a, Enum a, Eq a) =>
Array a -> Fold m a b -> t m a -> t m b
splitOnSuffixSeq Array a
patt Fold m a b
f t m a
m =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
(MonadIO m, Storable a, Enum a, Eq a) =>
Bool -> Array a -> Fold m a b -> Stream m a -> Stream m b
D.splitOnSuffixSeq Bool
False Array a
patt Fold m a b
f (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE splitWithSuffixSeq #-}
splitWithSuffixSeq
:: (IsStream t, MonadIO m, Storable a, Enum a, Eq a)
=> Array a -> Fold m a b -> t m a -> t m b
splitWithSuffixSeq :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadIO m, Storable a, Enum a, Eq a) =>
Array a -> Fold m a b -> t m a -> t m b
splitWithSuffixSeq Array a
patt Fold m a b
f t m a
m =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
(MonadIO m, Storable a, Enum a, Eq a) =>
Bool -> Array a -> Fold m a b -> Stream m a -> Stream m b
D.splitOnSuffixSeq Bool
True Array a
patt Fold m a b
f (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE chunksOf #-}
chunksOf
:: (IsStream t, Monad m)
=> Int -> Fold m a b -> t m a -> t m b
chunksOf :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Int -> Fold m a b -> t m a -> t m b
chunksOf Int
n Fold m a b
f = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b.
Monad m =>
Int -> Fold m a b -> Stream m a -> Stream m b
D.chunksOf Int
n Fold m a b
f forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE arraysOf #-}
arraysOf :: (IsStream t, MonadIO m, Storable a)
=> Int -> t m a -> t m (Array a)
arraysOf :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m, Storable a) =>
Int -> t m a -> t m (Array a)
arraysOf Int
n = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Stream m a -> Stream m (Array a)
A.arraysOf Int
n forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE intervalsOf #-}
intervalsOf
:: (IsStream t, MonadAsync m)
=> Double -> Fold m a b -> t m a -> t m b
intervalsOf :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
Double -> Fold m a b -> t m a -> t m b
intervalsOf Double
n Fold m a b
f t m a
xs =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Bool) -> Fold m a b -> t m a -> t m b
splitWithSuffix forall a. Maybe a -> Bool
isNothing (forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Fold m (Maybe a) b
FL.catMaybes Fold m a b
f)
(forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
Double -> m a -> t m a -> t m a
interjectSuffix Double
n (forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
map forall a. a -> Maybe a
Just t m a
xs))
{-# INLINE chunksOfTimeout #-}
chunksOfTimeout :: (IsStream t, MonadAsync m, Functor (t m))
=> Int -> Double -> FL.Fold m a b -> t m a -> t m b
chunksOfTimeout :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m, Functor (t m)) =>
Int -> Double -> Fold m a b -> t m a -> t m b
chunksOfTimeout Int
n Double
timeout Fold m a b
f =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
map forall a b. (a, b) -> b
snd
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
(IsStream t, MonadAsync m, Ord k) =>
Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
classifySessionsBy
Double
timeout Bool
False (forall a b. a -> b -> a
const (forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False)) Double
timeout (forall (m :: * -> *) a b.
Monad m =>
Int -> Fold m a b -> Fold m a b
FL.take Int
n Fold m a b
f)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
t m a -> t m (AbsTime, a)
Transform.timestamped
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
map ((),)
data SessionState t m k a b = SessionState
{ forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
sessionCurTime :: !AbsTime
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
sessionEventTime :: !AbsTime
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Int
sessionCount :: !Int
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Heap (Entry AbsTime k)
sessionTimerHeap :: H.Heap (H.Entry AbsTime k)
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Map k a
sessionKeyValueMap :: Map.Map k a
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> t m (k, b)
sessionOutputStream :: t (m :: Type -> Type) (k, b)
}
data SessionEntry a b = LiveSession !a !b | ZombieSession
{-# INLINABLE classifySessionsBy #-}
classifySessionsBy
:: (IsStream t, MonadAsync m, Ord k)
=> Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
classifySessionsBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
(IsStream t, MonadAsync m, Ord k) =>
Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
classifySessionsBy Double
tick Bool
reset Int -> m Bool
ejectPred Double
tmout
(Fold s -> a -> m (Step s b)
step m (Step s b)
initial s -> m b
extract) t m (AbsTime, (k, a))
str =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
concatMap forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> t m (k, b)
sessionOutputStream forall a b. (a -> b) -> a -> b
$
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b
scanlMAfter' forall {t :: (* -> *) -> * -> *} {a} {t :: (* -> *) -> * -> *}
{m :: * -> *} {b} {m :: * -> *}.
(IsStream t, Ord a) =>
SessionState t m a (SessionEntry AbsTime s) b
-> Maybe (AbsTime, (a, a))
-> m (SessionState t m a (SessionEntry AbsTime s) b)
sstep (forall (m :: * -> *) a. Monad m => a -> m a
return forall {m :: * -> *} {k} {a} {b}. SessionState t m k a b
szero) forall {k} {t :: (* -> *) -> * -> *} {t :: (* -> *) -> * -> *}
{m :: * -> *} {a} {b} {m :: * -> *}.
(Ord k, IsStream t) =>
SessionState t m k (SessionEntry a s) b
-> m (SessionState t m k (SessionEntry a s) b)
flush t m (Maybe (AbsTime, (k, a)))
stream
where
timeoutMs :: RelTime
timeoutMs = forall a. TimeUnit a => a -> RelTime
toRelTime (forall a b. (RealFrac a, Integral b) => a -> b
round (Double
tmout forall a. Num a => a -> a -> a
* Double
1000) :: MilliSecond64)
tickMs :: RelTime
tickMs = forall a. TimeUnit a => a -> RelTime
toRelTime (forall a b. (RealFrac a, Integral b) => a -> b
round (Double
tick forall a. Num a => a -> a -> a
* Double
1000) :: MilliSecond64)
szero :: SessionState t m k a b
szero = SessionState
{ sessionCurTime :: AbsTime
sessionCurTime = forall a. TimeUnit a => a -> AbsTime
toAbsTime (MilliSecond64
0 :: MilliSecond64)
, sessionEventTime :: AbsTime
sessionEventTime = forall a. TimeUnit a => a -> AbsTime
toAbsTime (MilliSecond64
0 :: MilliSecond64)
, sessionCount :: Int
sessionCount = Int
0
, sessionTimerHeap :: Heap (Entry AbsTime k)
sessionTimerHeap = forall a. Heap a
H.empty
, sessionKeyValueMap :: Map k a
sessionKeyValueMap = forall k a. Map k a
Map.empty
, sessionOutputStream :: t m (k, b)
sessionOutputStream = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
IsStream.nil
}
sstep :: SessionState t m a (SessionEntry AbsTime s) b
-> Maybe (AbsTime, (a, a))
-> m (SessionState t m a (SessionEntry AbsTime s) b)
sstep session :: SessionState t m a (SessionEntry AbsTime s) b
session@SessionState{t m (a, b)
Int
Map a (SessionEntry AbsTime s)
Heap (Entry AbsTime a)
AbsTime
sessionOutputStream :: t m (a, b)
sessionKeyValueMap :: Map a (SessionEntry AbsTime s)
sessionTimerHeap :: Heap (Entry AbsTime a)
sessionCount :: Int
sessionEventTime :: AbsTime
sessionCurTime :: AbsTime
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> t m (k, b)
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Map k a
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Heap (Entry AbsTime k)
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Int
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
..} (Just (AbsTime
timestamp, (a
key, a
value))) = do
let curTime :: AbsTime
curTime = forall a. Ord a => a -> a -> a
max AbsTime
sessionEventTime AbsTime
timestamp
mOld :: Maybe (SessionEntry AbsTime s)
mOld = forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup a
key Map a (SessionEntry AbsTime s)
sessionKeyValueMap
let done :: b -> m (SessionState t m a (SessionEntry AbsTime s) b)
done b
fb = do
let (Map a (SessionEntry AbsTime s)
mp, Int
cnt) = case Maybe (SessionEntry AbsTime s)
mOld of
Just (LiveSession AbsTime
_ s
_) ->
( forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert a
key forall a b. SessionEntry a b
ZombieSession Map a (SessionEntry AbsTime s)
sessionKeyValueMap
, Int
sessionCount forall a. Num a => a -> a -> a
- Int
1
)
Maybe (SessionEntry AbsTime s)
_ -> (Map a (SessionEntry AbsTime s)
sessionKeyValueMap, Int
sessionCount)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ SessionState t m a (SessionEntry AbsTime s) b
session
{ sessionCurTime :: AbsTime
sessionCurTime = AbsTime
curTime
, sessionEventTime :: AbsTime
sessionEventTime = AbsTime
curTime
, sessionCount :: Int
sessionCount = Int
cnt
, sessionKeyValueMap :: Map a (SessionEntry AbsTime s)
sessionKeyValueMap = Map a (SessionEntry AbsTime s)
mp
, sessionOutputStream :: t m (a, b)
sessionOutputStream = forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
fromPure (a
key, b
fb)
}
partial :: s -> m (SessionState t m a (SessionEntry AbsTime s) b)
partial s
fs1 = do
let expiry :: AbsTime
expiry = AbsTime -> RelTime -> AbsTime
addToAbsTime AbsTime
timestamp RelTime
timeoutMs
(Heap (Entry AbsTime a)
hp1, Map a (SessionEntry AbsTime s)
mp1, t m (a, b)
out1, Int
cnt1) <- do
let vars :: (Heap (Entry AbsTime a), Map a (SessionEntry AbsTime s), t m a,
Int)
vars = (Heap (Entry AbsTime a)
sessionTimerHeap, Map a (SessionEntry AbsTime s)
sessionKeyValueMap,
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
IsStream.nil, Int
sessionCount)
case Maybe (SessionEntry AbsTime s)
mOld of
Maybe (SessionEntry AbsTime s)
Nothing -> do
Bool
eject <- Int -> m Bool
ejectPred Int
sessionCount
(Heap (Entry AbsTime a)
hp, Map a (SessionEntry AbsTime s)
mp, t m (a, b)
out, Int
cnt) <-
if Bool
eject
then forall {k} {a} {d} {t :: (* -> *) -> * -> *} {m :: * -> *}.
(Ord k, Num d, IsStream t, Ord a) =>
(Heap (Entry a k), Map k (SessionEntry a s), t m (k, b), d)
-> m (Heap (Entry a k), Map k (SessionEntry a s), t m (k, b), d)
ejectOne forall {m :: * -> *} {a}.
(Heap (Entry AbsTime a), Map a (SessionEntry AbsTime s), t m a,
Int)
vars
else forall (m :: * -> *) a. Monad m => a -> m a
return forall {m :: * -> *} {a}.
(Heap (Entry AbsTime a), Map a (SessionEntry AbsTime s), t m a,
Int)
vars
let hp' :: Heap (Entry AbsTime a)
hp' = forall a. Ord a => a -> Heap a -> Heap a
H.insert (forall p a. p -> a -> Entry p a
Entry AbsTime
expiry a
key) Heap (Entry AbsTime a)
hp
in forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime a)
hp', Map a (SessionEntry AbsTime s)
mp, t m (a, b)
out, Int
cnt forall a. Num a => a -> a -> a
+ Int
1)
Just SessionEntry AbsTime s
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return forall {m :: * -> *} {a}.
(Heap (Entry AbsTime a), Map a (SessionEntry AbsTime s), t m a,
Int)
vars
let acc :: SessionEntry AbsTime s
acc = forall a b. a -> b -> SessionEntry a b
LiveSession AbsTime
expiry s
fs1
mp2 :: Map a (SessionEntry AbsTime s)
mp2 = forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert a
key SessionEntry AbsTime s
acc Map a (SessionEntry AbsTime s)
mp1
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ SessionState
{ sessionCurTime :: AbsTime
sessionCurTime = AbsTime
curTime
, sessionEventTime :: AbsTime
sessionEventTime = AbsTime
curTime
, sessionCount :: Int
sessionCount = Int
cnt1
, sessionTimerHeap :: Heap (Entry AbsTime a)
sessionTimerHeap = Heap (Entry AbsTime a)
hp1
, sessionKeyValueMap :: Map a (SessionEntry AbsTime s)
sessionKeyValueMap = Map a (SessionEntry AbsTime s)
mp2
, sessionOutputStream :: t m (a, b)
sessionOutputStream = t m (a, b)
out1
}
Step s b
res0 <- do
case Maybe (SessionEntry AbsTime s)
mOld of
Just (LiveSession AbsTime
_ s
acc) -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. s -> Step s b
FL.Partial s
acc
Maybe (SessionEntry AbsTime s)
_ -> m (Step s b)
initial
case Step s b
res0 of
FL.Done b
_ ->
forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"classifySessionsBy: "
forall a. [a] -> [a] -> [a]
++ [Char]
"The supplied fold must consume at least one input"
FL.Partial s
fs -> do
Step s b
res <- s -> a -> m (Step s b)
step s
fs a
value
case Step s b
res of
FL.Done b
fb -> forall {m :: * -> *} {t :: (* -> *) -> * -> *} {b} {m :: * -> *}.
(Monad m, IsStream t) =>
b -> m (SessionState t m a (SessionEntry AbsTime s) b)
done b
fb
FL.Partial s
fs1 -> forall {t :: (* -> *) -> * -> *} {m :: * -> *}.
IsStream t =>
s -> m (SessionState t m a (SessionEntry AbsTime s) b)
partial s
fs1
sstep sessionState :: SessionState t m a (SessionEntry AbsTime s) b
sessionState@SessionState{t m (a, b)
Int
Map a (SessionEntry AbsTime s)
Heap (Entry AbsTime a)
AbsTime
sessionOutputStream :: t m (a, b)
sessionKeyValueMap :: Map a (SessionEntry AbsTime s)
sessionTimerHeap :: Heap (Entry AbsTime a)
sessionCount :: Int
sessionEventTime :: AbsTime
sessionCurTime :: AbsTime
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> t m (k, b)
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Map k a
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Heap (Entry AbsTime k)
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Int
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
..} Maybe (AbsTime, (a, a))
Nothing =
let curTime :: AbsTime
curTime = AbsTime -> RelTime -> AbsTime
addToAbsTime AbsTime
sessionCurTime RelTime
tickMs
in forall {k} {t :: (* -> *) -> * -> *} {t :: (* -> *) -> * -> *}
{m :: * -> *} {b} {m :: * -> *}.
(Ord k, IsStream t) =>
SessionState t m k (SessionEntry AbsTime s) b
-> AbsTime -> m (SessionState t m k (SessionEntry AbsTime s) b)
ejectExpired SessionState t m a (SessionEntry AbsTime s) b
sessionState AbsTime
curTime
flush :: SessionState t m k (SessionEntry a s) b
-> m (SessionState t m k (SessionEntry a s) b)
flush session :: SessionState t m k (SessionEntry a s) b
session@SessionState{t m (k, b)
Int
Map k (SessionEntry a s)
Heap (Entry AbsTime k)
AbsTime
sessionOutputStream :: t m (k, b)
sessionKeyValueMap :: Map k (SessionEntry a s)
sessionTimerHeap :: Heap (Entry AbsTime k)
sessionCount :: Int
sessionEventTime :: AbsTime
sessionCurTime :: AbsTime
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> t m (k, b)
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Map k a
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Heap (Entry AbsTime k)
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Int
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
..} = do
(Heap (Entry AbsTime k)
hp', Map k (SessionEntry a s)
mp', t m (k, b)
out, Int
count) <-
forall {k} {d} {t :: (* -> *) -> * -> *} {p} {a} {m :: * -> *}.
(Ord k, Num d, IsStream t) =>
(Heap (Entry p k), Map k (SessionEntry a s), t m (k, b), d)
-> m (Heap (Entry p k), Map k (SessionEntry a s), t m (k, b), d)
ejectAll
( Heap (Entry AbsTime k)
sessionTimerHeap
, Map k (SessionEntry a s)
sessionKeyValueMap
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
IsStream.nil
, Int
sessionCount
)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ SessionState t m k (SessionEntry a s) b
session
{ sessionCount :: Int
sessionCount = Int
count
, sessionTimerHeap :: Heap (Entry AbsTime k)
sessionTimerHeap = Heap (Entry AbsTime k)
hp'
, sessionKeyValueMap :: Map k (SessionEntry a s)
sessionKeyValueMap = Map k (SessionEntry a s)
mp'
, sessionOutputStream :: t m (k, b)
sessionOutputStream = t m (k, b)
out
}
ejectEntry :: a
-> Map p a
-> t m (p, b)
-> d
-> s
-> p
-> m (a, Map p a, t m (p, b), d)
ejectEntry a
hp Map p a
mp t m (p, b)
out d
cnt s
acc p
key = do
b
sess <- s -> m b
extract s
acc
let out1 :: t m (p, b)
out1 = (p
key, b
sess) forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
`cons` t m (p, b)
out
let mp1 :: Map p a
mp1 = forall k a. Ord k => k -> Map k a -> Map k a
Map.delete p
key Map p a
mp
forall (m :: * -> *) a. Monad m => a -> m a
return (a
hp, Map p a
mp1, t m (p, b)
out1, d
cnt forall a. Num a => a -> a -> a
- d
1)
ejectAll :: (Heap (Entry p k), Map k (SessionEntry a s), t m (k, b), d)
-> m (Heap (Entry p k), Map k (SessionEntry a s), t m (k, b), d)
ejectAll (Heap (Entry p k)
hp, Map k (SessionEntry a s)
mp, t m (k, b)
out, !d
cnt) = do
let hres :: Maybe (Entry p k, Heap (Entry p k))
hres = forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry p k)
hp
case Maybe (Entry p k, Heap (Entry p k))
hres of
Just (Entry p
_ k
key, Heap (Entry p k)
hp1) -> do
(Heap (Entry p k), Map k (SessionEntry a s), t m (k, b), d)
r <- case forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup k
key Map k (SessionEntry a s)
mp of
Maybe (SessionEntry a s)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry p k)
hp1, Map k (SessionEntry a s)
mp, t m (k, b)
out, d
cnt)
Just SessionEntry a s
ZombieSession ->
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry p k)
hp1, forall k a. Ord k => k -> Map k a -> Map k a
Map.delete k
key Map k (SessionEntry a s)
mp, t m (k, b)
out, d
cnt)
Just (LiveSession a
_ s
acc) ->
forall {d} {t :: (* -> *) -> * -> *} {p} {a} {a} {m :: * -> *}.
(Num d, IsStream t, Ord p) =>
a
-> Map p a
-> t m (p, b)
-> d
-> s
-> p
-> m (a, Map p a, t m (p, b), d)
ejectEntry Heap (Entry p k)
hp1 Map k (SessionEntry a s)
mp t m (k, b)
out d
cnt s
acc k
key
(Heap (Entry p k), Map k (SessionEntry a s), t m (k, b), d)
-> m (Heap (Entry p k), Map k (SessionEntry a s), t m (k, b), d)
ejectAll (Heap (Entry p k), Map k (SessionEntry a s), t m (k, b), d)
r
Maybe (Entry p k, Heap (Entry p k))
Nothing -> do
forall a. HasCallStack => Bool -> a -> a
assert (forall k a. Map k a -> Bool
Map.null Map k (SessionEntry a s)
mp) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry p k)
hp, Map k (SessionEntry a s)
mp, t m (k, b)
out, d
cnt)
ejectOne :: (Heap (Entry a k), Map k (SessionEntry a s), t m (k, b), d)
-> m (Heap (Entry a k), Map k (SessionEntry a s), t m (k, b), d)
ejectOne (Heap (Entry a k)
hp, Map k (SessionEntry a s)
mp, t m (k, b)
out, !d
cnt) = do
let hres :: Maybe (Entry a k, Heap (Entry a k))
hres = forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry a k)
hp
case Maybe (Entry a k, Heap (Entry a k))
hres of
Just (Entry a
expiry k
key, Heap (Entry a k)
hp1) ->
case forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup k
key Map k (SessionEntry a s)
mp of
Maybe (SessionEntry a s)
Nothing -> (Heap (Entry a k), Map k (SessionEntry a s), t m (k, b), d)
-> m (Heap (Entry a k), Map k (SessionEntry a s), t m (k, b), d)
ejectOne (Heap (Entry a k)
hp1, Map k (SessionEntry a s)
mp, t m (k, b)
out, d
cnt)
Just SessionEntry a s
ZombieSession ->
(Heap (Entry a k), Map k (SessionEntry a s), t m (k, b), d)
-> m (Heap (Entry a k), Map k (SessionEntry a s), t m (k, b), d)
ejectOne (Heap (Entry a k)
hp1, forall k a. Ord k => k -> Map k a -> Map k a
Map.delete k
key Map k (SessionEntry a s)
mp, t m (k, b)
out, d
cnt)
Just (LiveSession a
expiry1 s
acc) -> do
if Bool -> Bool
not Bool
reset Bool -> Bool -> Bool
|| a
expiry1 forall a. Ord a => a -> a -> Bool
<= a
expiry
then forall {d} {t :: (* -> *) -> * -> *} {p} {a} {a} {m :: * -> *}.
(Num d, IsStream t, Ord p) =>
a
-> Map p a
-> t m (p, b)
-> d
-> s
-> p
-> m (a, Map p a, t m (p, b), d)
ejectEntry Heap (Entry a k)
hp1 Map k (SessionEntry a s)
mp t m (k, b)
out d
cnt s
acc k
key
else
let hp2 :: Heap (Entry a k)
hp2 = forall a. Ord a => a -> Heap a -> Heap a
H.insert (forall p a. p -> a -> Entry p a
Entry a
expiry1 k
key) Heap (Entry a k)
hp1
in (Heap (Entry a k), Map k (SessionEntry a s), t m (k, b), d)
-> m (Heap (Entry a k), Map k (SessionEntry a s), t m (k, b), d)
ejectOne (Heap (Entry a k)
hp2, Map k (SessionEntry a s)
mp, t m (k, b)
out, d
cnt)
Maybe (Entry a k, Heap (Entry a k))
Nothing -> do
forall a. HasCallStack => Bool -> a -> a
assert (forall k a. Map k a -> Bool
Map.null Map k (SessionEntry a s)
mp) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry a k)
hp, Map k (SessionEntry a s)
mp, t m (k, b)
out, d
cnt)
ejectExpired :: SessionState t m k (SessionEntry AbsTime s) b
-> AbsTime -> m (SessionState t m k (SessionEntry AbsTime s) b)
ejectExpired session :: SessionState t m k (SessionEntry AbsTime s) b
session@SessionState{t m (k, b)
Int
Map k (SessionEntry AbsTime s)
Heap (Entry AbsTime k)
AbsTime
sessionOutputStream :: t m (k, b)
sessionKeyValueMap :: Map k (SessionEntry AbsTime s)
sessionTimerHeap :: Heap (Entry AbsTime k)
sessionCount :: Int
sessionEventTime :: AbsTime
sessionCurTime :: AbsTime
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> t m (k, b)
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Map k a
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Heap (Entry AbsTime k)
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Int
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
..} AbsTime
curTime = do
(Heap (Entry AbsTime k)
hp', Map k (SessionEntry AbsTime s)
mp', t m (k, b)
out, Int
count) <-
forall {k} {t :: (* -> *) -> * -> *} {m :: * -> *}.
(Ord k, IsStream t) =>
Heap (Entry AbsTime k)
-> Map k (SessionEntry AbsTime s)
-> t m (k, b)
-> Int
-> m (Heap (Entry AbsTime k), Map k (SessionEntry AbsTime s),
t m (k, b), Int)
ejectLoop
Heap (Entry AbsTime k)
sessionTimerHeap Map k (SessionEntry AbsTime s)
sessionKeyValueMap forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
IsStream.nil Int
sessionCount
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ SessionState t m k (SessionEntry AbsTime s) b
session
{ sessionCurTime :: AbsTime
sessionCurTime = AbsTime
curTime
, sessionCount :: Int
sessionCount = Int
count
, sessionTimerHeap :: Heap (Entry AbsTime k)
sessionTimerHeap = Heap (Entry AbsTime k)
hp'
, sessionKeyValueMap :: Map k (SessionEntry AbsTime s)
sessionKeyValueMap = Map k (SessionEntry AbsTime s)
mp'
, sessionOutputStream :: t m (k, b)
sessionOutputStream = t m (k, b)
out
}
where
ejectLoop :: Heap (Entry AbsTime k)
-> Map k (SessionEntry AbsTime s)
-> t m (k, b)
-> Int
-> m (Heap (Entry AbsTime k), Map k (SessionEntry AbsTime s),
t m (k, b), Int)
ejectLoop Heap (Entry AbsTime k)
hp Map k (SessionEntry AbsTime s)
mp t m (k, b)
out !Int
cnt = do
let hres :: Maybe (Entry AbsTime k, Heap (Entry AbsTime k))
hres = forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry AbsTime k)
hp
case Maybe (Entry AbsTime k, Heap (Entry AbsTime k))
hres of
Just (Entry AbsTime
expiry k
key, Heap (Entry AbsTime k)
hp1) -> do
(Bool
eject, Bool
force) <-
if AbsTime
curTime forall a. Ord a => a -> a -> Bool
>= AbsTime
expiry
then forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
True, Bool
False)
else do
Bool
r <- Int -> m Bool
ejectPred Int
cnt
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
r, Bool
r)
if Bool
eject
then
case forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup k
key Map k (SessionEntry AbsTime s)
mp of
Maybe (SessionEntry AbsTime s)
Nothing -> Heap (Entry AbsTime k)
-> Map k (SessionEntry AbsTime s)
-> t m (k, b)
-> Int
-> m (Heap (Entry AbsTime k), Map k (SessionEntry AbsTime s),
t m (k, b), Int)
ejectLoop Heap (Entry AbsTime k)
hp1 Map k (SessionEntry AbsTime s)
mp t m (k, b)
out Int
cnt
Just SessionEntry AbsTime s
ZombieSession ->
Heap (Entry AbsTime k)
-> Map k (SessionEntry AbsTime s)
-> t m (k, b)
-> Int
-> m (Heap (Entry AbsTime k), Map k (SessionEntry AbsTime s),
t m (k, b), Int)
ejectLoop Heap (Entry AbsTime k)
hp1 (forall k a. Ord k => k -> Map k a -> Map k a
Map.delete k
key Map k (SessionEntry AbsTime s)
mp) t m (k, b)
out Int
cnt
Just (LiveSession AbsTime
expiry1 s
acc) -> do
if AbsTime
expiry1 forall a. Ord a => a -> a -> Bool
<= AbsTime
curTime Bool -> Bool -> Bool
|| Bool -> Bool
not Bool
reset Bool -> Bool -> Bool
|| Bool
force
then do
(Heap (Entry AbsTime k)
hp2,Map k (SessionEntry AbsTime s)
mp1,t m (k, b)
out1,Int
cnt1) <-
forall {d} {t :: (* -> *) -> * -> *} {p} {a} {a} {m :: * -> *}.
(Num d, IsStream t, Ord p) =>
a
-> Map p a
-> t m (p, b)
-> d
-> s
-> p
-> m (a, Map p a, t m (p, b), d)
ejectEntry Heap (Entry AbsTime k)
hp1 Map k (SessionEntry AbsTime s)
mp t m (k, b)
out Int
cnt s
acc k
key
Heap (Entry AbsTime k)
-> Map k (SessionEntry AbsTime s)
-> t m (k, b)
-> Int
-> m (Heap (Entry AbsTime k), Map k (SessionEntry AbsTime s),
t m (k, b), Int)
ejectLoop Heap (Entry AbsTime k)
hp2 Map k (SessionEntry AbsTime s)
mp1 t m (k, b)
out1 Int
cnt1
else
let hp2 :: Heap (Entry AbsTime k)
hp2 = forall a. Ord a => a -> Heap a -> Heap a
H.insert (forall p a. p -> a -> Entry p a
Entry AbsTime
expiry1 k
key) Heap (Entry AbsTime k)
hp1
in Heap (Entry AbsTime k)
-> Map k (SessionEntry AbsTime s)
-> t m (k, b)
-> Int
-> m (Heap (Entry AbsTime k), Map k (SessionEntry AbsTime s),
t m (k, b), Int)
ejectLoop Heap (Entry AbsTime k)
hp2 Map k (SessionEntry AbsTime s)
mp t m (k, b)
out Int
cnt
else forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime k)
hp, Map k (SessionEntry AbsTime s)
mp, t m (k, b)
out, Int
cnt)
Maybe (Entry AbsTime k, Heap (Entry AbsTime k))
Nothing -> do
forall a. HasCallStack => Bool -> a -> a
assert (forall k a. Map k a -> Bool
Map.null Map k (SessionEntry AbsTime s)
mp) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime k)
hp, Map k (SessionEntry AbsTime s)
mp, t m (k, b)
out, Int
cnt)
stream :: t m (Maybe (AbsTime, (k, a)))
stream = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
map forall a. a -> Maybe a
Just t m (AbsTime, (k, a))
str forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
`parallelFst` forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a
repeatM forall {a}. m (Maybe a)
timer
timer :: m (Maybe a)
timer = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (forall a b. (RealFrac a, Integral b) => a -> b
round forall a b. (a -> b) -> a -> b
$ Double
tick forall a. Num a => a -> a -> a
* Double
1000000)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
{-# INLINE classifyKeepAliveSessions #-}
classifyKeepAliveSessions ::
(IsStream t, MonadAsync m, Ord k)
=> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
classifyKeepAliveSessions :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
(IsStream t, MonadAsync m, Ord k) =>
(Int -> m Bool)
-> Double -> Fold m a b -> t m (AbsTime, (k, a)) -> t m (k, b)
classifyKeepAliveSessions = forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
(IsStream t, MonadAsync m, Ord k) =>
Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
classifySessionsBy Double
1 Bool
True
{-# INLINE classifySessionsOf #-}
classifySessionsOf ::
(IsStream t, MonadAsync m, Ord k)
=> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
classifySessionsOf :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
(IsStream t, MonadAsync m, Ord k) =>
(Int -> m Bool)
-> Double -> Fold m a b -> t m (AbsTime, (k, a)) -> t m (k, b)
classifySessionsOf = forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
(IsStream t, MonadAsync m, Ord k) =>
Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
classifySessionsBy Double
1 Bool
False
{-# INLINE splitInnerBy #-}
splitInnerBy
:: (IsStream t, Monad m)
=> (f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a))
-> t m (f a)
-> t m (f a)
splitInnerBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) a.
(IsStream t, Monad m) =>
(f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a)
splitInnerBy f a -> m (f a, Maybe (f a))
splitter f a -> f a -> m (f a)
joiner t m (f a)
xs =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (f :: * -> *) a.
Monad m =>
(f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a)
D.splitInnerBy f a -> m (f a, Maybe (f a))
splitter f a -> f a -> m (f a)
joiner forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m (f a)
xs
{-# INLINE splitInnerBySuffix #-}
splitInnerBySuffix
:: (IsStream t, Monad m, Eq (f a), Monoid (f a))
=> (f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a))
-> t m (f a)
-> t m (f a)
splitInnerBySuffix :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) a.
(IsStream t, Monad m, Eq (f a), Monoid (f a)) =>
(f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a)
splitInnerBySuffix f a -> m (f a, Maybe (f a))
splitter f a -> f a -> m (f a)
joiner t m (f a)
xs =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (f :: * -> *) a.
(Monad m, Eq (f a), Monoid (f a)) =>
(f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a)
D.splitInnerBySuffix f a -> m (f a, Maybe (f a))
splitter f a -> f a -> m (f a)
joiner forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m (f a)
xs