{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications    #-}
module Kafka.Avro.Decode
(
  DecodeError(..)
, decode
, decodeWithSchema
, extractSchemaId
) where

import           Control.Arrow              (left)
import           Control.Monad.IO.Class     (MonadIO)
import           Control.Monad.Trans.Except
import           Data.Avro                  (FromAvro, HasAvroSchema (..), Schema, decodeValueWithSchema, deconflict)
import           Data.Bits                  (shiftL)
import           Data.ByteString.Lazy       (ByteString)
import qualified Data.ByteString.Lazy       as BL hiding (zipWith)
import           Data.Int
import           Data.Tagged                (untag)
import           Kafka.Avro.SchemaRegistry

data DecodeError = DecodeRegistryError SchemaRegistryError
                 | BadPayloadNoSchemaId
                 | DecodeError Schema String
                 | IncompatibleSchema Schema String
                 deriving (Int -> DecodeError -> ShowS
[DecodeError] -> ShowS
DecodeError -> String
(Int -> DecodeError -> ShowS)
-> (DecodeError -> String)
-> ([DecodeError] -> ShowS)
-> Show DecodeError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [DecodeError] -> ShowS
$cshowList :: [DecodeError] -> ShowS
show :: DecodeError -> String
$cshow :: DecodeError -> String
showsPrec :: Int -> DecodeError -> ShowS
$cshowsPrec :: Int -> DecodeError -> ShowS
Show, DecodeError -> DecodeError -> Bool
(DecodeError -> DecodeError -> Bool)
-> (DecodeError -> DecodeError -> Bool) -> Eq DecodeError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: DecodeError -> DecodeError -> Bool
$c/= :: DecodeError -> DecodeError -> Bool
== :: DecodeError -> DecodeError -> Bool
$c== :: DecodeError -> DecodeError -> Bool
Eq)

-- | Decodes a provided Avro-encoded value.
-- The serialised value is expected to be in a "confluent-compatible" format
-- where the "real" value bytestring is prepended with extra 5 bytes:
-- a "magic" byte and 4 bytes representing the schema ID.
decode :: forall a m. (MonadIO m, HasAvroSchema a, FromAvro a)
  => SchemaRegistry
  -> ByteString
  -> m (Either DecodeError a)
decode :: SchemaRegistry -> ByteString -> m (Either DecodeError a)
decode SchemaRegistry
sr = SchemaRegistry -> Schema -> ByteString -> m (Either DecodeError a)
forall a (m :: * -> *).
(MonadIO m, FromAvro a) =>
SchemaRegistry -> Schema -> ByteString -> m (Either DecodeError a)
decodeWithSchema SchemaRegistry
sr (Tagged a Schema -> Schema
forall k (s :: k) b. Tagged s b -> b
untag @a Tagged a Schema
forall a. HasAvroSchema a => Tagged a Schema
schema)
{-# INLINE decode #-}

decodeWithSchema :: forall a m. (MonadIO m, FromAvro a)
  => SchemaRegistry
  -> Schema
  -> ByteString
  -> m (Either DecodeError a)
decodeWithSchema :: SchemaRegistry -> Schema -> ByteString -> m (Either DecodeError a)
decodeWithSchema SchemaRegistry
sr Schema
readerSchema ByteString
bs =
  case Either DecodeError (SchemaId, ByteString)
schemaData of
    Left DecodeError
err -> Either DecodeError a -> m (Either DecodeError a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either DecodeError a -> m (Either DecodeError a))
-> Either DecodeError a -> m (Either DecodeError a)
forall a b. (a -> b) -> a -> b
$ DecodeError -> Either DecodeError a
forall a b. a -> Either a b
Left DecodeError
err
    Right (SchemaId
sid, ByteString
payload) -> ExceptT DecodeError m a -> m (Either DecodeError a)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT DecodeError m a -> m (Either DecodeError a))
-> ExceptT DecodeError m a -> m (Either DecodeError a)
forall a b. (a -> b) -> a -> b
$ do
      Schema
writerSchema  <- (SchemaRegistryError -> DecodeError)
-> m (Either SchemaRegistryError Schema)
-> ExceptT DecodeError m Schema
forall (m :: * -> *) e e' a.
Functor m =>
(e -> e') -> m (Either e a) -> ExceptT e' m a
withError SchemaRegistryError -> DecodeError
DecodeRegistryError (SchemaRegistry -> SchemaId -> m (Either SchemaRegistryError Schema)
forall (m :: * -> *).
MonadIO m =>
SchemaRegistry -> SchemaId -> m (Either SchemaRegistryError Schema)
loadSchema SchemaRegistry
sr SchemaId
sid)
      ReadSchema
readSchema    <- (String -> DecodeError)
-> Either String ReadSchema -> ExceptT DecodeError m ReadSchema
forall (m :: * -> *) e e' a.
Applicative m =>
(e -> e') -> Either e a -> ExceptT e' m a
withPureError (Schema -> String -> DecodeError
IncompatibleSchema Schema
writerSchema) (Either String ReadSchema -> ExceptT DecodeError m ReadSchema)
-> Either String ReadSchema -> ExceptT DecodeError m ReadSchema
forall a b. (a -> b) -> a -> b
$ Schema -> Schema -> Either String ReadSchema
deconflict Schema
writerSchema Schema
readerSchema
      (String -> DecodeError)
-> Either String a -> ExceptT DecodeError m a
forall (m :: * -> *) e e' a.
Applicative m =>
(e -> e') -> Either e a -> ExceptT e' m a
withPureError (Schema -> String -> DecodeError
DecodeError Schema
writerSchema) (ReadSchema -> ByteString -> Either String a
forall a. FromAvro a => ReadSchema -> ByteString -> Either String a
decodeValueWithSchema ReadSchema
readSchema ByteString
payload)
  where
    schemaData :: Either DecodeError (SchemaId, ByteString)
schemaData = Either DecodeError (SchemaId, ByteString)
-> ((SchemaId, ByteString)
    -> Either DecodeError (SchemaId, ByteString))
-> Maybe (SchemaId, ByteString)
-> Either DecodeError (SchemaId, ByteString)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (DecodeError -> Either DecodeError (SchemaId, ByteString)
forall a b. a -> Either a b
Left DecodeError
BadPayloadNoSchemaId) (SchemaId, ByteString) -> Either DecodeError (SchemaId, ByteString)
forall a b. b -> Either a b
Right (ByteString -> Maybe (SchemaId, ByteString)
extractSchemaId ByteString
bs)
    withError :: (e -> e') -> m (Either e a) -> ExceptT e' m a
withError e -> e'
f = (e -> e') -> ExceptT e m a -> ExceptT e' m a
forall (m :: * -> *) e e' a.
Functor m =>
(e -> e') -> ExceptT e m a -> ExceptT e' m a
withExceptT e -> e'
f (ExceptT e m a -> ExceptT e' m a)
-> (m (Either e a) -> ExceptT e m a)
-> m (Either e a)
-> ExceptT e' m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m (Either e a) -> ExceptT e m a
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT
    withPureError :: (e -> e') -> Either e a -> ExceptT e' m a
withPureError e -> e'
f = (e -> e') -> m (Either e a) -> ExceptT e' m a
forall (m :: * -> *) e e' a.
Functor m =>
(e -> e') -> m (Either e a) -> ExceptT e' m a
withError e -> e'
f (m (Either e a) -> ExceptT e' m a)
-> (Either e a -> m (Either e a)) -> Either e a -> ExceptT e' m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Either e a -> m (Either e a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure

extractSchemaId :: ByteString -> Maybe (SchemaId, ByteString)
extractSchemaId :: ByteString -> Maybe (SchemaId, ByteString)
extractSchemaId ByteString
bs = do
  (Word8
_ , ByteString
b0) <- ByteString -> Maybe (Word8, ByteString)
BL.uncons ByteString
bs
  (Word8
w1, ByteString
b1) <- ByteString -> Maybe (Word8, ByteString)
BL.uncons ByteString
b0
  (Word8
w2, ByteString
b2) <- ByteString -> Maybe (Word8, ByteString)
BL.uncons ByteString
b1
  (Word8
w3, ByteString
b3) <- ByteString -> Maybe (Word8, ByteString)
BL.uncons ByteString
b2
  (Word8
w4, ByteString
b4) <- ByteString -> Maybe (Word8, ByteString)
BL.uncons ByteString
b3
  let ints :: [Int32]
ints =  Word8 -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word8 -> Int32) -> [Word8] -> [Int32]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Word8
w4, Word8
w3, Word8
w2, Word8
w1] :: [Int32]
  let int :: Int32
int  =  [Int32] -> Int32
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ([Int32] -> Int32) -> [Int32] -> Int32
forall a b. (a -> b) -> a -> b
$ (Int32 -> Int -> Int32) -> [Int32] -> [Int] -> [Int32]
forall a b c. (a -> b -> c) -> [a] -> [b] -> [c]
zipWith Int32 -> Int -> Int32
forall a. Bits a => a -> Int -> a
shiftL [Int32]
ints [Int
0, Int
8, Int
16, Int
24]
  (SchemaId, ByteString) -> Maybe (SchemaId, ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return (Int32 -> SchemaId
SchemaId Int32
int, ByteString
b4)