{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Network.Nakadi.Internal.Committer where
import Network.Nakadi.Internal.Prelude
import Conduit
import Control.Lens
import UnliftIO.STM
import qualified Network.Nakadi.Internal.Lenses
as L
import Network.Nakadi.Internal.Types
import Network.Nakadi.Internal.Committer.NoBuffer
import Network.Nakadi.Internal.Committer.Shared
import Network.Nakadi.Internal.Committer.SmartBuffer
import Network.Nakadi.Internal.Committer.TimeBuffer
subscriptionCommitter
:: forall b m
. (MonadNakadi b m, MonadUnliftIO m, MonadMask m)
=> CommitBufferingStrategy
-> SubscriptionEventStream
-> TBQueue (Int, SubscriptionCursor)
-> m ()
subscriptionCommitter CommitNoBuffer = committerNoBuffer
subscriptionCommitter (CommitTimeBuffer millis) = committerTimeBuffer millis
subscriptionCommitter CommitSmartBuffer = committerSmartBuffer
subscriptionSink
:: (MonadIO m, MonadNakadi b m)
=> SubscriptionEventStream
-> ConduitM (SubscriptionEventStreamBatch a) void m ()
subscriptionSink eventStream =
awaitForever $ \batch -> lift $ commitOneCursor eventStream (batch ^. L.cursor)