{-|
Module      : Network.Nakadi.Internal.Committer
Description : Implementation of Cursor Committing Strategies
Copyright   : (c) Moritz Clasmeier 2017, 2018
License     : BSD3
Maintainer  : mtesseract@silverratio.net
Stability   : experimental
Portability : POSIX

This internal module implements cursor committing strategies to be
used by the subscription API.
-}

{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE ScopedTypeVariables   #-}

module Network.Nakadi.Internal.Committer where

import           Network.Nakadi.Internal.Prelude

import           Conduit
import           Control.Lens
import           UnliftIO.STM

import qualified Network.Nakadi.Internal.Lenses
                                               as L
import           Network.Nakadi.Internal.Types

import           Network.Nakadi.Internal.Committer.NoBuffer
import           Network.Nakadi.Internal.Committer.Shared
import           Network.Nakadi.Internal.Committer.SmartBuffer
import           Network.Nakadi.Internal.Committer.TimeBuffer

-- | Main function for the cursor committer thread. Logic depends on
-- the provided buffering strategy. This function dispatches to the
-- actual commit strategy implementations:
--
-- * 'committerNoBuffer' defined in @Network.Nakadi.Internal.Committer.NoBuffer@,
-- * 'committerTimeBuffer' defined in @Network.Nakadi.Internal.Committer.TimeBuffer@ and
-- * 'committerSmartBuffer' defined in @Network.Nakadi.Internal.Committer.SmartBuffer@.
subscriptionCommitter
  :: forall b m
   . (MonadNakadi b m, MonadUnliftIO m, MonadMask m)
  => CommitBufferingStrategy
  -> SubscriptionEventStream
  -> TBQueue (Int, SubscriptionCursor)
  -> m ()
subscriptionCommitter CommitNoBuffer            = committerNoBuffer
subscriptionCommitter (CommitTimeBuffer millis) = committerTimeBuffer millis
subscriptionCommitter CommitSmartBuffer         = committerSmartBuffer

-- | Sink which can be used as sink for Conduits processing
-- subscription batches. This sink takes care of committing
-- all cursors synchronously, one after the other.
subscriptionSink
  :: (MonadIO m, MonadNakadi b m)
  => SubscriptionEventStream
  -> ConduitM (SubscriptionEventStreamBatch a) void m ()
subscriptionSink eventStream =
  awaitForever $ \batch -> lift $ commitOneCursor eventStream (batch ^. L.cursor)