module Network.Nakadi.Subscriptions.Cursors
( subscriptionCursorCommit'
, subscriptionCursorCommitR'
, subscriptionCommit
, subscriptionCursors
, subscriptionCursorsR
, subscriptionCursorsReset
, subscriptionCursorsResetR
) where
import Network.Nakadi.Internal.Prelude
import Data.Aeson
import qualified Control.Exception.Safe as Safe
import Control.Lens
import Control.Monad.Reader
import qualified Data.HashMap.Lazy as HashMap
import Network.Nakadi.Internal.Conversions
import Network.Nakadi.Internal.Http
import Network.Nakadi.Internal.Lenses (HasNakadiSubscriptionCursor)
import qualified Network.Nakadi.Internal.Lenses as L
path :: SubscriptionId -> ByteString
path subscriptionId =
"/subscriptions/"
<> subscriptionIdToByteString subscriptionId
<> "/cursors"
subscriptionCursorCommit' ::
MonadNakadi m
=> Config
-> SubscriptionId
-> StreamId
-> SubscriptionCursorCommit
-> m ()
subscriptionCursorCommit' config subscriptionId streamId cursors =
httpJsonNoBody config status204 [(ok200, errorCursorAlreadyCommitted)]
(setRequestMethod "POST"
. addRequestHeader "X-Nakadi-StreamId" (encodeUtf8 (unStreamId streamId))
. setRequestBodyJSON cursors
. setRequestPath (path subscriptionId))
subscriptionCursorCommitR' ::
MonadNakadiEnv r m
=> SubscriptionId
-> StreamId
-> SubscriptionCursorCommit
-> m ()
subscriptionCursorCommitR' subscriptionId streamId cursors = do
config <- asks (view L.nakadiConfig)
subscriptionCursorCommit' config subscriptionId streamId cursors
subscriptionCommit ::
(MonadNakadi m, MonadCatch m, HasNakadiSubscriptionCursor a)
=> [a]
-> ReaderT SubscriptionEventStreamContext m ()
subscriptionCommit as = do
SubscriptionEventStreamContext { .. } <- ask
Safe.catchJust
exceptionPredicate
(subscriptionCursorCommit' _ctxConfig _subscriptionId _streamId cursorsCommit)
(const (return ()))
where exceptionPredicate = \case
CursorAlreadyCommitted _ -> Just ()
_ -> Nothing
cursors = map (^. L.subscriptionCursor) as
cursorsCommit = SubscriptionCursorCommit cursors
subscriptionCursors ::
MonadNakadi m
=> Config
-> SubscriptionId
-> m [SubscriptionCursor]
subscriptionCursors config subscriptionId =
httpJsonBody config ok200 []
(setRequestMethod "GET" . setRequestPath (path subscriptionId))
subscriptionCursorsR ::
MonadNakadiEnv r m
=> SubscriptionId
-> m [SubscriptionCursor]
subscriptionCursorsR subscriptionId = do
config <- asks (view L.nakadiConfig)
subscriptionCursors config subscriptionId
subscriptionCursorsReset ::
MonadNakadi m
=> Config
-> SubscriptionId
-> [SubscriptionCursorWithoutToken]
-> m ()
subscriptionCursorsReset config subscriptionId cursors =
httpJsonNoBody config status204 [ (status404, errorSubscriptionNotFound)
, (status409, errorCursorResetInProgress) ]
(setRequestMethod "PATCH"
. setRequestPath (path subscriptionId)
. setRequestBodyJSON (Object (HashMap.fromList [("items", toJSON cursors)])))
subscriptionCursorsResetR ::
MonadNakadiEnv r m
=> SubscriptionId
-> [SubscriptionCursorWithoutToken]
-> m ()
subscriptionCursorsResetR subscriptionId cursors = do
config <- asks (view L.nakadiConfig)
subscriptionCursorsReset config subscriptionId cursors