{-# language BangPatterns #-}
{-# language NamedFieldPuns #-}
{-# language DataKinds #-}
{-# language DeriveFunctor #-}
{-# language DuplicateRecordFields #-}
{-# language FlexibleContexts #-}
{-# language GeneralizedNewtypeDeriving #-}
{-# language MultiParamTypeClasses #-}
{-# language OverloadedRecordDot #-}
{-# language OverloadedStrings #-}
{-# language PolyKinds #-}
{-# language RankNTypes #-}
{-# language TypeFamilies #-}
{-# language UnboxedTuples #-}
{-# language UndecidableInstances #-}

module Kafka.Fetch.Request.V13
  ( Request(..)
  , Topic(..)
  , Partition(..)
  , toChunks
  ) where

import Prelude hiding (id)

import Data.Bytes.Builder (Builder)
import Data.Bytes.Chunks (Chunks)
import Data.Int (Int8,Int32,Int64)
import Data.Primitive (SmallArray)
import Data.Text (Text)
import Data.WideWord (Word128)

import qualified Kafka.Builder as Builder

-- | Kafka Fetch request V13. Note: the forgotten topics array
-- is not yet implemented. Currently, we always encode this as a
-- zero-length array.
data Request = Request
  { Request -> Int32
replicaId :: !Int32
  , Request -> Int32
maxWaitMilliseconds :: !Int32
  , Request -> Int32
minBytes :: !Int32
  , Request -> Int32
maxBytes :: !Int32
  , Request -> Int8
isolationLevel :: !Int8
  , Request -> Int32
sessionId :: !Int32
    -- ^ Setting session ID to 0 means that we are requesting that the
    -- broker create a new session. The broker will return a randomly
    -- generated session ID in the response. A request with a session ID
    -- of -1 indicates that we do not want to use a fetch session at all.
  , Request -> Int32
sessionEpoch :: !Int32
  , Request -> SmallArray Topic
topics :: !(SmallArray Topic)
  , Request -> Text
rackId :: !Text
    -- ^ Rack ID of the consumer. Often the empty string.
  }

data Topic = Topic
  { Topic -> Word128
id :: {-# UNPACK #-} !Word128
  , Topic -> SmallArray Partition
partitions :: !(SmallArray Partition)
  } 

data Partition = Partition
  { Partition -> Int32
index :: !Int32
  , Partition -> Int32
currentLeaderEpoch :: !Int32
  , Partition -> Int64
fetchOffset :: !Int64
  , Partition -> Int32
lastFetchedEpoch :: !Int32
    -- ^ The epoch of the last fetched record or -1 if there is none.
  , Partition -> Int64
logStartOffset :: !Int64
    -- ^ Set this to -1. According to Kafka docs: "The earliest available
    -- offset of the follower replica. The field is only used when the request
    -- is sent by the follower."
  , Partition -> Int32
maxBytes :: !Int32
  } 

toChunks :: Request -> Chunks
toChunks :: Request -> Chunks
toChunks = Int -> Builder -> Chunks
Builder.run Int
128 (Builder -> Chunks) -> (Request -> Builder) -> Request -> Chunks
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Request -> Builder
encode

encode :: Request -> Builder
encode :: Request -> Builder
encode Request
r =
     Int32 -> Builder
Builder.int32 Request
r.replicaId
  Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Int32 -> Builder
Builder.int32 Request
r.maxWaitMilliseconds
  Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Int32 -> Builder
Builder.int32 Request
r.minBytes
  Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Int32 -> Builder
Builder.int32 Request
r.maxBytes
  Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Int8 -> Builder
Builder.int8 Request
r.isolationLevel
  Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Int32 -> Builder
Builder.int32 Request
r.sessionId
  Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Int32 -> Builder
Builder.int32 Request
r.sessionEpoch
  Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> (Topic -> Builder) -> SmallArray Topic -> Builder
forall a. (a -> Builder) -> SmallArray a -> Builder
Builder.compactArray Topic -> Builder
encodeTopic Request
r.topics
  Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Word8 -> Builder
Builder.word8 Word8
0x01 -- no forgotten topics
  Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Text -> Builder
Builder.compactString Request
r.rackId
  Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Word8 -> Builder
Builder.word8 Word8
0 -- zero tagged fields

encodeTopic :: Topic -> Builder
encodeTopic :: Topic -> Builder
encodeTopic Topic
r =
     Word128 -> Builder
Builder.word128 Topic
r.id
  Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> (Partition -> Builder) -> SmallArray Partition -> Builder
forall a. (a -> Builder) -> SmallArray a -> Builder
Builder.compactArray Partition -> Builder
encodePartition Topic
r.partitions
  Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Word8 -> Builder
Builder.word8 Word8
0 -- zero tagged fields

encodePartition :: Partition -> Builder
encodePartition :: Partition -> Builder
encodePartition Partition
r =
     Int32 -> Builder
Builder.int32 Partition
r.index
  Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Int32 -> Builder
Builder.int32 Partition
r.currentLeaderEpoch
  Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Int64 -> Builder
Builder.int64 Partition
r.fetchOffset
  Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Int32 -> Builder
Builder.int32 Partition
r.lastFetchedEpoch
  Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Int64 -> Builder
Builder.int64 Partition
r.logStartOffset
  Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Int32 -> Builder
Builder.int32 Partition
r.maxBytes
  Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Word8 -> Builder
Builder.word8 Word8
0 -- zero tagged fields