{-# language BangPatterns #-}
{-# language DerivingStrategies #-}
{-# language LambdaCase #-}
{-# language NumericUnderscores #-}
{-# language DataKinds #-}
{-# language DuplicateRecordFields #-}
{-# language NamedFieldPuns #-}
{-# language TypeApplications #-}

module Kafka.Record.Response
  ( Record(..)
  , Header(..)
    -- * Decode
    -- ** Relative Offsets
  , decodeArray
    -- * Decode 
    -- ** Absolute Offsets
  , decodeArrayAbsolute
  ) where

import Control.Monad (when)
import Data.Bytes (Bytes)
import Data.Bytes.Chunks (Chunks(ChunksCons,ChunksNil))
import Data.Int (Int32,Int64)
import Data.Primitive (SmallArray)
import Data.Text (Text)
import Data.Bytes.Parser (Parser)
import Data.Word (Word8)

import qualified Data.Primitive.Contiguous as C
import qualified Arithmetic.Nat as Nat
import qualified Data.Bytes as Bytes
import qualified Data.Bytes.Chunks as Chunks
import qualified Data.Bytes.Text.Utf8 as Utf8
import qualified Data.Text.Short as TS
import qualified Data.Bytes.Parser as Parser
import qualified Data.Primitive as PM
import qualified Kafka.Parser

-- | Information about @Record@ from Kafka documentation:
--
-- > length: varint
-- > attributes: int8
-- >     bit 0~7: unused
-- > timestampDelta: varlong
-- > offsetDelta: varint
-- > keyLength: varint
-- > key: byte[]
-- > valueLen: varint
-- > value: byte[]
-- > Headers => [Header]
--
data Record = Record
  { Record -> Word8
attributes :: !Word8
    -- ^ To the author's understanding, record attributes are unused,
    -- and this will always be zero. But just to be safe, this library
    -- parses it from the encoded record.
  , Record -> Int64
timestampDelta :: !Int64
  , Record -> Int64
offsetDelta :: !Int64
    -- ^ We deviate from the spec here by making this a 64-bit integer instead
    -- of a 32-bit integer. We do this so that the integral type is wide enough
    -- to hold an absolute offset.
  , Record -> Bytes
key :: {-# UNPACK #-} !Bytes
    -- ^ We decode a null key to the empty byte sequence.
  , Record -> Bytes
value :: {-# UNPACK #-} !Bytes
  , Record -> SmallArray Header
headers :: {-# UNPACK #-} !(SmallArray Header)
  } deriving stock (Int -> Record -> ShowS
[Record] -> ShowS
Record -> String
(Int -> Record -> ShowS)
-> (Record -> String) -> ([Record] -> ShowS) -> Show Record
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Record -> ShowS
showsPrec :: Int -> Record -> ShowS
$cshow :: Record -> String
show :: Record -> String
$cshowList :: [Record] -> ShowS
showList :: [Record] -> ShowS
Show)

-- | Information about @Header@ from Kafka documentation:
--
-- > headerKeyLength: varint
-- > headerKey: String
-- > headerValueLength: varint
-- > value: byte[]
data Header = Header
  { Header -> Text
key :: {-# UNPACK #-} !Text
    -- ^ Header key.
  , Header -> Bytes
value :: {-# UNPACK #-} !Bytes
    -- ^ Header value.
  } deriving stock (Int -> Header -> ShowS
[Header] -> ShowS
Header -> String
(Int -> Header -> ShowS)
-> (Header -> String) -> ([Header] -> ShowS) -> Show Header
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Header -> ShowS
showsPrec :: Int -> Header -> ShowS
$cshow :: Header -> String
show :: Header -> String
$cshowList :: [Header] -> ShowS
showList :: [Header] -> ShowS
Show)

-- | This consumes the entire input. If it succeeds, there were no
-- leftovers.
decodeArray :: Bytes -> Maybe (SmallArray Record)
{-# noinline decodeArray #-}
decodeArray :: Bytes -> Maybe (SmallArray Record)
decodeArray = (forall s. Parser () s (SmallArray Record))
-> Bytes -> Maybe (SmallArray Record)
forall e a. (forall s. Parser e s a) -> Bytes -> Maybe a
Parser.parseBytesMaybe (Parser () s Record -> Parser () s (SmallArray Record)
forall s. Parser () s Record -> Parser () s (SmallArray Record)
parserArrayOf (Int64 -> Int64 -> Parser () s Record
forall s. Int64 -> Int64 -> Parser () s Record
parser Int64
0 Int64
0))

-- | Variant of 'decodeArray' that converts the timestamp and offset
-- deltas to absolute timestamps and offsets. In the resulting records,
-- the fields 'timestampDelta' and 'offsetDelta' are misnomers.
decodeArrayAbsolute ::
     Int64 -- ^ Base timestamp (from record batch)
  -> Int64 -- ^ Base offset (from record batch)
  -> Bytes
  -> Maybe (SmallArray Record)
{-# noinline decodeArrayAbsolute #-}
decodeArrayAbsolute :: Int64 -> Int64 -> Bytes -> Maybe (SmallArray Record)
decodeArrayAbsolute !Int64
baseTimestamp !Int64
baseOffset
  = (forall s. Parser () s (SmallArray Record))
-> Bytes -> Maybe (SmallArray Record)
forall e a. (forall s. Parser e s a) -> Bytes -> Maybe a
Parser.parseBytesMaybe (Parser () s Record -> Parser () s (SmallArray Record)
forall s. Parser () s Record -> Parser () s (SmallArray Record)
parserArrayOf (Int64 -> Int64 -> Parser () s Record
forall s. Int64 -> Int64 -> Parser () s Record
parser Int64
baseTimestamp Int64
baseOffset))

-- | Parse several records laid out one after the other.
parserArrayOf :: Parser () s Record -> Parser () s (SmallArray Record)
{-# inline parserArrayOf #-} 
parserArrayOf :: forall s. Parser () s Record -> Parser () s (SmallArray Record)
parserArrayOf Parser () s Record
p = [Record] -> Int -> Parser () s (SmallArray Record)
forall {arr :: * -> *}.
(Element arr Record, Contiguous arr) =>
[Record] -> Int -> Parser () s (arr Record)
go [] Int
0
  where
  go :: [Record] -> Int -> Parser () s (arr Record)
go ![Record]
acc !Int
n = Parser () s Bool
forall e s. Parser e s Bool
Parser.isEndOfInput Parser () s Bool
-> (Bool -> Parser () s (arr Record)) -> Parser () s (arr Record)
forall a b. Parser () s a -> (a -> Parser () s b) -> Parser () s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Bool
True -> arr Record -> Parser () s (arr Record)
forall a. a -> Parser () s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (arr Record -> Parser () s (arr Record))
-> arr Record -> Parser () s (arr Record)
forall a b. (a -> b) -> a -> b
$! Int -> [Record] -> arr Record
forall (arr :: * -> *) a.
(Contiguous arr, Element arr a) =>
Int -> [a] -> arr a
C.unsafeFromListReverseN Int
n [Record]
acc
    Bool
False -> do
      Record
r <- Parser () s Record
p
      [Record] -> Int -> Parser () s (arr Record)
go (Record
r Record -> [Record] -> [Record]
forall a. a -> [a] -> [a]
: [Record]
acc) (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)

parser ::
     Int64 -- ^ base timestamp
  -> Int64 -- ^ base offset
  -> Parser () s Record
parser :: forall s. Int64 -> Int64 -> Parser () s Record
parser !Int64
baseTimestamp !Int64
baseOffset = do
  Int
len <- () -> Parser () s Int
forall e s. e -> Parser e s Int
Kafka.Parser.varIntNative ()
  () -> () -> Int -> Parser () s Record -> Parser () s Record
forall e s a. e -> e -> Int -> Parser e s a -> Parser e s a
Parser.delimit () () Int
len (Parser () s Record -> Parser () s Record)
-> Parser () s Record -> Parser () s Record
forall a b. (a -> b) -> a -> b
$ do
    Word8
attributes <- () -> Parser () s Word8
forall e s. e -> Parser e s Word8
Parser.any ()
    Int64
timestampDelta <- () -> Parser () s Int64
forall e s. e -> Parser e s Int64
Kafka.Parser.varInt64 ()
    Int32
offsetDelta <- () -> Parser () s Int32
forall e s. e -> Parser e s Int32
Kafka.Parser.varInt32 ()
    Int
keyLength <- () -> Parser () s Int
forall e s. e -> Parser e s Int
Kafka.Parser.varIntNative ()
    Bool -> Parser () s () -> Parser () s ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
keyLength Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< (-Int
1) Bool -> Bool -> Bool
|| Int
keyLength Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
1000000) (() -> Parser () s ()
forall e s a. e -> Parser e s a
Parser.fail ())
    -- Null keys are mapped to empty bytes.
    Bytes
key <- () -> Int -> Parser () s Bytes
forall e s. e -> Int -> Parser e s Bytes
Parser.take () (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
0 Int
keyLength)
    Int
valueLength <- () -> Parser () s Int
forall e s. e -> Parser e s Int
Kafka.Parser.varIntNative ()
    -- I don't think that null values are allowed.
    Bool -> Parser () s () -> Parser () s ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
valueLength Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0 Bool -> Bool -> Bool
|| Int
valueLength Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
2000000000) (() -> Parser () s ()
forall e s a. e -> Parser e s a
Parser.fail ())
    Bytes
value <- () -> Int -> Parser () s Bytes
forall e s. e -> Int -> Parser e s Bytes
Parser.take () Int
valueLength
    SmallArray Header
headers <- Parser () s (SmallArray Header)
forall s. Parser () s (SmallArray Header)
parserHeaders
    Record -> Parser () s Record
forall a. a -> Parser () s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Record
      { Word8
$sel:attributes:Record :: Word8
attributes :: Word8
attributes
      , $sel:timestampDelta:Record :: Int64
timestampDelta=Int64
timestampDelta Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int64
baseTimestamp
      , $sel:offsetDelta:Record :: Int64
offsetDelta=Int32 -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
offsetDelta Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int64
baseOffset
      , Bytes
$sel:key:Record :: Bytes
key :: Bytes
key
      , Bytes
$sel:value:Record :: Bytes
value :: Bytes
value
      , SmallArray Header
$sel:headers:Record :: SmallArray Header
headers :: SmallArray Header
headers
      }

parserHeaders :: Parser () s (SmallArray Header)
parserHeaders :: forall s. Parser () s (SmallArray Header)
parserHeaders = do
  Int
len <- () -> Parser () s Int
forall e s. e -> Parser e s Int
Kafka.Parser.varIntNative ()
  case Int
len of
    Int
0 -> SmallArray Header -> Parser () s (SmallArray Header)
forall a. a -> Parser () s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SmallArray Header
forall a. Monoid a => a
mempty
    -- Having more than 100K headers is unfathomable.
    Int
_ | Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
100_000 -> () -> Parser () s (SmallArray Header)
forall e s a. e -> Parser e s a
Parser.fail ()
      | Bool
otherwise -> Int -> Parser () s (SmallArray Header)
forall s. Int -> Parser () s (SmallArray Header)
replicateHeaderN Int
len

replicateHeaderN :: Int -> Parser () s (SmallArray Header)
replicateHeaderN :: forall s. Int -> Parser () s (SmallArray Header)
replicateHeaderN !Int
len = do
  SmallMutableArray s Header
dst <- ST s (SmallMutableArray s Header)
-> Parser () s (SmallMutableArray s Header)
forall s a e. ST s a -> Parser e s a
Parser.effect (Int -> Header -> ST s (SmallMutableArray (PrimState (ST s)) Header)
forall (m :: * -> *) a.
PrimMonad m =>
Int -> a -> m (SmallMutableArray (PrimState m) a)
PM.newSmallArray Int
len Header
uninitializedHeader)
  let go :: Int -> Parser () s (SmallArray Header)
go !Int
ix = if Int
ix Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
len
        then do
          Header
a <- Parser () s Header
forall s. Parser () s Header
parserHeader
          ST s () -> Parser () s ()
forall s a e. ST s a -> Parser e s a
Parser.effect (SmallMutableArray (PrimState (ST s)) Header
-> Int -> Header -> ST s ()
forall (m :: * -> *) a.
PrimMonad m =>
SmallMutableArray (PrimState m) a -> Int -> a -> m ()
PM.writeSmallArray SmallMutableArray s Header
SmallMutableArray (PrimState (ST s)) Header
dst Int
ix Header
a)
          Int -> Parser () s (SmallArray Header)
go (Int
ix Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
        else ST s (SmallArray Header) -> Parser () s (SmallArray Header)
forall s a e. ST s a -> Parser e s a
Parser.effect (SmallMutableArray (PrimState (ST s)) Header
-> ST s (SmallArray Header)
forall (m :: * -> *) a.
PrimMonad m =>
SmallMutableArray (PrimState m) a -> m (SmallArray a)
PM.unsafeFreezeSmallArray SmallMutableArray s Header
SmallMutableArray (PrimState (ST s)) Header
dst)
  Int -> Parser () s (SmallArray Header)
go (Int
0 :: Int)

uninitializedHeader :: Header
uninitializedHeader :: Header
uninitializedHeader = String -> Header
forall a. String -> a
errorWithoutStackTrace String
"Kafka.Record.Response.parserHeaders: implementation mistake"
  
parserHeader :: Parser () s Header
parserHeader :: forall s. Parser () s Header
parserHeader = do
  Int
keyLength <- () -> Parser () s Int
forall e s. e -> Parser e s Int
Kafka.Parser.varIntNative ()
  Bool -> Parser () s () -> Parser () s ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
keyLength Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0) (() -> Parser () s ()
forall e s a. e -> Parser e s a
Parser.fail ())
  Bytes
keyBytes <- () -> Int -> Parser () s Bytes
forall e s. e -> Int -> Parser e s Bytes
Parser.take () Int
keyLength
  -- This performs a copy of the memory. Fix this once text-2.0.3 or
  -- newer is released.
  case ShortByteString -> Maybe ShortText
TS.fromShortByteString (Bytes -> ShortByteString
Bytes.toShortByteString Bytes
keyBytes) of
    Maybe ShortText
Nothing -> () -> Parser () s Header
forall e s a. e -> Parser e s a
Parser.fail ()
    Just ShortText
keyShort -> do
      Int
valueLength <- () -> Parser () s Int
forall e s. e -> Parser e s Int
Kafka.Parser.varIntNative ()
      Bool -> Parser () s () -> Parser () s ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
valueLength Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0) (() -> Parser () s ()
forall e s a. e -> Parser e s a
Parser.fail ())
      Bytes
value <- () -> Int -> Parser () s Bytes
forall e s. e -> Int -> Parser e s Bytes
Parser.take () Int
valueLength
      Header -> Parser () s Header
forall a. a -> Parser () s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Header{$sel:key:Header :: Text
key=ShortText -> Text
TS.toText ShortText
keyShort,Bytes
$sel:value:Header :: Bytes
value :: Bytes
value}