module Network.Nakadi.EventTypes.CursorsLag
( cursorsLag'
, cursorsLag
) where
import Network.Nakadi.Internal.Prelude
import Control.Arrow
import Control.Lens
import qualified Data.Map.Strict as Map
import Network.Nakadi.Internal.Http
import qualified Network.Nakadi.Internal.Lenses as L
import Network.Nakadi.Internal.Util
path :: EventTypeName -> ByteString
path eventTypeName =
"/event-types/"
<> encodeUtf8 (unEventTypeName eventTypeName)
<> "/cursors-lag"
cursorsLag' ::
MonadNakadi b m
=> EventTypeName
-> [Cursor]
-> m [Partition]
cursorsLag' eventTypeName cursors = do
config <- nakadiAsk
httpJsonBody ok200 [] $
(setRequestMethod "POST"
. includeFlowId config
. setRequestPath (path eventTypeName)
. setRequestBodyJSON cursors)
cursorsLag ::
MonadNakadi b m
=> EventTypeName
-> Map PartitionName CursorOffset
-> m (Map PartitionName Int64)
cursorsLag eventTypeName cursorsMap = do
partitionStats <- cursorsLag' eventTypeName cursors
return $ partitionStats & map ((view L.partition &&& view L.unconsumedEvents) >>> sequenceSnd)
& catMaybes
& Map.fromList
where cursors = map (uncurry Cursor) (Map.toList cursorsMap)