{-# language BangPatterns #-}
{-# language DerivingStrategies #-}
{-# language LambdaCase #-}
{-# language NumericUnderscores #-}
{-# language DataKinds #-}
{-# language DuplicateRecordFields #-}
{-# language NamedFieldPuns #-}
{-# language TypeApplications #-}
module Kafka.Record.Response
( Record(..)
, Header(..)
, decodeArray
, decodeArrayAbsolute
) where
import Control.Monad (when)
import Data.Bytes (Bytes)
import Data.Bytes.Chunks (Chunks(ChunksCons,ChunksNil))
import Data.Int (Int32,Int64)
import Data.Primitive (SmallArray)
import Data.Text (Text)
import Data.Bytes.Parser (Parser)
import Data.Word (Word8)
import qualified Data.Primitive.Contiguous as C
import qualified Arithmetic.Nat as Nat
import qualified Data.Bytes as Bytes
import qualified Data.Bytes.Chunks as Chunks
import qualified Data.Bytes.Text.Utf8 as Utf8
import qualified Data.Text.Short as TS
import qualified Data.Bytes.Parser as Parser
import qualified Data.Primitive as PM
import qualified Kafka.Parser
data Record = Record
{ Record -> Word8
attributes :: !Word8
, Record -> Int64
timestampDelta :: !Int64
, Record -> Int64
offsetDelta :: !Int64
, Record -> Bytes
key :: {-# UNPACK #-} !Bytes
, Record -> Bytes
value :: {-# UNPACK #-} !Bytes
, :: {-# UNPACK #-} !(SmallArray Header)
} deriving stock (Int -> Record -> ShowS
[Record] -> ShowS
Record -> String
(Int -> Record -> ShowS)
-> (Record -> String) -> ([Record] -> ShowS) -> Show Record
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Record -> ShowS
showsPrec :: Int -> Record -> ShowS
$cshow :: Record -> String
show :: Record -> String
$cshowList :: [Record] -> ShowS
showList :: [Record] -> ShowS
Show)
data =
{ :: {-# UNPACK #-} !Text
, :: {-# UNPACK #-} !Bytes
} deriving stock (Int -> Header -> ShowS
[Header] -> ShowS
Header -> String
(Int -> Header -> ShowS)
-> (Header -> String) -> ([Header] -> ShowS) -> Show Header
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Header -> ShowS
showsPrec :: Int -> Header -> ShowS
$cshow :: Header -> String
show :: Header -> String
$cshowList :: [Header] -> ShowS
showList :: [Header] -> ShowS
Show)
decodeArray :: Bytes -> Maybe (SmallArray Record)
{-# noinline decodeArray #-}
decodeArray :: Bytes -> Maybe (SmallArray Record)
decodeArray = (forall s. Parser () s (SmallArray Record))
-> Bytes -> Maybe (SmallArray Record)
forall e a. (forall s. Parser e s a) -> Bytes -> Maybe a
Parser.parseBytesMaybe (Parser () s Record -> Parser () s (SmallArray Record)
forall s. Parser () s Record -> Parser () s (SmallArray Record)
parserArrayOf (Int64 -> Int64 -> Parser () s Record
forall s. Int64 -> Int64 -> Parser () s Record
parser Int64
0 Int64
0))
decodeArrayAbsolute ::
Int64
-> Int64
-> Bytes
-> Maybe (SmallArray Record)
{-# noinline decodeArrayAbsolute #-}
decodeArrayAbsolute :: Int64 -> Int64 -> Bytes -> Maybe (SmallArray Record)
decodeArrayAbsolute !Int64
baseTimestamp !Int64
baseOffset
= (forall s. Parser () s (SmallArray Record))
-> Bytes -> Maybe (SmallArray Record)
forall e a. (forall s. Parser e s a) -> Bytes -> Maybe a
Parser.parseBytesMaybe (Parser () s Record -> Parser () s (SmallArray Record)
forall s. Parser () s Record -> Parser () s (SmallArray Record)
parserArrayOf (Int64 -> Int64 -> Parser () s Record
forall s. Int64 -> Int64 -> Parser () s Record
parser Int64
baseTimestamp Int64
baseOffset))
parserArrayOf :: Parser () s Record -> Parser () s (SmallArray Record)
{-# inline parserArrayOf #-}
parserArrayOf :: forall s. Parser () s Record -> Parser () s (SmallArray Record)
parserArrayOf Parser () s Record
p = [Record] -> Int -> Parser () s (SmallArray Record)
forall {arr :: * -> *}.
(Element arr Record, Contiguous arr) =>
[Record] -> Int -> Parser () s (arr Record)
go [] Int
0
where
go :: [Record] -> Int -> Parser () s (arr Record)
go ![Record]
acc !Int
n = Parser () s Bool
forall e s. Parser e s Bool
Parser.isEndOfInput Parser () s Bool
-> (Bool -> Parser () s (arr Record)) -> Parser () s (arr Record)
forall a b. Parser () s a -> (a -> Parser () s b) -> Parser () s b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Bool
True -> arr Record -> Parser () s (arr Record)
forall a. a -> Parser () s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (arr Record -> Parser () s (arr Record))
-> arr Record -> Parser () s (arr Record)
forall a b. (a -> b) -> a -> b
$! Int -> [Record] -> arr Record
forall (arr :: * -> *) a.
(Contiguous arr, Element arr a) =>
Int -> [a] -> arr a
C.unsafeFromListReverseN Int
n [Record]
acc
Bool
False -> do
Record
r <- Parser () s Record
p
[Record] -> Int -> Parser () s (arr Record)
go (Record
r Record -> [Record] -> [Record]
forall a. a -> [a] -> [a]
: [Record]
acc) (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
parser ::
Int64
-> Int64
-> Parser () s Record
parser :: forall s. Int64 -> Int64 -> Parser () s Record
parser !Int64
baseTimestamp !Int64
baseOffset = do
Int
len <- () -> Parser () s Int
forall e s. e -> Parser e s Int
Kafka.Parser.varIntNative ()
() -> () -> Int -> Parser () s Record -> Parser () s Record
forall e s a. e -> e -> Int -> Parser e s a -> Parser e s a
Parser.delimit () () Int
len (Parser () s Record -> Parser () s Record)
-> Parser () s Record -> Parser () s Record
forall a b. (a -> b) -> a -> b
$ do
Word8
attributes <- () -> Parser () s Word8
forall e s. e -> Parser e s Word8
Parser.any ()
Int64
timestampDelta <- () -> Parser () s Int64
forall e s. e -> Parser e s Int64
Kafka.Parser.varInt64 ()
Int32
offsetDelta <- () -> Parser () s Int32
forall e s. e -> Parser e s Int32
Kafka.Parser.varInt32 ()
Int
keyLength <- () -> Parser () s Int
forall e s. e -> Parser e s Int
Kafka.Parser.varIntNative ()
Bool -> Parser () s () -> Parser () s ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
keyLength Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< (-Int
1) Bool -> Bool -> Bool
|| Int
keyLength Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
1000000) (() -> Parser () s ()
forall e s a. e -> Parser e s a
Parser.fail ())
Bytes
key <- () -> Int -> Parser () s Bytes
forall e s. e -> Int -> Parser e s Bytes
Parser.take () (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
0 Int
keyLength)
Int
valueLength <- () -> Parser () s Int
forall e s. e -> Parser e s Int
Kafka.Parser.varIntNative ()
Bool -> Parser () s () -> Parser () s ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
valueLength Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0 Bool -> Bool -> Bool
|| Int
valueLength Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
2000000000) (() -> Parser () s ()
forall e s a. e -> Parser e s a
Parser.fail ())
Bytes
value <- () -> Int -> Parser () s Bytes
forall e s. e -> Int -> Parser e s Bytes
Parser.take () Int
valueLength
SmallArray Header
headers <- Parser () s (SmallArray Header)
forall s. Parser () s (SmallArray Header)
parserHeaders
Record -> Parser () s Record
forall a. a -> Parser () s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Record
{ Word8
$sel:attributes:Record :: Word8
attributes :: Word8
attributes
, $sel:timestampDelta:Record :: Int64
timestampDelta=Int64
timestampDelta Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int64
baseTimestamp
, $sel:offsetDelta:Record :: Int64
offsetDelta=Int32 -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
offsetDelta Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int64
baseOffset
, Bytes
$sel:key:Record :: Bytes
key :: Bytes
key
, Bytes
$sel:value:Record :: Bytes
value :: Bytes
value
, SmallArray Header
$sel:headers:Record :: SmallArray Header
headers :: SmallArray Header
headers
}
parserHeaders :: Parser () s (SmallArray Header)
= do
Int
len <- () -> Parser () s Int
forall e s. e -> Parser e s Int
Kafka.Parser.varIntNative ()
case Int
len of
Int
0 -> SmallArray Header -> Parser () s (SmallArray Header)
forall a. a -> Parser () s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SmallArray Header
forall a. Monoid a => a
mempty
Int
_ | Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
100_000 -> () -> Parser () s (SmallArray Header)
forall e s a. e -> Parser e s a
Parser.fail ()
| Bool
otherwise -> Int -> Parser () s (SmallArray Header)
forall s. Int -> Parser () s (SmallArray Header)
replicateHeaderN Int
len
replicateHeaderN :: Int -> Parser () s (SmallArray Header)
!Int
len = do
SmallMutableArray s Header
dst <- ST s (SmallMutableArray s Header)
-> Parser () s (SmallMutableArray s Header)
forall s a e. ST s a -> Parser e s a
Parser.effect (Int -> Header -> ST s (SmallMutableArray (PrimState (ST s)) Header)
forall (m :: * -> *) a.
PrimMonad m =>
Int -> a -> m (SmallMutableArray (PrimState m) a)
PM.newSmallArray Int
len Header
uninitializedHeader)
let go :: Int -> Parser () s (SmallArray Header)
go !Int
ix = if Int
ix Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
len
then do
Header
a <- Parser () s Header
forall s. Parser () s Header
parserHeader
ST s () -> Parser () s ()
forall s a e. ST s a -> Parser e s a
Parser.effect (SmallMutableArray (PrimState (ST s)) Header
-> Int -> Header -> ST s ()
forall (m :: * -> *) a.
PrimMonad m =>
SmallMutableArray (PrimState m) a -> Int -> a -> m ()
PM.writeSmallArray SmallMutableArray s Header
SmallMutableArray (PrimState (ST s)) Header
dst Int
ix Header
a)
Int -> Parser () s (SmallArray Header)
go (Int
ix Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
else ST s (SmallArray Header) -> Parser () s (SmallArray Header)
forall s a e. ST s a -> Parser e s a
Parser.effect (SmallMutableArray (PrimState (ST s)) Header
-> ST s (SmallArray Header)
forall (m :: * -> *) a.
PrimMonad m =>
SmallMutableArray (PrimState m) a -> m (SmallArray a)
PM.unsafeFreezeSmallArray SmallMutableArray s Header
SmallMutableArray (PrimState (ST s)) Header
dst)
Int -> Parser () s (SmallArray Header)
go (Int
0 :: Int)
uninitializedHeader :: Header
= String -> Header
forall a. String -> a
errorWithoutStackTrace String
"Kafka.Record.Response.parserHeaders: implementation mistake"
parserHeader :: Parser () s Header
= do
Int
keyLength <- () -> Parser () s Int
forall e s. e -> Parser e s Int
Kafka.Parser.varIntNative ()
Bool -> Parser () s () -> Parser () s ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
keyLength Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0) (() -> Parser () s ()
forall e s a. e -> Parser e s a
Parser.fail ())
Bytes
keyBytes <- () -> Int -> Parser () s Bytes
forall e s. e -> Int -> Parser e s Bytes
Parser.take () Int
keyLength
case ShortByteString -> Maybe ShortText
TS.fromShortByteString (Bytes -> ShortByteString
Bytes.toShortByteString Bytes
keyBytes) of
Maybe ShortText
Nothing -> () -> Parser () s Header
forall e s a. e -> Parser e s a
Parser.fail ()
Just ShortText
keyShort -> do
Int
valueLength <- () -> Parser () s Int
forall e s. e -> Parser e s Int
Kafka.Parser.varIntNative ()
Bool -> Parser () s () -> Parser () s ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
valueLength Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0) (() -> Parser () s ()
forall e s a. e -> Parser e s a
Parser.fail ())
Bytes
value <- () -> Int -> Parser () s Bytes
forall e s. e -> Int -> Parser e s Bytes
Parser.take () Int
valueLength
Header -> Parser () s Header
forall a. a -> Parser () s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Header{$sel:key:Header :: Text
key=ShortText -> Text
TS.toText ShortText
keyShort,Bytes
$sel:value:Header :: Bytes
value :: Bytes
value}