{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
module Kafka.Avro.Decode
(
DecodeError(..)
, decode
, decodeWithSchema
, extractSchemaId
) where
import Control.Arrow (left)
import Control.Monad.IO.Class (MonadIO)
import Control.Monad.Trans.Except
import Data.Avro (FromAvro, HasAvroSchema (..), Schema, decodeValueWithSchema, deconflict)
import Data.Bits (shiftL)
import Data.ByteString.Lazy (ByteString)
import qualified Data.ByteString.Lazy as BL hiding (zipWith)
import Data.Int
import Data.Tagged (untag)
import Kafka.Avro.SchemaRegistry
data DecodeError = DecodeRegistryError SchemaRegistryError
| BadPayloadNoSchemaId
| DecodeError Schema String
| IncompatibleSchema Schema String
deriving (Int -> DecodeError -> ShowS
[DecodeError] -> ShowS
DecodeError -> String
(Int -> DecodeError -> ShowS)
-> (DecodeError -> String)
-> ([DecodeError] -> ShowS)
-> Show DecodeError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [DecodeError] -> ShowS
$cshowList :: [DecodeError] -> ShowS
show :: DecodeError -> String
$cshow :: DecodeError -> String
showsPrec :: Int -> DecodeError -> ShowS
$cshowsPrec :: Int -> DecodeError -> ShowS
Show, DecodeError -> DecodeError -> Bool
(DecodeError -> DecodeError -> Bool)
-> (DecodeError -> DecodeError -> Bool) -> Eq DecodeError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: DecodeError -> DecodeError -> Bool
$c/= :: DecodeError -> DecodeError -> Bool
== :: DecodeError -> DecodeError -> Bool
$c== :: DecodeError -> DecodeError -> Bool
Eq)
decode :: forall a m. (MonadIO m, HasAvroSchema a, FromAvro a)
=> SchemaRegistry
-> ByteString
-> m (Either DecodeError a)
decode :: SchemaRegistry -> ByteString -> m (Either DecodeError a)
decode SchemaRegistry
sr = SchemaRegistry -> Schema -> ByteString -> m (Either DecodeError a)
forall a (m :: * -> *).
(MonadIO m, FromAvro a) =>
SchemaRegistry -> Schema -> ByteString -> m (Either DecodeError a)
decodeWithSchema SchemaRegistry
sr (Tagged a Schema -> Schema
forall k (s :: k) b. Tagged s b -> b
untag @a Tagged a Schema
forall a. HasAvroSchema a => Tagged a Schema
schema)
{-# INLINE decode #-}
decodeWithSchema :: forall a m. (MonadIO m, FromAvro a)
=> SchemaRegistry
-> Schema
-> ByteString
-> m (Either DecodeError a)
decodeWithSchema :: SchemaRegistry -> Schema -> ByteString -> m (Either DecodeError a)
decodeWithSchema SchemaRegistry
sr Schema
readerSchema ByteString
bs =
case Either DecodeError (SchemaId, ByteString)
schemaData of
Left DecodeError
err -> Either DecodeError a -> m (Either DecodeError a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either DecodeError a -> m (Either DecodeError a))
-> Either DecodeError a -> m (Either DecodeError a)
forall a b. (a -> b) -> a -> b
$ DecodeError -> Either DecodeError a
forall a b. a -> Either a b
Left DecodeError
err
Right (SchemaId
sid, ByteString
payload) -> ExceptT DecodeError m a -> m (Either DecodeError a)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT DecodeError m a -> m (Either DecodeError a))
-> ExceptT DecodeError m a -> m (Either DecodeError a)
forall a b. (a -> b) -> a -> b
$ do
Schema
writerSchema <- (SchemaRegistryError -> DecodeError)
-> m (Either SchemaRegistryError Schema)
-> ExceptT DecodeError m Schema
forall (m :: * -> *) e e' a.
Functor m =>
(e -> e') -> m (Either e a) -> ExceptT e' m a
withError SchemaRegistryError -> DecodeError
DecodeRegistryError (SchemaRegistry -> SchemaId -> m (Either SchemaRegistryError Schema)
forall (m :: * -> *).
MonadIO m =>
SchemaRegistry -> SchemaId -> m (Either SchemaRegistryError Schema)
loadSchema SchemaRegistry
sr SchemaId
sid)
ReadSchema
readSchema <- (String -> DecodeError)
-> Either String ReadSchema -> ExceptT DecodeError m ReadSchema
forall (m :: * -> *) e e' a.
Applicative m =>
(e -> e') -> Either e a -> ExceptT e' m a
withPureError (Schema -> String -> DecodeError
IncompatibleSchema Schema
writerSchema) (Either String ReadSchema -> ExceptT DecodeError m ReadSchema)
-> Either String ReadSchema -> ExceptT DecodeError m ReadSchema
forall a b. (a -> b) -> a -> b
$ Schema -> Schema -> Either String ReadSchema
deconflict Schema
writerSchema Schema
readerSchema
(String -> DecodeError)
-> Either String a -> ExceptT DecodeError m a
forall (m :: * -> *) e e' a.
Applicative m =>
(e -> e') -> Either e a -> ExceptT e' m a
withPureError (Schema -> String -> DecodeError
DecodeError Schema
writerSchema) (ReadSchema -> ByteString -> Either String a
forall a. FromAvro a => ReadSchema -> ByteString -> Either String a
decodeValueWithSchema ReadSchema
readSchema ByteString
payload)
where
schemaData :: Either DecodeError (SchemaId, ByteString)
schemaData = Either DecodeError (SchemaId, ByteString)
-> ((SchemaId, ByteString)
-> Either DecodeError (SchemaId, ByteString))
-> Maybe (SchemaId, ByteString)
-> Either DecodeError (SchemaId, ByteString)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (DecodeError -> Either DecodeError (SchemaId, ByteString)
forall a b. a -> Either a b
Left DecodeError
BadPayloadNoSchemaId) (SchemaId, ByteString) -> Either DecodeError (SchemaId, ByteString)
forall a b. b -> Either a b
Right (ByteString -> Maybe (SchemaId, ByteString)
extractSchemaId ByteString
bs)
withError :: (e -> e') -> m (Either e a) -> ExceptT e' m a
withError e -> e'
f = (e -> e') -> ExceptT e m a -> ExceptT e' m a
forall (m :: * -> *) e e' a.
Functor m =>
(e -> e') -> ExceptT e m a -> ExceptT e' m a
withExceptT e -> e'
f (ExceptT e m a -> ExceptT e' m a)
-> (m (Either e a) -> ExceptT e m a)
-> m (Either e a)
-> ExceptT e' m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m (Either e a) -> ExceptT e m a
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT
withPureError :: (e -> e') -> Either e a -> ExceptT e' m a
withPureError e -> e'
f = (e -> e') -> m (Either e a) -> ExceptT e' m a
forall (m :: * -> *) e e' a.
Functor m =>
(e -> e') -> m (Either e a) -> ExceptT e' m a
withError e -> e'
f (m (Either e a) -> ExceptT e' m a)
-> (Either e a -> m (Either e a)) -> Either e a -> ExceptT e' m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Either e a -> m (Either e a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure
extractSchemaId :: ByteString -> Maybe (SchemaId, ByteString)
ByteString
bs = do
(Word8
_ , ByteString
b0) <- ByteString -> Maybe (Word8, ByteString)
BL.uncons ByteString
bs
(Word8
w1, ByteString
b1) <- ByteString -> Maybe (Word8, ByteString)
BL.uncons ByteString
b0
(Word8
w2, ByteString
b2) <- ByteString -> Maybe (Word8, ByteString)
BL.uncons ByteString
b1
(Word8
w3, ByteString
b3) <- ByteString -> Maybe (Word8, ByteString)
BL.uncons ByteString
b2
(Word8
w4, ByteString
b4) <- ByteString -> Maybe (Word8, ByteString)
BL.uncons ByteString
b3
let ints :: [Int32]
ints = Word8 -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word8 -> Int32) -> [Word8] -> [Int32]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Word8
w4, Word8
w3, Word8
w2, Word8
w1] :: [Int32]
let int :: Int32
int = [Int32] -> Int32
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ([Int32] -> Int32) -> [Int32] -> Int32
forall a b. (a -> b) -> a -> b
$ (Int32 -> Int -> Int32) -> [Int32] -> [Int] -> [Int32]
forall a b c. (a -> b -> c) -> [a] -> [b] -> [c]
zipWith Int32 -> Int -> Int32
forall a. Bits a => a -> Int -> a
shiftL [Int32]
ints [Int
0, Int
8, Int
16, Int
24]
(SchemaId, ByteString) -> Maybe (SchemaId, ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return (Int32 -> SchemaId
SchemaId Int32
int, ByteString
b4)