{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Network.Nakadi.Internal.Committer.SmartBuffer where
import Network.Nakadi.Internal.Prelude
import Control.Concurrent.Async.Timer ( Timer )
import qualified Control.Concurrent.Async.Timer
as Timer
import qualified Data.HashMap.Strict as HashMap
import Control.Concurrent.STM ( retry )
import Control.Lens
import Data.Function ( (&) )
import Network.Nakadi.Internal.Committer.Shared
import qualified Network.Nakadi.Internal.Lenses
as L
import Network.Nakadi.Internal.Types
import UnliftIO.Async
import UnliftIO.STM
committerSmartBuffer
:: forall b m
. (MonadNakadi b m, MonadUnliftIO m, MonadMask m)
=> SubscriptionEventStream
-> TBQueue (Int, SubscriptionCursor)
-> m ()
committerSmartBuffer eventStream queue = do
nMaxEvents <- cursorBufferSize
let millisDefault = 1000
timerConf =
Timer.defaultConf & Timer.setInitDelay (fromIntegral millisDefault) & Timer.setInterval
(fromIntegral millisDefault)
if nMaxEvents > 1
then do
cursorsMap <- liftIO . atomically $ newTVar HashMap.empty
withAsync (cursorConsumer queue cursorsMap) $ \asyncCursorConsumer -> do
link asyncCursorConsumer
Timer.withAsyncTimer timerConf $ cursorCommitter eventStream cursorsMap nMaxEvents
else unbufferedCommitLoop eventStream queue
cursorConsumer
:: (MonadIO m)
=> TBQueue (Int, SubscriptionCursor)
-> TVar (StagedCursorsMap SubscriptionCursorWithCounter)
-> m ()
cursorConsumer queue cursorsMap = forever . liftIO . atomically $ do
(nEvents, cursor) <- readTBQueue queue
let key = cursorKey cursor
stagedCursor = SubscriptionCursorWithCounter cursor nEvents
modifyTVar cursorsMap $ HashMap.insertWith updateCursor key stagedCursor
updateCursor
:: SubscriptionCursorWithCounter -> SubscriptionCursorWithCounter -> SubscriptionCursorWithCounter
updateCursor cursorNew cursorOld = cursorNew & L.nEvents %~ (+ (cursorOld ^. L.nEvents))
cursorCommitter
:: (MonadNakadi b m, MonadUnliftIO m)
=> SubscriptionEventStream
-> TVar (StagedCursorsMap SubscriptionCursorWithCounter)
-> Int
-> Timer
-> m ()
cursorCommitter eventStream cursorsMap nMaxEvents timer =
forever
$ race (Timer.wait timer) (maxEventsReached cursorsMap nMaxEvents)
>>= \case
Left _ ->
commitAllCursors (view L.cursor) eventStream cursorsMap
Right _ -> do
Timer.reset timer
commitAllCursors (view L.cursor) eventStream cursorsMap
maxEventsReached
:: MonadIO m => TVar (StagedCursorsMap SubscriptionCursorWithCounter) -> Int -> m ()
maxEventsReached stagedCursorsTv nMaxEvents = liftIO . atomically $ do
stagedCursors <- readTVar stagedCursorsTv
let cursorsList = HashMap.elems stagedCursors
cursorsCommit = filter (shouldBeCommitted nMaxEvents) cursorsList
if null cursorsCommit then retry else pure ()
shouldBeCommitted :: Int -> SubscriptionCursorWithCounter -> Bool
shouldBeCommitted nMaxEvents stagedCursor = stagedCursor ^. L.nEvents >= nMaxEvents
cursorBufferSize :: MonadNakadi b m => m Int
cursorBufferSize = do
conf <- nakadiAsk
pure $ case conf ^. L.maxUncommittedEvents of
Nothing -> 1
Just n -> n & fromIntegral & (* safetyFactor) & round
where safetyFactor = 0.5