Module      : Network.Nakadi.Internal.Committer.Shared
Description : Shared code for Committing Strategies
Copyright   : (c) Moritz Clasmeier 2018
License     : BSD3
Maintainer  : mtesseract@silverratio.net
Stability   : experimental
Portability : POSIX

{-# LANGUAGE MultiParamTypeClasses #-}

module Network.Nakadi.Internal.Committer.Shared where

import           Network.Nakadi.Internal.Prelude

import           Control.Lens
import           Control.Monad.Logger
import qualified Data.HashMap.Strict           as HashMap

import qualified Network.Nakadi.Internal.Lenses
                                               as L
import           Network.Nakadi.Internal.Types
import           Network.Nakadi.Subscriptions.Cursors

import           UnliftIO.STM

-- | This function commits all cursors in the provided map of staged
-- cursors.
  :: (MonadNakadi b m, MonadIO m)
  => (a -> SubscriptionCursor)
  -> SubscriptionEventStream
  -> TVar (StagedCursorsMap a)
  -> m ()
commitAllCursors extractCursor eventStream stagedCursorsTv = do
  stagedCursors <- liftIO . atomically $ swapTVar stagedCursorsTv HashMap.empty
  let cursors = map extractCursor $ HashMap.elems stagedCursors
  forM_ cursors (commitOneCursor eventStream)

-- | This function takes care of committing a single cursor. Exceptions will be
-- catched and logged, but the failure will NOT be propagated. This means that
-- Nakadi itself is in control of disconnecting us.
  :: (MonadIO m, MonadNakadi b m) => SubscriptionEventStream -> SubscriptionCursor -> m ()
commitOneCursor eventStream cursor = do
  config <- nakadiAsk
  catchAny (subscriptionCursorCommit eventStream [cursor])
    $ \exn -> nakadiLiftBase $ case config ^. L.logFunc of
        Just logFunc ->
          logFunc "nakadi-client" LevelWarn
            $  toLogStr
            $  "Failed to commit cursor "
            <> tshow cursor
            <> ": "
            <> tshow exn
        Nothing -> pure ()

-- | Naive cursor commit loop: We simply read every cursor and commit
-- it in order.
  :: (MonadNakadi b m, MonadIO m)
  => SubscriptionEventStream
  -> TBQueue (Int, SubscriptionCursor)
  -> m ()
unbufferedCommitLoop eventStream queue = do
  config <- nakadiAsk
  forever $ do
    (_nEvents, cursor) <- liftIO . atomically . readTBQueue $ queue
    catchAny (subscriptionCursorCommit eventStream [cursor])
      $ \exn -> nakadiLiftBase $ case config ^. L.logFunc of
          Just logFunc ->
            logFunc "nakadi-client" LevelWarn
              $  toLogStr
              $  "Failed to commit cursor "
              <> tshow cursor
              <> ": "
              <> tshow exn
          Nothing -> pure ()

cursorKey :: SubscriptionCursor -> (EventTypeName, PartitionName)
cursorKey cursor = (cursor ^. L.eventType, cursor ^. L.partition)