{-|
Module      : Network.Nakadi.EventTypes.Events
Description : Implementation of Nakadi Events API
Copyright   : (c) Moritz Schulte 2017, 2018
License     : BSD3
Maintainer  : mtesseract@silverratio.net
Stability   : experimental
Portability : POSIX

This module implements the
@\/event-types\/EVENT-TYPE\/events@ API.
-}

{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE ScopedTypeVariables   #-}
{-# LANGUAGE TupleSections         #-}
{-# LANGUAGE TypeFamilies          #-}

module Network.Nakadi.EventTypes.Events
  ( eventsProcessConduit
  , eventsProcess
  , eventsPublish
  ) where

import           Network.Nakadi.Internal.Prelude

import           Conduit
import           Data.Aeson
import qualified Data.ByteString.Lazy            as ByteString.Lazy
import           Data.Void
import           Network.HTTP.Client             (responseBody)
import           Network.Nakadi.Internal.Config
import           Network.Nakadi.Internal.Http

path :: EventTypeName -> ByteString
path eventTypeName =
  "/event-types/"
  <> encodeUtf8 (unEventTypeName eventTypeName)
  <> "/events"

eventsProcess
  :: ( MonadNakadi b m
     , FromJSON a )
  => Maybe ConsumeParameters
  -> EventTypeName
  -> Maybe [Cursor]
  -> (EventStreamBatch a -> m r)
  -> m r
eventsProcess maybeConsumeParameters eventTypeName maybeCursors processor =
  eventsProcess maybeConsumeParameters eventTypeName maybeCursors processor

eventsProcessConduit
  :: ( MonadNakadi b m
     , MonadMask m
     , FromJSON a )
  => Maybe ConsumeParameters
  -> EventTypeName
  -> Maybe [Cursor]
  -> ConduitM (EventStreamBatch a) Void m r
  -> m r
eventsProcessConduit maybeConsumeParameters eventTypeName maybeCursors consumer = do
  config <- nakadiAsk
  let consumeParams = fromMaybe defaultConsumeParameters maybeConsumeParameters
      queryParams   = buildConsumeQueryParameters consumeParams
  httpJsonBodyStream ok200 [ (status429, errorTooManyRequests)
                           , (status429, errorEventTypeNotFound) ]
    (setRequestPath (path eventTypeName)
     . includeFlowId config
     . setRequestQueryParameters queryParams
     . addCursors) $
    handler config

  where addCursors = case maybeCursors of
          Just cursors -> let cursors' = ByteString.Lazy.toStrict (encode cursors)
                          in addRequestHeader "X-Nakadi-Cursors" cursors'
          Nothing      -> identity

        handler config response =
          responseBody response
          .| linesUnboundedAsciiC
          .| conduitDecode config
          $$ consumer

-- | @POST@ to @\/event-types\/NAME\/events@. Publishes a batch of
-- events for the specified event type.
eventsPublish
  :: (MonadNakadi b m, ToJSON a)
  => EventTypeName
  -> [a]
  -> m ()
eventsPublish eventTypeName eventBatch = do
  config <- nakadiAsk
  httpJsonNoBody status200
    [ (Status 207 "Multi-Status", errorBatchPartiallySubmitted)
    , (status422, errorBatchNotSubmitted) ] $
    (setRequestMethod "POST"
     . includeFlowId config
     . setRequestPath (path eventTypeName)
     . setRequestBodyJSON eventBatch)