{-# LANGUAGE BangPatterns        #-}
{-# LANGUAGE DeriveFunctor       #-}
{-# LANGUAGE LambdaCase          #-}
{-# LANGUAGE OverloadedStrings   #-}
{-# LANGUAGE RankNTypes          #-}
{-# LANGUAGE RecordWildCards     #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StrictData          #-}
{-# LANGUAGE TupleSections       #-}
{-# LANGUAGE TypeApplications    #-}

module Data.Avro.Internal.Container
where

import           Control.Monad                (when)
import qualified Data.Aeson                   as Aeson
import           Data.Avro.Codec              (Codec (..), Decompress)
import qualified Data.Avro.Codec              as Codec
import           Data.Avro.Encoding.ToAvro    (toAvro)
import           Data.Avro.Internal.EncodeRaw (encodeRaw)
import           Data.Avro.Schema.Schema      (Schema)
import qualified Data.Avro.Schema.Schema      as Schema
import           Data.Binary.Get              (Get)
import qualified Data.Binary.Get              as Get
import           Data.ByteString              (ByteString)
import qualified Data.ByteString              as B
import           Data.ByteString.Builder      (Builder, lazyByteString, toLazyByteString)
import qualified Data.ByteString.Lazy         as BL
import qualified Data.ByteString.Lazy.Char8   as BLC
import           Data.Either                  (isRight)
import           Data.HashMap.Strict          (HashMap)
import qualified Data.HashMap.Strict          as HashMap
import           Data.Int                     (Int32, Int64)
import           Data.List                    (foldl', unfoldr)
import qualified Data.Map.Strict              as Map
import           Data.Text                    (Text)
import           System.Random.TF.Init        (initTFGen)
import           System.Random.TF.Instances   (randoms)

import qualified Data.Avro.Internal.Get as AGet

data ContainerHeader = ContainerHeader
  { ContainerHeader -> ByteString
syncBytes       :: BL.ByteString
  , ContainerHeader -> forall a. Decompress a
decompress      :: forall a. Decompress a
  , ContainerHeader -> Schema
containedSchema :: Schema
  }

nrSyncBytes :: Integral sb => sb
nrSyncBytes :: forall sb. Integral sb => sb
nrSyncBytes = sb
16
{-# INLINE nrSyncBytes #-}

-- | Generates a new synchronization marker for encoding Avro containers
newSyncBytes :: IO BL.ByteString
newSyncBytes :: IO ByteString
newSyncBytes = [Word8] -> ByteString
BL.pack forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Int -> [a] -> [a]
take forall sb. Integral sb => sb
nrSyncBytes forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a g. (Random a, RandomGen g) => g -> [a]
randoms forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO TFGen
initTFGen

getContainerHeader :: Get ContainerHeader
getContainerHeader :: Get ContainerHeader
getContainerHeader = do
  ByteString
magic <- Int -> Get ByteString
getFixed forall sb. Integral sb => sb
avroMagicSize
  forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString -> ByteString
BL.fromStrict ByteString
magic forall a. Eq a => a -> a -> Bool
/= ByteString
avroMagicBytes)
        (forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"Invalid magic number at start of container.")
  Map Text ByteString
metadata <- Get (Map Text ByteString)
getMeta
  ByteString
sync  <- ByteString -> ByteString
BL.fromStrict forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> Get ByteString
getFixed forall sb. Integral sb => sb
nrSyncBytes
  Codec
codec <- forall (m :: * -> *). Monad m => Maybe ByteString -> m Codec
parseCodec (forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Text
"avro.codec" Map Text ByteString
metadata)
  Schema
schema <- case forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Text
"avro.schema" Map Text ByteString
metadata of
              Maybe ByteString
Nothing -> forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"Invalid container object: no schema."
              Just ByteString
s  -> case forall a. FromJSON a => ByteString -> Either String a
Aeson.eitherDecode' ByteString
s of
                            Left String
e  -> forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String
"Can not decode container schema: " forall a. Semigroup a => a -> a -> a
<> String
e)
                            Right Schema
x -> forall (m :: * -> *) a. Monad m => a -> m a
return Schema
x
  forall (m :: * -> *) a. Monad m => a -> m a
return ContainerHeader  { syncBytes :: ByteString
syncBytes = ByteString
sync
                          , decompress :: forall a. Decompress a
decompress = Codec -> forall a. Decompress a
Codec.codecDecompress Codec
codec
                          , containedSchema :: Schema
containedSchema = Schema
schema
                          }
  where avroMagicSize :: Integral a => a
        avroMagicSize :: forall sb. Integral sb => sb
avroMagicSize = a
4

        avroMagicBytes :: BL.ByteString
        avroMagicBytes :: ByteString
avroMagicBytes = String -> ByteString
BLC.pack String
"Obj" forall a. Semigroup a => a -> a -> a
<> [Word8] -> ByteString
BL.pack [Word8
1]

        getFixed :: Int -> Get ByteString
        getFixed :: Int -> Get ByteString
getFixed = Int -> Get ByteString
Get.getByteString

        getMeta :: Get (Map.Map Text BL.ByteString)
        getMeta :: Get (Map Text ByteString)
getMeta =
          let keyValue :: Get (Text, ByteString)
keyValue = (,) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get Text
AGet.getString forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Get ByteString
AGet.getBytesLazy
          in forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Get a -> Get [a]
AGet.decodeBlocks Get (Text, ByteString)
keyValue

-- | Reads the container as a list of blocks without decoding them into actual values.
--
-- This can be useful for streaming / splitting / merging Avro containers without
-- paying the cost for Avro encoding/decoding.
--
-- Each block is returned as a raw 'ByteString' annotated with the number of Avro values
-- that are contained in this block.
--
-- The "outer" error represents the error in opening the container itself
-- (including problems like reading schemas embedded into the container.)
decodeRawBlocks :: BL.ByteString -> Either String (Schema, [Either String (Int, BL.ByteString)])
decodeRawBlocks :: ByteString
-> Either String (Schema, [Either String (Int, ByteString)])
decodeRawBlocks ByteString
bs =
  case forall a.
Get a
-> ByteString
-> Either
     (ByteString, ByteOffset, String) (ByteString, ByteOffset, a)
Get.runGetOrFail Get ContainerHeader
getContainerHeader ByteString
bs of
    Left (ByteString
bs', ByteOffset
_, String
err) -> forall a b. a -> Either a b
Left String
err
    Right (ByteString
bs', ByteOffset
_, containerHeader :: ContainerHeader
containerHeader@ContainerHeader {ByteString
Schema
forall a. Decompress a
containedSchema :: Schema
decompress :: forall a. Decompress a
syncBytes :: ByteString
containedSchema :: ContainerHeader -> Schema
decompress :: ContainerHeader -> forall a. Decompress a
syncBytes :: ContainerHeader -> ByteString
..}) ->
      let blocks :: [Either String (Int, ByteString)]
blocks = ContainerHeader -> ByteString -> [Either String (Int, ByteString)]
allBlocks ContainerHeader
containerHeader ByteString
bs'
      in forall a b. b -> Either a b
Right (Schema
containedSchema, [Either String (Int, ByteString)]
blocks)
  where
    allBlocks :: ContainerHeader -> ByteString -> [Either String (Int, ByteString)]
allBlocks ContainerHeader
containerHeader ByteString
bytes =
      forall a b.
(a -> b -> b) -> (String -> b) -> b -> ByteString -> Blocks a -> b
foldrBlocks (\(Int, ByteString)
x -> (forall a b. b -> Either a b
Right (Int, ByteString)
x forall a. a -> [a] -> [a]
:)) (\String
err -> [forall a b. a -> Either a b
Left String
err]) [] ByteString
bytes
        (ContainerHeader -> Blocks (Int, ByteString)
decodeRawBlocksIncremental ContainerHeader
containerHeader)

data Blocks a
  = Block
      a
      (Blocks a)
  | More
      (ByteString -> Blocks a) -- ^ Feed more bytes. Pass the empty ByteString to
                               -- signal end of input.
  | Error
      String -- ^ Error message
      ByteString -- ^ Leftover bytes
  | Done
      ByteString -- ^ Leftover bytes
  deriving (forall a b. a -> Blocks b -> Blocks a
forall a b. (a -> b) -> Blocks a -> Blocks b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: forall a b. a -> Blocks b -> Blocks a
$c<$ :: forall a b. a -> Blocks b -> Blocks a
fmap :: forall a b. (a -> b) -> Blocks a -> Blocks b
$cfmap :: forall a b. (a -> b) -> Blocks a -> Blocks b
Functor)

-- | Feeds a 'BL.ByteString' to the 'Blocks' until exhausted.
-- Consumes the 'BL.ByteString' lazily.
foldrBlocks :: (a -> b -> b) -> (String -> b) -> b -> BL.ByteString -> Blocks a -> b
foldrBlocks :: forall a b.
(a -> b -> b) -> (String -> b) -> b -> ByteString -> Blocks a -> b
foldrBlocks a -> b -> b
block String -> b
err b
done ByteString
input = [ByteString] -> Blocks a -> b
go (ByteString -> [ByteString]
BL.toChunks ByteString
input)
  where
    go :: [ByteString] -> Blocks a -> b
go [ByteString]
chunks (Block a
a Blocks a
rest)     = a -> b -> b
block a
a ([ByteString] -> Blocks a -> b
go [ByteString]
chunks Blocks a
rest)
    go []     (More ByteString -> Blocks a
cont)        = [ByteString] -> Blocks a -> b
go [] (ByteString -> Blocks a
cont ByteString
"")
    go (ByteString
c:[ByteString]
cx) (More ByteString -> Blocks a
cont)        = [ByteString] -> Blocks a -> b
go [ByteString]
cx (ByteString -> Blocks a
cont ByteString
c)
    go [ByteString]
_      (Error String
message ByteString
_)  = String -> b
err String
message
    go [ByteString]
_      (Done ByteString
_)           = b
done

decodeRawBlocksIncremental :: ContainerHeader -> Blocks (Int, BL.ByteString)
decodeRawBlocksIncremental :: ContainerHeader -> Blocks (Int, ByteString)
decodeRawBlocksIncremental ContainerHeader{ByteString
Schema
forall a. Decompress a
containedSchema :: Schema
decompress :: forall a. Decompress a
syncBytes :: ByteString
containedSchema :: ContainerHeader -> Schema
decompress :: ContainerHeader -> forall a. Decompress a
syncBytes :: ContainerHeader -> ByteString
..} = Blocks (Int, ByteString)
initial
  where
    initialDecoder :: Decoder (Int, ByteString)
initialDecoder =
      forall a. Get a -> Decoder a
Get.runGetIncremental Get (Int, ByteString)
getRawBlock

    initial :: Blocks (Int, ByteString)
initial = forall a. (ByteString -> Blocks a) -> Blocks a
More forall a b. (a -> b) -> a -> b
$ \ByteString
input ->
      case ByteString
input of
        ByteString
"" -> forall a. ByteString -> Blocks a
Done ByteString
""
        ByteString
_  -> Decoder (Int, ByteString) -> Blocks (Int, ByteString)
go (forall a. Decoder a -> ByteString -> Decoder a
Get.pushChunk Decoder (Int, ByteString)
initialDecoder ByteString
input)

    go :: Decoder (Int, ByteString) -> Blocks (Int, ByteString)
go Decoder (Int, ByteString)
decoder = case Decoder (Int, ByteString)
decoder of
      Get.Done ByteString
rest ByteOffset
_ !(Int, ByteString)
block ->
        case ByteString
rest of
          ByteString
"" -> forall a. a -> Blocks a -> Blocks a
Block (Int, ByteString)
block Blocks (Int, ByteString)
initial
          ByteString
_  -> forall a. a -> Blocks a -> Blocks a
Block (Int, ByteString)
block (Decoder (Int, ByteString) -> Blocks (Int, ByteString)
go (forall a. Decoder a -> ByteString -> Decoder a
Get.pushChunk Decoder (Int, ByteString)
initialDecoder ByteString
rest))
      Get.Fail ByteString
rest ByteOffset
_ String
err ->
        forall a. String -> ByteString -> Blocks a
Error String
err ByteString
rest
      Get.Partial{} -> forall a. (ByteString -> Blocks a) -> Blocks a
More forall a b. (a -> b) -> a -> b
$ \ByteString
input ->
        case ByteString
input of
          ByteString
"" -> Decoder (Int, ByteString) -> Blocks (Int, ByteString)
go (forall a. Decoder a -> Decoder a
Get.pushEndOfInput Decoder (Int, ByteString)
decoder)
          ByteString
_  -> Decoder (Int, ByteString) -> Blocks (Int, ByteString)
go (forall a. Decoder a -> ByteString -> Decoder a
Get.pushChunk Decoder (Int, ByteString)
decoder ByteString
input)

    getRawBlock :: Get (Int, ByteString)
getRawBlock = do
      Int
nrObj   <- Get ByteOffset
AGet.getLong forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a b (m :: * -> *).
(Monad m, Bounded a, Bounded b, Integral a, Integral b) =>
a -> m b
AGet.sFromIntegral
      ByteOffset
nrBytes <- Get ByteOffset
AGet.getLong
      ByteString
compressed <- ByteOffset -> Get ByteString
Get.getLazyByteString ByteOffset
nrBytes
      ByteString
bytes <- case forall a. Decompress a
decompress ByteString
compressed Get ByteString
Get.getRemainingLazyByteString of
        Right ByteString
x -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
x
        Left String
err -> forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
err
      ByteString
trailer <- ByteOffset -> Get ByteString
Get.getLazyByteString forall sb. Integral sb => sb
nrSyncBytes
      if ByteString
trailer forall a. Eq a => a -> a -> Bool
/= ByteString
syncBytes then
        forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"Invalid marker, does not match sync bytes."
      else
        forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int
nrObj, ByteString
bytes)

-- | Splits container into a list of individual avro-encoded values.
-- This version provides both encoded and decoded values.
--
-- This is particularly useful when slicing up containers into one or more
-- smaller files.  By extracting the original bytestring it is possible to
-- avoid re-encoding data.
extractContainerValuesBytes :: forall a schema.
     (Schema -> Either String schema)
  -> (schema -> Get a)
  -> BL.ByteString
  -> Either String (Schema, [Either String (a, BL.ByteString)])
extractContainerValuesBytes :: forall a schema.
(Schema -> Either String schema)
-> (schema -> Get a)
-> ByteString
-> Either String (Schema, [Either String (a, ByteString)])
extractContainerValuesBytes Schema -> Either String schema
deconflict schema -> Get a
f =
  forall a schema.
(Schema -> Either String schema)
-> (schema -> Get a)
-> ByteString
-> Either String (Schema, [Either String a])
extractContainerValues Schema -> Either String schema
deconflict schema -> Get (a, ByteString)
readBytes
  where
    readBytes :: schema -> Get (a, ByteString)
readBytes schema
sch = do
      ByteOffset
start <- Get ByteOffset
Get.bytesRead
      (a
val, ByteOffset
end) <- forall a. Get a -> Get a
Get.lookAhead (schema -> Get a
f schema
sch forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (\a
v -> (a
v, ) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get ByteOffset
Get.bytesRead))
      ByteString
res <- ByteOffset -> Get ByteString
Get.getLazyByteString (ByteOffset
endforall a. Num a => a -> a -> a
-ByteOffset
start)
      forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
val, ByteString
res)

extractContainerValues :: forall a schema.
     (Schema -> Either String schema)
  -> (schema -> Get a)
  -> BL.ByteString
  -> Either String (Schema, [Either String a])
extractContainerValues :: forall a schema.
(Schema -> Either String schema)
-> (schema -> Get a)
-> ByteString
-> Either String (Schema, [Either String a])
extractContainerValues Schema -> Either String schema
deconflict schema -> Get a
f ByteString
bs = do
  case forall a.
Get a
-> ByteString
-> Either
     (ByteString, ByteOffset, String) (ByteString, ByteOffset, a)
Get.runGetOrFail Get ContainerHeader
getContainerHeader ByteString
bs of
    Left (ByteString
_, ByteOffset
_, String
err) -> forall a b. a -> Either a b
Left String
err
    Right (ByteString
rest, ByteOffset
_, ContainerHeader
containerHeader) -> do
      (Schema
schema, Blocks [Either String a]
blocks) <- forall schema a.
(Schema -> Either String schema)
-> (schema -> Get a)
-> ContainerHeader
-> Either String (Schema, Blocks [Either String a])
extractContainerValuesIncremental Schema -> Either String schema
deconflict schema -> Get a
f ContainerHeader
containerHeader
      let values :: [Either String a]
values = forall a b.
(a -> b -> b) -> (String -> b) -> b -> ByteString -> Blocks a -> b
foldrBlocks forall a. [a] -> [a] -> [a]
(++) (\String
err -> [forall a b. a -> Either a b
Left String
err]) [] ByteString
rest Blocks [Either String a]
blocks
      forall (f :: * -> *) a. Applicative f => a -> f a
pure (Schema
schema, [Either String a]
values)

extractContainerValuesIncremental
  :: (Schema -> Either String schema)
  -> (schema -> Get a)
  -> ContainerHeader
  -> Either String (Schema, Blocks [Either String a])
extractContainerValuesIncremental :: forall schema a.
(Schema -> Either String schema)
-> (schema -> Get a)
-> ContainerHeader
-> Either String (Schema, Blocks [Either String a])
extractContainerValuesIncremental Schema -> Either String schema
deconflict schema -> Get a
getValue containerHeader :: ContainerHeader
containerHeader@ContainerHeader{ByteString
Schema
forall a. Decompress a
containedSchema :: Schema
decompress :: forall a. Decompress a
syncBytes :: ByteString
containedSchema :: ContainerHeader -> Schema
decompress :: ContainerHeader -> forall a. Decompress a
syncBytes :: ContainerHeader -> ByteString
..} = do
  schema
readSchema <- Schema -> Either String schema
deconflict Schema
containedSchema
  let blocks :: Blocks (Int, ByteString)
blocks = ContainerHeader -> Blocks (Int, ByteString)
decodeRawBlocksIncremental ContainerHeader
containerHeader
  forall (f :: * -> *) a. Applicative f => a -> f a
pure (Schema
containedSchema, forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall {a}.
Integral a =>
schema -> (a, ByteString) -> [Either String a]
decodeBlock schema
readSchema) Blocks (Int, ByteString)
blocks)
  where
    decodeBlock :: schema -> (a, ByteString) -> [Either String a]
decodeBlock schema
readSchema (a
nrObj, ByteString
bytes) =
      forall a b. (a, b) -> b
snd forall a b. (a -> b) -> a -> b
$ forall a b. ByteOffset -> (a -> (a, b)) -> a -> (a, [b])
consumeN (forall a b. (Integral a, Num b) => a -> b
fromIntegral a
nrObj) (schema -> ByteString -> (ByteString, Either String a)
decodeValue schema
readSchema) ByteString
bytes

    decodeValue :: schema -> ByteString -> (ByteString, Either String a)
decodeValue schema
readSchema ByteString
bytes =
      case forall a.
Get a
-> ByteString
-> Either
     (ByteString, ByteOffset, String) (ByteString, ByteOffset, a)
Get.runGetOrFail (schema -> Get a
getValue schema
readSchema) ByteString
bytes of
        Left (ByteString
rest, ByteOffset
_, String
err) -> (ByteString
rest, forall a b. a -> Either a b
Left String
err)
        Right (ByteString
rest, ByteOffset
_, a
value) -> (ByteString
rest, forall a b. b -> Either a b
Right a
value)

-- | Packs a container from a given list of already encoded Avro values
-- Each bytestring should represent exactly one one value serialised to Avro.
packContainerValues :: Codec -> Schema -> [[BL.ByteString]] -> IO BL.ByteString
packContainerValues :: Codec -> Schema -> [[ByteString]] -> IO ByteString
packContainerValues Codec
codec Schema
sch [[ByteString]]
values = do
  ByteString
sync <- IO ByteString
newSyncBytes
  forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ Codec -> Schema -> ByteString -> [[ByteString]] -> ByteString
packContainerValuesWithSync Codec
codec Schema
sch ByteString
sync [[ByteString]]
values

-- | Packs a container from a given list of already encoded Avro values
-- Each bytestring should represent exactly one one value serialised to Avro.
packContainerValuesWithSync :: Codec -> Schema -> BL.ByteString -> [[BL.ByteString]] -> BL.ByteString
packContainerValuesWithSync :: Codec -> Schema -> ByteString -> [[ByteString]] -> ByteString
packContainerValuesWithSync = forall a.
(Schema -> a -> Builder)
-> Codec -> Schema -> ByteString -> [[a]] -> ByteString
packContainerValuesWithSync' (\Schema
_ ByteString
a -> ByteString -> Builder
lazyByteString ByteString
a)
{-# INLINABLE packContainerValuesWithSync #-}
-- | Packs a container from a given list of already encoded Avro values
-- Each bytestring should represent exactly one one value serialised to Avro.
packContainerValuesWithSync' ::
     (Schema -> a -> Builder)
  -> Codec
  -> Schema
  -> BL.ByteString
  -> [[a]]
  -> BL.ByteString
packContainerValuesWithSync' :: forall a.
(Schema -> a -> Builder)
-> Codec -> Schema -> ByteString -> [[a]] -> ByteString
packContainerValuesWithSync' Schema -> a -> Builder
encode Codec
codec Schema
sch ByteString
syncBytes [[a]]
values =
  Builder -> ByteString
toLazyByteString forall a b. (a -> b) -> a -> b
$ Codec -> Schema -> ByteString -> Builder
containerHeaderWithSync Codec
codec Schema
sch ByteString
syncBytes forall a. Semigroup a => a -> a -> a
<> forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap forall {t :: * -> *}. Foldable t => t a -> Builder
putBlock [[a]]
values
  where
    putBlock :: t a -> Builder
putBlock t a
ys =
      let nrObj :: Int
nrObj = forall (t :: * -> *) a. Foldable t => t a -> Int
length t a
ys
          nrBytes :: ByteOffset
nrBytes = ByteString -> ByteOffset
BL.length ByteString
theBytes
          theBytes :: ByteString
theBytes = Codec -> ByteString -> ByteString
codecCompress Codec
codec forall a b. (a -> b) -> a -> b
$ Builder -> ByteString
toLazyByteString forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap (Schema -> a -> Builder
encode Schema
sch) t a
ys
      in forall a. EncodeRaw a => a -> Builder
encodeRaw @Int32 (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
nrObj) forall a. Semigroup a => a -> a -> a
<>
         forall a. EncodeRaw a => a -> Builder
encodeRaw ByteOffset
nrBytes forall a. Semigroup a => a -> a -> a
<>
         ByteString -> Builder
lazyByteString ByteString
theBytes forall a. Semigroup a => a -> a -> a
<>
         ByteString -> Builder
lazyByteString ByteString
syncBytes

-- | Packs a new container from a list of already encoded Avro blocks.
-- Each block is denoted as a pair of a number of objects within that block and the block content.
packContainerBlocks :: Codec -> Schema -> [(Int, BL.ByteString)] -> IO BL.ByteString
packContainerBlocks :: Codec -> Schema -> [(Int, ByteString)] -> IO ByteString
packContainerBlocks Codec
codec Schema
sch [(Int, ByteString)]
blocks = do
  ByteString
sync <- IO ByteString
newSyncBytes
  forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ Codec -> Schema -> ByteString -> [(Int, ByteString)] -> ByteString
packContainerBlocksWithSync Codec
codec Schema
sch ByteString
sync [(Int, ByteString)]
blocks

-- | Packs a new container from a list of already encoded Avro blocks.
-- Each block is denoted as a pair of a number of objects within that block and the block content.
packContainerBlocksWithSync :: Codec -> Schema -> BL.ByteString -> [(Int, BL.ByteString)] -> BL.ByteString
packContainerBlocksWithSync :: Codec -> Schema -> ByteString -> [(Int, ByteString)] -> ByteString
packContainerBlocksWithSync Codec
codec Schema
sch ByteString
syncBytes [(Int, ByteString)]
blocks =
  Builder -> ByteString
toLazyByteString forall a b. (a -> b) -> a -> b
$
    Codec -> Schema -> ByteString -> Builder
containerHeaderWithSync Codec
codec Schema
sch ByteString
syncBytes forall a. Semigroup a => a -> a -> a
<>
    forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap forall {a}. Integral a => (a, ByteString) -> Builder
putBlock [(Int, ByteString)]
blocks
  where
    putBlock :: (a, ByteString) -> Builder
putBlock (a
nrObj, ByteString
bytes) =
      let compressed :: ByteString
compressed = Codec -> ByteString -> ByteString
codecCompress Codec
codec ByteString
bytes in
        forall a. EncodeRaw a => a -> Builder
encodeRaw @Int32 (forall a b. (Integral a, Num b) => a -> b
fromIntegral a
nrObj) forall a. Semigroup a => a -> a -> a
<>
        forall a. EncodeRaw a => a -> Builder
encodeRaw (ByteString -> ByteOffset
BL.length ByteString
compressed) forall a. Semigroup a => a -> a -> a
<>
        ByteString -> Builder
lazyByteString ByteString
compressed forall a. Semigroup a => a -> a -> a
<>
        ByteString -> Builder
lazyByteString ByteString
syncBytes


-- | Creates an Avro container header for a given schema.
containerHeaderWithSync :: Codec -> Schema -> BL.ByteString -> Builder
containerHeaderWithSync :: Codec -> Schema -> ByteString -> Builder
containerHeaderWithSync Codec
codec Schema
sch ByteString
syncBytes =
  ByteString -> Builder
lazyByteString ByteString
avroMagicBytes
    forall a. Semigroup a => a -> a -> a
<> forall a. ToAvro a => Schema -> a -> Builder
toAvro (Schema -> Schema
Schema.Map Schema
Schema.Bytes') HashMap Text ByteString
headers
    forall a. Semigroup a => a -> a -> a
<> ByteString -> Builder
lazyByteString ByteString
syncBytes
  where
    avroMagicBytes :: BL.ByteString
    avroMagicBytes :: ByteString
avroMagicBytes = ByteString
"Obj" forall a. Semigroup a => a -> a -> a
<> [Word8] -> ByteString
BL.pack [Word8
1]

    headers :: HashMap Text BL.ByteString
    headers :: HashMap Text ByteString
headers =
      forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HashMap.fromList
        [
          (Text
"avro.schema", forall a. ToJSON a => a -> ByteString
Aeson.encode Schema
sch)
        , (Text
"avro.codec", ByteString -> ByteString
BL.fromStrict (Codec -> ByteString
codecName Codec
codec))
        ]

-----------------------------------------------------------------

consumeN :: Int64 -> (a -> (a, b)) -> a -> (a, [b])
consumeN :: forall a b. ByteOffset -> (a -> (a, b)) -> a -> (a, [b])
consumeN ByteOffset
n a -> (a, b)
f a
a =
  if ByteOffset
n forall a. Eq a => a -> a -> Bool
== ByteOffset
0
    then (a
a, [])
    else
      let (a
a', b
b) = a -> (a, b)
f a
a
          (a
r, [b]
bs) = forall a b. ByteOffset -> (a -> (a, b)) -> a -> (a, [b])
consumeN (ByteOffset
nforall a. Num a => a -> a -> a
-ByteOffset
1) a -> (a, b)
f a
a'
      in (a
r, b
bforall a. a -> [a] -> [a]
:[b]
bs)
{-# INLINE consumeN #-}

----------------------------------------------------------------
parseCodec :: Monad m => Maybe BL.ByteString -> m Codec
parseCodec :: forall (m :: * -> *). Monad m => Maybe ByteString -> m Codec
parseCodec (Just ByteString
"null")    = forall (f :: * -> *) a. Applicative f => a -> f a
pure Codec
Codec.nullCodec
parseCodec (Just ByteString
"deflate") = forall (f :: * -> *) a. Applicative f => a -> f a
pure Codec
Codec.deflateCodec
parseCodec (Just ByteString
x)         = forall a. HasCallStack => String -> a
error forall a b. (a -> b) -> a -> b
$ String
"Unrecognized codec: " forall a. Semigroup a => a -> a -> a
<> ByteString -> String
BLC.unpack ByteString
x
parseCodec Maybe ByteString
Nothing          = forall (f :: * -> *) a. Applicative f => a -> f a
pure Codec
Codec.nullCodec

takeWhileInclusive :: (a -> Bool) -> [a] -> [a]
takeWhileInclusive :: forall a. (a -> Bool) -> [a] -> [a]
takeWhileInclusive a -> Bool
_ [] = []
takeWhileInclusive a -> Bool
p (a
x:[a]
xs) =
  a
x forall a. a -> [a] -> [a]
: if a -> Bool
p a
x then forall a. (a -> Bool) -> [a] -> [a]
takeWhileInclusive a -> Bool
p [a]
xs else []
{-# INLINE takeWhileInclusive #-}