{-|
Module      : Network.Nakadi.Internal.Committer.TimeBuffer
Description : Implementation of TimeBuffer based Cursor Committing Strategy
Copyright   : (c) Moritz Clasmeier 2018
License     : BSD3
Maintainer  : mtesseract@silverratio.net
Stability   : experimental
Portability : POSIX
-}

{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE MultiParamTypeClasses #-}

module Network.Nakadi.Internal.Committer.TimeBuffer where

import           Network.Nakadi.Internal.Prelude

import qualified Control.Concurrent.Async.Timer
                                               as Timer
import qualified Data.HashMap.Strict           as HashMap

import           Data.Function                  ( (&) )
import           Network.Nakadi.Internal.Committer.Shared
import           Network.Nakadi.Internal.Types

import           UnliftIO.Async
import           UnliftIO.STM

-- | Implementation of the 'CommitTimeBuffer' strategy: We use an
-- async timer for committing cursors at specified intervals.
--
-- The 'StagedCursor's in the 'CommitTimeBuffer' case carry no
-- additional information, just the subscription cursors.
committerTimeBuffer
  :: (MonadNakadi b m, MonadUnliftIO m, MonadMask m)
  => Int32
  -> SubscriptionEventStream
  -> TBQueue (Int, SubscriptionCursor)
  -> m ()
committerTimeBuffer millis eventStream queue = do
  let timerConf = Timer.defaultConf & Timer.setInitDelay (fromIntegral millis) & Timer.setInterval
        (fromIntegral millis)
  cursorsMap <- liftIO $ newTVarIO (HashMap.empty :: StagedCursorsMap SubscriptionCursor)
  withAsync (cursorConsumer cursorsMap) $ \asyncCursorConsumer -> do
    link asyncCursorConsumer
    Timer.withAsyncTimer timerConf $ \timer -> forever $ do
      Timer.wait timer
      commitAllCursors identity eventStream cursorsMap
 where -- The cursorsConsumer drains the cursors queue and adds each
       -- cursor to the provided cursorsMap.
   cursorConsumer cursorsMap = forever . liftIO . atomically $ do
     (_, cursor) <- readTBQueue queue
     modifyTVar cursorsMap (HashMap.insert (cursorKey cursor) cursor)