{-# language BangPatterns #-}
{-# language DuplicateRecordFields #-}
{-# language NamedFieldPuns #-}

module Kafka.Subscription.Response.V1
  ( Subscription(..)
  , Ownership(..)
  , parser
  , parserOwnership
  , decode
  ) where

import Control.Monad (when)
import Data.Bytes (Bytes)
import Data.Bytes.Parser (Parser)
import Data.Int (Int16,Int32)
import Data.Primitive (SmallArray)
import Data.Text (Text)
import Kafka.Parser.Context (Context)
import Kafka.Subscription.Request.V1 (Subscription(..),Ownership(..))

import qualified Data.Bytes.Parser as Parser
import qualified Kafka.Parser.Context as Ctx
import qualified Kafka.Parser

parserOwnership :: Context -> Parser Context s Ownership
parserOwnership :: forall s. Context -> Parser Context s Ownership
parserOwnership Context
ctx = do
  Text
topic <- Context -> Parser Context s Text
forall s. Context -> Parser Context s Text
Kafka.Parser.string (Field -> Context -> Context
Ctx.Field Field
Ctx.Topic Context
ctx)
  PrimArray Int32
partitions <- Context -> Parser Context s (PrimArray Int32)
forall s. Context -> Parser Context s (PrimArray Int32)
Kafka.Parser.int32Array
    (Field -> Context -> Context
Ctx.Field Field
Ctx.Partitions Context
ctx)
  Ownership -> Parser Context s Ownership
forall a. a -> Parser Context s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Ownership{Text
topic :: Text
$sel:topic:Ownership :: Text
topic,PrimArray Int32
partitions :: PrimArray Int32
$sel:partitions:Ownership :: PrimArray Int32
partitions}

parser :: Context -> Parser Context s Subscription
parser :: forall s. Context -> Parser Context s Subscription
parser Context
ctx = do
  Int16
version <- Context -> Parser Context s Int16
forall e s. e -> Parser e s Int16
Kafka.Parser.int16 (Field -> Context -> Context
Ctx.Field Field
Ctx.Version Context
ctx)
  -- Since this is the V1 subscription, we require that the serialized
  -- message indicated V1 or greater.
  Bool -> Parser Context s () -> Parser Context s ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int16
version Int16 -> Int16 -> Bool
forall a. Ord a => a -> a -> Bool
<= Int16
0) (Context -> Parser Context s ()
forall e s a. e -> Parser e s a
Parser.fail (Field -> Context -> Context
Ctx.Field Field
Ctx.Version Context
ctx))
  SmallArray Text
topics <- (Context -> Parser Context s Text)
-> Context -> Parser Context s (SmallArray Text)
forall s a.
(Context -> Parser Context s a)
-> Context -> Parser Context s (SmallArray a)
Kafka.Parser.array Context -> Parser Context s Text
forall s. Context -> Parser Context s Text
Kafka.Parser.string
    (Field -> Context -> Context
Ctx.Field Field
Ctx.Topics Context
ctx) 
  Bytes
userData <- Context -> Parser Context s Bytes
forall s. Context -> Parser Context s Bytes
Kafka.Parser.nonCompactBytes
    (Field -> Context -> Context
Ctx.Field Field
Ctx.UserData Context
ctx)
  SmallArray Ownership
ownedPartitions <- (Context -> Parser Context s Ownership)
-> Context -> Parser Context s (SmallArray Ownership)
forall s a.
(Context -> Parser Context s a)
-> Context -> Parser Context s (SmallArray a)
Kafka.Parser.array Context -> Parser Context s Ownership
forall s. Context -> Parser Context s Ownership
parserOwnership
    (Field -> Context -> Context
Ctx.Field Field
Ctx.OwnedPartitions Context
ctx) 
  Subscription -> Parser Context s Subscription
forall a. a -> Parser Context s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Subscription{SmallArray Text
topics :: SmallArray Text
$sel:topics:Subscription :: SmallArray Text
topics,Bytes
userData :: Bytes
$sel:userData:Subscription :: Bytes
userData,SmallArray Ownership
ownedPartitions :: SmallArray Ownership
$sel:ownedPartitions:Subscription :: SmallArray Ownership
ownedPartitions}

-- | Decode allows trailing content since we should be able
-- to parse newer versions of subscription and discard the
-- parts that we do not understand.
decode :: Bytes -> Either Context Subscription
decode :: Bytes -> Either Context Subscription
decode !Bytes
b = (forall s. Parser Context s Subscription)
-> Bytes -> Either Context Subscription
forall e a. (forall s. Parser e s a) -> Bytes -> Either e a
Parser.parseBytesEither (Context -> Parser Context s Subscription
forall s. Context -> Parser Context s Subscription
parser Context
Ctx.Top) Bytes
b