{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
module Data.Avro.Encode
(
getSchema
, encodeAvro
, encodeContainer
, newSyncBytes
, encodeContainerWithSync
, containerHeaderWithSync
, packContainerBlocks
, packContainerBlocksWithSync
, packContainerValues
, packContainerValuesWithSync
, EncodeAvro(..)
, Zag(..)
, putAvro
) where
import qualified Data.Aeson as A
import qualified Data.Array as Ar
import qualified Data.Binary.IEEE754 as IEEE
import Data.Bits
import qualified Data.ByteString as B
import Data.ByteString.Builder
import Data.ByteString.Lazy as BL
import Data.ByteString.Lazy.Char8 ()
import qualified Data.Foldable as F
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HashMap
import Data.Int
import Data.Ix (Ix)
import Data.List as DL
import Data.List.NonEmpty (NonEmpty (..))
import qualified Data.List.NonEmpty as NE
import Data.Maybe (catMaybes, mapMaybe, fromJust)
import Data.Monoid
import Data.Proxy
import qualified Data.Set as S
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import qualified Data.Text.Lazy as TL
import qualified Data.Text.Lazy.Encoding as TL
import qualified Data.Time as Time
import qualified Data.UUID as UUID
import qualified Data.Vector as V
import qualified Data.Vector.Unboxed as U
import Data.Word
import GHC.TypeLits
import Prelude as P
import System.Random.TF.Init (initTFGen)
import System.Random.TF.Instances (randoms)
import Data.Avro.Codec
import Data.Avro.EncodeRaw
import Data.Avro.HasAvroSchema
import Data.Avro.Schema as S
import Data.Avro.Types as T
import Data.Avro.Types.Decimal as D
import Data.Avro.Types.Time
import Data.Avro.Zag
import Data.Avro.Zig
encodeAvro :: EncodeAvro a => a -> BL.ByteString
encodeAvro = toLazyByteString . putAvro
newSyncBytes :: IO BL.ByteString
newSyncBytes = BL.pack . DL.take 16 . randoms <$> initTFGen
encodeContainer :: EncodeAvro a => Codec -> Schema -> [[a]] -> IO BL.ByteString
encodeContainer codec sch xss =
do sync <- newSyncBytes
return $ encodeContainerWithSync codec sch sync xss
containerHeaderWithSync :: Codec -> Schema -> BL.ByteString -> Builder
containerHeaderWithSync codec sch syncBytes =
lazyByteString avroMagicBytes <> putAvro headers <> lazyByteString syncBytes
where
avroMagicBytes :: BL.ByteString
avroMagicBytes = "Obj" <> BL.pack [1]
headers :: HashMap Text BL.ByteString
headers =
HashMap.fromList
[
("avro.schema", A.encode sch)
, ("avro.codec", BL.fromStrict (codecName codec))
]
encodeContainerWithSync :: EncodeAvro a => Codec -> Schema -> BL.ByteString -> [[a]] -> BL.ByteString
encodeContainerWithSync codec sch syncBytes xss =
toLazyByteString $
containerHeaderWithSync codec sch syncBytes <>
foldMap putBlocks xss
where
putBlocks ys =
let nrObj = P.length ys
nrBytes = BL.length theBytes
theBytes = codecCompress codec $ toLazyByteString $ foldMap putAvro ys
in putAvro nrObj <>
putAvro nrBytes <>
lazyByteString theBytes <>
lazyByteString syncBytes
packContainerBlocks :: Codec -> Schema -> [(Int, BL.ByteString)] -> IO BL.ByteString
packContainerBlocks codec sch blocks = do
sync <- newSyncBytes
pure $ packContainerBlocksWithSync codec sch sync blocks
packContainerBlocksWithSync :: Codec -> Schema -> BL.ByteString -> [(Int, BL.ByteString)] -> BL.ByteString
packContainerBlocksWithSync codec sch syncBytes blocks =
toLazyByteString $
containerHeaderWithSync codec sch syncBytes <>
foldMap putBlock blocks
where
putBlock (nrObj, bytes) =
let compressed = codecCompress codec bytes in
putAvro nrObj <>
putAvro (BL.length compressed) <>
lazyByteString compressed <>
lazyByteString syncBytes
packContainerValues :: Codec -> Schema -> [[BL.ByteString]] -> IO BL.ByteString
packContainerValues codec sch values = do
sync <- newSyncBytes
pure $ packContainerValuesWithSync codec sch sync values
packContainerValuesWithSync :: Codec -> Schema -> BL.ByteString -> [[BL.ByteString]] -> BL.ByteString
packContainerValuesWithSync codec sch syncBytes values =
toLazyByteString $
containerHeaderWithSync codec sch syncBytes <>
foldMap putBlock values
where
putBlock ys =
let nrObj = P.length ys
nrBytes = BL.length theBytes
theBytes = codecCompress codec $ toLazyByteString $ mconcat $ lazyByteString <$> ys
in putAvro nrObj <>
putAvro nrBytes <>
lazyByteString theBytes <>
lazyByteString syncBytes
putAvro :: EncodeAvro a => a -> Builder
putAvro = fst . runAvro . avro
getSchema :: forall a. EncodeAvro a => a -> Schema
getSchema = snd . runAvro . avro
getType :: EncodeAvro a => Proxy a -> Schema
getType = getSchema . (asProxyTypeOf undefined)
newtype AvroM = AvroM { runAvro :: (Builder, Schema) }
class EncodeAvro a where
avro :: a -> AvroM
avroInt :: forall a. (FiniteBits a, Integral a, EncodeRaw a) => a -> AvroM
avroInt n = AvroM (encodeRaw n, S.Int Nothing)
avroLong :: forall a. (FiniteBits a, Integral a, EncodeRaw a) => a -> AvroM
avroLong n = AvroM (encodeRaw n, S.Long Nothing)
putI :: Int -> Builder
putI = encodeRaw
instance EncodeAvro Int where
avro = avroInt
instance EncodeAvro Int8 where
avro = avroInt
instance EncodeAvro Int16 where
avro = avroInt
instance EncodeAvro Int32 where
avro = avroInt
instance EncodeAvro Int64 where
avro = avroInt
instance EncodeAvro Word8 where
avro = avroInt
instance EncodeAvro Word16 where
avro = avroInt
instance EncodeAvro Word32 where
avro = avroLong
instance EncodeAvro Word64 where
avro = avroLong
instance EncodeAvro Text where
avro t =
let bs = T.encodeUtf8 t
in AvroM (encodeRaw (B.length bs) <> byteString bs, S.String')
instance EncodeAvro TL.Text where
avro t =
let bs = TL.encodeUtf8 t
in AvroM (encodeRaw (BL.length bs) <> lazyByteString bs, S.String')
instance EncodeAvro ByteString where
avro bs = AvroM (encodeRaw (BL.length bs) <> lazyByteString bs, S.Bytes Nothing)
instance EncodeAvro B.ByteString where
avro bs = AvroM (encodeRaw (B.length bs) <> byteString bs, S.Bytes Nothing)
instance EncodeAvro String where
avro s = let t = T.pack s in avro t
instance EncodeAvro Double where
avro d = AvroM (word64LE (IEEE.doubleToWord d), S.Double)
instance EncodeAvro Float where
avro d = AvroM (word32LE (IEEE.floatToWord d), S.Float)
instance (KnownNat p, KnownNat s) => EncodeAvro (D.Decimal p s) where
avro d = AvroM (encodeRaw val, S.Long (Just (DecimalL (S.Decimal pp ss))))
where ss = natVal (Proxy :: Proxy s)
pp = natVal (Proxy :: Proxy p)
val :: Int = fromJust $ D.underlyingValue d
instance EncodeAvro UUID.UUID where
avro d =
let bs = T.encodeUtf8 (UUID.toText d)
in AvroM (encodeRaw (B.length bs) <> byteString bs, S.String (Just UUID))
instance EncodeAvro Time.Day where
avro d = AvroM ( encodeRaw (fromIntegral $ daysSinceEpoch d :: Int)
, S.Int (Just Date) )
instance EncodeAvro Time.DiffTime where
avro d = AvroM ( encodeRaw (fromIntegral $ diffTimeToMicros d :: Int)
, S.Long (Just TimeMicros) )
long0 :: Builder
long0 = encodeRaw (0 :: Word64)
instance EncodeAvro a => EncodeAvro [a] where
avro a = AvroM ( if DL.null a then long0 else encodeRaw (F.length a) <> foldMap putAvro a <> long0
, S.Array (getType (Proxy :: Proxy a))
)
instance (Ix i, EncodeAvro a) => EncodeAvro (Ar.Array i a) where
avro a = AvroM ( if F.length a == 0 then long0 else encodeRaw (F.length a) <> foldMap putAvro a <> long0
, S.Array (getType (Proxy :: Proxy a))
)
instance EncodeAvro a => EncodeAvro (V.Vector a) where
avro a = AvroM ( if V.null a then long0 else encodeRaw (F.length a) <> foldMap putAvro a <> long0
, S.Array (getType (Proxy :: Proxy a))
)
instance (U.Unbox a, EncodeAvro a) => EncodeAvro (U.Vector a) where
avro a = AvroM ( if U.null a then long0 else encodeRaw (U.length a) <> foldMap putAvro (U.toList a) <> long0
, S.Array (getType (Proxy :: Proxy a))
)
instance EncodeAvro a => EncodeAvro (S.Set a) where
avro a = AvroM ( if S.null a then long0 else encodeRaw (F.length a) <> foldMap putAvro a <> long0
, S.Array (getType (Proxy :: Proxy a))
)
instance EncodeAvro a => EncodeAvro (HashMap Text a) where
avro hm = AvroM ( if HashMap.null hm then long0 else putI (F.length hm) <> foldMap putKV (HashMap.toList hm) <> long0
, S.Map (getType (Proxy :: Proxy a))
)
where putKV (k,v) = putAvro k <> putAvro v
instance EncodeAvro a => EncodeAvro (Maybe a) where
avro Nothing = AvroM (putI 0 , S.mkUnion (S.Null:|[S.Int']))
avro (Just x) = AvroM (putI 1 <> putAvro x, S.mkUnion (S.Null:|[S.Int']))
instance EncodeAvro () where
avro () = AvroM (mempty, S.Null)
instance EncodeAvro Bool where
avro b = AvroM (word8 $ fromIntegral $ fromEnum b, S.Boolean)
instance EncodeAvro (T.Value Schema) where
avro v =
case v of
T.Null -> avro ()
T.Boolean b -> avro b
T.Int i -> avro i
T.Long i -> avro i
T.Float f -> avro f
T.Double d -> avro d
T.Bytes bs -> avro bs
T.String t -> avro t
T.Array vec -> avro vec
T.Map hm -> avro hm
T.Record ty hm ->
let bs = foldMap putAvro (mapMaybe (`HashMap.lookup` hm) fs)
fs = P.map fldName (fields ty)
in AvroM (bs, ty)
T.Union opts sel val | F.length opts > 0 ->
case V.elemIndex sel opts of
Just idx -> AvroM (putI idx <> putAvro val, S.Union opts)
Nothing -> error "Union encoding specifies type not found in schema"
T.Enum sch@S.Enum{..} ix t -> AvroM (putI ix, sch)
T.Fixed ty bs ->
if (B.length bs == size ty)
then AvroM (byteString bs, S.Bytes Nothing)
else error $ "Fixed type " <> show (name ty)
<> " has size " <> show (size ty)
<> " but the value has length " <> show (B.length bs)