{-# language DataKinds #-}
{-# language DuplicateRecordFields #-}
{-# language NamedFieldPuns #-}
{-# language TypeApplications #-}
module Kafka.Record.Request
( Record(..)
, Header(..)
, toChunks
, toChunksOnto
) where
import Data.Bytes (Bytes)
import Data.Bytes.Chunks (Chunks(ChunksCons,ChunksNil))
import Data.Int (Int32,Int64)
import Data.Primitive (SmallArray)
import Data.Text (Text)
import Kafka.Builder (Builder)
import qualified Arithmetic.Nat as Nat
import qualified Data.Bytes as Bytes
import qualified Data.Bytes.Chunks as Chunks
import qualified Data.Bytes.Text.Utf8 as Utf8
import qualified Data.Primitive as PM
import qualified Kafka.Builder as Builder
import qualified Kafka.Builder.Bounded as Bounded
data Record = Record
{ Record -> Int64
timestampDelta :: !Int64
, Record -> Int32
offsetDelta :: !Int32
, Record -> Bytes
key :: {-# UNPACK #-} !Bytes
, Record -> Bytes
value :: {-# UNPACK #-} !Bytes
, :: {-# UNPACK #-} !(SmallArray Header)
}
toChunks :: Record -> Chunks
toChunks :: Record -> Chunks
toChunks Record
r = Record -> Chunks -> Chunks
toChunksOnto Record
r Chunks
ChunksNil
toChunksOnto :: Record -> Chunks -> Chunks
toChunksOnto :: Record -> Chunks -> Chunks
toChunksOnto Record
r Chunks
c = Bytes -> Chunks -> Chunks
ChunksCons
(ByteArray -> Bytes
Bytes.fromByteArray (Nat 10 -> Builder 10 -> ByteArray
forall (n :: Nat). Nat n -> Builder n -> ByteArray
Bounded.run Nat 10
forall (n :: Nat). KnownNat n => Nat n
Nat.constant (Int -> Builder 10
Bounded.varIntNative (Int -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n))))
Chunks
recordChunks
where
(Int
n,Chunks
recordChunks) = Int -> Builder -> Chunks -> (Int, Chunks)
Builder.runOntoLength Int
128 (Record -> Builder
encodeWithoutLength Record
r) Chunks
c
encodeWithoutLength :: Record -> Builder
encodeWithoutLength :: Record -> Builder
encodeWithoutLength Record{Int64
$sel:timestampDelta:Record :: Record -> Int64
timestampDelta :: Int64
timestampDelta,Int32
$sel:offsetDelta:Record :: Record -> Int32
offsetDelta :: Int32
offsetDelta,Bytes
$sel:key:Record :: Record -> Bytes
key :: Bytes
key,Bytes
$sel:value:Record :: Record -> Bytes
value :: Bytes
value,SmallArray Header
$sel:headers:Record :: Record -> SmallArray Header
headers :: SmallArray Header
headers} =
Word8 -> Builder
Builder.word8 Word8
0x00
Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
Int64 -> Builder
Builder.varInt64 Int64
timestampDelta
Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
Int32 -> Builder
Builder.varInt32 Int32
offsetDelta
Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
Int -> Builder
Builder.varIntNative
(case Bytes -> Int
Bytes.length Bytes
key of
Int
0 -> (-Int
1)
Int
klen -> Int
klen
)
Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
Bytes -> Builder
Builder.copy Bytes
key
Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
Int -> Builder
Builder.varIntNative (Bytes -> Int
Bytes.length Bytes
value)
Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
Bytes -> Builder
Builder.bytes Bytes
value
Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
Int -> Builder
Builder.varIntNative (SmallArray Header -> Int
forall a. SmallArray a -> Int
PM.sizeofSmallArray SmallArray Header
headers)
Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
(Header -> Builder) -> SmallArray Header -> Builder
forall m a. Monoid m => (a -> m) -> SmallArray a -> m
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap Header -> Builder
encodeHeader SmallArray Header
headers
data =
{ :: {-# UNPACK #-} !Text
, :: {-# UNPACK #-} !Bytes
}
encodeHeader :: Header -> Builder
Header{Text
$sel:key:Header :: Header -> Text
key :: Text
key,Bytes
$sel:value:Header :: Header -> Bytes
value :: Bytes
value} =
Int -> Builder
Builder.varIntNative (Bytes -> Int
Bytes.length Bytes
keyBytes)
Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
Bytes -> Builder
Builder.copy Bytes
keyBytes
Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
Int -> Builder
Builder.varIntNative (Bytes -> Int
Bytes.length Bytes
value)
Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
Bytes -> Builder
Builder.copy Bytes
value
where
keyBytes :: Bytes
keyBytes = Text -> Bytes
Utf8.fromText Text
key