{-# language BangPatterns #-}
{-# language NamedFieldPuns #-}
{-# language DataKinds #-}
{-# language DeriveFunctor #-}
{-# language DuplicateRecordFields #-}
{-# language FlexibleContexts #-}
{-# language GeneralizedNewtypeDeriving #-}
{-# language MultiParamTypeClasses #-}
{-# language OverloadedStrings #-}
{-# language PolyKinds #-}
{-# language RankNTypes #-}
{-# language TypeFamilies #-}
{-# language UnboxedTuples #-}
{-# language UndecidableInstances #-}
module Kafka.Produce.Request.V9
( Request(..)
, Topic(..)
, Partition(..)
, encode
, toChunks
, singleton
) where
import Data.Int
import Data.Primitive (SmallArray)
import Data.Text (Text)
import Kafka.RecordBatch.Request (RecordBatch(..))
import Data.Bytes.Chunks (Chunks)
import qualified Arithmetic.Nat as Nat
import qualified Data.Bytes.Chunks as Chunks
import qualified Data.Primitive.Contiguous as C
import qualified Kafka.Acknowledgments as Acknowledgments
import qualified Kafka.Builder as Builder
import qualified Kafka.Builder.Bounded as Bounded
import qualified Kafka.RecordBatch.Request as RecordBatch
singleton ::
Acknowledgments.Acknowledgments
-> Int32
-> Text
-> Int32
-> RecordBatch
-> Request
singleton :: Acknowledgments -> Int32 -> Text -> Int32 -> RecordBatch -> Request
singleton !Acknowledgments
acks !Int32
timeoutMs !Text
topicName !Int32
partitionIx RecordBatch
records = Request
{ $sel:transactionalId:Request :: Maybe Text
transactionalId=Maybe Text
forall a. Maybe a
Nothing
, $sel:acks:Request :: Acknowledgments
acks=Acknowledgments
acks
, $sel:timeoutMilliseconds:Request :: Int32
timeoutMilliseconds=Int32
timeoutMs
, $sel:topicData:Request :: SmallArray Topic
topicData=Topic -> SmallArray Topic
forall a. Element SmallArray a => a -> SmallArray a
forall (arr :: * -> *) a.
(Contiguous arr, Element arr a) =>
a -> arr a
C.singleton (Topic -> SmallArray Topic) -> Topic -> SmallArray Topic
forall a b. (a -> b) -> a -> b
$ Topic
{ $sel:name:Topic :: Text
name=Text
topicName
, $sel:partitions:Topic :: SmallArray Partition
partitions=Partition -> SmallArray Partition
forall a. Element SmallArray a => a -> SmallArray a
forall (arr :: * -> *) a.
(Contiguous arr, Element arr a) =>
a -> arr a
C.singleton (Partition -> SmallArray Partition)
-> Partition -> SmallArray Partition
forall a b. (a -> b) -> a -> b
$ Partition
{ $sel:index:Partition :: Int32
index=Int32
partitionIx
, $sel:records:Partition :: RecordBatch
records=RecordBatch
records
}
}
}
toChunks :: Request -> Chunks
toChunks :: Request -> Chunks
toChunks = Int -> Builder -> Chunks
Builder.run Int
256 (Builder -> Chunks) -> (Request -> Builder) -> Request -> Chunks
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Request -> Builder
encode
encode :: Request -> Builder.Builder
encode :: Request -> Builder
encode Request{Maybe Text
$sel:transactionalId:Request :: Request -> Maybe Text
transactionalId :: Maybe Text
transactionalId,$sel:acks:Request :: Request -> Acknowledgments
acks=Acknowledgments.Acknowledgments Int16
acks,Int32
$sel:timeoutMilliseconds:Request :: Request -> Int32
timeoutMilliseconds :: Int32
timeoutMilliseconds,SmallArray Topic
$sel:topicData:Request :: Request -> SmallArray Topic
topicData :: SmallArray Topic
topicData} =
Maybe Text -> Builder
Builder.compactNullableString Maybe Text
transactionalId
Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
Nat 6 -> Builder 6 -> Builder
forall (n :: Nat). Nat n -> Builder n -> Builder
Builder.fromBounded Nat 6
forall (n :: Nat). KnownNat n => Nat n
Nat.constant
( Int16 -> Builder 2
Bounded.int16 Int16
acks
Builder 2 -> Builder 4 -> Builder (2 + 4)
forall (m :: Nat) (n :: Nat).
Builder m -> Builder n -> Builder (m + n)
`Bounded.append`
Int32 -> Builder 4
Bounded.int32 Int32
timeoutMilliseconds
)
Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
(Topic -> Builder) -> SmallArray Topic -> Builder
forall a. (a -> Builder) -> SmallArray a -> Builder
Builder.compactArray Topic -> Builder
encodeTopicData SmallArray Topic
topicData
Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
Word8 -> Builder
Builder.word8 Word8
0
encodeTopicData :: Topic -> Builder.Builder
encodeTopicData :: Topic -> Builder
encodeTopicData Topic{Text
$sel:name:Topic :: Topic -> Text
name :: Text
name,SmallArray Partition
$sel:partitions:Topic :: Topic -> SmallArray Partition
partitions :: SmallArray Partition
partitions} =
Text -> Builder
Builder.compactString Text
name
Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
(Partition -> Builder) -> SmallArray Partition -> Builder
forall a. (a -> Builder) -> SmallArray a -> Builder
Builder.compactArray Partition -> Builder
encodePartition SmallArray Partition
partitions
Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
Word8 -> Builder
Builder.word8 Word8
0
encodePartition :: Partition -> Builder.Builder
encodePartition :: Partition -> Builder
encodePartition Partition{Int32
$sel:index:Partition :: Partition -> Int32
index :: Int32
index,RecordBatch
$sel:records:Partition :: Partition -> RecordBatch
records :: RecordBatch
records} =
Int32 -> Builder
Builder.int32 Int32
index
Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
Word -> Builder
Builder.varWordNative (Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int
1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Chunks -> Int
Chunks.length Chunks
batchChunks))
Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
Chunks -> Builder
Builder.chunks Chunks
batchChunks
Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
Word8 -> Builder
Builder.word8 Word8
0
where
batchChunks :: Chunks
batchChunks = RecordBatch -> Chunks
RecordBatch.toChunks RecordBatch
records
data Request = Request
{ Request -> Maybe Text
transactionalId :: !(Maybe Text)
, Request -> Acknowledgments
acks :: !Acknowledgments.Acknowledgments
, Request -> Int32
timeoutMilliseconds :: !Int32
, Request -> SmallArray Topic
topicData :: !(SmallArray Topic)
}
data Topic = Topic
{ Topic -> Text
name :: !Text
, Topic -> SmallArray Partition
partitions :: !(SmallArray Partition)
}
data Partition = Partition
{ Partition -> Int32
index :: !Int32
, Partition -> RecordBatch
records :: !RecordBatch
}