{-# language DataKinds #-}
{-# language NamedFieldPuns #-}
{-# language TypeApplications #-}

module Kafka.RecordBatch.Request
  ( RecordBatch(..)
  , toChunks
  ) where

import Data.Int (Int32,Int64,Int16)
import Data.Word (Word16)
import Kafka.Builder (Builder)
import Data.Bytes.Chunks (Chunks(ChunksCons))

import qualified Arithmetic.Nat as Nat
import qualified Data.Bytes as Bytes
import qualified Data.Bytes.Chunks as Chunks
import qualified Kafka.Builder as Builder
import qualified Kafka.Builder.Bounded as Bounded
import qualified Crc32c

-- | A record batch. The following fields are not made explicit since
-- they are only for framing and checksum:
--
-- * batchLength
-- * magic (always the number 2)
-- * crc
--
-- From kafka documentation:
--
-- > baseOffset: int64
-- > batchLength: int32
-- > partitionLeaderEpoch: int32
-- > magic: int8 (current magic value is 2)
-- > crc: int32
-- > attributes: int16
-- >     bit 0~2:
-- >         0: no compression
-- >         1: gzip
-- >         2: snappy
-- >         3: lz4
-- >         4: zstd
-- >     bit 3: timestampType
-- >     bit 4: isTransactional (0 means not transactional)
-- >     bit 5: isControlBatch (0 means not a control batch)
-- >     bit 6: hasDeleteHorizonMs (0 means baseTimestamp is not set as the delete horizon for compaction)
-- >     bit 7~15: unused
-- > lastOffsetDelta: int32
-- > baseTimestamp: int64
-- > maxTimestamp: int64
-- > producerId: int64
-- > producerEpoch: int16
-- > baseSequence: int32
-- > records: [Record]
--
-- A few of my own observations:
--
-- * The docs add a note that that last field @records@ is not really what
--   it looks like. The array length is always serialized in the usual way,
--   but the payload might be compressed.
-- * The field @batchLength@ includes the size of everything after it.
--   So, not itself and not @baseOffset@.
data RecordBatch = RecordBatch
  { RecordBatch -> Int64
baseOffset :: !Int64
    -- ^ When producing records, this should always be set to zero.
    -- It might make sense to just remove this field. The actual base
    -- offset assigned to the batch can be found at
    -- @Kafka.Produce.Response.V9:Partition.baseOffset@.
  , RecordBatch -> Int32
partitionLeaderEpoch :: !Int32
    -- ^ It looks like this is always set to -1 during production.
  , RecordBatch -> Word16
attributes :: !Word16
  , RecordBatch -> Int32
lastOffsetDelta :: !Int32
    -- ^ Should be set to the number of records minus one.
  , RecordBatch -> Int64
baseTimestamp :: !Int64
  , RecordBatch -> Int64
maxTimestamp :: !Int64
  , RecordBatch -> Int64
producerId :: !Int64
    -- ^ Producer ID. This comes from the @InitProducerId@ response.
  , RecordBatch -> Int16
producerEpoch :: !Int16
    -- ^ Producer epoch. This comes from the @InitProducerId@ response.
  , RecordBatch -> Int32
baseSequence :: !Int32
    -- ^ This is not documented very well anywhere, but it appears that,
    -- for a producer (identifier by a producer id), the sequence number
    -- starts as zero and is increased by 1 for every message that is produced.
    -- So, if the first base sequence number is 0, and a batch of 50 messages
    -- is produces, then next base sequence number will be 50.
  , RecordBatch -> Int32
recordsCount :: !Int32
    -- ^ Number of records.
  , RecordBatch -> Chunks
recordsPayload :: !Chunks
    -- ^ Records might be compressed. Look at @attributes@ to check for
    -- compression, and with that information, you can decode this field.
  }

toChunks :: RecordBatch -> Chunks
toChunks :: RecordBatch -> Chunks
toChunks RecordBatch
  { Int64
baseOffset :: RecordBatch -> Int64
baseOffset :: Int64
baseOffset, Int32
partitionLeaderEpoch :: RecordBatch -> Int32
partitionLeaderEpoch :: Int32
partitionLeaderEpoch, Word16
attributes :: RecordBatch -> Word16
attributes :: Word16
attributes 
  , Int32
lastOffsetDelta :: RecordBatch -> Int32
lastOffsetDelta :: Int32
lastOffsetDelta, Int64
baseTimestamp :: RecordBatch -> Int64
baseTimestamp :: Int64
baseTimestamp, Int64
maxTimestamp :: RecordBatch -> Int64
maxTimestamp :: Int64
maxTimestamp
  , Int64
producerId :: RecordBatch -> Int64
producerId :: Int64
producerId, Int16
producerEpoch :: RecordBatch -> Int16
producerEpoch :: Int16
producerEpoch, Int32
baseSequence :: RecordBatch -> Int32
baseSequence :: Int32
baseSequence, Int32
recordsCount :: RecordBatch -> Int32
recordsCount :: Int32
recordsCount
  , Chunks
recordsPayload :: RecordBatch -> Chunks
recordsPayload :: Chunks
recordsPayload
  } =
  Bytes -> Chunks -> Chunks
ChunksCons
    ( ByteArray -> Bytes
Bytes.fromByteArray (ByteArray -> Bytes) -> ByteArray -> Bytes
forall a b. (a -> b) -> a -> b
$ Nat (8 + (4 + (4 + 5))) -> Builder (8 + (4 + (4 + 5))) -> ByteArray
forall (n :: Nat). Nat n -> Builder n -> ByteArray
Bounded.run Nat (8 + (4 + (4 + 5)))
forall (n :: Nat). KnownNat n => Nat n
Nat.constant
      ( Int64 -> Builder 8
Bounded.int64 Int64
baseOffset
        Builder 8 -> Builder (4 + (4 + 5)) -> Builder (8 + (4 + (4 + 5)))
forall (m :: Nat) (n :: Nat).
Builder m -> Builder n -> Builder (m + n)
`Bounded.append`
        Int32 -> Builder 4
Bounded.int32 (Int -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Chunks -> Int
Chunks.length Chunks
postCrc Int -> Int -> Int
forall a. Num a => a -> a -> a
+ (Int
4 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
4)))
        Builder 4 -> Builder (4 + 5) -> Builder (4 + (4 + 5))
forall (m :: Nat) (n :: Nat).
Builder m -> Builder n -> Builder (m + n)
`Bounded.append`
        Int32 -> Builder 4
Bounded.int32 Int32
partitionLeaderEpoch
        Builder 4 -> Builder 5 -> Builder (4 + 5)
forall (m :: Nat) (n :: Nat).
Builder m -> Builder n -> Builder (m + n)
`Bounded.append`
        Word8 -> Builder 1
Bounded.word8 Word8
0x02
        Builder 1 -> Builder 4 -> Builder (1 + 4)
forall (m :: Nat) (n :: Nat).
Builder m -> Builder n -> Builder (m + n)
`Bounded.append`
        Word32 -> Builder 4
Bounded.word32 (Word32 -> Chunks -> Word32
Crc32c.chunks Word32
0 Chunks
postCrc)
      )
    ) Chunks
postCrc
  where
  postCrc :: Chunks
  postCrc :: Chunks
postCrc = Bytes -> Chunks -> Chunks
ChunksCons
    ( ByteArray -> Bytes
Bytes.fromByteArray (ByteArray -> Bytes) -> ByteArray -> Bytes
forall a b. (a -> b) -> a -> b
$ Nat (2 + (4 + (8 + (8 + (8 + (2 + 8))))))
-> Builder (2 + (4 + (8 + (8 + (8 + (2 + 8)))))) -> ByteArray
forall (n :: Nat). Nat n -> Builder n -> ByteArray
Bounded.run Nat (2 + (4 + (8 + (8 + (8 + (2 + 8))))))
forall (n :: Nat). KnownNat n => Nat n
Nat.constant
      ( Word16 -> Builder 2
Bounded.word16 Word16
attributes
        Builder 2
-> Builder (4 + (8 + (8 + (8 + (2 + 8)))))
-> Builder (2 + (4 + (8 + (8 + (8 + (2 + 8))))))
forall (m :: Nat) (n :: Nat).
Builder m -> Builder n -> Builder (m + n)
`Bounded.append`
        Int32 -> Builder 4
Bounded.int32 Int32
lastOffsetDelta
        Builder 4
-> Builder (8 + (8 + (8 + (2 + 8))))
-> Builder (4 + (8 + (8 + (8 + (2 + 8)))))
forall (m :: Nat) (n :: Nat).
Builder m -> Builder n -> Builder (m + n)
`Bounded.append`
        Int64 -> Builder 8
Bounded.int64 Int64
baseTimestamp
        Builder 8
-> Builder (8 + (8 + (2 + 8))) -> Builder (8 + (8 + (8 + (2 + 8))))
forall (m :: Nat) (n :: Nat).
Builder m -> Builder n -> Builder (m + n)
`Bounded.append`
        Int64 -> Builder 8
Bounded.int64 Int64
maxTimestamp
        Builder 8 -> Builder (8 + (2 + 8)) -> Builder (8 + (8 + (2 + 8)))
forall (m :: Nat) (n :: Nat).
Builder m -> Builder n -> Builder (m + n)
`Bounded.append`
        Int64 -> Builder 8
Bounded.int64 Int64
producerId
        Builder 8 -> Builder (2 + 8) -> Builder (8 + (2 + 8))
forall (m :: Nat) (n :: Nat).
Builder m -> Builder n -> Builder (m + n)
`Bounded.append`
        Int16 -> Builder 2
Bounded.int16 Int16
producerEpoch
        Builder 2 -> Builder 8 -> Builder (2 + 8)
forall (m :: Nat) (n :: Nat).
Builder m -> Builder n -> Builder (m + n)
`Bounded.append`
        Int32 -> Builder 4
Bounded.int32 Int32
baseSequence
        Builder 4 -> Builder 4 -> Builder (4 + 4)
forall (m :: Nat) (n :: Nat).
Builder m -> Builder n -> Builder (m + n)
`Bounded.append`
        Int32 -> Builder 4
Bounded.int32 Int32
recordsCount
      )
    ) Chunks
recordsPayload