-- |
-- Module      : Streamly.Internal.Data.Stream.Time
-- Copyright   : (c) 2020 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.Stream.Time
    (
    -- * Imports for Examples
    -- $setup

    -- * Timers
      periodic -- XXX Should go to Streamly.Data.Stream
    , ticks -- XXX Should go to Streamly.Data.Stream
    , ticksRate
    , interject

    -- * Trimming
    , takeInterval
    , takeLastInterval
    , dropInterval
    , dropLastInterval

    -- * Chunking
    , intervalsOf
    , groupsOfTimeout

    -- * Sampling
    , sampleIntervalEnd
    , sampleIntervalStart
    , sampleBurst
    , sampleBurstEnd
    , sampleBurstStart

    -- * Windowed Sessions
    , classifySessionsByGeneric
    , classifySessionsBy
    , classifySessionsOf
    , classifyKeepAliveSessions

    -- XXX This should go in the concurrent module
    -- * Buffering
    -- | Evaluate strictly using a buffer of results.  When the buffer becomes
    -- full we can block, drop the new elements, drop the oldest element and
    -- insert the new at the end.
    , bufferLatest
    , bufferLatestN
    , bufferOldestN
    )
where

import Control.Concurrent (threadDelay)
import Control.Exception (assert)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Heap (Entry(..))
import Data.Kind (Type)
import Data.Map (Map)
import Data.Maybe (isNothing)
import Data.Proxy (Proxy(..))
import Streamly.Data.Fold (Fold)
import Streamly.Internal.Data.Fold (Fold (..))
import Streamly.Internal.Data.IsMap (IsMap(..))
import Streamly.Internal.Data.Stream (Stream)
import Streamly.Internal.Data.Time.Units
    ( AbsTime
    , MilliSecond64(..)
    , addToAbsTime
    , toAbsTime
    , toRelTime
    )
import Streamly.Internal.Data.Time.Units (NanoSecond64(..), toRelTime64)

import qualified Data.Heap as H
import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Data.Stream as Stream
import qualified Streamly.Data.Unfold as Unfold
import qualified Streamly.Internal.Data.Fold as Fold (Step(..))
import qualified Streamly.Internal.Data.IsMap as IsMap
import qualified Streamly.Internal.Data.Stream as Stream
    ( scanlMAfter'
    , timeIndexed
    , timestamped
    )

import Streamly.Internal.Data.Stream.Concurrent

-- $setup
--
-- Imports for example snippets in this module.
--
-- >>> :m
-- >>> import Control.Concurrent (threadDelay)
-- >>> import qualified Streamly.Data.Array as Array
-- >>> import qualified Streamly.Data.Fold as Fold
-- >>> import qualified Streamly.Data.Parser as Parser
-- >>> import qualified Streamly.Data.Stream as Stream
-- >>> import qualified Streamly.Data.Stream.Prelude as Stream
-- >>> import qualified Streamly.Internal.Data.Stream as Stream (delayPost, timestamped)
-- >>> import qualified Streamly.Internal.Data.Stream.Concurrent as Stream (parListEagerFst)
-- >>> import qualified Streamly.Internal.Data.Stream.Time as Stream
-- >>> import Prelude hiding (concatMap, concat)
-- >>> :{
--  delay n = do
--      threadDelay (n * 1000000)   -- sleep for n seconds
--      putStrLn (show n ++ " sec") -- print "n sec"
--      return n                    -- IO Int
-- :}

--------------------------------------------------------------------------------
-- Combinators
--------------------------------------------------------------------------------

-- | Generate a stream by running an action periodically at the specified time
-- interval.
--
{-# INLINE periodic #-}
periodic :: MonadIO m => m a -> Double -> Stream m a
periodic :: forall (m :: * -> *) a. MonadIO m => m a -> Double -> Stream m a
periodic m a
action Double
n = m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> Stream m a
Stream.repeatM m a
timed

    where

    timed :: m a
timed = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Int -> IO ()
threadDelay (Double -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (Double -> Int) -> Double -> Int
forall a b. (a -> b) -> a -> b
$ Double
n Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000000)) m () -> m a -> m a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m a
action

-- | Generate a tick stream consisting of '()' elements, each tick is generated
-- after the specified time delay given in seconds.
--
-- >>> ticks = Stream.periodic (return ())
--
{-# INLINE ticks #-}
ticks :: MonadIO m => Double -> Stream m ()
ticks :: forall (m :: * -> *). MonadIO m => Double -> Stream m ()
ticks = m () -> Double -> Stream m ()
forall (m :: * -> *) a. MonadIO m => m a -> Double -> Stream m a
periodic (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())

-- | Generate a tick stream, ticks are generated at the specified 'Rate'. The
-- rate is adaptive, the tick generation speed can be increased or decreased at
-- different times to achieve the specified rate.  The specific behavior for
-- different styles of 'Rate' specifications is documented under 'Rate'.  The
-- effective maximum rate achieved by a stream is governed by the processor
-- speed.
--
-- >>> tickStream = Stream.repeatM (return ())
-- >>> ticksRate r = Stream.parEval (Stream.rate (Just r)) tickStream
--
{-# INLINE ticksRate #-}
ticksRate :: MonadAsync m => Rate -> Stream m ()
ticksRate :: forall (m :: * -> *). MonadAsync m => Rate -> Stream m ()
ticksRate Rate
r = (Config -> Config) -> Stream m () -> Stream m ()
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m a -> Stream m a
parEval (Maybe Rate -> Config -> Config
rate (Rate -> Maybe Rate
forall a. a -> Maybe a
Just Rate
r)) (Stream m () -> Stream m ()) -> Stream m () -> Stream m ()
forall a b. (a -> b) -> a -> b
$ m () -> Stream m ()
forall (m :: * -> *) a. Monad m => m a -> Stream m a
Stream.repeatM (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())

-- XXX The case when the interval is 0, we should run only the stream being
-- interjected.

-- | Intersperse a monadic action into the input stream after every @n@
-- seconds.
--
-- Definition:
--
-- >>> interject n f xs = Stream.parListEagerFst [xs, Stream.periodic f n]
--
-- Example:
--
-- >>> s = Stream.fromList "hello"
-- >>> input = Stream.mapM (\x -> threadDelay 1000000 >> putChar x) s
-- >>> Stream.fold Fold.drain $ Stream.interject (putChar ',') 1.05 input
-- h,e,l,l,o
--
{-# INLINE interject #-}
interject :: MonadAsync m => m a -> Double -> Stream m a -> Stream m a
interject :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> Double -> Stream m a -> Stream m a
interject m a
f Double
n Stream m a
xs = [Stream m a] -> Stream m a
forall (m :: * -> *) a. MonadAsync m => [Stream m a] -> Stream m a
parListEagerFst [Stream m a
xs, m a -> Double -> Stream m a
forall (m :: * -> *) a. MonadIO m => m a -> Double -> Stream m a
periodic m a
f Double
n]

-- XXX No element should be yielded if the duration is zero.

-- | @takeInterval interval@ runs the stream only upto the specified time
-- @interval@ in seconds.
--
-- The interval starts when the stream is evaluated for the first time.
--
{-# INLINE takeInterval #-}
takeInterval :: MonadAsync m => Double -> Stream m a -> Stream m a
takeInterval :: forall (m :: * -> *) a.
MonadAsync m =>
Double -> Stream m a -> Stream m a
takeInterval Double
d =
    Stream m (Maybe a) -> Stream m a
forall (m :: * -> *) a. Monad m => Stream m (Maybe a) -> Stream m a
Stream.catMaybes
        (Stream m (Maybe a) -> Stream m a)
-> (Stream m a -> Stream m (Maybe a)) -> Stream m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe a -> Bool) -> Stream m (Maybe a) -> Stream m (Maybe a)
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
Stream.takeWhile Maybe a -> Bool
forall a. Maybe a -> Bool
isNothing
        (Stream m (Maybe a) -> Stream m (Maybe a))
-> (Stream m a -> Stream m (Maybe a))
-> Stream m a
-> Stream m (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m (Maybe a) -> Double -> Stream m (Maybe a) -> Stream m (Maybe a)
forall (m :: * -> *) a.
MonadAsync m =>
m a -> Double -> Stream m a -> Stream m a
interject (Maybe a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing) Double
d (Stream m (Maybe a) -> Stream m (Maybe a))
-> (Stream m a -> Stream m (Maybe a))
-> Stream m a
-> Stream m (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> Maybe a) -> Stream m a -> Stream m (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> Maybe a
forall a. a -> Maybe a
Just

-- | Take time interval @i@ seconds at the end of the stream.
--
-- O(n) space, where n is the number elements taken.
--
-- /Unimplemented/
{-# INLINE takeLastInterval #-}
takeLastInterval :: -- MonadAsync m =>
    Double -> Stream m a -> Stream m a
takeLastInterval :: forall (m :: * -> *) a. Double -> Stream m a -> Stream m a
takeLastInterval = Double -> Stream m a -> Stream m a
forall a. HasCallStack => a
undefined

-- XXX All elements should be yielded if the duration is zero.

-- | @dropInterval interval@ drops all the stream elements that are generated
-- before the specified @interval@ in seconds has passed.
--
-- The interval begins when the stream is evaluated for the first time.
--
{-# INLINE dropInterval #-}
dropInterval :: MonadAsync m => Double -> Stream m a -> Stream m a
dropInterval :: forall (m :: * -> *) a.
MonadAsync m =>
Double -> Stream m a -> Stream m a
dropInterval Double
d =
    Stream m (Maybe a) -> Stream m a
forall (m :: * -> *) a. Monad m => Stream m (Maybe a) -> Stream m a
Stream.catMaybes
        (Stream m (Maybe a) -> Stream m a)
-> (Stream m a -> Stream m (Maybe a)) -> Stream m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe a -> Bool) -> Stream m (Maybe a) -> Stream m (Maybe a)
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
Stream.dropWhile Maybe a -> Bool
forall a. Maybe a -> Bool
isNothing
        (Stream m (Maybe a) -> Stream m (Maybe a))
-> (Stream m a -> Stream m (Maybe a))
-> Stream m a
-> Stream m (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m (Maybe a) -> Double -> Stream m (Maybe a) -> Stream m (Maybe a)
forall (m :: * -> *) a.
MonadAsync m =>
m a -> Double -> Stream m a -> Stream m a
interject (Maybe a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing) Double
d (Stream m (Maybe a) -> Stream m (Maybe a))
-> (Stream m a -> Stream m (Maybe a))
-> Stream m a
-> Stream m (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> Maybe a) -> Stream m a -> Stream m (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> Maybe a
forall a. a -> Maybe a
Just

-- | Drop time interval @i@ seconds at the end of the stream.
--
-- O(n) space, where n is the number elements dropped.
--
-- /Unimplemented/
{-# INLINE dropLastInterval #-}
dropLastInterval :: -- MonadAsync m =>
    Int -> Stream m a -> Stream m a
dropLastInterval :: forall (m :: * -> *) a. Int -> Stream m a -> Stream m a
dropLastInterval = Int -> Stream m a -> Stream m a
forall a. HasCallStack => a
undefined

-- | Group the input stream into windows of @n@ second each and then fold each
-- group using the provided fold function.
--
-- >>> twoPerSec = Stream.parEval (Stream.constRate 2) $ Stream.enumerateFrom 1
-- >>> intervals = Stream.intervalsOf 1 Fold.toList twoPerSec
-- >>> Stream.fold Fold.toList $ Stream.take 2 intervals
-- [...,...]
--
{-# INLINE intervalsOf #-}
intervalsOf :: MonadAsync m => Double -> Fold m a b -> Stream m a -> Stream m b
intervalsOf :: forall (m :: * -> *) a b.
MonadAsync m =>
Double -> Fold m a b -> Stream m a -> Stream m b
intervalsOf Double
n Fold m a b
f Stream m a
xs =
    Fold m (Maybe a) b -> Stream m (Maybe a) -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
Stream.foldMany
        ((Maybe a -> Bool) -> Fold m (Maybe a) b -> Fold m (Maybe a) b
forall (m :: * -> *) a b.
Monad m =>
(a -> Bool) -> Fold m a b -> Fold m a b
Fold.takeEndBy Maybe a -> Bool
forall a. Maybe a -> Bool
isNothing (Fold m a b -> Fold m (Maybe a) b
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Fold m (Maybe a) b
Fold.catMaybes Fold m a b
f))
        (m (Maybe a) -> Double -> Stream m (Maybe a) -> Stream m (Maybe a)
forall (m :: * -> *) a.
MonadAsync m =>
m a -> Double -> Stream m a -> Stream m a
interject (Maybe a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing) Double
n ((a -> Maybe a) -> Stream m a -> Stream m (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> Maybe a
forall a. a -> Maybe a
Just Stream m a
xs))

-- XXX This can be implemented more efficiently by sharing a Clock.Timer across
-- parallel threads and resetting it whenever a span is emitted.

-- | Like 'chunksOf' but if the chunk is not completed within the specified
-- time interval then emit whatever we have collected till now. The chunk
-- timeout is reset whenever a chunk is emitted. The granularity of the clock
-- is 100 ms.
--
-- >>> s = Stream.delayPost 0.3 $ Stream.fromList [1..1000]
-- >>> f = Stream.fold (Fold.drainMapM print) $ Stream.groupsOfTimeout 5 1 Fold.toList s
--
-- /Pre-release/
{-# INLINE groupsOfTimeout #-}
groupsOfTimeout :: MonadAsync m
    => Int -> Double -> Fold m a b -> Stream m a -> Stream m b
groupsOfTimeout :: forall (m :: * -> *) a b.
MonadAsync m =>
Int -> Double -> Fold m a b -> Stream m a -> Stream m b
groupsOfTimeout Int
n Double
timeout Fold m a b
f =
      (((), b) -> b) -> Stream m ((), b) -> Stream m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((), b) -> b
forall a b. (a, b) -> b
snd
    (Stream m ((), b) -> Stream m b)
-> (Stream m a -> Stream m ((), b)) -> Stream m a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> Stream m (AbsTime, ((), a))
-> Stream m ((), b)
forall (m :: * -> *) k a b.
(MonadAsync m, Ord k) =>
Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> Stream m (AbsTime, (k, a))
-> Stream m (k, b)
classifySessionsBy
        Double
0.1 Bool
False (m Bool -> Int -> m Bool
forall a b. a -> b -> a
const (Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False)) Double
timeout (Int -> Fold m a b -> Fold m a b
forall (m :: * -> *) a b.
Monad m =>
Int -> Fold m a b -> Fold m a b
Fold.take Int
n Fold m a b
f)
    (Stream m (AbsTime, ((), a)) -> Stream m ((), b))
-> (Stream m a -> Stream m (AbsTime, ((), a)))
-> Stream m a
-> Stream m ((), b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m ((), a) -> Stream m (AbsTime, ((), a))
forall (m :: * -> *) a.
MonadIO m =>
Stream m a -> Stream m (AbsTime, a)
Stream.timestamped
    (Stream m ((), a) -> Stream m (AbsTime, ((), a)))
-> (Stream m a -> Stream m ((), a))
-> Stream m a
-> Stream m (AbsTime, ((), a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> ((), a)) -> Stream m a -> Stream m ((), a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((),)

------------------------------------------------------------------------------
-- Windowed classification
------------------------------------------------------------------------------

-- TODO: To mark the position in space or time we can have Indexed or
-- TimeStamped types. That can make it easy to deal with the position indices
-- or timestamps.

------------------------------------------------------------------------------
-- Keyed Sliding Windows
------------------------------------------------------------------------------

{-
{-# INLINABLE classifySlidingChunks #-}
classifySlidingChunks
    :: (IsStream t, MonadAsync m, Ord k)
    => Int              -- ^ window size
    -> Int              -- ^ window slide
    -> Fold m a b       -- ^ Fold to be applied to window events
    -> t m (k, a, Bool) -- ^ window key, data, close event
    -> t m (k, b)
classifySlidingChunks wsize wslide (Fold step initial extract) str
    = undefined

-- XXX Another variant could be to slide the window on an event, e.g. in TCP we
-- slide the send window when an ack is received and we slide the receive
-- window when a sequence is complete. Sliding is stateful in case of TCP,
-- sliding releases the send buffer or makes data available to the user from
-- the receive buffer.
{-# INLINABLE classifySlidingSessions #-}
classifySlidingSessions
    :: (IsStream t, MonadAsync m, Ord k)
    => Double         -- ^ timer tick in seconds
    -> Double         -- ^ time window size
    -> Double         -- ^ window slide
    -> Fold m a b     -- ^ Fold to be applied to window events
    -> t m (k, a, Bool, AbsTime) -- ^ window key, data, close flag, timestamp
    -> t m (k, b)
classifySlidingSessions tick interval slide (Fold step initial extract) str
    = undefined
-}

------------------------------------------------------------------------------
-- Sliding Window Buffers
------------------------------------------------------------------------------

-- These buffered versions could be faster than concurrent incremental folds of
-- all overlapping windows as in many cases we may not need all the values to
-- compute the fold, we can just compute the result using the old value and new
-- value.  However, we may need the buffer once in a while, for example for
-- string search we usually compute the hash incrementally but when the hash
-- matches the hash of the pattern we need to compare the whole string.
--
-- XXX we should be able to implement sequence based splitting combinators
-- using this combinator.

{-
-- | Buffer n elements of the input in a ring buffer. When t new elements are
-- collected, slide the window to remove the same number of oldest elements,
-- insert the new elements, and apply an incremental fold on the sliding
-- window, supplying the outgoing elements, the new ring buffer as arguments.
slidingChunkBuffer
    :: (IsStream t, Monad m, Ord a, Unboxed a)
    => Int -- window size
    -> Int -- window slide
    -> Fold m (Ring a, Array a) b
    -> t m a
    -> t m b
slidingChunkBuffer = undefined

-- Buffer n seconds worth of stream elements of the input in a radix tree.
-- Every t seconds, remove the items that are older than n seconds, and apply
-- an incremental fold on the sliding window, supplying the outgoing elements,
-- and the new radix tree buffer as arguments.
slidingSessionBuffer
    :: (IsStream t, Monad m, Ord a, Unboxed a)
    => Int    -- window size
    -> Int    -- tick size
    -> Fold m (RTree a, Array a) b
    -> t m a
    -> t m b
slidingSessionBuffer = undefined
-}

------------------------------------------------------------------------------
-- Keyed Session Windows
------------------------------------------------------------------------------

{-
-- | Keyed variable size space windows. Close the window if we do not receive a
-- window event in the next "spaceout" elements.
{-# INLINABLE classifyChunksBy #-}
classifyChunksBy
    :: (IsStream t, MonadAsync m, Ord k)
    => Int   -- ^ window spaceout (spread)
    -> Bool  -- ^ reset the spaceout when a chunk window element is received
    -> Fold m a b       -- ^ Fold to be applied to chunk window elements
    -> t m (k, a, Bool) -- ^ chunk key, data, last element
    -> t m (k, b)
classifyChunksBy spanout reset (Fold step initial extract) str = undefined

-- | Like 'classifyChunksOf' but the chunk size is reset if an element is
-- received within the chunk size window. The chunk gets closed only if no
-- element is received within the chunk window.
--
{-# INLINABLE classifyKeepAliveChunks #-}
classifyKeepAliveChunks
    :: (IsStream t, MonadAsync m, Ord k)
    => Int   -- ^ window spaceout (spread)
    -> Fold m a b       -- ^ Fold to be applied to chunk window elements
    -> t m (k, a, Bool) -- ^ chunk key, data, last element
    -> t m (k, b)
classifyKeepAliveChunks spanout = classifyChunksBy spanout True
-}

data SessionState t m f s b = SessionState
    { forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
sessionCurTime :: !AbsTime  -- ^ time since last event
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
sessionEventTime :: !AbsTime -- ^ time as per last event
    -- We can use the Map size instead of maintaining a count, but if we have
    -- to switch to HashMap then it can be useful.
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Int
sessionCount :: !Int -- ^ total number of sessions in progress
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Heap (Entry AbsTime (Key f))
sessionTimerHeap :: H.Heap (H.Entry AbsTime (Key f)) -- ^ heap for timeouts
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> f s
sessionKeyValueMap :: f s -- ^ Stored sessions for keys
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> t m (Key f, b)
sessionOutputStream :: t (m :: Type -> Type) (Key f, b) -- ^ Completed sessions
    }

data SessionEntry s = LiveSession !AbsTime !s | ZombieSession

-- delete from map and output the fold accumulator
ejectEntry :: (Monad m, IsMap f) =>
    (acc -> m b)
    -> heap
    -> f entry
    -> Stream m (Key f, b)
    -> Int
    -> acc
    -> Key f
    -> m (heap, f entry, Stream m (Key f, b), Int)
ejectEntry :: forall (m :: * -> *) (f :: * -> *) acc b heap entry.
(Monad m, IsMap f) =>
(acc -> m b)
-> heap
-> f entry
-> Stream m (Key f, b)
-> Int
-> acc
-> Key f
-> m (heap, f entry, Stream m (Key f, b), Int)
ejectEntry acc -> m b
extract heap
hp f entry
mp Stream m (Key f, b)
out Int
cnt acc
acc Key f
key = do
    b
sess <- acc -> m b
extract acc
acc
    let out1 :: Stream m (Key f, b)
out1 = (Key f, b) -> Stream m (Key f, b) -> Stream m (Key f, b)
forall (m :: * -> *) a.
Applicative m =>
a -> Stream m a -> Stream m a
Stream.cons (Key f
key, b
sess) Stream m (Key f, b)
out
    let mp1 :: f entry
mp1 = Key f -> f entry -> f entry
forall (f :: * -> *) a. IsMap f => Key f -> f a -> f a
IsMap.mapDelete Key f
key f entry
mp
    (heap, f entry, Stream m (Key f, b), Int)
-> m (heap, f entry, Stream m (Key f, b), Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (heap
hp, f entry
mp1, Stream m (Key f, b)
out1, Int
cnt Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)

{-# NOINLINE flush #-}
flush :: (Monad m, IsMap f) =>
       (s -> m b)
    -> SessionState Stream m f (SessionEntry s) b
    -> m (SessionState Stream m f (SessionEntry s) b)
flush :: forall (m :: * -> *) (f :: * -> *) s b.
(Monad m, IsMap f) =>
(s -> m b)
-> SessionState Stream m f (SessionEntry s) b
-> m (SessionState Stream m f (SessionEntry s) b)
flush s -> m b
extract session :: SessionState Stream m f (SessionEntry s) b
session@SessionState{f (SessionEntry s)
Int
Heap (Entry AbsTime (Key f))
Stream m (Key f, b)
AbsTime
sessionOutputStream :: Stream m (Key f, b)
sessionKeyValueMap :: f (SessionEntry s)
sessionTimerHeap :: Heap (Entry AbsTime (Key f))
sessionCount :: Int
sessionEventTime :: AbsTime
sessionCurTime :: AbsTime
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> t m (Key f, b)
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> f s
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Heap (Entry AbsTime (Key f))
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Int
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
..} = do
    (Heap (Entry AbsTime (Key f))
hp', f (SessionEntry s)
mp', Stream m (Key f, b)
out, Int
count) <-
        (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
 Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
      Stream m (Key f, b), Int)
forall {f :: * -> *} {p}.
IsMap f =>
(Heap (Entry p (Key f)), f (SessionEntry s), Stream m (Key f, b),
 Int)
-> m (Heap (Entry p (Key f)), f (SessionEntry s),
      Stream m (Key f, b), Int)
ejectAll
            ( Heap (Entry AbsTime (Key f))
sessionTimerHeap
            , f (SessionEntry s)
sessionKeyValueMap
            , Stream m (Key f, b)
forall (m :: * -> *) a. Applicative m => Stream m a
Stream.nil
            , Int
sessionCount
            )
    SessionState Stream m f (SessionEntry s) b
-> m (SessionState Stream m f (SessionEntry s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (SessionState Stream m f (SessionEntry s) b
 -> m (SessionState Stream m f (SessionEntry s) b))
-> SessionState Stream m f (SessionEntry s) b
-> m (SessionState Stream m f (SessionEntry s) b)
forall a b. (a -> b) -> a -> b
$ SessionState Stream m f (SessionEntry s) b
session
        { sessionCount :: Int
sessionCount = Int
count
        , sessionTimerHeap :: Heap (Entry AbsTime (Key f))
sessionTimerHeap = Heap (Entry AbsTime (Key f))
hp'
        , sessionKeyValueMap :: f (SessionEntry s)
sessionKeyValueMap = f (SessionEntry s)
mp'
        , sessionOutputStream :: Stream m (Key f, b)
sessionOutputStream = Stream m (Key f, b)
out
        }

    where

    ejectAll :: (Heap (Entry p (Key f)), f (SessionEntry s), Stream m (Key f, b),
 Int)
-> m (Heap (Entry p (Key f)), f (SessionEntry s),
      Stream m (Key f, b), Int)
ejectAll (Heap (Entry p (Key f))
hp, f (SessionEntry s)
mp, Stream m (Key f, b)
out, !Int
cnt) = do
        let hres :: Maybe (Entry p (Key f), Heap (Entry p (Key f)))
hres = Heap (Entry p (Key f))
-> Maybe (Entry p (Key f), Heap (Entry p (Key f)))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry p (Key f))
hp
        case Maybe (Entry p (Key f), Heap (Entry p (Key f)))
hres of
            Just (Entry p
_ Key f
key, Heap (Entry p (Key f))
hp1) -> do
                (Heap (Entry p (Key f)), f (SessionEntry s), Stream m (Key f, b),
 Int)
r <- case Key f -> f (SessionEntry s) -> Maybe (SessionEntry s)
forall (f :: * -> *) a. IsMap f => Key f -> f a -> Maybe a
IsMap.mapLookup Key f
key f (SessionEntry s)
mp of
                    Maybe (SessionEntry s)
Nothing -> (Heap (Entry p (Key f)), f (SessionEntry s), Stream m (Key f, b),
 Int)
-> m (Heap (Entry p (Key f)), f (SessionEntry s),
      Stream m (Key f, b), Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry p (Key f))
hp1, f (SessionEntry s)
mp, Stream m (Key f, b)
out, Int
cnt)
                    Just SessionEntry s
ZombieSession ->
                        (Heap (Entry p (Key f)), f (SessionEntry s), Stream m (Key f, b),
 Int)
-> m (Heap (Entry p (Key f)), f (SessionEntry s),
      Stream m (Key f, b), Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry p (Key f))
hp1, Key f -> f (SessionEntry s) -> f (SessionEntry s)
forall (f :: * -> *) a. IsMap f => Key f -> f a -> f a
IsMap.mapDelete Key f
key f (SessionEntry s)
mp, Stream m (Key f, b)
out, Int
cnt)
                    Just (LiveSession AbsTime
_ s
acc) ->
                        (s -> m b)
-> Heap (Entry p (Key f))
-> f (SessionEntry s)
-> Stream m (Key f, b)
-> Int
-> s
-> Key f
-> m (Heap (Entry p (Key f)), f (SessionEntry s),
      Stream m (Key f, b), Int)
forall (m :: * -> *) (f :: * -> *) acc b heap entry.
(Monad m, IsMap f) =>
(acc -> m b)
-> heap
-> f entry
-> Stream m (Key f, b)
-> Int
-> acc
-> Key f
-> m (heap, f entry, Stream m (Key f, b), Int)
ejectEntry s -> m b
extract Heap (Entry p (Key f))
hp1 f (SessionEntry s)
mp Stream m (Key f, b)
out Int
cnt s
acc Key f
key
                (Heap (Entry p (Key f)), f (SessionEntry s), Stream m (Key f, b),
 Int)
-> m (Heap (Entry p (Key f)), f (SessionEntry s),
      Stream m (Key f, b), Int)
ejectAll (Heap (Entry p (Key f)), f (SessionEntry s), Stream m (Key f, b),
 Int)
r
            Maybe (Entry p (Key f), Heap (Entry p (Key f)))
Nothing -> do
                Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (f (SessionEntry s) -> Bool
forall (f :: * -> *) a. IsMap f => f a -> Bool
IsMap.mapNull f (SessionEntry s)
mp) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
                (Heap (Entry p (Key f)), f (SessionEntry s), Stream m (Key f, b),
 Int)
-> m (Heap (Entry p (Key f)), f (SessionEntry s),
      Stream m (Key f, b), Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry p (Key f))
hp, f (SessionEntry s)
mp, Stream m (Key f, b)
out, Int
cnt)

{-# NOINLINE ejectOne #-}
ejectOne :: (IsMap f, Monad m) =>
       Bool
    -> (acc -> m b)
    -> ( H.Heap (Entry AbsTime (Key f))
       , f (SessionEntry acc)
       , Stream m (Key f, b)
       , Int
       )
    -> m ( H.Heap (Entry AbsTime (Key f))
         , f (SessionEntry acc)
         , Stream m (Key f, b), Int
         )
ejectOne :: forall (f :: * -> *) (m :: * -> *) acc b.
(IsMap f, Monad m) =>
Bool
-> (acc -> m b)
-> (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
    Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
ejectOne Bool
reset acc -> m b
extract = (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
 Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
forall {f :: * -> *}.
IsMap f =>
(Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
 Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
go

    where

    go :: (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
 Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
go (Heap (Entry AbsTime (Key f))
hp, f (SessionEntry acc)
mp, Stream m (Key f, b)
out, !Int
cnt) = do
        let hres :: Maybe (Entry AbsTime (Key f), Heap (Entry AbsTime (Key f)))
hres = Heap (Entry AbsTime (Key f))
-> Maybe (Entry AbsTime (Key f), Heap (Entry AbsTime (Key f)))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry AbsTime (Key f))
hp
        case Maybe (Entry AbsTime (Key f), Heap (Entry AbsTime (Key f)))
hres of
            Just (Entry AbsTime
expiry Key f
key, Heap (Entry AbsTime (Key f))
hp1) ->
                case Key f -> f (SessionEntry acc) -> Maybe (SessionEntry acc)
forall (f :: * -> *) a. IsMap f => Key f -> f a -> Maybe a
IsMap.mapLookup Key f
key f (SessionEntry acc)
mp of
                    Maybe (SessionEntry acc)
Nothing -> (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
 Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
go (Heap (Entry AbsTime (Key f))
hp1, f (SessionEntry acc)
mp, Stream m (Key f, b)
out, Int
cnt)
                    Just SessionEntry acc
ZombieSession ->
                        (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
 Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
go (Heap (Entry AbsTime (Key f))
hp1, Key f -> f (SessionEntry acc) -> f (SessionEntry acc)
forall (f :: * -> *) a. IsMap f => Key f -> f a -> f a
IsMap.mapDelete Key f
key f (SessionEntry acc)
mp, Stream m (Key f, b)
out, Int
cnt)
                    Just (LiveSession AbsTime
expiry1 acc
acc) -> do
                        if Bool -> Bool
not Bool
reset Bool -> Bool -> Bool
|| AbsTime
expiry1 AbsTime -> AbsTime -> Bool
forall a. Ord a => a -> a -> Bool
<= AbsTime
expiry
                        then (acc -> m b)
-> Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> Stream m (Key f, b)
-> Int
-> acc
-> Key f
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
forall (m :: * -> *) (f :: * -> *) acc b heap entry.
(Monad m, IsMap f) =>
(acc -> m b)
-> heap
-> f entry
-> Stream m (Key f, b)
-> Int
-> acc
-> Key f
-> m (heap, f entry, Stream m (Key f, b), Int)
ejectEntry acc -> m b
extract Heap (Entry AbsTime (Key f))
hp1 f (SessionEntry acc)
mp Stream m (Key f, b)
out Int
cnt acc
acc Key f
key
                        else
                            -- reset the session timeout and continue
                            let hp2 :: Heap (Entry AbsTime (Key f))
hp2 = Entry AbsTime (Key f)
-> Heap (Entry AbsTime (Key f)) -> Heap (Entry AbsTime (Key f))
forall a. Ord a => a -> Heap a -> Heap a
H.insert (AbsTime -> Key f -> Entry AbsTime (Key f)
forall p a. p -> a -> Entry p a
Entry AbsTime
expiry1 Key f
key) Heap (Entry AbsTime (Key f))
hp1
                            in (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
 Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
go (Heap (Entry AbsTime (Key f))
hp2, f (SessionEntry acc)
mp, Stream m (Key f, b)
out, Int
cnt)
            Maybe (Entry AbsTime (Key f), Heap (Entry AbsTime (Key f)))
Nothing -> do
                Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (f (SessionEntry acc) -> Bool
forall (f :: * -> *) a. IsMap f => f a -> Bool
IsMap.mapNull f (SessionEntry acc)
mp) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
                (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
 Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime (Key f))
hp, f (SessionEntry acc)
mp, Stream m (Key f, b)
out, Int
cnt)

{-# NOINLINE ejectExpired #-}
ejectExpired :: (IsMap f, Monad m) =>
       Bool
    -> (Int -> m Bool)
    -> (acc -> m b)
    -> SessionState Stream m f (SessionEntry acc) b
    -> AbsTime
    -> m (SessionState Stream m f (SessionEntry acc) b)
ejectExpired :: forall (f :: * -> *) (m :: * -> *) acc b.
(IsMap f, Monad m) =>
Bool
-> (Int -> m Bool)
-> (acc -> m b)
-> SessionState Stream m f (SessionEntry acc) b
-> AbsTime
-> m (SessionState Stream m f (SessionEntry acc) b)
ejectExpired Bool
reset Int -> m Bool
ejectPred acc -> m b
extract session :: SessionState Stream m f (SessionEntry acc) b
session@SessionState{f (SessionEntry acc)
Int
Heap (Entry AbsTime (Key f))
Stream m (Key f, b)
AbsTime
sessionOutputStream :: Stream m (Key f, b)
sessionKeyValueMap :: f (SessionEntry acc)
sessionTimerHeap :: Heap (Entry AbsTime (Key f))
sessionCount :: Int
sessionEventTime :: AbsTime
sessionCurTime :: AbsTime
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> t m (Key f, b)
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> f s
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Heap (Entry AbsTime (Key f))
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Int
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
..} AbsTime
curTime = do
    (Heap (Entry AbsTime (Key f))
hp', f (SessionEntry acc)
mp', Stream m (Key f, b)
out, Int
count) <-
        Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> Stream m (Key f, b)
-> Int
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
forall {f :: * -> *}.
IsMap f =>
Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> Stream m (Key f, b)
-> Int
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
ejectLoop
            Heap (Entry AbsTime (Key f))
sessionTimerHeap f (SessionEntry acc)
sessionKeyValueMap Stream m (Key f, b)
forall (m :: * -> *) a. Applicative m => Stream m a
Stream.nil Int
sessionCount
    SessionState Stream m f (SessionEntry acc) b
-> m (SessionState Stream m f (SessionEntry acc) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (SessionState Stream m f (SessionEntry acc) b
 -> m (SessionState Stream m f (SessionEntry acc) b))
-> SessionState Stream m f (SessionEntry acc) b
-> m (SessionState Stream m f (SessionEntry acc) b)
forall a b. (a -> b) -> a -> b
$ SessionState Stream m f (SessionEntry acc) b
session
        { sessionCurTime :: AbsTime
sessionCurTime = AbsTime
curTime
        , sessionCount :: Int
sessionCount = Int
count
        , sessionTimerHeap :: Heap (Entry AbsTime (Key f))
sessionTimerHeap = Heap (Entry AbsTime (Key f))
hp'
        , sessionKeyValueMap :: f (SessionEntry acc)
sessionKeyValueMap = f (SessionEntry acc)
mp'
        , sessionOutputStream :: Stream m (Key f, b)
sessionOutputStream = Stream m (Key f, b)
out
        }

    where

    ejectLoop :: Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> Stream m (Key f, b)
-> Int
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
ejectLoop Heap (Entry AbsTime (Key f))
hp f (SessionEntry acc)
mp Stream m (Key f, b)
out !Int
cnt = do
        let hres :: Maybe (Entry AbsTime (Key f), Heap (Entry AbsTime (Key f)))
hres = Heap (Entry AbsTime (Key f))
-> Maybe (Entry AbsTime (Key f), Heap (Entry AbsTime (Key f)))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry AbsTime (Key f))
hp
        case Maybe (Entry AbsTime (Key f), Heap (Entry AbsTime (Key f)))
hres of
            Just (Entry AbsTime
expiry Key f
key, Heap (Entry AbsTime (Key f))
hp1) -> do
                (Bool
eject, Bool
force) <-
                    if AbsTime
curTime AbsTime -> AbsTime -> Bool
forall a. Ord a => a -> a -> Bool
>= AbsTime
expiry
                    then (Bool, Bool) -> m (Bool, Bool)
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
True, Bool
False)
                    else do
                        Bool
r <- Int -> m Bool
ejectPred Int
cnt
                        (Bool, Bool) -> m (Bool, Bool)
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
r, Bool
r)
                if Bool
eject
                then
                    case Key f -> f (SessionEntry acc) -> Maybe (SessionEntry acc)
forall (f :: * -> *) a. IsMap f => Key f -> f a -> Maybe a
IsMap.mapLookup Key f
key f (SessionEntry acc)
mp of
                        Maybe (SessionEntry acc)
Nothing -> Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> Stream m (Key f, b)
-> Int
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
ejectLoop Heap (Entry AbsTime (Key f))
hp1 f (SessionEntry acc)
mp Stream m (Key f, b)
out Int
cnt
                        Just SessionEntry acc
ZombieSession ->
                            Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> Stream m (Key f, b)
-> Int
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
ejectLoop Heap (Entry AbsTime (Key f))
hp1 (Key f -> f (SessionEntry acc) -> f (SessionEntry acc)
forall (f :: * -> *) a. IsMap f => Key f -> f a -> f a
IsMap.mapDelete Key f
key f (SessionEntry acc)
mp) Stream m (Key f, b)
out Int
cnt
                        Just (LiveSession AbsTime
expiry1 acc
acc) -> do
                            if AbsTime
expiry1 AbsTime -> AbsTime -> Bool
forall a. Ord a => a -> a -> Bool
<= AbsTime
curTime Bool -> Bool -> Bool
|| Bool -> Bool
not Bool
reset Bool -> Bool -> Bool
|| Bool
force
                            then do
                                (Heap (Entry AbsTime (Key f))
hp2,f (SessionEntry acc)
mp1,Stream m (Key f, b)
out1,Int
cnt1) <-
                                    (acc -> m b)
-> Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> Stream m (Key f, b)
-> Int
-> acc
-> Key f
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
forall (m :: * -> *) (f :: * -> *) acc b heap entry.
(Monad m, IsMap f) =>
(acc -> m b)
-> heap
-> f entry
-> Stream m (Key f, b)
-> Int
-> acc
-> Key f
-> m (heap, f entry, Stream m (Key f, b), Int)
ejectEntry acc -> m b
extract Heap (Entry AbsTime (Key f))
hp1 f (SessionEntry acc)
mp Stream m (Key f, b)
out Int
cnt acc
acc Key f
key
                                Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> Stream m (Key f, b)
-> Int
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
ejectLoop Heap (Entry AbsTime (Key f))
hp2 f (SessionEntry acc)
mp1 Stream m (Key f, b)
out1 Int
cnt1
                            else
                                -- reset the session timeout and continue
                                let hp2 :: Heap (Entry AbsTime (Key f))
hp2 = Entry AbsTime (Key f)
-> Heap (Entry AbsTime (Key f)) -> Heap (Entry AbsTime (Key f))
forall a. Ord a => a -> Heap a -> Heap a
H.insert (AbsTime -> Key f -> Entry AbsTime (Key f)
forall p a. p -> a -> Entry p a
Entry AbsTime
expiry1 Key f
key) Heap (Entry AbsTime (Key f))
hp1
                                in Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> Stream m (Key f, b)
-> Int
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
ejectLoop Heap (Entry AbsTime (Key f))
hp2 f (SessionEntry acc)
mp Stream m (Key f, b)
out Int
cnt
                else (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
 Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime (Key f))
hp, f (SessionEntry acc)
mp, Stream m (Key f, b)
out, Int
cnt)
            Maybe (Entry AbsTime (Key f), Heap (Entry AbsTime (Key f)))
Nothing -> do
                Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (f (SessionEntry acc) -> Bool
forall (f :: * -> *) a. IsMap f => f a -> Bool
IsMap.mapNull f (SessionEntry acc)
mp) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
                (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
 Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime (Key f))
hp, f (SessionEntry acc)
mp, Stream m (Key f, b)
out, Int
cnt)

-- XXX Use mutable IORef in accumulator
{-# INLINE classifySessionsByGeneric #-}
classifySessionsByGeneric
    :: forall m f a b. (MonadAsync m, IsMap f)
    => Proxy (f :: (Type -> Type))
    -> Double         -- ^ timer tick in seconds
    -> Bool           -- ^ reset the timer when an event is received
    -> (Int -> m Bool) -- ^ predicate to eject sessions based on session count
    -> Double         -- ^ session timeout in seconds
    -> Fold m a b  -- ^ Fold to be applied to session data
    -> Stream m (AbsTime, (Key f, a)) -- ^ timestamp, (session key, session
                                      -- data)
    -> Stream m (Key f, b) -- ^ session key, fold result
classifySessionsByGeneric :: forall (m :: * -> *) (f :: * -> *) a b.
(MonadAsync m, IsMap f) =>
Proxy f
-> Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> Stream m (AbsTime, (Key f, a))
-> Stream m (Key f, b)
classifySessionsByGeneric Proxy f
_ Double
tick Bool
reset Int -> m Bool
ejectPred Double
tmout
    (Fold s -> a -> m (Step s b)
step m (Step s b)
initial s -> m b
extract s -> m b
final) Stream m (AbsTime, (Key f, a))
input =
    Unfold m (SessionState Stream m f (SessionEntry s) b) (Key f, b)
-> Stream m (SessionState Stream m f (SessionEntry s) b)
-> Stream m (Key f, b)
forall (m :: * -> *) a b.
Monad m =>
Unfold m a b -> Stream m a -> Stream m b
Stream.unfoldMany ((SessionState Stream m f (SessionEntry s) b -> Stream m (Key f, b))
-> Unfold m (Stream m (Key f, b)) (Key f, b)
-> Unfold m (SessionState Stream m f (SessionEntry s) b) (Key f, b)
forall a c (m :: * -> *) b.
(a -> c) -> Unfold m c b -> Unfold m a b
Unfold.lmap SessionState Stream m f (SessionEntry s) b -> Stream m (Key f, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> t m (Key f, b)
sessionOutputStream Unfold m (Stream m (Key f, b)) (Key f, b)
forall (m :: * -> *) a. Applicative m => Unfold m (Stream m a) a
Unfold.fromStream)
        (Stream m (SessionState Stream m f (SessionEntry s) b)
 -> Stream m (Key f, b))
-> Stream m (SessionState Stream m f (SessionEntry s) b)
-> Stream m (Key f, b)
forall a b. (a -> b) -> a -> b
$ (SessionState Stream m f (SessionEntry s) b
 -> Maybe (AbsTime, (Key f, a))
 -> m (SessionState Stream m f (SessionEntry s) b))
-> m (SessionState Stream m f (SessionEntry s) b)
-> (SessionState Stream m f (SessionEntry s) b
    -> m (SessionState Stream m f (SessionEntry s) b))
-> Stream m (Maybe (AbsTime, (Key f, a)))
-> Stream m (SessionState Stream m f (SessionEntry s) b)
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b
Stream.scanlMAfter' SessionState Stream m f (SessionEntry s) b
-> Maybe (AbsTime, (Key f, a))
-> m (SessionState Stream m f (SessionEntry s) b)
forall {f :: * -> *}.
IsMap f =>
SessionState Stream m f (SessionEntry s) b
-> Maybe (AbsTime, (Key f, a))
-> m (SessionState Stream m f (SessionEntry s) b)
sstep (SessionState Stream m f (SessionEntry s) b
-> m (SessionState Stream m f (SessionEntry s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return SessionState Stream m f (SessionEntry s) b
forall {s} {b}. SessionState Stream m f s b
szero) ((s -> m b)
-> SessionState Stream m f (SessionEntry s) b
-> m (SessionState Stream m f (SessionEntry s) b)
forall (m :: * -> *) (f :: * -> *) s b.
(Monad m, IsMap f) =>
(s -> m b)
-> SessionState Stream m f (SessionEntry s) b
-> m (SessionState Stream m f (SessionEntry s) b)
flush s -> m b
final)
        (Stream m (Maybe (AbsTime, (Key f, a)))
 -> Stream m (SessionState Stream m f (SessionEntry s) b))
-> Stream m (Maybe (AbsTime, (Key f, a)))
-> Stream m (SessionState Stream m f (SessionEntry s) b)
forall a b. (a -> b) -> a -> b
$ m (Maybe (AbsTime, (Key f, a)))
-> Double
-> Stream m (Maybe (AbsTime, (Key f, a)))
-> Stream m (Maybe (AbsTime, (Key f, a)))
forall (m :: * -> *) a.
MonadAsync m =>
m a -> Double -> Stream m a -> Stream m a
interject (Maybe (AbsTime, (Key f, a)) -> m (Maybe (AbsTime, (Key f, a)))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (AbsTime, (Key f, a))
forall a. Maybe a
Nothing) Double
tick
        (Stream m (Maybe (AbsTime, (Key f, a)))
 -> Stream m (Maybe (AbsTime, (Key f, a))))
-> Stream m (Maybe (AbsTime, (Key f, a)))
-> Stream m (Maybe (AbsTime, (Key f, a)))
forall a b. (a -> b) -> a -> b
$ ((AbsTime, (Key f, a)) -> Maybe (AbsTime, (Key f, a)))
-> Stream m (AbsTime, (Key f, a))
-> Stream m (Maybe (AbsTime, (Key f, a)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (AbsTime, (Key f, a)) -> Maybe (AbsTime, (Key f, a))
forall a. a -> Maybe a
Just Stream m (AbsTime, (Key f, a))
input

    where

    timeoutMs :: RelTime
timeoutMs = MilliSecond64 -> RelTime
forall a. TimeUnit a => a -> RelTime
toRelTime (Double -> MilliSecond64
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
tmout Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000) :: MilliSecond64)
    tickMs :: RelTime
tickMs = MilliSecond64 -> RelTime
forall a. TimeUnit a => a -> RelTime
toRelTime (Double -> MilliSecond64
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
tick Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000) :: MilliSecond64)
    szero :: SessionState Stream m f s b
szero = SessionState :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
AbsTime
-> AbsTime
-> Int
-> Heap (Entry AbsTime (Key f))
-> f s
-> t m (Key f, b)
-> SessionState t m f s b
SessionState
        { sessionCurTime :: AbsTime
sessionCurTime = MilliSecond64 -> AbsTime
forall a. TimeUnit a => a -> AbsTime
toAbsTime (MilliSecond64
0 :: MilliSecond64)
        , sessionEventTime :: AbsTime
sessionEventTime = MilliSecond64 -> AbsTime
forall a. TimeUnit a => a -> AbsTime
toAbsTime (MilliSecond64
0 :: MilliSecond64)
        , sessionCount :: Int
sessionCount = Int
0
        , sessionTimerHeap :: Heap (Entry AbsTime (Key f))
sessionTimerHeap = Heap (Entry AbsTime (Key f))
forall a. Heap a
H.empty
        , sessionKeyValueMap :: f s
sessionKeyValueMap = forall {s}. f s
forall (f :: * -> *) a. IsMap f => f a
IsMap.mapEmpty :: f s
        , sessionOutputStream :: Stream m (Key f, b)
sessionOutputStream = Stream m (Key f, b)
forall (m :: * -> *) a. Applicative m => Stream m a
Stream.nil
        }

    -- We can eject sessions based on the current session count to limit
    -- memory consumption. There are two possible strategies:
    --
    -- 1) Eject old sessions or sessions beyond a certain/lower timeout
    -- threshold even before timeout, effectively reduce the timeout.
    -- 2) Drop creation of new sessions but keep accepting new events for the
    -- old ones.
    --
    -- We use the first strategy as of now.

    -- Got a new stream input element
    sstep :: SessionState Stream m f (SessionEntry s) b
-> Maybe (AbsTime, (Key f, a))
-> m (SessionState Stream m f (SessionEntry s) b)
sstep session :: SessionState Stream m f (SessionEntry s) b
session@SessionState{f (SessionEntry s)
Int
Heap (Entry AbsTime (Key f))
Stream m (Key f, b)
AbsTime
sessionOutputStream :: Stream m (Key f, b)
sessionKeyValueMap :: f (SessionEntry s)
sessionTimerHeap :: Heap (Entry AbsTime (Key f))
sessionCount :: Int
sessionEventTime :: AbsTime
sessionCurTime :: AbsTime
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> t m (Key f, b)
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> f s
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Heap (Entry AbsTime (Key f))
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Int
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
..} (Just (AbsTime
timestamp, (Key f
key, a
value))) = do
        -- XXX instead of a heap we could use a timer wheel.
        --
        -- XXX if the key is an Int, we can also use an IntMap for slightly
        -- better performance.
        --
        -- How it works:
        --
        -- Values for each key are collected in a map using the supplied fold.
        -- When we insert a key in the Map we insert an entry into the heap as
        -- well with the session expiry as the sort key.  The Map entry
        -- consists of the fold result, and the expiry time of the session. If
        -- "reset" is True the expiry time is readjusted whenever a new event
        -- is processed. If the fold terminates and a new session is started
        -- for the same key the expiry time is set to the first timestamp of
        -- the new session.
        --
        -- The heap must have at most one entry for any given key. The heap is
        -- processed periodically to remove the expired entries.  We pick up an
        -- expired entry from the top of the heap and if the session has
        -- expired based on the expiry time in the Map entry then we remove the
        -- session from the Map and yield its fold output. Otherwise, we
        -- reinsert the entry into the heap based on the current expiry in the
        -- Map entry.
        --
        -- If an entry is removed from the Map and not removed from the heap
        -- and in the meantime it is inserted again in the Map (using the same
        -- key) then how do we avoid inserting multiple entries in the heap?
        -- For this reason we maintain the invariant that the Map entry is
        -- removed only when the heap entry is removed. Even if the fold has
        -- finished we still keep a dummy Map entry (ZombieSession) until the
        -- heap entry is removed. That way if we have a Map entry we do not
        -- insert a heap entry because we know it is already there.
        -- XXX The ZombieSession mechanism does not work as expected as we
        -- ignore ZombieSession when inserting a new entry. Anyway, we can
        -- remove this mechanism as at most only two heap entries may be
        -- created and they will be ultimately cleaned up.
        --
        -- Heap processing needs the map and map processing needs the heap,
        -- therefore we cannot separate the two for modularity unless we have a
        -- way to achieve mutual recursion.
        --
        let curTime :: AbsTime
curTime = AbsTime -> AbsTime -> AbsTime
forall a. Ord a => a -> a -> a
max AbsTime
sessionEventTime AbsTime
timestamp
            mOld :: Maybe (SessionEntry s)
mOld = Key f -> f (SessionEntry s) -> Maybe (SessionEntry s)
forall (f :: * -> *) a. IsMap f => Key f -> f a -> Maybe a
IsMap.mapLookup Key f
key f (SessionEntry s)
sessionKeyValueMap
        let done :: b -> m (SessionState Stream m f (SessionEntry s) b)
done b
fb = do
                -- deleting a key from the heap is expensive, so we never
                -- delete a key from heap, we just purge it from the Map and it
                -- gets purged from the heap on timeout. We just need an extra
                -- lookup in the Map when the key is purged from the heap, that
                -- should not be expensive.
                --
                let (f (SessionEntry s)
mp, Int
cnt) = case Maybe (SessionEntry s)
mOld of
                        Just (LiveSession AbsTime
_ s
_) ->
                            ( Key f -> SessionEntry s -> f (SessionEntry s) -> f (SessionEntry s)
forall (f :: * -> *) a. IsMap f => Key f -> a -> f a -> f a
IsMap.mapInsert
                                Key f
key SessionEntry s
forall s. SessionEntry s
ZombieSession f (SessionEntry s)
sessionKeyValueMap
                            , Int
sessionCount Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1
                            )
                        Maybe (SessionEntry s)
_ -> (f (SessionEntry s)
sessionKeyValueMap, Int
sessionCount)
                SessionState Stream m f (SessionEntry s) b
-> m (SessionState Stream m f (SessionEntry s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (SessionState Stream m f (SessionEntry s) b
 -> m (SessionState Stream m f (SessionEntry s) b))
-> SessionState Stream m f (SessionEntry s) b
-> m (SessionState Stream m f (SessionEntry s) b)
forall a b. (a -> b) -> a -> b
$ SessionState Stream m f (SessionEntry s) b
session
                    { sessionCurTime :: AbsTime
sessionCurTime = AbsTime
curTime
                    , sessionEventTime :: AbsTime
sessionEventTime = AbsTime
curTime
                    , sessionCount :: Int
sessionCount = Int
cnt
                    , sessionKeyValueMap :: f (SessionEntry s)
sessionKeyValueMap = f (SessionEntry s)
mp
                    , sessionOutputStream :: Stream m (Key f, b)
sessionOutputStream = (Key f, b) -> Stream m (Key f, b)
forall (m :: * -> *) a. Applicative m => a -> Stream m a
Stream.fromPure (Key f
key, b
fb)
                    }
            partial :: s -> m (SessionState Stream m f (SessionEntry s) b)
partial s
fs1 = do
                let expiry :: AbsTime
expiry = AbsTime -> RelTime -> AbsTime
addToAbsTime AbsTime
timestamp RelTime
timeoutMs
                (Heap (Entry AbsTime (Key f))
hp1, f (SessionEntry s)
mp1, Stream m (Key f, b)
out1, Int
cnt1) <- do
                        let vars :: (Heap (Entry AbsTime (Key f)), f (SessionEntry s), Stream m a, Int)
vars = (Heap (Entry AbsTime (Key f))
sessionTimerHeap, f (SessionEntry s)
sessionKeyValueMap,
                                           Stream m a
forall (m :: * -> *) a. Applicative m => Stream m a
Stream.nil, Int
sessionCount)
                        case Maybe (SessionEntry s)
mOld of
                            -- inserting new entry
                            Maybe (SessionEntry s)
Nothing -> do
                                -- Eject a session from heap and map if needed
                                Bool
eject <- Int -> m Bool
ejectPred Int
sessionCount
                                (Heap (Entry AbsTime (Key f))
hp, f (SessionEntry s)
mp, Stream m (Key f, b)
out, Int
cnt) <-
                                    if Bool
eject
                                    then Bool
-> (s -> m b)
-> (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
    Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
      Stream m (Key f, b), Int)
forall (f :: * -> *) (m :: * -> *) acc b.
(IsMap f, Monad m) =>
Bool
-> (acc -> m b)
-> (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
    Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
ejectOne Bool
reset s -> m b
extract (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
 Stream m (Key f, b), Int)
forall {a}.
(Heap (Entry AbsTime (Key f)), f (SessionEntry s), Stream m a, Int)
vars
                                    else (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
 Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
      Stream m (Key f, b), Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
 Stream m (Key f, b), Int)
forall {a}.
(Heap (Entry AbsTime (Key f)), f (SessionEntry s), Stream m a, Int)
vars

                                -- Insert the new session in heap
                                let hp' :: Heap (Entry AbsTime (Key f))
hp' = Entry AbsTime (Key f)
-> Heap (Entry AbsTime (Key f)) -> Heap (Entry AbsTime (Key f))
forall a. Ord a => a -> Heap a -> Heap a
H.insert (AbsTime -> Key f -> Entry AbsTime (Key f)
forall p a. p -> a -> Entry p a
Entry AbsTime
expiry Key f
key) Heap (Entry AbsTime (Key f))
hp
                                 in (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
 Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
      Stream m (Key f, b), Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime (Key f))
hp', f (SessionEntry s)
mp, Stream m (Key f, b)
out, Int
cnt Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
                            -- updating old entry
                            Just SessionEntry s
_ -> (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
 Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
      Stream m (Key f, b), Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
 Stream m (Key f, b), Int)
forall {a}.
(Heap (Entry AbsTime (Key f)), f (SessionEntry s), Stream m a, Int)
vars

                let acc :: SessionEntry s
acc = AbsTime -> s -> SessionEntry s
forall s. AbsTime -> s -> SessionEntry s
LiveSession AbsTime
expiry s
fs1
                    mp2 :: f (SessionEntry s)
mp2 = Key f -> SessionEntry s -> f (SessionEntry s) -> f (SessionEntry s)
forall (f :: * -> *) a. IsMap f => Key f -> a -> f a -> f a
IsMap.mapInsert Key f
key SessionEntry s
acc f (SessionEntry s)
mp1
                SessionState Stream m f (SessionEntry s) b
-> m (SessionState Stream m f (SessionEntry s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (SessionState Stream m f (SessionEntry s) b
 -> m (SessionState Stream m f (SessionEntry s) b))
-> SessionState Stream m f (SessionEntry s) b
-> m (SessionState Stream m f (SessionEntry s) b)
forall a b. (a -> b) -> a -> b
$ SessionState :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
AbsTime
-> AbsTime
-> Int
-> Heap (Entry AbsTime (Key f))
-> f s
-> t m (Key f, b)
-> SessionState t m f s b
SessionState
                    { sessionCurTime :: AbsTime
sessionCurTime = AbsTime
curTime
                    , sessionEventTime :: AbsTime
sessionEventTime = AbsTime
curTime
                    , sessionCount :: Int
sessionCount = Int
cnt1
                    , sessionTimerHeap :: Heap (Entry AbsTime (Key f))
sessionTimerHeap = Heap (Entry AbsTime (Key f))
hp1
                    , sessionKeyValueMap :: f (SessionEntry s)
sessionKeyValueMap = f (SessionEntry s)
mp2
                    , sessionOutputStream :: Stream m (Key f, b)
sessionOutputStream = Stream m (Key f, b)
out1
                    }
        Step s b
res0 <- do
            case Maybe (SessionEntry s)
mOld of
                Just (LiveSession AbsTime
_ s
acc) -> Step s b -> m (Step s b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s b -> m (Step s b)) -> Step s b -> m (Step s b)
forall a b. (a -> b) -> a -> b
$ s -> Step s b
forall s b. s -> Step s b
Fold.Partial s
acc
                Maybe (SessionEntry s)
_ -> m (Step s b)
initial
        case Step s b
res0 of
            Fold.Done b
_ ->
                [Char] -> m (SessionState Stream m f (SessionEntry s) b)
forall a. HasCallStack => [Char] -> a
error ([Char] -> m (SessionState Stream m f (SessionEntry s) b))
-> [Char] -> m (SessionState Stream m f (SessionEntry s) b)
forall a b. (a -> b) -> a -> b
$ [Char]
"classifySessionsBy: "
                    [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"The supplied fold must consume at least one input"
            Fold.Partial s
fs -> do
                Step s b
res <- s -> a -> m (Step s b)
step s
fs a
value
                case Step s b
res of
                    Fold.Done b
fb -> b -> m (SessionState Stream m f (SessionEntry s) b)
forall {m :: * -> *} {m :: * -> *} {b}.
(Monad m, Applicative m) =>
b -> m (SessionState Stream m f (SessionEntry s) b)
done b
fb
                    Fold.Partial s
fs1 -> s -> m (SessionState Stream m f (SessionEntry s) b)
partial s
fs1

    -- Got a timer tick event
    sstep sessionState :: SessionState Stream m f (SessionEntry s) b
sessionState@SessionState{f (SessionEntry s)
Int
Heap (Entry AbsTime (Key f))
Stream m (Key f, b)
AbsTime
sessionOutputStream :: Stream m (Key f, b)
sessionKeyValueMap :: f (SessionEntry s)
sessionTimerHeap :: Heap (Entry AbsTime (Key f))
sessionCount :: Int
sessionEventTime :: AbsTime
sessionCurTime :: AbsTime
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> t m (Key f, b)
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> f s
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Heap (Entry AbsTime (Key f))
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Int
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
..} Maybe (AbsTime, (Key f, a))
Nothing =
        let curTime :: AbsTime
curTime = AbsTime -> RelTime -> AbsTime
addToAbsTime AbsTime
sessionCurTime RelTime
tickMs
        in Bool
-> (Int -> m Bool)
-> (s -> m b)
-> SessionState Stream m f (SessionEntry s) b
-> AbsTime
-> m (SessionState Stream m f (SessionEntry s) b)
forall (f :: * -> *) (m :: * -> *) acc b.
(IsMap f, Monad m) =>
Bool
-> (Int -> m Bool)
-> (acc -> m b)
-> SessionState Stream m f (SessionEntry acc) b
-> AbsTime
-> m (SessionState Stream m f (SessionEntry acc) b)
ejectExpired Bool
reset Int -> m Bool
ejectPred s -> m b
extract SessionState Stream m f (SessionEntry s) b
sessionState AbsTime
curTime

-- | @classifySessionsBy tick keepalive predicate timeout fold stream@
-- classifies an input event @stream@ consisting of  @(timestamp, (key,
-- value))@ into sessions based on the @key@, folding all the values
-- corresponding to the same key into a session using the supplied @fold@.
--
-- When the fold terminates or a @timeout@ occurs, a tuple consisting of the
-- session key and the folded value is emitted in the output stream. The
-- timeout is measured from the first event in the session.  If the @keepalive@
-- option is set to 'True' the timeout is reset to 0 whenever an event is
-- received.
--
-- The @timestamp@ in the input stream is an absolute time from some epoch,
-- characterizing the time when the input event was generated.  The notion of
-- current time is maintained by a monotonic event time clock using the
-- timestamps seen in the input stream. The latest timestamp seen till now is
-- used as the base for the current time.  When no new events are seen, a timer
-- is started with a clock resolution of @tick@ seconds. This timer is used to
-- detect session timeouts in the absence of new events.
--
-- To ensure an upper bound on the memory used the number of sessions can be
-- limited to an upper bound. If the ejection @predicate@ returns 'True', the
-- oldest session is ejected before inserting a new session.
--
-- When the stream ends any buffered sessions are ejected immediately.
--
-- If a session key is received even after a session has finished, another
-- session is created for that key.
--
-- >>> :{
-- Stream.fold (Fold.drainMapM print)
--     $ Stream.classifySessionsBy 1 False (const (return False)) 3 (Fold.take 3 Fold.toList)
--     $ Stream.timestamped
--     $ Stream.delay 0.1
--     $ Stream.fromList ((,) <$> [1,2,3] <*> ['a','b','c'])
-- :}
-- (1,"abc")
-- (2,"abc")
-- (3,"abc")
--
-- /Pre-release/
{-# INLINE classifySessionsBy #-}
classifySessionsBy
    :: (MonadAsync m, Ord k)
    => Double         -- ^ timer tick in seconds
    -> Bool           -- ^ reset the timer when an event is received
    -> (Int -> m Bool) -- ^ predicate to eject sessions based on session count
    -> Double         -- ^ session timeout in seconds
    -> Fold m a b  -- ^ Fold to be applied to session data
    -> Stream m (AbsTime, (k, a)) -- ^ timestamp, (session key, session data)
    -> Stream m (k, b) -- ^ session key, fold result
classifySessionsBy :: forall (m :: * -> *) k a b.
(MonadAsync m, Ord k) =>
Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> Stream m (AbsTime, (k, a))
-> Stream m (k, b)
classifySessionsBy = Proxy (Map k)
-> Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> Stream m (AbsTime, (Key (Map k), a))
-> Stream m (Key (Map k), b)
forall (m :: * -> *) (f :: * -> *) a b.
(MonadAsync m, IsMap f) =>
Proxy f
-> Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> Stream m (AbsTime, (Key f, a))
-> Stream m (Key f, b)
classifySessionsByGeneric (forall {k}. Proxy (Map k)
forall {k} (t :: k). Proxy t
Proxy :: Proxy (Map k))

-- | Same as 'classifySessionsBy' with a timer tick of 1 second and keepalive
-- option set to 'True'.
--
-- @
-- classifyKeepAliveSessions = classifySessionsBy 1 True
-- @
--
-- /Pre-release/
--
{-# INLINE classifyKeepAliveSessions #-}
classifyKeepAliveSessions ::
       (MonadAsync m, Ord k)
    => (Int -> m Bool) -- ^ predicate to eject sessions on session count
    -> Double -- ^ session inactive timeout
    -> Fold m a b -- ^ Fold to be applied to session payload data
    -> Stream m (AbsTime, (k, a)) -- ^ timestamp, (session key, session data)
    -> Stream m (k, b)
classifyKeepAliveSessions :: forall (m :: * -> *) k a b.
(MonadAsync m, Ord k) =>
(Int -> m Bool)
-> Double
-> Fold m a b
-> Stream m (AbsTime, (k, a))
-> Stream m (k, b)
classifyKeepAliveSessions = Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> Stream m (AbsTime, (k, a))
-> Stream m (k, b)
forall (m :: * -> *) k a b.
(MonadAsync m, Ord k) =>
Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> Stream m (AbsTime, (k, a))
-> Stream m (k, b)
classifySessionsBy Double
1 Bool
True

------------------------------------------------------------------------------
-- Keyed tumbling windows
------------------------------------------------------------------------------

-- Tumbling windows is a special case of sliding windows where the window slide
-- is the same as the window size. Or it can be a special case of session
-- windows where the reset flag is set to False.

-- XXX instead of using the early termination flag in the stream, we can use an
-- early terminating fold instead.

{-
-- | Split the stream into fixed size chunks of specified size. Within each
-- such chunk fold the elements in buckets identified by the keys. A particular
-- bucket fold can be terminated early if a closing flag is encountered in an
-- element for that key.
--
-- @since 0.7.0
{-# INLINABLE classifyChunksOf #-}
classifyChunksOf
    :: (IsStream t, MonadAsync m, Ord k)
    => Int              -- ^ window size
    -> Fold m a b       -- ^ Fold to be applied to window events
    -> t m (k, a, Bool) -- ^ window key, data, close event
    -> t m (k, b)
classifyChunksOf wsize = classifyChunksBy wsize False
-}

-- | Same as 'classifySessionsBy' with a timer tick of 1 second and keepalive
-- option set to 'False'.
--
-- >>> classifySessionsOf = Stream.classifySessionsBy 1 False
--
-- /Pre-release/
--
{-# INLINE classifySessionsOf #-}
classifySessionsOf ::
       (MonadAsync m, Ord k)
    => (Int -> m Bool) -- ^ predicate to eject sessions on session count
    -> Double -- ^ time window size
    -> Fold m a b -- ^ Fold to be applied to session data
    -> Stream m (AbsTime, (k, a)) -- ^ timestamp, (session key, session data)
    -> Stream m (k, b)
classifySessionsOf :: forall (m :: * -> *) k a b.
(MonadAsync m, Ord k) =>
(Int -> m Bool)
-> Double
-> Fold m a b
-> Stream m (AbsTime, (k, a))
-> Stream m (k, b)
classifySessionsOf = Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> Stream m (AbsTime, (k, a))
-> Stream m (k, b)
forall (m :: * -> *) k a b.
(MonadAsync m, Ord k) =>
Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> Stream m (AbsTime, (k, a))
-> Stream m (k, b)
classifySessionsBy Double
1 Bool
False

------------------------------------------------------------------------------
-- Sampling
------------------------------------------------------------------------------

-- | Continuously evaluate the input stream and sample the last event in each
-- time window of @n@ seconds.
--
-- This is also known as @throttle@ in some libraries.
--
-- >>> sampleIntervalEnd n = Stream.catMaybes . Stream.intervalsOf n Fold.latest
--
{-# INLINE sampleIntervalEnd #-}
sampleIntervalEnd :: MonadAsync m => Double -> Stream m a -> Stream m a
sampleIntervalEnd :: forall (m :: * -> *) a.
MonadAsync m =>
Double -> Stream m a -> Stream m a
sampleIntervalEnd Double
n = Stream m (Maybe a) -> Stream m a
forall (m :: * -> *) a. Monad m => Stream m (Maybe a) -> Stream m a
Stream.catMaybes (Stream m (Maybe a) -> Stream m a)
-> (Stream m a -> Stream m (Maybe a)) -> Stream m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Double -> Fold m a (Maybe a) -> Stream m a -> Stream m (Maybe a)
forall (m :: * -> *) a b.
MonadAsync m =>
Double -> Fold m a b -> Stream m a -> Stream m b
intervalsOf Double
n Fold m a (Maybe a)
forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
Fold.latest

-- | Like 'sampleInterval' but samples at the beginning of the time window.
--
-- >>> sampleIntervalStart n = Stream.catMaybes . Stream.intervalsOf n Fold.one
--
{-# INLINE sampleIntervalStart #-}
sampleIntervalStart :: MonadAsync m => Double -> Stream m a -> Stream m a
sampleIntervalStart :: forall (m :: * -> *) a.
MonadAsync m =>
Double -> Stream m a -> Stream m a
sampleIntervalStart Double
n = Stream m (Maybe a) -> Stream m a
forall (m :: * -> *) a. Monad m => Stream m (Maybe a) -> Stream m a
Stream.catMaybes (Stream m (Maybe a) -> Stream m a)
-> (Stream m a -> Stream m (Maybe a)) -> Stream m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Double -> Fold m a (Maybe a) -> Stream m a -> Stream m (Maybe a)
forall (m :: * -> *) a b.
MonadAsync m =>
Double -> Fold m a b -> Stream m a -> Stream m b
intervalsOf Double
n Fold m a (Maybe a)
forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
Fold.one

data BurstState t x =
      BurstNone
    | BurstWait !t !x
    | BurstDone !x
    | BurstDoneNext !x !t !x

{-# INLINE sampleBurst #-}
sampleBurst :: MonadAsync m => Bool -> Double -> Stream m a -> Stream m a
sampleBurst :: forall (m :: * -> *) a.
MonadAsync m =>
Bool -> Double -> Stream m a -> Stream m a
sampleBurst Bool
sampleAtEnd Double
gap Stream m a
xs =
    -- XXX Ideally we should schedule a timer event exactly after gap time,
    -- but the tick stream should work well as long as the timer
    -- granularity is small enough compared to the gap.
    (BurstState RelTime64 a -> Maybe a)
-> Stream m (BurstState RelTime64 a) -> Stream m a
forall (m :: * -> *) a b.
Monad m =>
(a -> Maybe b) -> Stream m a -> Stream m b
Stream.mapMaybe BurstState RelTime64 a -> Maybe a
forall {t} {a}. BurstState t a -> Maybe a
extract
        (Stream m (BurstState RelTime64 a) -> Stream m a)
-> Stream m (BurstState RelTime64 a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ Fold m (RelTime64, Maybe a) (BurstState RelTime64 a)
-> Stream m (RelTime64, Maybe a)
-> Stream m (BurstState RelTime64 a)
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
Stream.scan ((BurstState RelTime64 a
 -> (RelTime64, Maybe a) -> BurstState RelTime64 a)
-> BurstState RelTime64 a
-> Fold m (RelTime64, Maybe a) (BurstState RelTime64 a)
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Fold m a b
Fold.foldl' BurstState RelTime64 a
-> (RelTime64, Maybe a) -> BurstState RelTime64 a
forall {x}.
BurstState RelTime64 x
-> (RelTime64, Maybe x) -> BurstState RelTime64 x
step BurstState RelTime64 a
forall t x. BurstState t x
BurstNone)
        (Stream m (RelTime64, Maybe a)
 -> Stream m (BurstState RelTime64 a))
-> Stream m (RelTime64, Maybe a)
-> Stream m (BurstState RelTime64 a)
forall a b. (a -> b) -> a -> b
$ Stream m (Maybe a) -> Stream m (RelTime64, Maybe a)
forall (m :: * -> *) a.
MonadIO m =>
Stream m a -> Stream m (RelTime64, a)
Stream.timeIndexed
        (Stream m (Maybe a) -> Stream m (RelTime64, Maybe a))
-> Stream m (Maybe a) -> Stream m (RelTime64, Maybe a)
forall a b. (a -> b) -> a -> b
$ m (Maybe a) -> Double -> Stream m (Maybe a) -> Stream m (Maybe a)
forall (m :: * -> *) a.
MonadAsync m =>
m a -> Double -> Stream m a -> Stream m a
interject (Maybe a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing) Double
0.01 ((a -> Maybe a) -> Stream m a -> Stream m (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> Maybe a
forall a. a -> Maybe a
Just Stream m a
xs)

    where

    gap1 :: RelTime64
gap1 = NanoSecond64 -> RelTime64
forall a. TimeUnit64 a => a -> RelTime64
toRelTime64 (Int64 -> NanoSecond64
NanoSecond64 (Double -> Int64
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
gap Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
10Double -> Int -> Double
forall a b. (Num a, Integral b) => a -> b -> a
^(Int
9::Int))))

    {-# INLINE step #-}
    step :: BurstState RelTime64 x
-> (RelTime64, Maybe x) -> BurstState RelTime64 x
step BurstState RelTime64 x
BurstNone (RelTime64
t1, Just x
x1) = RelTime64 -> x -> BurstState RelTime64 x
forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t1 x
x1
    step BurstState RelTime64 x
BurstNone (RelTime64, Maybe x)
_ = BurstState RelTime64 x
forall t x. BurstState t x
BurstNone

    step (BurstDone x
_) (RelTime64
t1, Just x
x1) = RelTime64 -> x -> BurstState RelTime64 x
forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t1 x
x1
    step (BurstDone x
_) (RelTime64, Maybe x)
_ = BurstState RelTime64 x
forall t x. BurstState t x
BurstNone

    step old :: BurstState RelTime64 x
old@(BurstWait RelTime64
t0 x
x0) (RelTime64
t1, Maybe x
Nothing)
        | RelTime64
t1 RelTime64 -> RelTime64 -> RelTime64
forall a. Num a => a -> a -> a
- RelTime64
t0 RelTime64 -> RelTime64 -> Bool
forall a. Ord a => a -> a -> Bool
>= RelTime64
gap1 = x -> BurstState RelTime64 x
forall t x. x -> BurstState t x
BurstDone x
x0
        | Bool
otherwise = BurstState RelTime64 x
old
    -- This can happen due to scheduling delays, if we received back to
    -- back events spaced by more than the timeout without an
    -- intervening timeout event then we emit the old event instead of
    -- replacing it by the new.
    step (BurstWait RelTime64
t0 x
x0) (RelTime64
t1, Just x
x1)
        | RelTime64
t1 RelTime64 -> RelTime64 -> RelTime64
forall a. Num a => a -> a -> a
- RelTime64
t0 RelTime64 -> RelTime64 -> Bool
forall a. Ord a => a -> a -> Bool
>= RelTime64
gap1 = x -> RelTime64 -> x -> BurstState RelTime64 x
forall t x. x -> t -> x -> BurstState t x
BurstDoneNext x
x0 RelTime64
t1 x
x1
        | Bool
sampleAtEnd = RelTime64 -> x -> BurstState RelTime64 x
forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t1 x
x1
        | Bool
otherwise = RelTime64 -> x -> BurstState RelTime64 x
forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t1 x
x0

    step (BurstDoneNext x
_ RelTime64
t0 x
x0) (RelTime64
t1, Maybe x
Nothing)
        | RelTime64
t1 RelTime64 -> RelTime64 -> RelTime64
forall a. Num a => a -> a -> a
- RelTime64
t0 RelTime64 -> RelTime64 -> Bool
forall a. Ord a => a -> a -> Bool
>= RelTime64
gap1 = x -> BurstState RelTime64 x
forall t x. x -> BurstState t x
BurstDone x
x0
        | Bool
otherwise =  RelTime64 -> x -> BurstState RelTime64 x
forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t0 x
x0
    step (BurstDoneNext x
_ RelTime64
t0 x
x0) (RelTime64
t1, Just x
x1)
        | RelTime64
t1 RelTime64 -> RelTime64 -> RelTime64
forall a. Num a => a -> a -> a
- RelTime64
t0 RelTime64 -> RelTime64 -> Bool
forall a. Ord a => a -> a -> Bool
>= RelTime64
gap1 = x -> RelTime64 -> x -> BurstState RelTime64 x
forall t x. x -> t -> x -> BurstState t x
BurstDoneNext x
x0 RelTime64
t1 x
x1
        | Bool
sampleAtEnd = RelTime64 -> x -> BurstState RelTime64 x
forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t1 x
x1
        | Bool
otherwise = RelTime64 -> x -> BurstState RelTime64 x
forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t1 x
x0

    {-# INLINE extract #-}
    extract :: BurstState t a -> Maybe a
extract (BurstDoneNext a
x t
_ a
_) = a -> Maybe a
forall a. a -> Maybe a
Just a
x
    extract (BurstDone a
x) = a -> Maybe a
forall a. a -> Maybe a
Just a
x
    extract BurstState t a
_ = Maybe a
forall a. Maybe a
Nothing

-- | Sample one event at the end of each burst of events.  A burst is a group
-- of events close together in time, it ends when an event is spaced by more
-- than the specified time interval (in seconds) from the previous event.
--
-- This is known as @debounce@ in some libraries.
--
-- The clock granularity is 10 ms.
--
{-# INLINE sampleBurstEnd #-}
sampleBurstEnd :: MonadAsync m => Double -> Stream m a -> Stream m a
sampleBurstEnd :: forall (m :: * -> *) a.
MonadAsync m =>
Double -> Stream m a -> Stream m a
sampleBurstEnd = Bool -> Double -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
Bool -> Double -> Stream m a -> Stream m a
sampleBurst Bool
True

-- | Like 'sampleBurstEnd' but samples the event at the beginning of the burst
-- instead of at the end of it.
--
{-# INLINE sampleBurstStart #-}
sampleBurstStart :: MonadAsync m => Double -> Stream m a -> Stream m a
sampleBurstStart :: forall (m :: * -> *) a.
MonadAsync m =>
Double -> Stream m a -> Stream m a
sampleBurstStart = Bool -> Double -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
Bool -> Double -> Stream m a -> Stream m a
sampleBurst Bool
False

------------------------------------------------------------------------------
-- Lossy Buffering
------------------------------------------------------------------------------

-- XXX We could use 'maxBuffer Block/Drop/Rotate/Sample' instead. However we
-- may want to have the evaluation rate independent of the sampling rate. To
-- support that we can decouple evaluation and sampling in independent stages.
-- The sampling stage would strictly evaluate and sample, the evaluation stage
-- would control the evaluation rate.
--
-- bufferLatest - evaluate retaining only the latest element. Fork a thread
-- that keeps writing the stream to an IORef, another parallel thread reads
-- from the IORef.
--
-- bufferLatestN - Fork a thread that keeps writing to a sliding ring buffer,
-- and the stream unfolds the entire buffer when it is read, and the empty
-- buffer starts filling again in the same way.
--
-- bufferOldestN - Fork a thread that keeps writing to a buffer until it is
-- filled and keeps dropping once it is filled. When the stream is read the
-- buffer is unfolded to a stream and the empty buffer starts filling again.

-- | Evaluate the input stream continuously and keep only the oldest @n@
-- elements in the buffer, discard the new ones when the buffer is full.  When
-- the output stream is evaluated the collected buffer is streamed and the
-- buffer starts filling again.
--
-- /Unimplemented/
--
{-# INLINE bufferOldestN #-}
bufferOldestN :: -- MonadAsync m =>
    Int -> Stream m a -> Stream m a
bufferOldestN :: forall (m :: * -> *) a. Int -> Stream m a -> Stream m a
bufferOldestN = Int -> Stream m a -> Stream m a
forall a. HasCallStack => a
undefined

-- | Evaluate the input stream continuously and keep only the latest @n@
-- elements in a ring buffer, keep discarding the older ones to make space for
-- the new ones.  When the output stream is evaluated the buffer collected till
-- now is streamed and it starts filling again.
--
-- /Unimplemented/
--
{-# INLINE bufferLatestN #-}
bufferLatestN :: -- MonadAsync m =>
    Int -> Stream m a -> Stream m a
bufferLatestN :: forall (m :: * -> *) a. Int -> Stream m a -> Stream m a
bufferLatestN = Int -> Stream m a -> Stream m a
forall a. HasCallStack => a
undefined

-- | Always produce the latest available element from the stream without any
-- delay. The stream is continuously evaluated at the highest possible rate and
-- only the latest element is retained for sampling.
--
-- /Unimplemented/
--
{-# INLINE bufferLatest #-}
bufferLatest :: -- MonadAsync m =>
    Stream m a -> Stream m (Maybe a)
bufferLatest :: forall (m :: * -> *) a. Stream m a -> Stream m (Maybe a)
bufferLatest = Stream m a -> Stream m (Maybe a)
forall a. HasCallStack => a
undefined