{-# LANGUAGE MultiParamTypeClasses #-}
module Network.Nakadi.Internal.Committer.Shared where
import Network.Nakadi.Internal.Prelude
import Control.Lens
import Control.Monad.Logger
import qualified Data.HashMap.Strict as HashMap
import qualified Network.Nakadi.Internal.Lenses
as L
import Network.Nakadi.Internal.Types
import Network.Nakadi.Subscriptions.Cursors
import UnliftIO.STM
commitAllCursors
:: (MonadNakadi b m, MonadIO m)
=> (a -> SubscriptionCursor)
-> SubscriptionEventStream
-> TVar (StagedCursorsMap a)
-> m ()
commitAllCursors extractCursor eventStream stagedCursorsTv = do
stagedCursors <- liftIO . atomically $ swapTVar stagedCursorsTv HashMap.empty
let cursors = map extractCursor $ HashMap.elems stagedCursors
forM_ cursors (commitOneCursor eventStream)
commitOneCursor
:: (MonadIO m, MonadNakadi b m) => SubscriptionEventStream -> SubscriptionCursor -> m ()
commitOneCursor eventStream cursor = do
config <- nakadiAsk
catchAny (subscriptionCursorCommit eventStream [cursor])
$ \exn -> nakadiLiftBase $ case config ^. L.logFunc of
Just logFunc ->
logFunc "nakadi-client" LevelWarn
$ toLogStr
$ "Failed to commit cursor "
<> tshow cursor
<> ": "
<> tshow exn
Nothing -> pure ()
unbufferedCommitLoop
:: (MonadNakadi b m, MonadIO m)
=> SubscriptionEventStream
-> TBQueue (Int, SubscriptionCursor)
-> m ()
unbufferedCommitLoop eventStream queue = do
config <- nakadiAsk
forever $ do
(_nEvents, cursor) <- liftIO . atomically . readTBQueue $ queue
catchAny (subscriptionCursorCommit eventStream [cursor])
$ \exn -> nakadiLiftBase $ case config ^. L.logFunc of
Just logFunc ->
logFunc "nakadi-client" LevelWarn
$ toLogStr
$ "Failed to commit cursor "
<> tshow cursor
<> ": "
<> tshow exn
Nothing -> pure ()
cursorKey :: SubscriptionCursor -> (EventTypeName, PartitionName)
cursorKey cursor = (cursor ^. L.eventType, cursor ^. L.partition)