{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Data.Avro
(
Schema
, Result(..), badValue
, encode
, decode
, (.:)
, (.=), record, fixed
, decodeWithSchema
, decodeContainer
, decodeContainerWithSchema
, decodeContainerBytes
, encodeContainer
, encodeContainer'
, encodeContainerWithSync
, encodeContainerWithSync'
, FromAvro(..)
, ToAvro(..)
, HasAvroSchema(..)
, schemaOf
, Avro
) where
import Control.Arrow (first)
import qualified Data.Avro.Decode as D
import qualified Data.Avro.Decode.Lazy as DL
import Data.Avro.Deconflict as C
import qualified Data.Avro.Encode as E
import Data.Avro.Schema as S
import Data.Avro.Types as T
import qualified Data.Binary.Get as G
import qualified Data.Binary.Put as P
import qualified Data.ByteString as B
import Data.ByteString.Lazy (ByteString)
import qualified Data.ByteString.Lazy as BL
import Data.Foldable (toList)
import qualified Data.HashMap.Strict as HashMap
import Data.Int
import Data.List.NonEmpty (NonEmpty (..))
import qualified Data.Map as Map
import Data.Monoid ((<>))
import Data.Tagged
import Data.Text (Text)
import qualified Data.Text as Text
import qualified Data.Text.Lazy as TL
import qualified Data.Vector as V
import Data.Word
import Prelude as P
import Data.Avro.Codec (Codec, deflateCodec, nullCodec)
import Data.Avro.FromAvro
import Data.Avro.HasAvroSchema
import Data.Avro.ToAvro
type Avro a = (FromAvro a, ToAvro a)
decode :: forall a. FromAvro a => ByteString -> Result a
decode bytes =
case D.decodeAvro (untag (schema :: Tagged a Type)) bytes of
Right val -> fromAvro val
Left err -> Error err
decodeWithSchema :: FromAvro a => Schema -> ByteString -> Result a
decodeWithSchema sch bytes =
case D.decodeAvro sch bytes of
Right val -> fromAvro val
Left err -> Error err
decodeContainer :: forall a. FromAvro a => ByteString -> [[a]]
decodeContainer bs =
let readerSchema = untag (schema :: Tagged a Schema)
in decodeContainerWithSchema readerSchema bs
decodeContainerWithSchema :: FromAvro a => Schema -> ByteString -> [[a]]
decodeContainerWithSchema readerSchema bs =
case D.decodeContainer bs of
Right (writerSchema,val) ->
let
writerSchema' = S.expandNamedTypes writerSchema
readerSchema' = S.expandNamedTypes readerSchema
err e = error $ "Could not deconflict reader and writer schema." <> e
dec x =
case C.deconflictNoResolve writerSchema' readerSchema' x of
Left e -> err e
Right v -> case fromAvro v of
Success x -> x
Error e -> error e
in P.map (P.map dec) val
Left err -> error err
encode :: ToAvro a => a -> BL.ByteString
encode = E.encodeAvro . toAvro
encodeContainer :: forall a. ToAvro a => [[a]] -> IO BL.ByteString
encodeContainer = encodeContainer' nullCodec
encodeContainer' :: forall a. ToAvro a => Codec -> [[a]] -> IO BL.ByteString
encodeContainer' codec =
let sch = untag (schema :: Tagged a Schema)
in E.encodeContainer codec sch . map (map toAvro)
encodeContainerWithSync :: forall a. ToAvro a => (Word64,Word64,Word64,Word64) -> [[a]] -> BL.ByteString
encodeContainerWithSync = encodeContainerWithSync' nullCodec
encodeContainerWithSync' :: forall a. ToAvro a => Codec -> (Word64,Word64,Word64,Word64) -> [[a]] -> BL.ByteString
encodeContainerWithSync' codec (a,b,c,d) =
let
sch = untag (schema :: Tagged a Schema)
syncBytes = P.runPut $ mapM_ P.putWord64le [a,b,c,d]
in E.encodeContainerWithSync codec sch syncBytes . map (map toAvro)
decodeContainerBytes :: ByteString -> [[ByteString]]
decodeContainerBytes bs =
case D.decodeContainerWith schemaBytes bs of
Right (writerSchema, val) -> val
Left e -> error $ "Could not decode container: " <> e
where
schemaBytes sch =
do start <- G.bytesRead
end <- G.lookAhead $ do _ <- D.getAvroOf sch
G.bytesRead
G.getLazyByteString (end-start)
record :: Foldable f => Type -> f (Text,T.Value Type) -> T.Value Type
record ty = T.Record ty . HashMap.fromList . toList
fixed :: Type -> B.ByteString -> T.Value Type
fixed = T.Fixed