{-# language BangPatterns #-}
{-# language NamedFieldPuns #-}
{-# language DataKinds #-}
{-# language DeriveFunctor #-}
{-# language DuplicateRecordFields #-}
{-# language FlexibleContexts #-}
{-# language GeneralizedNewtypeDeriving #-}
{-# language MultiParamTypeClasses #-}
{-# language OverloadedStrings #-}
{-# language PolyKinds #-}
{-# language RankNTypes #-}
{-# language TypeFamilies #-}
{-# language UnboxedTuples #-}
{-# language UndecidableInstances #-}

module Kafka.Produce.Request.V9
  ( Request(..)
  , Topic(..)
  , Partition(..)
  , encode
  , toChunks
    -- * Request Construction
  , singleton
  ) where

import Data.Int
import Data.Primitive (SmallArray)
import Data.Text (Text)
import Kafka.RecordBatch.Request (RecordBatch(..))
import Data.Bytes.Chunks (Chunks)

import qualified Arithmetic.Nat as Nat
import qualified Data.Bytes.Chunks as Chunks
import qualified Data.Primitive.Contiguous as C
import qualified Kafka.Acknowledgments as Acknowledgments
import qualified Kafka.Builder as Builder
import qualified Kafka.Builder.Bounded as Bounded
import qualified Kafka.RecordBatch.Request as RecordBatch

-- Description from Kafka docs:
--
-- > Produce Request (Version: 9) => transactional_id acks timeout_ms [topic_data] TAG_BUFFER 
-- >   transactional_id => COMPACT_NULLABLE_STRING
-- >   acks => INT16
-- >   timeout_ms => INT32
-- >   topic_data => name [partition_data] TAG_BUFFER 
-- >     name => COMPACT_STRING
-- >     partition_data => index records TAG_BUFFER 
-- >       index => INT32
-- >       records => COMPACT_RECORDS

-- | Create a request for producing to a single partition of a single topic.
-- Transactions are not used.
singleton ::
     Acknowledgments.Acknowledgments -- ^ Acknowledgements
  -> Int32 -- ^ Timeout milliseconds
  -> Text -- ^ Topic name
  -> Int32 -- ^ Partition index
  -> RecordBatch -- ^ Records
  -> Request
singleton :: Acknowledgments -> Int32 -> Text -> Int32 -> RecordBatch -> Request
singleton !Acknowledgments
acks !Int32
timeoutMs !Text
topicName !Int32
partitionIx RecordBatch
records = Request
  { $sel:transactionalId:Request :: Maybe Text
transactionalId=Maybe Text
forall a. Maybe a
Nothing
  , $sel:acks:Request :: Acknowledgments
acks=Acknowledgments
acks
  , $sel:timeoutMilliseconds:Request :: Int32
timeoutMilliseconds=Int32
timeoutMs
  , $sel:topicData:Request :: SmallArray Topic
topicData=Topic -> SmallArray Topic
forall a. Element SmallArray a => a -> SmallArray a
forall (arr :: * -> *) a.
(Contiguous arr, Element arr a) =>
a -> arr a
C.singleton (Topic -> SmallArray Topic) -> Topic -> SmallArray Topic
forall a b. (a -> b) -> a -> b
$ Topic
    { $sel:name:Topic :: Text
name=Text
topicName
    , $sel:partitions:Topic :: SmallArray Partition
partitions=Partition -> SmallArray Partition
forall a. Element SmallArray a => a -> SmallArray a
forall (arr :: * -> *) a.
(Contiguous arr, Element arr a) =>
a -> arr a
C.singleton (Partition -> SmallArray Partition)
-> Partition -> SmallArray Partition
forall a b. (a -> b) -> a -> b
$ Partition
      { $sel:index:Partition :: Int32
index=Int32
partitionIx
      , $sel:records:Partition :: RecordBatch
records=RecordBatch
records
      }
    }
  }
       
  

toChunks :: Request -> Chunks
toChunks :: Request -> Chunks
toChunks = Int -> Builder -> Chunks
Builder.run Int
256 (Builder -> Chunks) -> (Request -> Builder) -> Request -> Chunks
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Request -> Builder
encode

encode :: Request -> Builder.Builder
encode :: Request -> Builder
encode Request{Maybe Text
$sel:transactionalId:Request :: Request -> Maybe Text
transactionalId :: Maybe Text
transactionalId,$sel:acks:Request :: Request -> Acknowledgments
acks=Acknowledgments.Acknowledgments Int16
acks,Int32
$sel:timeoutMilliseconds:Request :: Request -> Int32
timeoutMilliseconds :: Int32
timeoutMilliseconds,SmallArray Topic
$sel:topicData:Request :: Request -> SmallArray Topic
topicData :: SmallArray Topic
topicData} =
  Maybe Text -> Builder
Builder.compactNullableString Maybe Text
transactionalId
  Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
  Nat 6 -> Builder 6 -> Builder
forall (n :: Nat). Nat n -> Builder n -> Builder
Builder.fromBounded Nat 6
forall (n :: Nat). KnownNat n => Nat n
Nat.constant
    ( Int16 -> Builder 2
Bounded.int16 Int16
acks
      Builder 2 -> Builder 4 -> Builder (2 + 4)
forall (m :: Nat) (n :: Nat).
Builder m -> Builder n -> Builder (m + n)
`Bounded.append`
      Int32 -> Builder 4
Bounded.int32 Int32
timeoutMilliseconds
    )
  Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
  (Topic -> Builder) -> SmallArray Topic -> Builder
forall a. (a -> Builder) -> SmallArray a -> Builder
Builder.compactArray Topic -> Builder
encodeTopicData SmallArray Topic
topicData
  Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
  Word8 -> Builder
Builder.word8 Word8
0

encodeTopicData :: Topic -> Builder.Builder
encodeTopicData :: Topic -> Builder
encodeTopicData Topic{Text
$sel:name:Topic :: Topic -> Text
name :: Text
name,SmallArray Partition
$sel:partitions:Topic :: Topic -> SmallArray Partition
partitions :: SmallArray Partition
partitions} =
  Text -> Builder
Builder.compactString Text
name
  Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
  (Partition -> Builder) -> SmallArray Partition -> Builder
forall a. (a -> Builder) -> SmallArray a -> Builder
Builder.compactArray Partition -> Builder
encodePartition SmallArray Partition
partitions
  Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
  Word8 -> Builder
Builder.word8 Word8
0

encodePartition :: Partition -> Builder.Builder
encodePartition :: Partition -> Builder
encodePartition Partition{Int32
$sel:index:Partition :: Partition -> Int32
index :: Int32
index,RecordBatch
$sel:records:Partition :: Partition -> RecordBatch
records :: RecordBatch
records} =
  Int32 -> Builder
Builder.int32 Int32
index
  Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
  -- This is not documented, but in V9+, you have to add the number 1
  -- the the size of the encoded record batch. The kafka source code
  -- confirms this. The proof is in org.apache.kafka.common.message.ProduceRequestData,
  -- a generated file, on a line that says:
  --   _writable.writeUnsignedVarint(records.sizeInBytes() + 1);
  Word -> Builder
Builder.varWordNative (Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int
1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Chunks -> Int
Chunks.length Chunks
batchChunks))
  Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
  Chunks -> Builder
Builder.chunks Chunks
batchChunks
  Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
  Word8 -> Builder
Builder.word8 Word8
0
  where
  batchChunks :: Chunks
batchChunks = RecordBatch -> Chunks
RecordBatch.toChunks RecordBatch
records

data Request = Request
  { Request -> Maybe Text
transactionalId :: !(Maybe Text)
  , Request -> Acknowledgments
acks :: !Acknowledgments.Acknowledgments
  , Request -> Int32
timeoutMilliseconds :: !Int32
  , Request -> SmallArray Topic
topicData :: !(SmallArray Topic)
  }

data Topic = Topic
  { Topic -> Text
name :: !Text
    -- ^ Topic name
  , Topic -> SmallArray Partition
partitions :: !(SmallArray Partition)
  }

data Partition = Partition
  { Partition -> Int32
index :: !Int32
  , Partition -> RecordBatch
records :: !RecordBatch
    -- ^ Record batch is decoded in a separate step.
  }