module Streamly.Internal.Data.Stream.IsStream.Reduce
(
dropPrefix
, dropInfix
, dropSuffix
, foldMany
, foldManyPost
, foldSequence
, foldIterateM
, chunksOf
, arraysOf
, intervalsOf
, splitOn
, splitOnSuffix
, splitOnPrefix
, splitWithSuffix
, splitBySeq
, splitOnSeq
, splitOnSuffixSeq
, splitWithSuffixSeq
, classifySessionsBy
, classifySessionsOf
, classifyKeepAliveSessions
, parseMany
, parseManyD
, parseManyTill
, parseSequence
, parseIterate
, wordsBy
, groups
, groupsBy
, groupsByRolling
, splitInnerBy
, splitInnerBySuffix
, chunksOf2
)
where
#include "inline.hs"
import Control.Concurrent (threadDelay)
import Control.Exception (assert)
import Control.Monad.Catch (MonadThrow)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Heap (Entry(..))
import Data.Kind (Type)
import Data.Maybe (isNothing)
import Foreign.Storable (Storable)
import Streamly.Internal.Data.Fold.Type (Fold (..), Fold2 (..))
import Streamly.Internal.Data.Parser (Parser (..))
import Streamly.Internal.Data.Array.Foreign.Type (Array)
import Streamly.Internal.Data.SVar (MonadAsync)
import Streamly.Internal.Data.Stream.IsStream.Common
( concatMap
, fold
, interjectSuffix
, intersperseM
, repeatM
, scanlMAfter'
, splitOnSeq
, fromPure)
import Streamly.Internal.Data.Stream.StreamK (IsStream)
import Streamly.Internal.Data.Time.Units
( AbsTime, MilliSecond64(..), addToAbsTime, toRelTime
, toAbsTime)
import Streamly.Internal.Data.Tuple.Strict (Tuple'(..))
import qualified Data.Heap as H
import qualified Data.Map.Strict as Map
import qualified Streamly.Internal.Data.Array.Foreign.Type as A
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Data.Parser.ParserK.Type as PRK
import qualified Streamly.Internal.Data.Parser.ParserD as PRD
import qualified Streamly.Internal.Data.Stream.Parallel as Par
import qualified Streamly.Internal.Data.Stream.Serial as Serial
import qualified Streamly.Internal.Data.Stream.StreamD as D
import qualified Streamly.Internal.Data.Stream.StreamK as K
import Prelude hiding (concatMap)
{-# INLINE dropPrefix #-}
dropPrefix ::
t m a -> t m a -> t m a
dropPrefix :: t m a -> t m a -> t m a
dropPrefix = [Char] -> t m a -> t m a -> t m a
forall a. HasCallStack => [Char] -> a
error [Char]
"Not implemented yet!"
{-# INLINE dropInfix #-}
dropInfix ::
t m a -> t m a -> t m a
dropInfix :: t m a -> t m a -> t m a
dropInfix = [Char] -> t m a -> t m a -> t m a
forall a. HasCallStack => [Char] -> a
error [Char]
"Not implemented yet!"
{-# INLINE dropSuffix #-}
dropSuffix ::
t m a -> t m a -> t m a
dropSuffix :: t m a -> t m a -> t m a
dropSuffix = [Char] -> t m a -> t m a -> t m a
forall a. HasCallStack => [Char] -> a
error [Char]
"Not implemented yet!"
{-# INLINE foldManyPost #-}
foldManyPost
:: (IsStream t, Monad m)
=> Fold m a b
-> t m a
-> t m b
foldManyPost :: Fold m a b -> t m a -> t m b
foldManyPost Fold m a b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
D.foldManyPost Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)
{-# INLINE foldMany #-}
foldMany
:: (IsStream t, Monad m)
=> Fold m a b
-> t m a
-> t m b
foldMany :: Fold m a b -> t m a -> t m b
foldMany Fold m a b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
D.foldMany Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)
{-# INLINE foldSequence #-}
foldSequence
::
t m (Fold m a b)
-> t m a
-> t m b
foldSequence :: t m (Fold m a b) -> t m a -> t m b
foldSequence t m (Fold m a b)
_f t m a
_m = t m b
forall a. HasCallStack => a
undefined
{-# INLINE foldIterateM #-}
foldIterateM ::
(IsStream t, Monad m) => (b -> m (Fold m a b)) -> b -> t m a -> t m b
foldIterateM :: (b -> m (Fold m a b)) -> b -> t m a -> t m b
foldIterateM b -> m (Fold m a b)
f b
i t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (b -> m (Fold m a b)) -> b -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
Monad m =>
(b -> m (Fold m a b)) -> b -> Stream m a -> Stream m b
D.foldIterateM b -> m (Fold m a b)
f b
i (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)
{-# INLINE parseMany #-}
parseMany
:: (IsStream t, MonadThrow m)
=> Parser m a b
-> t m a
-> t m b
parseMany :: Parser m a b -> t m a -> t m b
parseMany Parser m a b
p t m a
m =
Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Parser m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
MonadThrow m =>
Parser m a b -> Stream m a -> Stream m b
D.parseMany (Parser m a b -> Parser m a b
forall (m :: * -> *) a b.
MonadThrow m =>
Parser m a b -> Parser m a b
PRK.fromParserK Parser m a b
p) (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)
{-# INLINE parseManyD #-}
parseManyD
:: (IsStream t, MonadThrow m)
=> PRD.Parser m a b
-> t m a
-> t m b
parseManyD :: Parser m a b -> t m a -> t m b
parseManyD Parser m a b
p t m a
m =
Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Parser m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
MonadThrow m =>
Parser m a b -> Stream m a -> Stream m b
D.parseMany Parser m a b
p (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)
{-# INLINE parseSequence #-}
parseSequence
::
t m (Parser m a b)
-> t m a
-> t m b
parseSequence :: t m (Parser m a b) -> t m a -> t m b
parseSequence t m (Parser m a b)
_f t m a
_m = t m b
forall a. HasCallStack => a
undefined
{-# INLINE parseManyTill #-}
parseManyTill ::
Parser m a b
-> Parser m a x
-> t m a
-> t m b
parseManyTill :: Parser m a b -> Parser m a x -> t m a -> t m b
parseManyTill = Parser m a b -> Parser m a x -> t m a -> t m b
forall a. HasCallStack => a
undefined
{-# INLINE parseIterate #-}
parseIterate
:: (IsStream t, MonadThrow m)
=> (b -> Parser m a b)
-> b
-> t m a
-> t m b
parseIterate :: (b -> Parser m a b) -> b -> t m a -> t m b
parseIterate b -> Parser m a b
f b
i t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$
(b -> Parser m a b) -> b -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
MonadThrow m =>
(b -> Parser m a b) -> b -> Stream m a -> Stream m b
D.parseIterate (Parser m a b -> Parser m a b
forall (m :: * -> *) a b.
MonadThrow m =>
Parser m a b -> Parser m a b
PRK.fromParserK (Parser m a b -> Parser m a b)
-> (b -> Parser m a b) -> b -> Parser m a b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. b -> Parser m a b
f) b
i (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)
{-# INLINE groupsBy #-}
groupsBy
:: (IsStream t, Monad m)
=> (a -> a -> Bool)
-> Fold m a b
-> t m a
-> t m b
groupsBy :: (a -> a -> Bool) -> Fold m a b -> t m a -> t m b
groupsBy a -> a -> Bool
cmp Fold m a b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
D.groupsBy a -> a -> Bool
cmp Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)
{-# INLINE groupsByRolling #-}
groupsByRolling
:: (IsStream t, Monad m)
=> (a -> a -> Bool)
-> Fold m a b
-> t m a
-> t m b
groupsByRolling :: (a -> a -> Bool) -> Fold m a b -> t m a -> t m b
groupsByRolling a -> a -> Bool
cmp Fold m a b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
D.groupsRollingBy a -> a -> Bool
cmp Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)
{-# INLINE groups #-}
groups :: (IsStream t, Monad m, Eq a) => Fold m a b -> t m a -> t m b
groups :: Fold m a b -> t m a -> t m b
groups = (a -> a -> Bool) -> Fold m a b -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> Fold m a b -> t m a -> t m b
groupsBy a -> a -> Bool
forall a. Eq a => a -> a -> Bool
(==)
{-# INLINE splitOn #-}
splitOn
:: (IsStream t, Monad m)
=> (a -> Bool) -> Fold m a b -> t m a -> t m b
splitOn :: (a -> Bool) -> Fold m a b -> t m a -> t m b
splitOn a -> Bool
predicate Fold m a b
f =
Fold m a b -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m b
foldManyPost ((a -> Bool) -> Fold m a b -> Fold m a b
forall (m :: * -> *) a b.
Monad m =>
(a -> Bool) -> Fold m a b -> Fold m a b
FL.takeEndBy_ a -> Bool
predicate Fold m a b
f)
{-# INLINE splitOnSuffix #-}
splitOnSuffix
:: (IsStream t, Monad m)
=> (a -> Bool) -> Fold m a b -> t m a -> t m b
splitOnSuffix :: (a -> Bool) -> Fold m a b -> t m a -> t m b
splitOnSuffix a -> Bool
predicate Fold m a b
f = Fold m a b -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m b
foldMany ((a -> Bool) -> Fold m a b -> Fold m a b
forall (m :: * -> *) a b.
Monad m =>
(a -> Bool) -> Fold m a b -> Fold m a b
FL.takeEndBy_ a -> Bool
predicate Fold m a b
f)
{-# INLINE splitOnPrefix #-}
splitOnPrefix ::
(a -> Bool) -> Fold m a b -> t m a -> t m b
splitOnPrefix :: (a -> Bool) -> Fold m a b -> t m a -> t m b
splitOnPrefix a -> Bool
_predicate Fold m a b
_f = t m a -> t m b
forall a. HasCallStack => a
undefined
{-# INLINE wordsBy #-}
wordsBy
:: (IsStream t, Monad m)
=> (a -> Bool) -> Fold m a b -> t m a -> t m b
wordsBy :: (a -> Bool) -> Fold m a b -> t m a -> t m b
wordsBy a -> Bool
predicate Fold m a b
f t m a
m =
Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
D.wordsBy a -> Bool
predicate Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)
{-# INLINE splitWithSuffix #-}
splitWithSuffix
:: (IsStream t, Monad m)
=> (a -> Bool) -> Fold m a b -> t m a -> t m b
splitWithSuffix :: (a -> Bool) -> Fold m a b -> t m a -> t m b
splitWithSuffix a -> Bool
predicate Fold m a b
f = Fold m a b -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m b
foldMany ((a -> Bool) -> Fold m a b -> Fold m a b
forall (m :: * -> *) a b.
Monad m =>
(a -> Bool) -> Fold m a b -> Fold m a b
FL.takeEndBy a -> Bool
predicate Fold m a b
f)
{-# INLINE splitBySeq #-}
splitBySeq
:: (IsStream t, MonadAsync m, Storable a, Enum a, Eq a)
=> Array a -> Fold m a b -> t m a -> t m b
splitBySeq :: Array a -> Fold m a b -> t m a -> t m b
splitBySeq Array a
patt Fold m a b
f t m a
m =
m b -> t m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
intersperseM (Fold m a b -> SerialT m a -> m b
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> SerialT m a -> m b
fold Fold m a b
f (Array a -> SerialT m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t, Storable a) =>
Array a -> t m a
A.toStream Array a
patt)) (t m b -> t m b) -> t m b -> t m b
forall a b. (a -> b) -> a -> b
$ Array a -> Fold m a b -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadIO m, Storable a, Enum a, Eq a) =>
Array a -> Fold m a b -> t m a -> t m b
splitOnSeq Array a
patt Fold m a b
f t m a
m
{-# INLINE splitOnSuffixSeq #-}
splitOnSuffixSeq
:: (IsStream t, MonadIO m, Storable a, Enum a, Eq a)
=> Array a -> Fold m a b -> t m a -> t m b
splitOnSuffixSeq :: Array a -> Fold m a b -> t m a -> t m b
splitOnSuffixSeq Array a
patt Fold m a b
f t m a
m =
Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Bool -> Array a -> Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
(MonadIO m, Storable a, Enum a, Eq a) =>
Bool -> Array a -> Fold m a b -> Stream m a -> Stream m b
D.splitOnSuffixSeq Bool
False Array a
patt Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)
{-# INLINE splitWithSuffixSeq #-}
splitWithSuffixSeq
:: (IsStream t, MonadIO m, Storable a, Enum a, Eq a)
=> Array a -> Fold m a b -> t m a -> t m b
splitWithSuffixSeq :: Array a -> Fold m a b -> t m a -> t m b
splitWithSuffixSeq Array a
patt Fold m a b
f t m a
m =
Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Bool -> Array a -> Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
(MonadIO m, Storable a, Enum a, Eq a) =>
Bool -> Array a -> Fold m a b -> Stream m a -> Stream m b
D.splitOnSuffixSeq Bool
True Array a
patt Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)
{-# INLINE chunksOf #-}
chunksOf
:: (IsStream t, Monad m)
=> Int -> Fold m a b -> t m a -> t m b
chunksOf :: Int -> Fold m a b -> t m a -> t m b
chunksOf Int
n Fold m a b
f = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> (t m a -> Stream m b) -> t m a -> t m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
Int -> Fold m a b -> Stream m a -> Stream m b
D.chunksOf Int
n Fold m a b
f (Stream m a -> Stream m b)
-> (t m a -> Stream m a) -> t m a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD
{-# INLINE chunksOf2 #-}
chunksOf2
:: (IsStream t, Monad m)
=> Int -> m c -> Fold2 m c a b -> t m a -> t m b
chunksOf2 :: Int -> m c -> Fold2 m c a b -> t m a -> t m b
chunksOf2 Int
n m c
action Fold2 m c a b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Int -> m c -> Fold2 m c a b -> Stream m a -> Stream m b
forall (m :: * -> *) c a b.
Monad m =>
Int -> m c -> Fold2 m c a b -> Stream m a -> Stream m b
D.groupsOf2 Int
n m c
action Fold2 m c a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m)
{-# INLINE arraysOf #-}
arraysOf :: (IsStream t, MonadIO m, Storable a)
=> Int -> t m a -> t m (Array a)
arraysOf :: Int -> t m a -> t m (Array a)
arraysOf Int
n = Stream m (Array a) -> t m (Array a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m (Array a) -> t m (Array a))
-> (t m a -> Stream m (Array a)) -> t m a -> t m (Array a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Stream m a -> Stream m (Array a)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Stream m a -> Stream m (Array a)
A.arraysOf Int
n (Stream m a -> Stream m (Array a))
-> (t m a -> Stream m a) -> t m a -> Stream m (Array a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD
{-# INLINE intervalsOf #-}
intervalsOf
:: (IsStream t, MonadAsync m)
=> Double -> Fold m a b -> t m a -> t m b
intervalsOf :: Double -> Fold m a b -> t m a -> t m b
intervalsOf Double
n Fold m a b
f t m a
xs =
(Maybe a -> Bool) -> Fold m (Maybe a) b -> t m (Maybe a) -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Bool) -> Fold m a b -> t m a -> t m b
splitWithSuffix Maybe a -> Bool
forall a. Maybe a -> Bool
isNothing (Fold m a b -> Fold m (Maybe a) b
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Fold m (Maybe a) b
FL.catMaybes Fold m a b
f)
(Double -> m (Maybe a) -> t m (Maybe a) -> t m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
Double -> m a -> t m a -> t m a
interjectSuffix Double
n (Maybe a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing) ((a -> Maybe a) -> t m a -> t m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Serial.map a -> Maybe a
forall a. a -> Maybe a
Just t m a
xs))
data SessionState t m k a b = SessionState
{ SessionState t m k a b -> AbsTime
sessionCurTime :: !AbsTime
, SessionState t m k a b -> AbsTime
sessionEventTime :: !AbsTime
, SessionState t m k a b -> Int
sessionCount :: !Int
, SessionState t m k a b -> Heap (Entry AbsTime k)
sessionTimerHeap :: H.Heap (H.Entry AbsTime k)
, SessionState t m k a b -> Map k a
sessionKeyValueMap :: Map.Map k a
, SessionState t m k a b -> t m (k, b)
sessionOutputStream :: t (m :: Type -> Type) (k, b)
}
{-# INLINABLE classifySessionsBy #-}
classifySessionsBy
:: (IsStream t, MonadAsync m, Ord k)
=> Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
classifySessionsBy :: Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
classifySessionsBy Double
tick Bool
reset Int -> m Bool
ejectPred Double
tmout
(Fold s -> a -> m (Step s b)
step m (Step s b)
initial s -> m b
extract) t m (AbsTime, (k, a))
str =
(SessionState t m k (Tuple' AbsTime s) b -> t m (k, b))
-> t m (SessionState t m k (Tuple' AbsTime s) b) -> t m (k, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
concatMap SessionState t m k (Tuple' AbsTime s) b -> t m (k, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> t m (k, b)
sessionOutputStream (t m (SessionState t m k (Tuple' AbsTime s) b) -> t m (k, b))
-> t m (SessionState t m k (Tuple' AbsTime s) b) -> t m (k, b)
forall a b. (a -> b) -> a -> b
$
(SessionState t m k (Tuple' AbsTime s) b
-> Maybe (AbsTime, (k, a))
-> m (SessionState t m k (Tuple' AbsTime s) b))
-> m (SessionState t m k (Tuple' AbsTime s) b)
-> (SessionState t m k (Tuple' AbsTime s) b
-> m (SessionState t m k (Tuple' AbsTime s) b))
-> t m (Maybe (AbsTime, (k, a)))
-> t m (SessionState t m k (Tuple' AbsTime s) b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b
scanlMAfter' SessionState t m k (Tuple' AbsTime s) b
-> Maybe (AbsTime, (k, a))
-> m (SessionState t m k (Tuple' AbsTime s) b)
forall (t :: (* -> *) -> * -> *) k (t :: (* -> *) -> * -> *)
(m :: * -> *) b (m :: * -> *).
(IsStream t, Ord k) =>
SessionState t m k (Tuple' AbsTime s) b
-> Maybe (AbsTime, (k, a))
-> m (SessionState t m k (Tuple' AbsTime s) b)
sstep (SessionState t m k (Tuple' AbsTime s) b
-> m (SessionState t m k (Tuple' AbsTime s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return SessionState t m k (Tuple' AbsTime s) b
forall (m :: * -> *) k a b. SessionState t m k a b
szero) SessionState t m k (Tuple' AbsTime s) b
-> m (SessionState t m k (Tuple' AbsTime s) b)
forall k (t :: (* -> *) -> * -> *) (t :: (* -> *) -> * -> *)
(m :: * -> *) a b (m :: * -> *).
(Ord k, IsStream t) =>
SessionState t m k (Tuple' a s) b
-> m (SessionState t m k (Tuple' a s) b)
flush t m (Maybe (AbsTime, (k, a)))
stream
where
timeoutMs :: RelTime
timeoutMs = MilliSecond64 -> RelTime
forall a. TimeUnit a => a -> RelTime
toRelTime (Double -> MilliSecond64
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
tmout Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000) :: MilliSecond64)
tickMs :: RelTime
tickMs = MilliSecond64 -> RelTime
forall a. TimeUnit a => a -> RelTime
toRelTime (Double -> MilliSecond64
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
tick Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000) :: MilliSecond64)
szero :: SessionState t m k a b
szero = SessionState :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
AbsTime
-> AbsTime
-> Int
-> Heap (Entry AbsTime k)
-> Map k a
-> t m (k, b)
-> SessionState t m k a b
SessionState
{ sessionCurTime :: AbsTime
sessionCurTime = MilliSecond64 -> AbsTime
forall a. TimeUnit a => a -> AbsTime
toAbsTime (MilliSecond64
0 :: MilliSecond64)
, sessionEventTime :: AbsTime
sessionEventTime = MilliSecond64 -> AbsTime
forall a. TimeUnit a => a -> AbsTime
toAbsTime (MilliSecond64
0 :: MilliSecond64)
, sessionCount :: Int
sessionCount = Int
0
, sessionTimerHeap :: Heap (Entry AbsTime k)
sessionTimerHeap = Heap (Entry AbsTime k)
forall a. Heap a
H.empty
, sessionKeyValueMap :: Map k a
sessionKeyValueMap = Map k a
forall k a. Map k a
Map.empty
, sessionOutputStream :: t m (k, b)
sessionOutputStream = t m (k, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
K.nil
}
sstep :: SessionState t m k (Tuple' AbsTime s) b
-> Maybe (AbsTime, (k, a))
-> m (SessionState t m k (Tuple' AbsTime s) b)
sstep session :: SessionState t m k (Tuple' AbsTime s) b
session@SessionState{t m (k, b)
Int
Map k (Tuple' AbsTime s)
Heap (Entry AbsTime k)
AbsTime
sessionOutputStream :: t m (k, b)
sessionKeyValueMap :: Map k (Tuple' AbsTime s)
sessionTimerHeap :: Heap (Entry AbsTime k)
sessionCount :: Int
sessionEventTime :: AbsTime
sessionCurTime :: AbsTime
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> t m (k, b)
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Map k a
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Heap (Entry AbsTime k)
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Int
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
..} (Just (AbsTime
timestamp, (k
key, a
value))) = do
let curTime :: AbsTime
curTime = AbsTime -> AbsTime -> AbsTime
forall a. Ord a => a -> a -> a
max AbsTime
sessionEventTime AbsTime
timestamp
mOld :: Maybe (Tuple' AbsTime s)
mOld = k -> Map k (Tuple' AbsTime s) -> Maybe (Tuple' AbsTime s)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup k
key Map k (Tuple' AbsTime s)
sessionKeyValueMap
let done :: b -> m (SessionState t m k (Tuple' AbsTime s) b)
done b
fb = do
let (Map k (Tuple' AbsTime s)
mp, Int
cnt) = case Maybe (Tuple' AbsTime s)
mOld of
Maybe (Tuple' AbsTime s)
Nothing -> (Map k (Tuple' AbsTime s)
sessionKeyValueMap, Int
sessionCount)
Just Tuple' AbsTime s
_ -> (k -> Map k (Tuple' AbsTime s) -> Map k (Tuple' AbsTime s)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete k
key Map k (Tuple' AbsTime s)
sessionKeyValueMap
, Int
sessionCount Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
SessionState t m k (Tuple' AbsTime s) b
-> m (SessionState t m k (Tuple' AbsTime s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (SessionState t m k (Tuple' AbsTime s) b
-> m (SessionState t m k (Tuple' AbsTime s) b))
-> SessionState t m k (Tuple' AbsTime s) b
-> m (SessionState t m k (Tuple' AbsTime s) b)
forall a b. (a -> b) -> a -> b
$ SessionState t m k (Tuple' AbsTime s) b
session
{ sessionCurTime :: AbsTime
sessionCurTime = AbsTime
curTime
, sessionEventTime :: AbsTime
sessionEventTime = AbsTime
curTime
, sessionCount :: Int
sessionCount = Int
cnt
, sessionKeyValueMap :: Map k (Tuple' AbsTime s)
sessionKeyValueMap = Map k (Tuple' AbsTime s)
mp
, sessionOutputStream :: t m (k, b)
sessionOutputStream = (k, b) -> t m (k, b)
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
fromPure (k
key, b
fb)
}
partial :: s -> m (SessionState t m k (Tuple' AbsTime s) b)
partial s
fs1 = do
let acc :: Tuple' AbsTime s
acc = AbsTime -> s -> Tuple' AbsTime s
forall a b. a -> b -> Tuple' a b
Tuple' AbsTime
timestamp s
fs1
(Heap (Entry AbsTime k)
hp1, Map k (Tuple' AbsTime s)
mp1, t m (k, b)
out1, Int
cnt1) <- do
let vars :: (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m a, Int)
vars = (Heap (Entry AbsTime k)
sessionTimerHeap, Map k (Tuple' AbsTime s)
sessionKeyValueMap,
t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
K.nil, Int
sessionCount)
case Maybe (Tuple' AbsTime s)
mOld of
Maybe (Tuple' AbsTime s)
Nothing -> do
Bool
eject <- Int -> m Bool
ejectPred Int
sessionCount
(Heap (Entry AbsTime k)
hp, Map k (Tuple' AbsTime s)
mp, t m (k, b)
out, Int
cnt) <-
if Bool
eject
then (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), Int)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
Int)
forall k d (t :: (* -> *) -> * -> *) (m :: * -> *).
(Ord k, Num d, IsStream t) =>
(Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), d)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
d)
ejectOne (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), Int)
forall (m :: * -> *) a.
(Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m a, Int)
vars
else (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), Int)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), Int)
forall (m :: * -> *) a.
(Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m a, Int)
vars
let expiry :: AbsTime
expiry = AbsTime -> RelTime -> AbsTime
addToAbsTime AbsTime
timestamp RelTime
timeoutMs
hp' :: Heap (Entry AbsTime k)
hp' = Entry AbsTime k -> Heap (Entry AbsTime k) -> Heap (Entry AbsTime k)
forall a. Ord a => a -> Heap a -> Heap a
H.insert (AbsTime -> k -> Entry AbsTime k
forall p a. p -> a -> Entry p a
Entry AbsTime
expiry k
key) Heap (Entry AbsTime k)
hp
in (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), Int)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime k)
hp', Map k (Tuple' AbsTime s)
mp, t m (k, b)
out, Int
cnt Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
Just Tuple' AbsTime s
_ -> (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), Int)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), Int)
forall (m :: * -> *) a.
(Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m a, Int)
vars
let mp2 :: Map k (Tuple' AbsTime s)
mp2 = k
-> Tuple' AbsTime s
-> Map k (Tuple' AbsTime s)
-> Map k (Tuple' AbsTime s)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
key Tuple' AbsTime s
acc Map k (Tuple' AbsTime s)
mp1
SessionState t m k (Tuple' AbsTime s) b
-> m (SessionState t m k (Tuple' AbsTime s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (SessionState t m k (Tuple' AbsTime s) b
-> m (SessionState t m k (Tuple' AbsTime s) b))
-> SessionState t m k (Tuple' AbsTime s) b
-> m (SessionState t m k (Tuple' AbsTime s) b)
forall a b. (a -> b) -> a -> b
$ SessionState :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
AbsTime
-> AbsTime
-> Int
-> Heap (Entry AbsTime k)
-> Map k a
-> t m (k, b)
-> SessionState t m k a b
SessionState
{ sessionCurTime :: AbsTime
sessionCurTime = AbsTime
curTime
, sessionEventTime :: AbsTime
sessionEventTime = AbsTime
curTime
, sessionCount :: Int
sessionCount = Int
cnt1
, sessionTimerHeap :: Heap (Entry AbsTime k)
sessionTimerHeap = Heap (Entry AbsTime k)
hp1
, sessionKeyValueMap :: Map k (Tuple' AbsTime s)
sessionKeyValueMap = Map k (Tuple' AbsTime s)
mp2
, sessionOutputStream :: t m (k, b)
sessionOutputStream = t m (k, b)
out1
}
Step s b
res0 <- do
case Maybe (Tuple' AbsTime s)
mOld of
Maybe (Tuple' AbsTime s)
Nothing -> m (Step s b)
initial
Just (Tuple' AbsTime
_ s
acc) -> Step s b -> m (Step s b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s b -> m (Step s b)) -> Step s b -> m (Step s b)
forall a b. (a -> b) -> a -> b
$ s -> Step s b
forall s b. s -> Step s b
FL.Partial s
acc
case Step s b
res0 of
FL.Done b
fb -> b -> m (SessionState t m k (Tuple' AbsTime s) b)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) b (m :: * -> *).
(Monad m, IsStream t) =>
b -> m (SessionState t m k (Tuple' AbsTime s) b)
done b
fb
FL.Partial s
fs -> do
Step s b
res <- s -> a -> m (Step s b)
step s
fs a
value
case Step s b
res of
FL.Done b
fb -> b -> m (SessionState t m k (Tuple' AbsTime s) b)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) b (m :: * -> *).
(Monad m, IsStream t) =>
b -> m (SessionState t m k (Tuple' AbsTime s) b)
done b
fb
FL.Partial s
fs1 -> s -> m (SessionState t m k (Tuple' AbsTime s) b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
IsStream t =>
s -> m (SessionState t m k (Tuple' AbsTime s) b)
partial s
fs1
sstep sessionState :: SessionState t m k (Tuple' AbsTime s) b
sessionState@SessionState{t m (k, b)
Int
Map k (Tuple' AbsTime s)
Heap (Entry AbsTime k)
AbsTime
sessionOutputStream :: t m (k, b)
sessionKeyValueMap :: Map k (Tuple' AbsTime s)
sessionTimerHeap :: Heap (Entry AbsTime k)
sessionCount :: Int
sessionEventTime :: AbsTime
sessionCurTime :: AbsTime
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> t m (k, b)
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Map k a
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Heap (Entry AbsTime k)
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Int
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
..} Maybe (AbsTime, (k, a))
Nothing =
let curTime :: AbsTime
curTime = AbsTime -> RelTime -> AbsTime
addToAbsTime AbsTime
sessionCurTime RelTime
tickMs
in SessionState t m k (Tuple' AbsTime s) b
-> AbsTime -> m (SessionState t m k (Tuple' AbsTime s) b)
forall k (t :: (* -> *) -> * -> *) (t :: (* -> *) -> * -> *)
(m :: * -> *) b (m :: * -> *).
(Ord k, IsStream t) =>
SessionState t m k (Tuple' AbsTime s) b
-> AbsTime -> m (SessionState t m k (Tuple' AbsTime s) b)
ejectExpired SessionState t m k (Tuple' AbsTime s) b
sessionState AbsTime
curTime
flush :: SessionState t m k (Tuple' a s) b
-> m (SessionState t m k (Tuple' a s) b)
flush session :: SessionState t m k (Tuple' a s) b
session@SessionState{t m (k, b)
Int
Map k (Tuple' a s)
Heap (Entry AbsTime k)
AbsTime
sessionOutputStream :: t m (k, b)
sessionKeyValueMap :: Map k (Tuple' a s)
sessionTimerHeap :: Heap (Entry AbsTime k)
sessionCount :: Int
sessionEventTime :: AbsTime
sessionCurTime :: AbsTime
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> t m (k, b)
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Map k a
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Heap (Entry AbsTime k)
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Int
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
..} = do
(Heap (Entry AbsTime k)
hp', Map k (Tuple' a s)
mp', t m (k, b)
out, Int
count) <-
(Heap (Entry AbsTime k), Map k (Tuple' a s), t m (k, b), Int)
-> m (Heap (Entry AbsTime k), Map k (Tuple' a s), t m (k, b), Int)
forall k d (t :: (* -> *) -> * -> *) p a (m :: * -> *).
(Ord k, Num d, IsStream t) =>
(Heap (Entry p k), Map k (Tuple' a s), t m (k, b), d)
-> m (Heap (Entry p k), Map k (Tuple' a s), t m (k, b), d)
ejectAll
( Heap (Entry AbsTime k)
sessionTimerHeap
, Map k (Tuple' a s)
sessionKeyValueMap
, t m (k, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
K.nil
, Int
sessionCount
)
SessionState t m k (Tuple' a s) b
-> m (SessionState t m k (Tuple' a s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (SessionState t m k (Tuple' a s) b
-> m (SessionState t m k (Tuple' a s) b))
-> SessionState t m k (Tuple' a s) b
-> m (SessionState t m k (Tuple' a s) b)
forall a b. (a -> b) -> a -> b
$ SessionState t m k (Tuple' a s) b
session
{ sessionCount :: Int
sessionCount = Int
count
, sessionTimerHeap :: Heap (Entry AbsTime k)
sessionTimerHeap = Heap (Entry AbsTime k)
hp'
, sessionKeyValueMap :: Map k (Tuple' a s)
sessionKeyValueMap = Map k (Tuple' a s)
mp'
, sessionOutputStream :: t m (k, b)
sessionOutputStream = t m (k, b)
out
}
ejectEntry :: a
-> Map k a
-> t m (k, b)
-> d
-> s
-> k
-> m (a, Map k a, t m (k, b), d)
ejectEntry a
hp Map k a
mp t m (k, b)
out d
cnt s
acc k
key = do
b
sess <- s -> m b
extract s
acc
let out1 :: t m (k, b)
out1 = (k
key, b
sess) (k, b) -> t m (k, b) -> t m (k, b)
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
`K.cons` t m (k, b)
out
let mp1 :: Map k a
mp1 = k -> Map k a -> Map k a
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete k
key Map k a
mp
(a, Map k a, t m (k, b), d) -> m (a, Map k a, t m (k, b), d)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
hp, Map k a
mp1, t m (k, b)
out1, d
cnt d -> d -> d
forall a. Num a => a -> a -> a
- d
1)
ejectAll :: (Heap (Entry p k), Map k (Tuple' a s), t m (k, b), d)
-> m (Heap (Entry p k), Map k (Tuple' a s), t m (k, b), d)
ejectAll (Heap (Entry p k)
hp, Map k (Tuple' a s)
mp, t m (k, b)
out, !d
cnt) = do
let hres :: Maybe (Entry p k, Heap (Entry p k))
hres = Heap (Entry p k) -> Maybe (Entry p k, Heap (Entry p k))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry p k)
hp
case Maybe (Entry p k, Heap (Entry p k))
hres of
Just (Entry p
_ k
key, Heap (Entry p k)
hp1) -> do
(Heap (Entry p k), Map k (Tuple' a s), t m (k, b), d)
r <- case k -> Map k (Tuple' a s) -> Maybe (Tuple' a s)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup k
key Map k (Tuple' a s)
mp of
Maybe (Tuple' a s)
Nothing -> (Heap (Entry p k), Map k (Tuple' a s), t m (k, b), d)
-> m (Heap (Entry p k), Map k (Tuple' a s), t m (k, b), d)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry p k)
hp1, Map k (Tuple' a s)
mp, t m (k, b)
out, d
cnt)
Just (Tuple' a
_ s
acc) -> Heap (Entry p k)
-> Map k (Tuple' a s)
-> t m (k, b)
-> d
-> s
-> k
-> m (Heap (Entry p k), Map k (Tuple' a s), t m (k, b), d)
forall d (t :: (* -> *) -> * -> *) k a a (m :: * -> *).
(Num d, IsStream t, Ord k) =>
a
-> Map k a
-> t m (k, b)
-> d
-> s
-> k
-> m (a, Map k a, t m (k, b), d)
ejectEntry Heap (Entry p k)
hp1 Map k (Tuple' a s)
mp t m (k, b)
out d
cnt s
acc k
key
(Heap (Entry p k), Map k (Tuple' a s), t m (k, b), d)
-> m (Heap (Entry p k), Map k (Tuple' a s), t m (k, b), d)
ejectAll (Heap (Entry p k), Map k (Tuple' a s), t m (k, b), d)
r
Maybe (Entry p k, Heap (Entry p k))
Nothing -> do
Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (Map k (Tuple' a s) -> Bool
forall k a. Map k a -> Bool
Map.null Map k (Tuple' a s)
mp) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
(Heap (Entry p k), Map k (Tuple' a s), t m (k, b), d)
-> m (Heap (Entry p k), Map k (Tuple' a s), t m (k, b), d)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry p k)
hp, Map k (Tuple' a s)
mp, t m (k, b)
out, d
cnt)
ejectOne :: (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), d)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
d)
ejectOne (Heap (Entry AbsTime k)
hp, Map k (Tuple' AbsTime s)
mp, t m (k, b)
out, !d
cnt) = do
let hres :: Maybe (Entry AbsTime k, Heap (Entry AbsTime k))
hres = Heap (Entry AbsTime k)
-> Maybe (Entry AbsTime k, Heap (Entry AbsTime k))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry AbsTime k)
hp
case Maybe (Entry AbsTime k, Heap (Entry AbsTime k))
hres of
Just (Entry AbsTime
expiry k
key, Heap (Entry AbsTime k)
hp1) ->
case k -> Map k (Tuple' AbsTime s) -> Maybe (Tuple' AbsTime s)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup k
key Map k (Tuple' AbsTime s)
mp of
Maybe (Tuple' AbsTime s)
Nothing -> (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), d)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
d)
ejectOne (Heap (Entry AbsTime k)
hp1, Map k (Tuple' AbsTime s)
mp, t m (k, b)
out, d
cnt)
Just (Tuple' AbsTime
latestTS s
acc) -> do
let expiry1 :: AbsTime
expiry1 = AbsTime -> RelTime -> AbsTime
addToAbsTime AbsTime
latestTS RelTime
timeoutMs
if Bool -> Bool
not Bool
reset Bool -> Bool -> Bool
|| AbsTime
expiry1 AbsTime -> AbsTime -> Bool
forall a. Ord a => a -> a -> Bool
<= AbsTime
expiry
then Heap (Entry AbsTime k)
-> Map k (Tuple' AbsTime s)
-> t m (k, b)
-> d
-> s
-> k
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
d)
forall d (t :: (* -> *) -> * -> *) k a a (m :: * -> *).
(Num d, IsStream t, Ord k) =>
a
-> Map k a
-> t m (k, b)
-> d
-> s
-> k
-> m (a, Map k a, t m (k, b), d)
ejectEntry Heap (Entry AbsTime k)
hp1 Map k (Tuple' AbsTime s)
mp t m (k, b)
out d
cnt s
acc k
key
else
let hp2 :: Heap (Entry AbsTime k)
hp2 = Entry AbsTime k -> Heap (Entry AbsTime k) -> Heap (Entry AbsTime k)
forall a. Ord a => a -> Heap a -> Heap a
H.insert (AbsTime -> k -> Entry AbsTime k
forall p a. p -> a -> Entry p a
Entry AbsTime
expiry1 k
key) Heap (Entry AbsTime k)
hp1
in (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), d)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
d)
ejectOne (Heap (Entry AbsTime k)
hp2, Map k (Tuple' AbsTime s)
mp, t m (k, b)
out, d
cnt)
Maybe (Entry AbsTime k, Heap (Entry AbsTime k))
Nothing -> do
Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (Map k (Tuple' AbsTime s) -> Bool
forall k a. Map k a -> Bool
Map.null Map k (Tuple' AbsTime s)
mp) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
(Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), d)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
d)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime k)
hp, Map k (Tuple' AbsTime s)
mp, t m (k, b)
out, d
cnt)
ejectExpired :: SessionState t m k (Tuple' AbsTime s) b
-> AbsTime -> m (SessionState t m k (Tuple' AbsTime s) b)
ejectExpired session :: SessionState t m k (Tuple' AbsTime s) b
session@SessionState{t m (k, b)
Int
Map k (Tuple' AbsTime s)
Heap (Entry AbsTime k)
AbsTime
sessionOutputStream :: t m (k, b)
sessionKeyValueMap :: Map k (Tuple' AbsTime s)
sessionTimerHeap :: Heap (Entry AbsTime k)
sessionCount :: Int
sessionEventTime :: AbsTime
sessionCurTime :: AbsTime
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> t m (k, b)
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Map k a
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Heap (Entry AbsTime k)
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> Int
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
SessionState t m k a b -> AbsTime
..} AbsTime
curTime = do
(Heap (Entry AbsTime k)
hp', Map k (Tuple' AbsTime s)
mp', t m (k, b)
out, Int
count) <-
Heap (Entry AbsTime k)
-> Map k (Tuple' AbsTime s)
-> t m (k, b)
-> Int
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
Int)
forall k (t :: (* -> *) -> * -> *) (m :: * -> *).
(Ord k, IsStream t) =>
Heap (Entry AbsTime k)
-> Map k (Tuple' AbsTime s)
-> t m (k, b)
-> Int
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
Int)
ejectLoop Heap (Entry AbsTime k)
sessionTimerHeap Map k (Tuple' AbsTime s)
sessionKeyValueMap t m (k, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
K.nil Int
sessionCount
SessionState t m k (Tuple' AbsTime s) b
-> m (SessionState t m k (Tuple' AbsTime s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (SessionState t m k (Tuple' AbsTime s) b
-> m (SessionState t m k (Tuple' AbsTime s) b))
-> SessionState t m k (Tuple' AbsTime s) b
-> m (SessionState t m k (Tuple' AbsTime s) b)
forall a b. (a -> b) -> a -> b
$ SessionState t m k (Tuple' AbsTime s) b
session
{ sessionCurTime :: AbsTime
sessionCurTime = AbsTime
curTime
, sessionCount :: Int
sessionCount = Int
count
, sessionTimerHeap :: Heap (Entry AbsTime k)
sessionTimerHeap = Heap (Entry AbsTime k)
hp'
, sessionKeyValueMap :: Map k (Tuple' AbsTime s)
sessionKeyValueMap = Map k (Tuple' AbsTime s)
mp'
, sessionOutputStream :: t m (k, b)
sessionOutputStream = t m (k, b)
out
}
where
ejectLoop :: Heap (Entry AbsTime k)
-> Map k (Tuple' AbsTime s)
-> t m (k, b)
-> Int
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
Int)
ejectLoop Heap (Entry AbsTime k)
hp Map k (Tuple' AbsTime s)
mp t m (k, b)
out !Int
cnt = do
let hres :: Maybe (Entry AbsTime k, Heap (Entry AbsTime k))
hres = Heap (Entry AbsTime k)
-> Maybe (Entry AbsTime k, Heap (Entry AbsTime k))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry AbsTime k)
hp
case Maybe (Entry AbsTime k, Heap (Entry AbsTime k))
hres of
Just (Entry AbsTime
expiry k
key, Heap (Entry AbsTime k)
hp1) -> do
(Bool
eject, Bool
force) <-
if AbsTime
curTime AbsTime -> AbsTime -> Bool
forall a. Ord a => a -> a -> Bool
>= AbsTime
expiry
then (Bool, Bool) -> m (Bool, Bool)
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
True, Bool
False)
else do
Bool
r <- Int -> m Bool
ejectPred Int
cnt
(Bool, Bool) -> m (Bool, Bool)
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
r, Bool
r)
if Bool
eject
then
case k -> Map k (Tuple' AbsTime s) -> Maybe (Tuple' AbsTime s)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup k
key Map k (Tuple' AbsTime s)
mp of
Maybe (Tuple' AbsTime s)
Nothing -> Heap (Entry AbsTime k)
-> Map k (Tuple' AbsTime s)
-> t m (k, b)
-> Int
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
Int)
ejectLoop Heap (Entry AbsTime k)
hp1 Map k (Tuple' AbsTime s)
mp t m (k, b)
out Int
cnt
Just (Tuple' AbsTime
latestTS s
acc) -> do
let expiry1 :: AbsTime
expiry1 = AbsTime -> RelTime -> AbsTime
addToAbsTime AbsTime
latestTS RelTime
timeoutMs
if AbsTime
expiry1 AbsTime -> AbsTime -> Bool
forall a. Ord a => a -> a -> Bool
<= AbsTime
curTime Bool -> Bool -> Bool
|| Bool -> Bool
not Bool
reset Bool -> Bool -> Bool
|| Bool
force
then do
(Heap (Entry AbsTime k)
hp2,Map k (Tuple' AbsTime s)
mp1,t m (k, b)
out1,Int
cnt1) <-
Heap (Entry AbsTime k)
-> Map k (Tuple' AbsTime s)
-> t m (k, b)
-> Int
-> s
-> k
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
Int)
forall d (t :: (* -> *) -> * -> *) k a a (m :: * -> *).
(Num d, IsStream t, Ord k) =>
a
-> Map k a
-> t m (k, b)
-> d
-> s
-> k
-> m (a, Map k a, t m (k, b), d)
ejectEntry Heap (Entry AbsTime k)
hp1 Map k (Tuple' AbsTime s)
mp t m (k, b)
out Int
cnt s
acc k
key
Heap (Entry AbsTime k)
-> Map k (Tuple' AbsTime s)
-> t m (k, b)
-> Int
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
Int)
ejectLoop Heap (Entry AbsTime k)
hp2 Map k (Tuple' AbsTime s)
mp1 t m (k, b)
out1 Int
cnt1
else
let hp2 :: Heap (Entry AbsTime k)
hp2 = Entry AbsTime k -> Heap (Entry AbsTime k) -> Heap (Entry AbsTime k)
forall a. Ord a => a -> Heap a -> Heap a
H.insert (AbsTime -> k -> Entry AbsTime k
forall p a. p -> a -> Entry p a
Entry AbsTime
expiry1 k
key) Heap (Entry AbsTime k)
hp1
in Heap (Entry AbsTime k)
-> Map k (Tuple' AbsTime s)
-> t m (k, b)
-> Int
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
Int)
ejectLoop Heap (Entry AbsTime k)
hp2 Map k (Tuple' AbsTime s)
mp t m (k, b)
out Int
cnt
else (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), Int)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime k)
hp, Map k (Tuple' AbsTime s)
mp, t m (k, b)
out, Int
cnt)
Maybe (Entry AbsTime k, Heap (Entry AbsTime k))
Nothing -> do
Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (Map k (Tuple' AbsTime s) -> Bool
forall k a. Map k a -> Bool
Map.null Map k (Tuple' AbsTime s)
mp) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
(Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b), Int)
-> m (Heap (Entry AbsTime k), Map k (Tuple' AbsTime s), t m (k, b),
Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime k)
hp, Map k (Tuple' AbsTime s)
mp, t m (k, b)
out, Int
cnt)
stream :: t m (Maybe (AbsTime, (k, a)))
stream = ((AbsTime, (k, a)) -> Maybe (AbsTime, (k, a)))
-> t m (AbsTime, (k, a)) -> t m (Maybe (AbsTime, (k, a)))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Serial.map (AbsTime, (k, a)) -> Maybe (AbsTime, (k, a))
forall a. a -> Maybe a
Just t m (AbsTime, (k, a))
str t m (Maybe (AbsTime, (k, a)))
-> t m (Maybe (AbsTime, (k, a))) -> t m (Maybe (AbsTime, (k, a)))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
`Par.parallelFst` m (Maybe (AbsTime, (k, a))) -> t m (Maybe (AbsTime, (k, a)))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a
repeatM m (Maybe (AbsTime, (k, a)))
forall a. m (Maybe a)
timer
timer :: m (Maybe a)
timer = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Double -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (Double -> Int) -> Double -> Int
forall a b. (a -> b) -> a -> b
$ Double
tick Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000000)
Maybe a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
{-# INLINABLE classifyKeepAliveSessions #-}
classifyKeepAliveSessions ::
(IsStream t, MonadAsync m, Ord k)
=> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
classifyKeepAliveSessions :: (Int -> m Bool)
-> Double -> Fold m a b -> t m (AbsTime, (k, a)) -> t m (k, b)
classifyKeepAliveSessions = Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
(IsStream t, MonadAsync m, Ord k) =>
Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
classifySessionsBy Double
1 Bool
True
{-# INLINABLE classifySessionsOf #-}
classifySessionsOf ::
(IsStream t, MonadAsync m, Ord k)
=> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
classifySessionsOf :: (Int -> m Bool)
-> Double -> Fold m a b -> t m (AbsTime, (k, a)) -> t m (k, b)
classifySessionsOf = Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
(IsStream t, MonadAsync m, Ord k) =>
Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
classifySessionsBy Double
1 Bool
False
{-# INLINE splitInnerBy #-}
splitInnerBy
:: (IsStream t, Monad m)
=> (f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a))
-> t m (f a)
-> t m (f a)
splitInnerBy :: (f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a)
splitInnerBy f a -> m (f a, Maybe (f a))
splitter f a -> f a -> m (f a)
joiner t m (f a)
xs =
Stream m (f a) -> t m (f a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m (f a) -> t m (f a)) -> Stream m (f a) -> t m (f a)
forall a b. (a -> b) -> a -> b
$ (f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a)
forall (m :: * -> *) (f :: * -> *) a.
Monad m =>
(f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a)
D.splitInnerBy f a -> m (f a, Maybe (f a))
splitter f a -> f a -> m (f a)
joiner (Stream m (f a) -> Stream m (f a))
-> Stream m (f a) -> Stream m (f a)
forall a b. (a -> b) -> a -> b
$ t m (f a) -> Stream m (f a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m (f a)
xs
{-# INLINE splitInnerBySuffix #-}
splitInnerBySuffix
:: (IsStream t, Monad m, Eq (f a), Monoid (f a))
=> (f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a))
-> t m (f a)
-> t m (f a)
splitInnerBySuffix :: (f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a)
splitInnerBySuffix f a -> m (f a, Maybe (f a))
splitter f a -> f a -> m (f a)
joiner t m (f a)
xs =
Stream m (f a) -> t m (f a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m (f a) -> t m (f a)) -> Stream m (f a) -> t m (f a)
forall a b. (a -> b) -> a -> b
$ (f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a)
forall (m :: * -> *) (f :: * -> *) a.
(Monad m, Eq (f a), Monoid (f a)) =>
(f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a)
D.splitInnerBySuffix f a -> m (f a, Maybe (f a))
splitter f a -> f a -> m (f a)
joiner (Stream m (f a) -> Stream m (f a))
-> Stream m (f a) -> Stream m (f a)
forall a b. (a -> b) -> a -> b
$ t m (f a) -> Stream m (f a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m (f a)
xs