{-# language BangPatterns #-}
{-# language DerivingStrategies #-}
{-# language LambdaCase #-}
{-# language DuplicateRecordFields #-}
{-# language NamedFieldPuns #-}

module Kafka.Fetch.Response.V13
  ( Response(..)
  , Topic(..)
  , Partition(..)
  , parser
  , decode
  , decodeHeaded
  ) where

import Prelude hiding (id)

import Control.Applicative (liftA2)
import Data.Bytes (Bytes)
import Data.Bytes.Parser (Parser)
import Data.Int (Int16,Int32,Int64)
import Data.Primitive (SmallArray,PrimArray)
import Data.Text (Text)
import Data.WideWord (Word128)
import Data.Word (Word32)
import Kafka.ErrorCode (ErrorCode)
import Kafka.Parser.Context (Context)
import Kafka.RecordBatch.Response (RecordBatch)
import Kafka.TaggedField (TaggedField)

import qualified Data.Bytes.Parser as Parser
import qualified Kafka.Parser.Context as Ctx
import qualified Kafka.TaggedField as TaggedField
import qualified Kafka.Parser
import qualified Kafka.Header.Response.V1 as Header
import qualified Kafka.RecordBatch.Response as RecordBatch

data Response = Response
  { Response -> Int32
throttleTimeMilliseconds :: !Int32
  , Response -> ErrorCode
errorCode :: !ErrorCode
  , Response -> Int32
sessionId :: !Int32
  , Response -> SmallArray Topic
topics :: !(SmallArray Topic)
  , Response -> SmallArray TaggedField
taggedFields :: !(SmallArray TaggedField)
  } deriving stock (Int -> Response -> ShowS
[Response] -> ShowS
Response -> String
(Int -> Response -> ShowS)
-> (Response -> String) -> ([Response] -> ShowS) -> Show Response
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Response -> ShowS
showsPrec :: Int -> Response -> ShowS
$cshow :: Response -> String
show :: Response -> String
$cshowList :: [Response] -> ShowS
showList :: [Response] -> ShowS
Show)

data Topic = Topic
  { Topic -> Word128
id :: {-# UNPACK #-} !Word128
  , Topic -> SmallArray Partition
partitions :: !(SmallArray Partition)
  , Topic -> SmallArray TaggedField
taggedFields :: !(SmallArray TaggedField)
  } deriving stock (Int -> Topic -> ShowS
[Topic] -> ShowS
Topic -> String
(Int -> Topic -> ShowS)
-> (Topic -> String) -> ([Topic] -> ShowS) -> Show Topic
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Topic -> ShowS
showsPrec :: Int -> Topic -> ShowS
$cshow :: Topic -> String
show :: Topic -> String
$cshowList :: [Topic] -> ShowS
showList :: [Topic] -> ShowS
Show)

data Partition = Partition
  { Partition -> Int32
index :: !Int32
  , Partition -> ErrorCode
errorCode :: !ErrorCode
  , Partition -> Int64
highWatermark :: !Int64
  , Partition -> Int64
lastStableOffset :: !Int64
  , Partition -> Int64
logStartOffset :: !Int64
  , Partition -> Int32
preferredReadReplica :: !Int32
  , Partition -> SmallArray RecordBatch
records :: !(SmallArray RecordBatch)
  , Partition -> SmallArray TaggedField
taggedFields :: !(SmallArray TaggedField)
  } deriving stock (Int -> Partition -> ShowS
[Partition] -> ShowS
Partition -> String
(Int -> Partition -> ShowS)
-> (Partition -> String)
-> ([Partition] -> ShowS)
-> Show Partition
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Partition -> ShowS
showsPrec :: Int -> Partition -> ShowS
$cshow :: Partition -> String
show :: Partition -> String
$cshowList :: [Partition] -> ShowS
showList :: [Partition] -> ShowS
Show)

decodeHeaded :: Bytes -> Either Context (Header.Headed Response)
decodeHeaded :: Bytes -> Either Context (Headed Response)
decodeHeaded !Bytes
b = (forall s. Parser Context s (Headed Response))
-> Bytes -> Either Context (Headed Response)
forall e a. (forall s. Parser e s a) -> Bytes -> Either e a
Parser.parseBytesEither
  ((Header -> Response -> Headed Response)
-> Parser Context s Header
-> Parser Context s Response
-> Parser Context s (Headed Response)
forall a b c.
(a -> b -> c)
-> Parser Context s a -> Parser Context s b -> Parser Context s c
forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 Header -> Response -> Headed Response
forall a. Header -> a -> Headed a
Header.Headed
    (Context -> Parser Context s Header
forall s. Context -> Parser Context s Header
Header.parser Context
Ctx.Top)
    (Context -> Parser Context s Response
forall s. Context -> Parser Context s Response
parser Context
Ctx.Top Parser Context s Response
-> Parser Context s () -> Parser Context s Response
forall a b.
Parser Context s a -> Parser Context s b -> Parser Context s a
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<* Context -> Parser Context s ()
forall e s. e -> Parser e s ()
Parser.endOfInput Context
Ctx.End)
  ) Bytes
b

decode :: Bytes -> Either Context Response
decode :: Bytes -> Either Context Response
decode !Bytes
b = (forall s. Parser Context s Response)
-> Bytes -> Either Context Response
forall e a. (forall s. Parser e s a) -> Bytes -> Either e a
Parser.parseBytesEither (Context -> Parser Context s Response
forall s. Context -> Parser Context s Response
parser Context
Ctx.Top Parser Context s Response
-> Parser Context s () -> Parser Context s Response
forall a b.
Parser Context s a -> Parser Context s b -> Parser Context s a
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<* Context -> Parser Context s ()
forall e s. e -> Parser e s ()
Parser.endOfInput Context
Ctx.End) Bytes
b

parser :: Context -> Parser Context s Response
parser :: forall s. Context -> Parser Context s Response
parser Context
ctx = do
  Int32
throttleTimeMilliseconds <- Context -> Parser Context s Int32
forall e s. e -> Parser e s Int32
Kafka.Parser.int32 (Field -> Context -> Context
Ctx.Field Field
Ctx.ThrottleTimeMilliseconds Context
ctx)
  ErrorCode
errorCode <- Context -> Parser Context s ErrorCode
forall e s. e -> Parser e s ErrorCode
Kafka.Parser.errorCode (Field -> Context -> Context
Ctx.Field Field
Ctx.ErrorCode Context
ctx)
  Int32
sessionId <- Context -> Parser Context s Int32
forall e s. e -> Parser e s Int32
Kafka.Parser.int32 (Field -> Context -> Context
Ctx.Field Field
Ctx.SessionId Context
ctx)
  SmallArray Topic
topics <- (Context -> Parser Context s Topic)
-> Context -> Parser Context s (SmallArray Topic)
forall s a.
(Context -> Parser Context s a)
-> Context -> Parser Context s (SmallArray a)
Kafka.Parser.compactArray Context -> Parser Context s Topic
forall s. Context -> Parser Context s Topic
parserTopic (Field -> Context -> Context
Ctx.Field Field
Ctx.Topics Context
ctx)
  SmallArray TaggedField
taggedFields <- Context -> Parser Context s (SmallArray TaggedField)
forall s. Context -> Parser Context s (SmallArray TaggedField)
TaggedField.parserMany (Field -> Context -> Context
Ctx.Field Field
Ctx.TagBuffer Context
ctx)
  Response -> Parser Context s Response
forall a. a -> Parser Context s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Response{Int32
$sel:throttleTimeMilliseconds:Response :: Int32
throttleTimeMilliseconds :: Int32
throttleTimeMilliseconds,ErrorCode
$sel:errorCode:Response :: ErrorCode
errorCode :: ErrorCode
errorCode,Int32
$sel:sessionId:Response :: Int32
sessionId :: Int32
sessionId,SmallArray Topic
$sel:topics:Response :: SmallArray Topic
topics :: SmallArray Topic
topics,SmallArray TaggedField
$sel:taggedFields:Response :: SmallArray TaggedField
taggedFields :: SmallArray TaggedField
taggedFields}

parserTopic :: Context -> Parser Context s Topic
parserTopic :: forall s. Context -> Parser Context s Topic
parserTopic Context
ctx = do
  Word128
id <- Context -> Parser Context s Word128
forall e s. e -> Parser e s Word128
Kafka.Parser.word128 (Field -> Context -> Context
Ctx.Field Field
Ctx.Id Context
ctx)
  SmallArray Partition
partitions <- (Context -> Parser Context s Partition)
-> Context -> Parser Context s (SmallArray Partition)
forall s a.
(Context -> Parser Context s a)
-> Context -> Parser Context s (SmallArray a)
Kafka.Parser.compactArray Context -> Parser Context s Partition
forall s. Context -> Parser Context s Partition
parserPartition (Field -> Context -> Context
Ctx.Field Field
Ctx.Partitions Context
ctx)
  SmallArray TaggedField
taggedFields <- Context -> Parser Context s (SmallArray TaggedField)
forall s. Context -> Parser Context s (SmallArray TaggedField)
TaggedField.parserMany (Field -> Context -> Context
Ctx.Field Field
Ctx.TagBuffer Context
ctx)
  Topic -> Parser Context s Topic
forall a. a -> Parser Context s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Topic{Word128
$sel:id:Topic :: Word128
id :: Word128
id,SmallArray Partition
$sel:partitions:Topic :: SmallArray Partition
partitions :: SmallArray Partition
partitions,SmallArray TaggedField
$sel:taggedFields:Topic :: SmallArray TaggedField
taggedFields :: SmallArray TaggedField
taggedFields}

parserPartition :: Context -> Parser Context s Partition
parserPartition :: forall s. Context -> Parser Context s Partition
parserPartition Context
ctx = do
  Int32
index <- Context -> Parser Context s Int32
forall e s. e -> Parser e s Int32
Kafka.Parser.int32 (Field -> Context -> Context
Ctx.Field Field
Ctx.Ix Context
ctx)
  ErrorCode
errorCode <- Context -> Parser Context s ErrorCode
forall e s. e -> Parser e s ErrorCode
Kafka.Parser.errorCode (Field -> Context -> Context
Ctx.Field Field
Ctx.ErrorCode Context
ctx)
  Int64
highWatermark <- Context -> Parser Context s Int64
forall e s. e -> Parser e s Int64
Kafka.Parser.int64 (Field -> Context -> Context
Ctx.Field Field
Ctx.HighWatermark Context
ctx)
  Int64
lastStableOffset <- Context -> Parser Context s Int64
forall e s. e -> Parser e s Int64
Kafka.Parser.int64 (Field -> Context -> Context
Ctx.Field Field
Ctx.LastStableOffset Context
ctx)
  Int64
logStartOffset <- Context -> Parser Context s Int64
forall e s. e -> Parser e s Int64
Kafka.Parser.int64 (Field -> Context -> Context
Ctx.Field Field
Ctx.LogStartOffset Context
ctx)
  -- I do not have an example of a log with an aborted transaction, so
  -- I'm not sure how this would look if it were present.
  Context -> Parser Context s Word8
forall e s. e -> Parser e s Word8
Parser.any (Field -> Context -> Context
Ctx.Field Field
Ctx.AbortedTransactions Context
ctx) Parser Context s Word8
-> (Word8 -> Parser Context s ()) -> Parser Context s ()
forall a b.
Parser Context s a
-> (a -> Parser Context s b) -> Parser Context s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Word8
0 -> () -> Parser Context s ()
forall a. a -> Parser Context s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Word8
_ -> Context -> Parser Context s ()
forall e s a. e -> Parser e s a
Parser.fail (Field -> Context -> Context
Ctx.Field Field
Ctx.AbortedTransactions Context
ctx)
  Int32
preferredReadReplica <- Context -> Parser Context s Int32
forall e s. e -> Parser e s Int32
Kafka.Parser.int32 (Field -> Context -> Context
Ctx.Field Field
Ctx.PreferredReadReplica Context
ctx)
  Word
sizeSucc <- Context -> Parser Context s Word
forall e s. e -> Parser e s Word
Kafka.Parser.varWordNative (Field -> Context -> Context
Ctx.Field Field
Ctx.RecordBatchLength Context
ctx)
  Int
size <- case Word
sizeSucc of
    Word
0 -> Context -> Parser Context s Int
forall e s a. e -> Parser e s a
Kafka.Parser.fail (Field -> Context -> Context
Ctx.Field Field
Ctx.RecordBatchLength Context
ctx)
    Word
_ -> Int -> Parser Context s Int
forall a. a -> Parser Context s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word
sizeSucc Word -> Word -> Word
forall a. Num a => a -> a -> a
- Word
1) :: Int)
  SmallArray RecordBatch
records <- Context
-> Context
-> Int
-> Parser Context s (SmallArray RecordBatch)
-> Parser Context s (SmallArray RecordBatch)
forall e s a. e -> e -> Int -> Parser e s a -> Parser e s a
Parser.delimit
    (Field -> Context -> Context
Ctx.Field Field
Ctx.RecordBatchNotEnoughBytes Context
ctx)
    (Field -> Context -> Context
Ctx.Field Field
Ctx.RecordBatchLeftoverBytes Context
ctx)
    Int
size
    (Context -> Parser Context s (SmallArray RecordBatch)
forall s. Context -> Parser Context s (SmallArray RecordBatch)
RecordBatch.parserArray (Field -> Context -> Context
Ctx.Field Field
Ctx.RecordBatch Context
ctx))
  SmallArray TaggedField
taggedFields <- Context -> Parser Context s (SmallArray TaggedField)
forall s. Context -> Parser Context s (SmallArray TaggedField)
TaggedField.parserMany (Field -> Context -> Context
Ctx.Field Field
Ctx.TagBuffer Context
ctx)
  Partition -> Parser Context s Partition
forall a. a -> Parser Context s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Partition
    { Int32
$sel:index:Partition :: Int32
index :: Int32
index
    , ErrorCode
$sel:errorCode:Partition :: ErrorCode
errorCode :: ErrorCode
errorCode
    , Int64
$sel:highWatermark:Partition :: Int64
highWatermark :: Int64
highWatermark
    , Int64
$sel:lastStableOffset:Partition :: Int64
lastStableOffset :: Int64
lastStableOffset
    , Int64
$sel:logStartOffset:Partition :: Int64
logStartOffset :: Int64
logStartOffset
    , Int32
$sel:preferredReadReplica:Partition :: Int32
preferredReadReplica :: Int32
preferredReadReplica
    , SmallArray RecordBatch
$sel:records:Partition :: SmallArray RecordBatch
records :: SmallArray RecordBatch
records
    , SmallArray TaggedField
$sel:taggedFields:Partition :: SmallArray TaggedField
taggedFields :: SmallArray TaggedField
taggedFields
    }