{-| Module : Network.Nakadi.EventTypes.Events Description : Implementation of Nakadi Events API Copyright : (c) Moritz Schulte 2017 License : BSD3 Maintainer : mtesseract@silverratio.net Stability : experimental Portability : POSIX This module implements the @\/event-types\/EVENT-TYPE\/events@ API. -} {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TupleSections #-} module Network.Nakadi.EventTypes.Events ( eventSource , eventSourceR , eventPublish , eventPublishR ) where import Network.Nakadi.Internal.Prelude import Conduit import Control.Lens import Data.Aeson import qualified Data.ByteString.Lazy as ByteString.Lazy import Network.Nakadi.Internal.Config import Network.Nakadi.Internal.Http import qualified Network.Nakadi.Internal.Lenses as L path :: EventTypeName -> ByteString path eventTypeName = "/event-types/" <> encodeUtf8 (unEventTypeName eventTypeName) <> "/events" -- | @GET@ to @\/event-types\/NAME\/events@. Returns Conduit source -- for event batch consumption. eventSource :: (MonadNakadi m, MonadResource m, MonadBaseControl IO m, MonadMask m, FromJSON a) => Config -- ^ Configuration parameter -> Maybe ConsumeParameters -- ^ Optional parameters for event consumption -> EventTypeName -- ^ Name of the event type to consume -> Maybe [Cursor] -- ^ Optional list of cursors; by default -- consumption begins with the most -- recent event -> m (ConduitM () (EventStreamBatch a) m ()) -- ^ Returns a Conduit source. eventSource config maybeParams eventTypeName maybeCursors = do let consumeParams = fromMaybe (config^.L.consumeParameters) maybeParams queryParams = buildSubscriptionConsumeQueryParameters consumeParams runReaderC () . snd <$> httpJsonBodyStream config ok200 (const (Right ())) [ (status429, errorTooManyRequests) , (status429, errorEventTypeNotFound) ] (setRequestPath (path eventTypeName) . setRequestQueryParameters queryParams . case maybeCursors of Just cursors -> let cursors' = ByteString.Lazy.toStrict (encode cursors) in addRequestHeader "X-Nakadi-Cursors" cursors' Nothing -> identity) -- | @GET@ to @\/event-types\/NAME\/events@. Returns Conduit source -- for event batch consumption. Retrieves configuration from -- environment. eventSourceR :: (MonadNakadiEnv r m, MonadResource m, MonadBaseControl IO m, MonadMask m, FromJSON a) => Maybe ConsumeParameters -- ^ Optional parameters for event consumption -> EventTypeName -- ^ Name of the event type to consume -> Maybe [Cursor] -- ^ Optional list of cursors; by default -- consumption begins with the most -- recent event -> m (ConduitM () (EventStreamBatch a) m ()) -- ^ Returns a Conduit source. eventSourceR maybeParams eventType maybeCursors = do config <- asks (view L.nakadiConfig) eventSource config maybeParams eventType maybeCursors -- | @POST@ to @\/event-types\/NAME\/events@. Publishes a batch of -- events for the specified event type. eventPublish :: (MonadNakadi m, ToJSON a) => Config -> EventTypeName -> Maybe FlowId -> [a] -> m () eventPublish config eventTypeName maybeFlowId eventBatch = httpJsonNoBody config status200 [ (Status 207 "Multi-Status", errorBatchPartiallySubmitted) , (status422, errorBatchNotSubmitted) ] (setRequestMethod "POST" . setRequestPath (path eventTypeName) . maybe identity (addRequestHeader "X-Flow-Id" . encodeUtf8 . unFlowId) maybeFlowId . setRequestBodyJSON eventBatch) -- | @POST@ to @\/event-types\/NAME\/events@. Publishes a batch of -- events for the specified event type. Uses the configuration from -- the environment. eventPublishR :: (MonadNakadiEnv r m, ToJSON a) => EventTypeName -> Maybe FlowId -> [a] -> m () eventPublishR eventTypeName maybeFlowId eventBatch = do config <- asks (view L.nakadiConfig) eventPublish config eventTypeName maybeFlowId eventBatch