{-# language BangPatterns #-}
{-# language DataKinds #-}
{-# language DerivingStrategies #-}
{-# language LambdaCase #-}
{-# language NamedFieldPuns #-}
{-# language TypeApplications #-}

module Kafka.RecordBatch.Response
  ( RecordBatch(..)
  , parser
  , parserArray
  ) where

import Control.Monad (when)
import Data.Bytes (Bytes)
import Data.Bytes.Parser (Parser)
import Data.Int (Int32,Int64,Int16)
import Data.Primitive (SmallArray)
import Data.Word (Word16)
import Kafka.Parser.Context (Context)

import qualified Arithmetic.Nat as Nat
import qualified Crc32c
import qualified Data.Bytes as Bytes
import qualified Data.Bytes.Parser as Parser
import qualified Data.Primitive.Contiguous as C
import qualified Kafka.Parser
import qualified Kafka.Parser.Context as Ctx

-- | A record batch. The following fields are not made explicit since
-- they are only for framing and checksum:
--
-- * batchLength
-- * magic (always the number 2)
-- * crc
--
-- From kafka documentation:
--
-- > baseOffset: int64
-- > batchLength: int32
-- > partitionLeaderEpoch: int32
-- > magic: int8 (current magic value is 2)
-- > crc: int32
-- > attributes: int16
-- >     bit 0~2:
-- >         0: no compression
-- >         1: gzip
-- >         2: snappy
-- >         3: lz4
-- >         4: zstd
-- >     bit 3: timestampType
-- >     bit 4: isTransactional (0 means not transactional)
-- >     bit 5: isControlBatch (0 means not a control batch)
-- >     bit 6: hasDeleteHorizonMs (0 means baseTimestamp is not set as the delete horizon for compaction)
-- >     bit 7~15: unused
-- > lastOffsetDelta: int32
-- > baseTimestamp: int64
-- > maxTimestamp: int64
-- > producerId: int64
-- > producerEpoch: int16
-- > baseSequence: int32
-- > records: [Record]
--
-- A few of my own observations:
--
-- * The docs add a note that that last field @records@ is not really what
--   it looks like. The array length is always serialized in the usual way,
--   but the payload might be compressed.
-- * The field @batchLength@ includes the size of everything after it.
--   So, not itself and not @baseOffset@.
data RecordBatch = RecordBatch
  { RecordBatch -> Int64
baseOffset :: !Int64
  , RecordBatch -> Int32
partitionLeaderEpoch :: !Int32
  , RecordBatch -> Word16
attributes :: !Word16
  , RecordBatch -> Int32
lastOffsetDelta :: !Int32
  , RecordBatch -> Int64
baseTimestamp :: !Int64
  , RecordBatch -> Int64
maxTimestamp :: !Int64
  , RecordBatch -> Int64
producerId :: !Int64
  , RecordBatch -> Int16
producerEpoch :: !Int16
  , RecordBatch -> Int32
baseSequence :: !Int32
  , RecordBatch -> Int32
recordsCount :: !Int32
  , RecordBatch -> Bytes
recordsPayload :: !Bytes
    -- ^ Records might be compressed. Look at @attributes@ to check for
    -- compression, and with that information, you can decode this field.
  } deriving stock (Int -> RecordBatch -> ShowS
[RecordBatch] -> ShowS
RecordBatch -> String
(Int -> RecordBatch -> ShowS)
-> (RecordBatch -> String)
-> ([RecordBatch] -> ShowS)
-> Show RecordBatch
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> RecordBatch -> ShowS
showsPrec :: Int -> RecordBatch -> ShowS
$cshow :: RecordBatch -> String
show :: RecordBatch -> String
$cshowList :: [RecordBatch] -> ShowS
showList :: [RecordBatch] -> ShowS
Show)

-- This is not encoded like of the other arrays in kafka. Here, the
-- record batches are just encoded and then concatenated one after
-- the next. There is no length prefix that signals how many batches
-- are present. To my knowledge, this is undocumented.
parserArray :: Context -> Parser Context s (SmallArray RecordBatch)
parserArray :: forall s. Context -> Parser Context s (SmallArray RecordBatch)
parserArray !Context
ctx = [RecordBatch] -> Int -> Parser Context s (SmallArray RecordBatch)
forall {arr :: * -> *} {s}.
(Element arr RecordBatch, Contiguous arr) =>
[RecordBatch] -> Int -> Parser Context s (arr RecordBatch)
go [] Int
0
  where
  go :: [RecordBatch] -> Int -> Parser Context s (arr RecordBatch)
go ![RecordBatch]
acc !Int
n = Parser Context s Bool
forall e s. Parser e s Bool
Parser.isEndOfInput Parser Context s Bool
-> (Bool -> Parser Context s (arr RecordBatch))
-> Parser Context s (arr RecordBatch)
forall a b.
Parser Context s a
-> (a -> Parser Context s b) -> Parser Context s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Bool
True -> arr RecordBatch -> Parser Context s (arr RecordBatch)
forall a. a -> Parser Context s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (arr RecordBatch -> Parser Context s (arr RecordBatch))
-> arr RecordBatch -> Parser Context s (arr RecordBatch)
forall a b. (a -> b) -> a -> b
$! Int -> [RecordBatch] -> arr RecordBatch
forall (arr :: * -> *) a.
(Contiguous arr, Element arr a) =>
Int -> [a] -> arr a
C.unsafeFromListReverseN Int
n [RecordBatch]
acc
    Bool
False -> do
      RecordBatch
batch <- Context -> Parser Context s RecordBatch
forall s. Context -> Parser Context s RecordBatch
parser (Int -> Context -> Context
Ctx.Index Int
n Context
ctx)
      [RecordBatch] -> Int -> Parser Context s (arr RecordBatch)
go (RecordBatch
batch RecordBatch -> [RecordBatch] -> [RecordBatch]
forall a. a -> [a] -> [a]
: [RecordBatch]
acc) (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)

parser :: Context -> Parser Context s RecordBatch
parser :: forall s. Context -> Parser Context s RecordBatch
parser !Context
ctx = do
  Int64
baseOffset <- Context -> Parser Context s Int64
forall e s. e -> Parser e s Int64
Kafka.Parser.int64 (Field -> Context -> Context
Ctx.Field Field
Ctx.BaseOffset Context
ctx)
  Int32
batchLength <- Context -> Parser Context s Int32
forall e s. e -> Parser e s Int32
Kafka.Parser.int32 (Field -> Context -> Context
Ctx.Field Field
Ctx.BatchLength Context
ctx)
  Bool -> Parser Context s () -> Parser Context s ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int32
batchLength Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
< Int32
0) (Context -> Parser Context s ()
forall e s a. e -> Parser e s a
Parser.fail (Field -> Context -> Context
Ctx.Field Field
Ctx.BatchLengthNegative Context
ctx))
  Context
-> Context
-> Int
-> Parser Context s RecordBatch
-> Parser Context s RecordBatch
forall e s a. e -> e -> Int -> Parser e s a -> Parser e s a
Parser.delimit
    (Field -> Context -> Context
Ctx.Field Field
Ctx.BatchLengthNotEnoughBytes Context
ctx)
    (Field -> Context -> Context
Ctx.Field Field
Ctx.BatchLengthLeftoverBytes Context
ctx)
    (Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
batchLength :: Int) (Parser Context s RecordBatch -> Parser Context s RecordBatch)
-> Parser Context s RecordBatch -> Parser Context s RecordBatch
forall a b. (a -> b) -> a -> b
$ do
      Int32
partitionLeaderEpoch <- Context -> Parser Context s Int32
forall e s. e -> Parser e s Int32
Kafka.Parser.int32 (Field -> Context -> Context
Ctx.Field Field
Ctx.PartitionLeaderEpoch Context
ctx)
      Context -> Parser Context s Word8
forall e s. e -> Parser e s Word8
Parser.any (Field -> Context -> Context
Ctx.Field Field
Ctx.Magic Context
ctx) Parser Context s Word8
-> (Word8 -> Parser Context s ()) -> Parser Context s ()
forall a b.
Parser Context s a
-> (a -> Parser Context s b) -> Parser Context s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Word8
2 -> () -> Parser Context s ()
forall a. a -> Parser Context s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        Word8
_ -> Context -> Parser Context s ()
forall e s a. e -> Parser e s a
Parser.fail (Field -> Context -> Context
Ctx.Field Field
Ctx.Magic Context
ctx)
      Word32
crc <- Context -> Parser Context s Word32
forall e s. e -> Parser e s Word32
Kafka.Parser.word32 (Field -> Context -> Context
Ctx.Field Field
Ctx.Crc Context
ctx)
      Bytes
remaining <- Parser Context s Bytes
forall e s. Parser e s Bytes
Parser.peekRemaining
      Bool -> Parser Context s () -> Parser Context s ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word32 -> Bytes -> Word32
Crc32c.bytes Word32
0 Bytes
remaining Word32 -> Word32 -> Bool
forall a. Eq a => a -> a -> Bool
/= Word32
crc) (Parser Context s () -> Parser Context s ())
-> Parser Context s () -> Parser Context s ()
forall a b. (a -> b) -> a -> b
$ do
        Context -> Parser Context s ()
forall e s a. e -> Parser e s a
Parser.fail (Field -> Context -> Context
Ctx.Field Field
Ctx.CrcMismatch Context
ctx)
      Word16
attributes <- Context -> Parser Context s Word16
forall e s. e -> Parser e s Word16
Kafka.Parser.word16 (Field -> Context -> Context
Ctx.Field Field
Ctx.Attributes Context
ctx)
      Int32
lastOffsetDelta <- Context -> Parser Context s Int32
forall e s. e -> Parser e s Int32
Kafka.Parser.int32 (Field -> Context -> Context
Ctx.Field Field
Ctx.LastOffsetDelta Context
ctx)
      Int64
baseTimestamp <- Context -> Parser Context s Int64
forall e s. e -> Parser e s Int64
Kafka.Parser.int64 (Field -> Context -> Context
Ctx.Field Field
Ctx.BaseTimestamp Context
ctx)
      Int64
maxTimestamp <- Context -> Parser Context s Int64
forall e s. e -> Parser e s Int64
Kafka.Parser.int64 (Field -> Context -> Context
Ctx.Field Field
Ctx.MaxTimestamp Context
ctx)
      Int64
producerId <- Context -> Parser Context s Int64
forall e s. e -> Parser e s Int64
Kafka.Parser.int64 (Field -> Context -> Context
Ctx.Field Field
Ctx.ProducerId Context
ctx)
      Int16
producerEpoch <- Context -> Parser Context s Int16
forall e s. e -> Parser e s Int16
Kafka.Parser.int16 (Field -> Context -> Context
Ctx.Field Field
Ctx.ProducerEpoch Context
ctx)
      Int32
baseSequence <- Context -> Parser Context s Int32
forall e s. e -> Parser e s Int32
Kafka.Parser.int32 (Field -> Context -> Context
Ctx.Field Field
Ctx.BaseSequence Context
ctx)
      Int32
recordsCount <- Context -> Parser Context s Int32
forall e s. e -> Parser e s Int32
Kafka.Parser.int32 (Field -> Context -> Context
Ctx.Field Field
Ctx.RecordsCount Context
ctx)
      Bytes
recordsPayload <- Parser Context s Bytes
forall e s. Parser e s Bytes
Parser.remaining
      RecordBatch -> Parser Context s RecordBatch
forall a. a -> Parser Context s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure RecordBatch
        { Int64
baseOffset :: Int64
baseOffset :: Int64
baseOffset, Int32
partitionLeaderEpoch :: Int32
partitionLeaderEpoch :: Int32
partitionLeaderEpoch, Word16
attributes :: Word16
attributes :: Word16
attributes 
        , Int32
lastOffsetDelta :: Int32
lastOffsetDelta :: Int32
lastOffsetDelta, Int64
baseTimestamp :: Int64
baseTimestamp :: Int64
baseTimestamp, Int64
maxTimestamp :: Int64
maxTimestamp :: Int64
maxTimestamp
        , Int64
producerId :: Int64
producerId :: Int64
producerId, Int16
producerEpoch :: Int16
producerEpoch :: Int16
producerEpoch, Int32
baseSequence :: Int32
baseSequence :: Int32
baseSequence, Int32
recordsCount :: Int32
recordsCount :: Int32
recordsCount
        , Bytes
recordsPayload :: Bytes
recordsPayload :: Bytes
recordsPayload
        }