{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE MultiParamTypeClasses #-}
module Network.Nakadi.Internal.Committer.TimeBuffer where
import Network.Nakadi.Internal.Prelude
import qualified Control.Concurrent.Async.Timer
as Timer
import qualified Data.HashMap.Strict as HashMap
import Data.Function ( (&) )
import Network.Nakadi.Internal.Committer.Shared
import Network.Nakadi.Internal.Types
import UnliftIO.Async
import UnliftIO.STM
committerTimeBuffer
:: (MonadNakadi b m, MonadUnliftIO m, MonadMask m)
=> Int32
-> SubscriptionEventStream
-> TBQueue (Int, SubscriptionCursor)
-> m ()
committerTimeBuffer millis eventStream queue = do
let timerConf = Timer.defaultConf & Timer.setInitDelay (fromIntegral millis) & Timer.setInterval
(fromIntegral millis)
cursorsMap <- liftIO $ newTVarIO (HashMap.empty :: StagedCursorsMap SubscriptionCursor)
withAsync (cursorConsumer cursorsMap) $ \asyncCursorConsumer -> do
link asyncCursorConsumer
Timer.withAsyncTimer timerConf $ \timer -> forever $ do
Timer.wait timer
commitAllCursors identity eventStream cursorsMap
where
cursorConsumer cursorsMap = forever . liftIO . atomically $ do
(_, cursor) <- readTBQueue queue
modifyTVar cursorsMap (HashMap.insert (cursorKey cursor) cursor)