{-# language CPP                   #-}
{-# language DataKinds             #-}
{-# language FlexibleContexts      #-}
{-# language FlexibleInstances     #-}
{-# language KindSignatures        #-}
{-# language MultiParamTypeClasses #-}
{-# language OverloadedStrings     #-}
{-# language ScopedTypeVariables   #-}
{-# language TypeApplications      #-}
{-# language TypeOperators         #-}
{-# language UndecidableInstances  #-}
{-# options_ghc -fno-warn-orphans  #-}
module Mu.GRpc.Avro (
  AvroRPC(..)
, ViaFromAvroTypeRef(..)
, ViaToAvroTypeRef(..)
) where

import           Data.Avro
import           Data.Binary.Builder         (fromByteString, putWord32be, singleton)
import           Data.Binary.Get             (Decoder (..), getByteString, getInt8, getWord32be,
                                              runGetIncremental)
import           Data.ByteString.Char8       (ByteString)
import qualified Data.ByteString.Char8       as ByteString
import           Data.ByteString.Lazy        (fromStrict, toStrict)
import           Data.Functor.Identity
import           Data.Kind
import           GHC.TypeLits
import           Network.GRPC.HTTP2.Encoding
import           Network.GRPC.HTTP2.Types

#if MIN_VERSION_base(4,11,0)
#else
import           Data.Monoid                 ((<>))
#endif

import           Mu.Adapter.Avro ()
import           Mu.Rpc
import           Mu.Schema

-- | A proxy type for giving static information about RPCs.
data AvroRPC = AvroRPC { AvroRPC -> ByteString
pkg :: ByteString, AvroRPC -> ByteString
srv :: ByteString, AvroRPC -> ByteString
meth :: ByteString }

instance IsRPC AvroRPC where
  path :: AvroRPC -> ByteString
path rpc :: AvroRPC
rpc = "/" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> AvroRPC -> ByteString
pkg AvroRPC
rpc ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> "." ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> AvroRPC -> ByteString
srv AvroRPC
rpc ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> "/" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> AvroRPC -> ByteString
meth AvroRPC
rpc
  {-# INLINE path #-}

newtype ViaFromAvroTypeRef (ref :: TypeRef) t
  = ViaFromAvroTypeRef { ViaFromAvroTypeRef ref t -> t
unViaFromAvroTypeRef :: t }
newtype ViaToAvroTypeRef (ref :: TypeRef) t
  = ViaToAvroTypeRef { ViaToAvroTypeRef ref t -> t
unViaToAvroTypeRef :: t }

instance GRPCInput AvroRPC () where
  encodeInput :: AvroRPC -> Compression -> () -> Builder
encodeInput _ _ () = Builder
forall a. Monoid a => a
mempty
  decodeInput :: AvroRPC -> Compression -> Decoder (Either String ())
decodeInput _ _ = Get (Either String ()) -> Decoder (Either String ())
forall a. Get a -> Decoder a
runGetIncremental (Get (Either String ()) -> Decoder (Either String ()))
-> Get (Either String ()) -> Decoder (Either String ())
forall a b. (a -> b) -> a -> b
$ Either String () -> Get (Either String ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either String () -> Get (Either String ()))
-> Either String () -> Get (Either String ())
forall a b. (a -> b) -> a -> b
$ () -> Either String ()
forall a b. b -> Either a b
Right ()

instance GRPCOutput AvroRPC () where
  encodeOutput :: AvroRPC -> Compression -> () -> Builder
encodeOutput _ _ () = Builder
forall a. Monoid a => a
mempty
  decodeOutput :: AvroRPC -> Compression -> Decoder (Either String ())
decodeOutput _ _ = Get (Either String ()) -> Decoder (Either String ())
forall a. Get a -> Decoder a
runGetIncremental (Get (Either String ()) -> Decoder (Either String ()))
-> Get (Either String ()) -> Decoder (Either String ())
forall a b. (a -> b) -> a -> b
$ Either String () -> Get (Either String ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either String () -> Get (Either String ()))
-> Either String () -> Get (Either String ())
forall a b. (a -> b) -> a -> b
$ () -> Either String ()
forall a b. b -> Either a b
Right ()

instance forall (sch :: Schema') (sty :: Symbol) (i :: Type).
         ( FromSchema Identity sch sty i
         , FromAvro (Term Identity sch (sch :/: sty)) )
         => GRPCInput AvroRPC (ViaFromAvroTypeRef ('ViaSchema sch sty) i) where
  encodeInput :: AvroRPC
-> Compression
-> ViaFromAvroTypeRef ('ViaSchema sch sty) i
-> Builder
encodeInput = String
-> AvroRPC
-> Compression
-> ViaFromAvroTypeRef ('ViaSchema sch sty) i
-> Builder
forall a. HasCallStack => String -> a
error "eif/you should not call this"
  decodeInput :: AvroRPC
-> Compression
-> Decoder
     (Either String (ViaFromAvroTypeRef ('ViaSchema sch sty) i))
decodeInput _ i :: Compression
i = (i -> ViaFromAvroTypeRef ('ViaSchema sch sty) i
forall (ref :: TypeRef) t. t -> ViaFromAvroTypeRef ref t
ViaFromAvroTypeRef (i -> ViaFromAvroTypeRef ('ViaSchema sch sty) i)
-> (Term Identity sch (sch :/: sty) -> i)
-> Term Identity sch (sch :/: sty)
-> ViaFromAvroTypeRef ('ViaSchema sch sty) i
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall fn tn (sch :: Schema tn fn) (w :: * -> *) t (sty :: tn).
FromSchema w sch sty t =>
Term w sch (sch :/: sty) -> t
forall t (sty :: Symbol).
FromSchema Identity sch sty t =>
Term Identity sch (sch :/: sty) -> t
fromSchema' @_ @_ @sch @Identity (Term Identity sch (sch :/: sty)
 -> ViaFromAvroTypeRef ('ViaSchema sch sty) i)
-> Either String (Term Identity sch (sch :/: sty))
-> Either String (ViaFromAvroTypeRef ('ViaSchema sch sty) i)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) (Either String (Term Identity sch (sch :/: sty))
 -> Either String (ViaFromAvroTypeRef ('ViaSchema sch sty) i))
-> Decoder (Either String (Term Identity sch (sch :/: sty)))
-> Decoder
     (Either String (ViaFromAvroTypeRef ('ViaSchema sch sty) i))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Compression
-> Decoder (Either String (Term Identity sch (sch :/: sty)))
forall a. FromAvro a => Compression -> Decoder (Either String a)
decoder Compression
i

instance forall (sch :: Schema') (sty :: Symbol) (i :: Type).
         ( FromSchema Identity sch sty i
         , FromAvro (Term Identity sch (sch :/: sty)) )
         => GRPCOutput AvroRPC (ViaFromAvroTypeRef ('ViaSchema sch sty) i) where
  encodeOutput :: AvroRPC
-> Compression
-> ViaFromAvroTypeRef ('ViaSchema sch sty) i
-> Builder
encodeOutput = String
-> AvroRPC
-> Compression
-> ViaFromAvroTypeRef ('ViaSchema sch sty) i
-> Builder
forall a. HasCallStack => String -> a
error "eof/you should not call this"
  decodeOutput :: AvroRPC
-> Compression
-> Decoder
     (Either String (ViaFromAvroTypeRef ('ViaSchema sch sty) i))
decodeOutput _ i :: Compression
i = (i -> ViaFromAvroTypeRef ('ViaSchema sch sty) i
forall (ref :: TypeRef) t. t -> ViaFromAvroTypeRef ref t
ViaFromAvroTypeRef (i -> ViaFromAvroTypeRef ('ViaSchema sch sty) i)
-> (Term Identity sch (sch :/: sty) -> i)
-> Term Identity sch (sch :/: sty)
-> ViaFromAvroTypeRef ('ViaSchema sch sty) i
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall fn tn (sch :: Schema tn fn) (w :: * -> *) t (sty :: tn).
FromSchema w sch sty t =>
Term w sch (sch :/: sty) -> t
forall t (sty :: Symbol).
FromSchema Identity sch sty t =>
Term Identity sch (sch :/: sty) -> t
fromSchema' @_ @_ @sch @Identity (Term Identity sch (sch :/: sty)
 -> ViaFromAvroTypeRef ('ViaSchema sch sty) i)
-> Either String (Term Identity sch (sch :/: sty))
-> Either String (ViaFromAvroTypeRef ('ViaSchema sch sty) i)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) (Either String (Term Identity sch (sch :/: sty))
 -> Either String (ViaFromAvroTypeRef ('ViaSchema sch sty) i))
-> Decoder (Either String (Term Identity sch (sch :/: sty)))
-> Decoder
     (Either String (ViaFromAvroTypeRef ('ViaSchema sch sty) i))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Compression
-> Decoder (Either String (Term Identity sch (sch :/: sty)))
forall a. FromAvro a => Compression -> Decoder (Either String a)
decoder Compression
i

instance forall (sch :: Schema') (sty :: Symbol) (o :: Type).
         ( ToSchema Identity sch sty o
         , ToAvro (Term Identity sch (sch :/: sty)) )
         => GRPCInput AvroRPC (ViaToAvroTypeRef ('ViaSchema sch sty) o) where
  encodeInput :: AvroRPC
-> Compression
-> ViaToAvroTypeRef ('ViaSchema sch sty) o
-> Builder
encodeInput _ compression :: Compression
compression
    = Compression -> Term Identity sch (sch :/: sty) -> Builder
forall m. ToAvro m => Compression -> m -> Builder
encoder Compression
compression (Term Identity sch (sch :/: sty) -> Builder)
-> (ViaToAvroTypeRef ('ViaSchema sch sty) o
    -> Term Identity sch (sch :/: sty))
-> ViaToAvroTypeRef ('ViaSchema sch sty) o
-> Builder
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall fn tn (sch :: Schema tn fn) (w :: * -> *) t (sty :: tn).
ToSchema w sch sty t =>
t -> Term w sch (sch :/: sty)
forall t (sty :: Symbol).
ToSchema Identity sch sty t =>
t -> Term Identity sch (sch :/: sty)
toSchema' @_ @_ @sch @Identity (o -> Term Identity sch (sch :/: sty))
-> (ViaToAvroTypeRef ('ViaSchema sch sty) o -> o)
-> ViaToAvroTypeRef ('ViaSchema sch sty) o
-> Term Identity sch (sch :/: sty)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ViaToAvroTypeRef ('ViaSchema sch sty) o -> o
forall (ref :: TypeRef) t. ViaToAvroTypeRef ref t -> t
unViaToAvroTypeRef
  decodeInput :: AvroRPC
-> Compression
-> Decoder
     (Either String (ViaToAvroTypeRef ('ViaSchema sch sty) o))
decodeInput = String
-> AvroRPC
-> Compression
-> Decoder
     (Either String (ViaToAvroTypeRef ('ViaSchema sch sty) o))
forall a. HasCallStack => String -> a
error "dit/you should not call this"

instance forall (sch :: Schema') (sty :: Symbol) (o :: Type).
         ( ToSchema Identity sch sty o
         , ToAvro (Term Identity sch (sch :/: sty)) )
         => GRPCOutput AvroRPC (ViaToAvroTypeRef ('ViaSchema sch sty) o) where
  encodeOutput :: AvroRPC
-> Compression
-> ViaToAvroTypeRef ('ViaSchema sch sty) o
-> Builder
encodeOutput _ compression :: Compression
compression
    = Compression -> Term Identity sch (sch :/: sty) -> Builder
forall m. ToAvro m => Compression -> m -> Builder
encoder Compression
compression (Term Identity sch (sch :/: sty) -> Builder)
-> (ViaToAvroTypeRef ('ViaSchema sch sty) o
    -> Term Identity sch (sch :/: sty))
-> ViaToAvroTypeRef ('ViaSchema sch sty) o
-> Builder
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall fn tn (sch :: Schema tn fn) (w :: * -> *) t (sty :: tn).
ToSchema w sch sty t =>
t -> Term w sch (sch :/: sty)
forall t (sty :: Symbol).
ToSchema Identity sch sty t =>
t -> Term Identity sch (sch :/: sty)
toSchema' @_ @_ @sch @Identity (o -> Term Identity sch (sch :/: sty))
-> (ViaToAvroTypeRef ('ViaSchema sch sty) o -> o)
-> ViaToAvroTypeRef ('ViaSchema sch sty) o
-> Term Identity sch (sch :/: sty)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ViaToAvroTypeRef ('ViaSchema sch sty) o -> o
forall (ref :: TypeRef) t. ViaToAvroTypeRef ref t -> t
unViaToAvroTypeRef
  decodeOutput :: AvroRPC
-> Compression
-> Decoder
     (Either String (ViaToAvroTypeRef ('ViaSchema sch sty) o))
decodeOutput = String
-> AvroRPC
-> Compression
-> Decoder
     (Either String (ViaToAvroTypeRef ('ViaSchema sch sty) o))
forall a. HasCallStack => String -> a
error "dot/you should not call this"

encoder :: ToAvro m => Compression -> m -> Builder
encoder :: Compression -> m -> Builder
encoder compression :: Compression
compression plain :: m
plain =
    [Builder] -> Builder
forall a. Monoid a => [a] -> a
mconcat [ Word8 -> Builder
singleton (if Compression -> Bool
_compressionByteSet Compression
compression then 1 else 0)
            , Word32 -> Builder
putWord32be (Int -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Word32) -> Int -> Word32
forall a b. (a -> b) -> a -> b
$ ByteString -> Int
ByteString.length ByteString
bin)
            , ByteString -> Builder
fromByteString ByteString
bin
            ]
  where
    bin :: ByteString
bin = Compression -> ByteString -> ByteString
_compressionFunction Compression
compression (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
toStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ m -> ByteString
forall a. ToAvro a => a -> ByteString
encode m
plain

decoder :: FromAvro a => Compression -> Decoder (Either String a)
decoder :: Compression -> Decoder (Either String a)
decoder compression :: Compression
compression = Get (Either String a) -> Decoder (Either String a)
forall a. Get a -> Decoder a
runGetIncremental (Get (Either String a) -> Decoder (Either String a))
-> Get (Either String a) -> Decoder (Either String a)
forall a b. (a -> b) -> a -> b
$ do
    Int8
isCompressed <- Get Int8
getInt8      -- 1byte
    let decompress :: ByteString -> Get ByteString
decompress = if Int8
isCompressed Int8 -> Int8 -> Bool
forall a. Eq a => a -> a -> Bool
== 0 then ByteString -> Get ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure else Compression -> ByteString -> Get ByteString
_decompressionFunction Compression
compression
    Word32
n <- Get Word32
getWord32be             -- 4bytes
    ByteString -> Either String a
forall b. FromAvro b => ByteString -> Either String b
decode' (ByteString -> Either String a)
-> (ByteString -> ByteString) -> ByteString -> Either String a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
fromStrict (ByteString -> Either String a)
-> Get ByteString -> Get (Either String a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (ByteString -> Get ByteString
decompress (ByteString -> Get ByteString) -> Get ByteString -> Get ByteString
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Int -> Get ByteString
getByteString (Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
n))
  where
    decode' :: ByteString -> Either String b
decode' x :: ByteString
x = case ByteString -> Result b
forall a. FromAvro a => ByteString -> Result a
decode ByteString
x of
                  Success y :: b
y -> b -> Either String b
forall a b. b -> Either a b
Right b
y
                  Error   e :: String
e -> String -> Either String b
forall a b. a -> Either a b
Left String
e

-- Based on https://hackage.haskell.org/package/binary/docs/Data-Binary-Get-Internal.html
instance Functor Decoder where
  fmap :: (a -> b) -> Decoder a -> Decoder b
fmap f :: a -> b
f (Done b :: ByteString
b s :: ByteOffset
s a :: a
a)   = ByteString -> ByteOffset -> b -> Decoder b
forall a. ByteString -> ByteOffset -> a -> Decoder a
Done ByteString
b ByteOffset
s (a -> b
f a
a)
  fmap f :: a -> b
f (Partial k :: Maybe ByteString -> Decoder a
k)    = (Maybe ByteString -> Decoder b) -> Decoder b
forall a. (Maybe ByteString -> Decoder a) -> Decoder a
Partial ((a -> b) -> Decoder a -> Decoder b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f (Decoder a -> Decoder b)
-> (Maybe ByteString -> Decoder a) -> Maybe ByteString -> Decoder b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe ByteString -> Decoder a
k)
  fmap _ (Fail b :: ByteString
b s :: ByteOffset
s msg :: String
msg) = ByteString -> ByteOffset -> String -> Decoder b
forall a. ByteString -> ByteOffset -> String -> Decoder a
Fail ByteString
b ByteOffset
s String
msg