module Network.Nakadi.EventTypes.Events
( eventSource
, eventSourceR
, eventPublish
, eventPublishR
) where
import Network.Nakadi.Internal.Prelude
import Conduit
import Control.Lens
import Data.Aeson
import qualified Data.ByteString.Lazy as ByteString.Lazy
import Network.Nakadi.Internal.Config
import Network.Nakadi.Internal.Http
import qualified Network.Nakadi.Internal.Lenses as L
path :: EventTypeName -> ByteString
path eventTypeName =
"/event-types/"
<> encodeUtf8 (unEventTypeName eventTypeName)
<> "/events"
eventSource ::
(MonadNakadi m, MonadResource m, MonadBaseControl IO m, MonadMask m, FromJSON a)
=> Config
-> Maybe ConsumeParameters
-> EventTypeName
-> Maybe [Cursor]
-> m (ConduitM () (EventStreamBatch a)
m ())
eventSource config maybeParams eventTypeName maybeCursors = do
let consumeParams = fromMaybe (config^.L.consumeParameters) maybeParams
queryParams = buildSubscriptionConsumeQueryParameters consumeParams
runReaderC () . snd <$>
httpJsonBodyStream config ok200 (const (Right ())) [ (status429, errorTooManyRequests)
, (status429, errorEventTypeNotFound) ]
(setRequestPath (path eventTypeName)
. setRequestQueryParameters queryParams
. case maybeCursors of
Just cursors -> let cursors' = ByteString.Lazy.toStrict (encode cursors)
in addRequestHeader "X-Nakadi-Cursors" cursors'
Nothing -> identity)
eventSourceR ::
(MonadNakadiEnv r m, MonadResource m, MonadBaseControl IO m, MonadMask m, FromJSON a)
=> Maybe ConsumeParameters
-> EventTypeName
-> Maybe [Cursor]
-> m (ConduitM () (EventStreamBatch a)
m ())
eventSourceR maybeParams eventType maybeCursors = do
config <- asks (view L.nakadiConfig)
eventSource config maybeParams eventType maybeCursors
eventPublish ::
(MonadNakadi m, ToJSON a)
=> Config
-> EventTypeName
-> Maybe FlowId
-> [a]
-> m ()
eventPublish config eventTypeName maybeFlowId eventBatch =
httpJsonNoBody config status200
[ (Status 207 "Multi-Status", errorBatchPartiallySubmitted)
, (status422, errorBatchNotSubmitted) ]
(setRequestMethod "POST"
. setRequestPath (path eventTypeName)
. maybe identity (addRequestHeader "X-Flow-Id" . encodeUtf8 . unFlowId) maybeFlowId
. setRequestBodyJSON eventBatch)
eventPublishR ::
(MonadNakadiEnv r m, ToJSON a)
=> EventTypeName
-> Maybe FlowId
-> [a]
-> m ()
eventPublishR eventTypeName maybeFlowId eventBatch = do
config <- asks (view L.nakadiConfig)
eventPublish config eventTypeName maybeFlowId eventBatch