{-# language AllowAmbiguousTypes   #-}
{-# language DataKinds             #-}
{-# language DeriveFunctor         #-}
{-# language FlexibleContexts      #-}
{-# language FlexibleInstances     #-}
{-# language GADTs                 #-}
{-# language MultiParamTypeClasses #-}
{-# language PolyKinds             #-}
{-# language ScopedTypeVariables   #-}
{-# language TypeApplications      #-}
{-# language TypeFamilies          #-}
{-# language TypeOperators         #-}
{-# language UndecidableInstances  #-}
{-# OPTIONS_GHC -fprint-explicit-kinds #-}
-- | Client for gRPC services defined using Mu 'Service'
module Mu.GRpc.Client.Internal where

import           Control.Concurrent.Async
import           Control.Concurrent.STM        (atomically)
import           Control.Concurrent.STM.TMChan
import           Control.Concurrent.STM.TMVar
import           Control.Monad.IO.Class
import           Data.Avro
import qualified Data.ByteString.Char8         as BS
import           Data.Conduit
import qualified Data.Conduit.Combinators      as C
import           Data.Conduit.TMChan
import           Data.Kind
import           GHC.TypeLits
import           Network.GRPC.Client           (CompressMode (..), IncomingEvent (..),
                                                OutgoingEvent (..), RawReply, StreamDone (..))
import           Network.GRPC.Client.Helpers
import           Network.GRPC.HTTP2.Encoding   (GRPCInput, GRPCOutput)
import           Network.HTTP2                 (ErrorCode)
import           Network.HTTP2.Client          (ClientError, ClientIO, TooMuchConcurrency,
                                                runExceptT)

import           Mu.Adapter.ProtoBuf.Via
import           Mu.GRpc.Avro
import           Mu.GRpc.Bridge
import           Mu.Rpc
import           Mu.Schema

-- | Initialize a connection to a gRPC server.
setupGrpcClient' :: GrpcClientConfig -> IO (Either ClientError GrpcClient)
setupGrpcClient' :: GrpcClientConfig -> IO (Either ClientError GrpcClient)
setupGrpcClient' = ExceptT ClientError IO GrpcClient
-> IO (Either ClientError GrpcClient)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT ClientError IO GrpcClient
 -> IO (Either ClientError GrpcClient))
-> (GrpcClientConfig -> ExceptT ClientError IO GrpcClient)
-> GrpcClientConfig
-> IO (Either ClientError GrpcClient)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. GrpcClientConfig -> ExceptT ClientError IO GrpcClient
setupGrpcClient

class GRpcServiceMethodCall (p :: GRpcMessageProtocol)
                            (pkg :: snm) (s :: snm) (m :: Method snm mnm anm) h where
  gRpcServiceMethodCall :: Proxy p -> Proxy pkg -> Proxy s -> Proxy m -> GrpcClient -> h
instance ( KnownName serviceName, KnownName pkg, KnownName mname
         , GRpcMethodCall p ('Method mname manns margs mret) h, MkRPC p )
         => GRpcServiceMethodCall p pkg serviceName ('Method mname manns margs mret) h where
  gRpcServiceMethodCall :: Proxy @GRpcMessageProtocol p
-> Proxy @Symbol pkg
-> Proxy @Symbol serviceName
-> Proxy
     @(Method Symbol Symbol Symbol)
     ('Method @Symbol @Symbol @Symbol mname manns margs mret)
-> GrpcClient
-> h
gRpcServiceMethodCall pro :: Proxy @GRpcMessageProtocol p
pro _ _ = RPCTy p
-> Proxy
     @(Method Symbol Symbol Symbol)
     ('Method @Symbol @Symbol @Symbol mname manns margs mret)
-> GrpcClient
-> h
forall (p :: GRpcMessageProtocol)
       (method :: Method Symbol Symbol Symbol) h.
GRpcMethodCall p method h =>
RPCTy p
-> Proxy @(Method Symbol Symbol Symbol) method -> GrpcClient -> h
gRpcMethodCall @p RPCTy p
rpc
    where pkgName :: ByteString
pkgName = String -> ByteString
BS.pack (Proxy @Symbol pkg -> String
forall k (a :: k) (proxy :: k -> *).
KnownName @k a =>
proxy a -> String
nameVal (Proxy @Symbol pkg
forall k (t :: k). Proxy @k t
Proxy @pkg))
          svrName :: ByteString
svrName = String -> ByteString
BS.pack (Proxy @Symbol serviceName -> String
forall k (a :: k) (proxy :: k -> *).
KnownName @k a =>
proxy a -> String
nameVal (Proxy @Symbol serviceName
forall k (t :: k). Proxy @k t
Proxy @serviceName))
          metName :: ByteString
metName = String -> ByteString
BS.pack (Proxy @Symbol mname -> String
forall k (a :: k) (proxy :: k -> *).
KnownName @k a =>
proxy a -> String
nameVal (Proxy @Symbol mname
forall k (t :: k). Proxy @k t
Proxy @mname))
          rpc :: RPCTy p
rpc = Proxy @GRpcMessageProtocol p
-> ByteString -> ByteString -> ByteString -> RPCTy p
forall (p :: GRpcMessageProtocol).
MkRPC p =>
Proxy @GRpcMessageProtocol p
-> ByteString -> ByteString -> ByteString -> RPCTy p
mkRPC Proxy @GRpcMessageProtocol p
pro ByteString
pkgName ByteString
svrName ByteString
metName

data GRpcReply a
  = GRpcTooMuchConcurrency TooMuchConcurrency
  | GRpcErrorCode ErrorCode
  | GRpcErrorString String
  | GRpcClientError ClientError
  | GRpcOk a
  deriving (Int -> GRpcReply a -> ShowS
[GRpcReply a] -> ShowS
GRpcReply a -> String
(Int -> GRpcReply a -> ShowS)
-> (GRpcReply a -> String)
-> ([GRpcReply a] -> ShowS)
-> Show (GRpcReply a)
forall a. Show a => Int -> GRpcReply a -> ShowS
forall a. Show a => [GRpcReply a] -> ShowS
forall a. Show a => GRpcReply a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [GRpcReply a] -> ShowS
$cshowList :: forall a. Show a => [GRpcReply a] -> ShowS
show :: GRpcReply a -> String
$cshow :: forall a. Show a => GRpcReply a -> String
showsPrec :: Int -> GRpcReply a -> ShowS
$cshowsPrec :: forall a. Show a => Int -> GRpcReply a -> ShowS
Show, a -> GRpcReply b -> GRpcReply a
(a -> b) -> GRpcReply a -> GRpcReply b
(forall a b. (a -> b) -> GRpcReply a -> GRpcReply b)
-> (forall a b. a -> GRpcReply b -> GRpcReply a)
-> Functor GRpcReply
forall a b. a -> GRpcReply b -> GRpcReply a
forall a b. (a -> b) -> GRpcReply a -> GRpcReply b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> GRpcReply b -> GRpcReply a
$c<$ :: forall a b. a -> GRpcReply b -> GRpcReply a
fmap :: (a -> b) -> GRpcReply a -> GRpcReply b
$cfmap :: forall a b. (a -> b) -> GRpcReply a -> GRpcReply b
Functor)

buildGRpcReply1 :: Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 :: Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 (Left tmc :: TooMuchConcurrency
tmc)                      = TooMuchConcurrency -> GRpcReply a
forall a. TooMuchConcurrency -> GRpcReply a
GRpcTooMuchConcurrency TooMuchConcurrency
tmc
buildGRpcReply1 (Right (Left ec :: ErrorCode
ec))               = ErrorCode -> GRpcReply a
forall a. ErrorCode -> GRpcReply a
GRpcErrorCode ErrorCode
ec
buildGRpcReply1 (Right (Right (_, _, Left es :: String
es))) = String -> GRpcReply a
forall a. String -> GRpcReply a
GRpcErrorString String
es
buildGRpcReply1 (Right (Right (_, _, Right r :: a
r))) = a -> GRpcReply a
forall a. a -> GRpcReply a
GRpcOk a
r

buildGRpcReply2 :: Either TooMuchConcurrency (r, RawReply a) -> GRpcReply a
buildGRpcReply2 :: Either TooMuchConcurrency (r, RawReply a) -> GRpcReply a
buildGRpcReply2 (Left tmc :: TooMuchConcurrency
tmc)                         = TooMuchConcurrency -> GRpcReply a
forall a. TooMuchConcurrency -> GRpcReply a
GRpcTooMuchConcurrency TooMuchConcurrency
tmc
buildGRpcReply2 (Right (_, Left ec :: ErrorCode
ec))               = ErrorCode -> GRpcReply a
forall a. ErrorCode -> GRpcReply a
GRpcErrorCode ErrorCode
ec
buildGRpcReply2 (Right (_, Right (_, _, Left es :: String
es))) = String -> GRpcReply a
forall a. String -> GRpcReply a
GRpcErrorString String
es
buildGRpcReply2 (Right (_, Right (_, _, Right r :: a
r))) = a -> GRpcReply a
forall a. a -> GRpcReply a
GRpcOk a
r

buildGRpcReply3 :: Either TooMuchConcurrency v -> GRpcReply ()
buildGRpcReply3 :: Either TooMuchConcurrency v -> GRpcReply ()
buildGRpcReply3 (Left tmc :: TooMuchConcurrency
tmc) = TooMuchConcurrency -> GRpcReply ()
forall a. TooMuchConcurrency -> GRpcReply a
GRpcTooMuchConcurrency TooMuchConcurrency
tmc
buildGRpcReply3 (Right _)  = () -> GRpcReply ()
forall a. a -> GRpcReply a
GRpcOk ()

simplifyResponse :: ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse :: ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse reply :: ClientIO (GRpcReply a)
reply = do
  Either ClientError (GRpcReply a)
r <- ClientIO (GRpcReply a) -> IO (Either ClientError (GRpcReply a))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT ClientIO (GRpcReply a)
reply
  GRpcReply a -> IO (GRpcReply a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (GRpcReply a -> IO (GRpcReply a))
-> GRpcReply a -> IO (GRpcReply a)
forall a b. (a -> b) -> a -> b
$ case Either ClientError (GRpcReply a)
r of
    Left e :: ClientError
e  -> ClientError -> GRpcReply a
forall a. ClientError -> GRpcReply a
GRpcClientError ClientError
e
    Right v :: GRpcReply a
v -> GRpcReply a
v

-- These type classes allow us to abstract over
-- the choice of message protocol (PB or Avro)

class GRPCInput (RPCTy p) (GRpcIWTy p ref r)
      => GRpcInputWrapper (p :: GRpcMessageProtocol) (ref :: TypeRef snm) (r :: Type) where
  type GRpcIWTy p ref r :: Type
  buildGRpcIWTy :: Proxy p -> Proxy ref -> r -> GRpcIWTy p ref r

instance ToProtoBufTypeRef ref r
         => GRpcInputWrapper 'MsgProtoBuf ref r where
  type GRpcIWTy 'MsgProtoBuf ref r = ViaToProtoBufTypeRef ref r
  buildGRpcIWTy :: Proxy @GRpcMessageProtocol 'MsgProtoBuf
-> Proxy @(TypeRef snm) ref
-> r
-> GRpcIWTy @snm 'MsgProtoBuf ref r
buildGRpcIWTy _ _ = r -> GRpcIWTy @snm 'MsgProtoBuf ref r
forall snm (ref :: TypeRef snm) t.
t -> ViaToProtoBufTypeRef @snm ref t
ViaToProtoBufTypeRef

instance forall (sch :: Schema') (sty :: Symbol) (r :: Type).
         ( ToSchema sch sty r
         , ToAvro (WithSchema sch sty r)
         , HasAvroSchema (WithSchema sch sty r) )
         => GRpcInputWrapper 'MsgAvro ('SchemaRef sch sty) r where
  type GRpcIWTy 'MsgAvro ('SchemaRef sch sty) r = ViaToAvroTypeRef ('SchemaRef sch sty) r
  buildGRpcIWTy :: Proxy @GRpcMessageProtocol 'MsgAvro
-> Proxy @(TypeRef snm) ('SchemaRef @Symbol @Symbol @snm sch sty)
-> r
-> GRpcIWTy
     @snm 'MsgAvro ('SchemaRef @Symbol @Symbol @snm sch sty) r
buildGRpcIWTy _ _ = r
-> GRpcIWTy
     @snm 'MsgAvro ('SchemaRef @Symbol @Symbol @snm sch sty) r
forall snm (ref :: TypeRef snm) t. t -> ViaToAvroTypeRef @snm ref t
ViaToAvroTypeRef

class GRPCOutput (RPCTy p) (GRpcOWTy p ref r)
      => GRpcOutputWrapper (p :: GRpcMessageProtocol) (ref :: TypeRef snm) (r :: Type) where
  type GRpcOWTy p ref r :: Type
  unGRpcOWTy :: Proxy p -> Proxy ref -> GRpcOWTy p ref r -> r

instance FromProtoBufTypeRef ref r
         => GRpcOutputWrapper 'MsgProtoBuf ref r where
  type GRpcOWTy 'MsgProtoBuf ref r = ViaFromProtoBufTypeRef ref r
  unGRpcOWTy :: Proxy @GRpcMessageProtocol 'MsgProtoBuf
-> Proxy @(TypeRef snm) ref
-> GRpcOWTy @snm 'MsgProtoBuf ref r
-> r
unGRpcOWTy _ _ = GRpcOWTy @snm 'MsgProtoBuf ref r -> r
forall snm (ref :: TypeRef snm) t.
ViaFromProtoBufTypeRef @snm ref t -> t
unViaFromProtoBufTypeRef

instance forall (sch :: Schema') (sty :: Symbol) (r :: Type).
         ( FromSchema sch sty r
         , FromAvro (WithSchema sch sty r)
         , HasAvroSchema (WithSchema sch sty r) )
         => GRpcOutputWrapper 'MsgAvro ('SchemaRef sch sty) r where
  type GRpcOWTy 'MsgAvro ('SchemaRef sch sty) r = ViaFromAvroTypeRef ('SchemaRef sch sty) r
  unGRpcOWTy :: Proxy @GRpcMessageProtocol 'MsgAvro
-> Proxy @(TypeRef snm) ('SchemaRef @Symbol @Symbol @snm sch sty)
-> GRpcOWTy
     @snm 'MsgAvro ('SchemaRef @Symbol @Symbol @snm sch sty) r
-> r
unGRpcOWTy _ _ = GRpcOWTy @snm 'MsgAvro ('SchemaRef @Symbol @Symbol @snm sch sty) r
-> r
forall snm (ref :: TypeRef snm) t.
ViaFromAvroTypeRef @snm ref t -> t
unViaFromAvroTypeRef

-- -----------------------------
-- IMPLEMENTATION OF THE METHODS
-- -----------------------------

class GRpcMethodCall (p :: GRpcMessageProtocol) (method :: Method') h where
  gRpcMethodCall :: RPCTy p -> Proxy method -> GrpcClient -> h

instance ( KnownName name
         , GRPCInput (RPCTy p) (), GRPCOutput (RPCTy p) ()
         , handler ~ IO (GRpcReply ()) )
         => GRpcMethodCall p ('Method name anns '[ ] 'RetNothing) handler where
  gRpcMethodCall :: RPCTy p
-> Proxy
     @(Method Symbol Symbol Symbol)
     ('Method
        @Symbol
        @Symbol
        @Symbol
        name
        anns
        ('[] @(Argument Symbol Symbol))
        ('RetNothing @Symbol))
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client
    = ClientIO (GRpcReply ()) -> handler
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> handler)
-> ClientIO (GRpcReply ()) -> handler
forall a b. (a -> b) -> a -> b
$
      Either TooMuchConcurrency (RawReply ()) -> GRpcReply ()
forall a. Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 (Either TooMuchConcurrency (RawReply ()) -> GRpcReply ())
-> ExceptT ClientError IO (Either TooMuchConcurrency (RawReply ()))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
      RPCTy p
-> GrpcClient
-> ()
-> ExceptT ClientError IO (Either TooMuchConcurrency (RawReply ()))
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> i
-> ClientIO (Either TooMuchConcurrency (RawReply o))
rawUnary RPCTy p
rpc GrpcClient
client ()

instance ( KnownName name
         , GRPCInput (RPCTy p) (), GRpcOutputWrapper p rref r
         , handler ~ IO (GRpcReply r) )
         => GRpcMethodCall p ('Method name anns '[ ] ('RetSingle rref)) handler where
  gRpcMethodCall :: RPCTy p
-> Proxy
     @(Method Symbol Symbol Symbol)
     ('Method
        @Symbol
        @Symbol
        @Symbol
        name
        anns
        ('[] @(Argument Symbol Symbol))
        ('RetSingle @Symbol rref))
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client
    = (GRpcReply (GRpcOWTy @Symbol p rref r) -> GRpcReply r)
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((GRpcOWTy @Symbol p rref r -> r)
-> GRpcReply (GRpcOWTy @Symbol p rref r) -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) rref -> GRpcOWTy @Symbol p rref r -> r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> GRpcOWTy @snm p ref r -> r
unGRpcOWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) rref
forall k (t :: k). Proxy @k t
Proxy @rref))) (IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> handler)
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> handler
forall a b. (a -> b) -> a -> b
$
      ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
 -> IO (GRpcReply (GRpcOWTy @Symbol p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall a b. (a -> b) -> a -> b
$
      Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r))
-> GRpcReply (GRpcOWTy @Symbol p rref r)
forall a. Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 (Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r))
 -> GRpcReply (GRpcOWTy @Symbol p rref r))
-> ExceptT
     ClientError
     IO
     (Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
      RPCTy p
-> GrpcClient
-> ()
-> ExceptT
     ClientError
     IO
     (Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r)))
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> i
-> ClientIO (Either TooMuchConcurrency (RawReply o))
rawUnary @_ @() @(GRpcOWTy p rref r) RPCTy p
rpc GrpcClient
client ()

instance ( KnownName name
         , GRPCInput (RPCTy p) (), GRpcOutputWrapper p rref r
         , handler ~ IO (ConduitT () (GRpcReply r) IO ()) )
         => GRpcMethodCall p ('Method name anns '[ ] ('RetStream rref)) handler where
  gRpcMethodCall :: RPCTy p
-> Proxy
     @(Method Symbol Symbol Symbol)
     ('Method
        @Symbol
        @Symbol
        @Symbol
        name
        anns
        ('[] @(Argument Symbol Symbol))
        ('RetStream @Symbol rref))
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client
    = do -- Create a new TMChan
         TMChan r
chan <- IO (TMChan r)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan r)
         TMVar (GRpcReply ())
var  <- IO (TMVar (GRpcReply ()))
forall a. IO (TMVar a)
newEmptyTMVarIO  -- if full, this means an error
         -- Start executing the client in another thread
         Async ()
_ <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ do
            GRpcReply ()
v <- ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> IO (GRpcReply ()))
-> ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$
                 Either TooMuchConcurrency ((), HeaderList, HeaderList)
-> GRpcReply ()
forall v. Either TooMuchConcurrency v -> GRpcReply ()
buildGRpcReply3 (Either TooMuchConcurrency ((), HeaderList, HeaderList)
 -> GRpcReply ())
-> ExceptT
     ClientError
     IO
     (Either TooMuchConcurrency ((), HeaderList, HeaderList))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
                 RPCTy p
-> GrpcClient
-> ()
-> ()
-> (() -> HeaderList -> GRpcOWTy @Symbol p rref r -> ClientIO ())
-> ExceptT
     ClientError
     IO
     (Either TooMuchConcurrency ((), HeaderList, HeaderList))
forall r i o a.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> a
-> i
-> (a -> HeaderList -> o -> ClientIO a)
-> ClientIO (Either TooMuchConcurrency (a, HeaderList, HeaderList))
rawStreamServer @_ @() @(GRpcOWTy p rref r)
                                 RPCTy p
rpc GrpcClient
client () ()
                                 (\_ _ newVal :: GRpcOWTy @Symbol p rref r
newVal -> IO () -> ClientIO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ClientIO ()) -> IO () -> ClientIO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                                   -- on the first iteration, say that everything is OK
                                   Bool
_ <- TMVar (GRpcReply ()) -> GRpcReply () -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (GRpcReply ())
var (() -> GRpcReply ()
forall a. a -> GRpcReply a
GRpcOk ())
                                   TMChan r -> r -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan r
chan (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) rref -> GRpcOWTy @Symbol p rref r -> r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> GRpcOWTy @snm p ref r -> r
unGRpcOWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) rref
forall k (t :: k). Proxy @k t
Proxy @rref) GRpcOWTy @Symbol p rref r
newVal))
            case GRpcReply ()
v of
              GRpcOk () -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan r -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan r
chan
              _         -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> GRpcReply () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (GRpcReply ())
var GRpcReply ()
v
         -- This conduit feeds information to the other thread
         let go :: ConduitT () (GRpcReply r) IO ()
go = do GRpcReply ()
firstResult <- IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ()))
-> IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ STM (GRpcReply ()) -> IO (GRpcReply ())
forall a. STM a -> IO a
atomically (STM (GRpcReply ()) -> IO (GRpcReply ()))
-> STM (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> STM (GRpcReply ())
forall a. TMVar a -> STM a
takeTMVar TMVar (GRpcReply ())
var
                     case GRpcReply ()
firstResult of
                       GRpcOk _ -> -- no error, everything is fine
                         TMChan r -> ConduitT () r IO ()
forall (m :: * -> *) a. MonadIO m => TMChan a -> ConduitT () a m ()
sourceTMChan TMChan r
chan ConduitT () r IO ()
-> ConduitM r (GRpcReply r) IO ()
-> ConduitT () (GRpcReply r) IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (r -> GRpcReply r) -> ConduitM r (GRpcReply r) IO ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.map r -> GRpcReply r
forall a. a -> GRpcReply a
GRpcOk
                       e :: GRpcReply ()
e -> GRpcReply r -> ConduitT () (GRpcReply r) IO ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (GRpcReply r -> ConduitT () (GRpcReply r) IO ())
-> GRpcReply r -> ConduitT () (GRpcReply r) IO ()
forall a b. (a -> b) -> a -> b
$ (\_ -> String -> r
forall a. HasCallStack => String -> a
error "this should never happen") (() -> r) -> GRpcReply () -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> GRpcReply ()
e
         ConduitT () (GRpcReply r) IO ()
-> IO (ConduitT () (GRpcReply r) IO ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ConduitT () (GRpcReply r) IO ()
go

instance ( KnownName name
         , GRpcInputWrapper p vref v, GRPCOutput (RPCTy p) ()
         , handler ~ (v -> IO (GRpcReply ())) )
         => GRpcMethodCall p ('Method name anns '[ 'ArgSingle aname aanns vref ]
                                      'RetNothing) handler where
  gRpcMethodCall :: RPCTy p
-> Proxy
     @(Method Symbol Symbol Symbol)
     ('Method
        @Symbol
        @Symbol
        @Symbol
        name
        anns
        ((':)
           @(Argument Symbol Symbol)
           ('ArgSingle @Symbol @Symbol aname aanns vref)
           ('[] @(Argument Symbol Symbol)))
        ('RetNothing @Symbol))
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client x :: v
x
    = ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> IO (GRpcReply ()))
-> ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$
      Either TooMuchConcurrency (RawReply ()) -> GRpcReply ()
forall a. Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 (Either TooMuchConcurrency (RawReply ()) -> GRpcReply ())
-> ExceptT ClientError IO (Either TooMuchConcurrency (RawReply ()))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
      RPCTy p
-> GrpcClient
-> GRpcIWTy @Symbol p vref v
-> ExceptT ClientError IO (Either TooMuchConcurrency (RawReply ()))
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> i
-> ClientIO (Either TooMuchConcurrency (RawReply o))
rawUnary @_ @(GRpcIWTy p vref v) @() RPCTy p
rpc GrpcClient
client (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) vref -> v -> GRpcIWTy @Symbol p vref v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> r -> GRpcIWTy @snm p ref r
buildGRpcIWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) vref
forall k (t :: k). Proxy @k t
Proxy @vref) v
x)

instance ( KnownName name
         , GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r
         , handler ~ (v -> IO (GRpcReply r)) )
         => GRpcMethodCall p ('Method name anns '[ 'ArgSingle aname aanns vref ]
                                      ('RetSingle rref)) handler where
  gRpcMethodCall :: RPCTy p
-> Proxy
     @(Method Symbol Symbol Symbol)
     ('Method
        @Symbol
        @Symbol
        @Symbol
        name
        anns
        ((':)
           @(Argument Symbol Symbol)
           ('ArgSingle @Symbol @Symbol aname aanns vref)
           ('[] @(Argument Symbol Symbol)))
        ('RetSingle @Symbol rref))
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client x :: v
x
    = (GRpcReply (GRpcOWTy @Symbol p rref r) -> GRpcReply r)
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((GRpcOWTy @Symbol p rref r -> r)
-> GRpcReply (GRpcOWTy @Symbol p rref r) -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) rref -> GRpcOWTy @Symbol p rref r -> r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> GRpcOWTy @snm p ref r -> r
unGRpcOWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) rref
forall k (t :: k). Proxy @k t
Proxy @rref))) (IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r)
forall a b. (a -> b) -> a -> b
$
      ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
 -> IO (GRpcReply (GRpcOWTy @Symbol p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall a b. (a -> b) -> a -> b
$
      Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r))
-> GRpcReply (GRpcOWTy @Symbol p rref r)
forall a. Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 (Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r))
 -> GRpcReply (GRpcOWTy @Symbol p rref r))
-> ExceptT
     ClientError
     IO
     (Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
      RPCTy p
-> GrpcClient
-> GRpcIWTy @Symbol p vref v
-> ExceptT
     ClientError
     IO
     (Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r)))
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> i
-> ClientIO (Either TooMuchConcurrency (RawReply o))
rawUnary @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
               RPCTy p
rpc GrpcClient
client (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) vref -> v -> GRpcIWTy @Symbol p vref v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> r -> GRpcIWTy @snm p ref r
buildGRpcIWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) vref
forall k (t :: k). Proxy @k t
Proxy @vref) v
x)

instance ( KnownName name
         , GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r
         , handler ~ (v -> IO (ConduitT () (GRpcReply r) IO ())) )
         => GRpcMethodCall p ('Method name anns '[ 'ArgSingle aname aanns vref ]
                                      ('RetStream rref)) handler where
  gRpcMethodCall :: RPCTy p
-> Proxy
     @(Method Symbol Symbol Symbol)
     ('Method
        @Symbol
        @Symbol
        @Symbol
        name
        anns
        ((':)
           @(Argument Symbol Symbol)
           ('ArgSingle @Symbol @Symbol aname aanns vref)
           ('[] @(Argument Symbol Symbol)))
        ('RetStream @Symbol rref))
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client x :: v
x
    = do -- Create a new TMChan
         TMChan r
chan <- IO (TMChan r)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan r)
         TMVar (GRpcReply ())
var  <- IO (TMVar (GRpcReply ()))
forall a. IO (TMVar a)
newEmptyTMVarIO  -- if full, this means an error
         -- Start executing the client in another thread
         Async ()
_ <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ do
            GRpcReply ()
v <- ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> IO (GRpcReply ()))
-> ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$
                 Either TooMuchConcurrency ((), HeaderList, HeaderList)
-> GRpcReply ()
forall v. Either TooMuchConcurrency v -> GRpcReply ()
buildGRpcReply3 (Either TooMuchConcurrency ((), HeaderList, HeaderList)
 -> GRpcReply ())
-> ExceptT
     ClientError
     IO
     (Either TooMuchConcurrency ((), HeaderList, HeaderList))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
                 RPCTy p
-> GrpcClient
-> ()
-> GRpcIWTy @Symbol p vref v
-> (() -> HeaderList -> GRpcOWTy @Symbol p rref r -> ClientIO ())
-> ExceptT
     ClientError
     IO
     (Either TooMuchConcurrency ((), HeaderList, HeaderList))
forall r i o a.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> a
-> i
-> (a -> HeaderList -> o -> ClientIO a)
-> ClientIO (Either TooMuchConcurrency (a, HeaderList, HeaderList))
rawStreamServer @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
                                 RPCTy p
rpc GrpcClient
client () (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) vref -> v -> GRpcIWTy @Symbol p vref v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> r -> GRpcIWTy @snm p ref r
buildGRpcIWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) vref
forall k (t :: k). Proxy @k t
Proxy @vref) v
x)
                                 (\_ _ newVal :: GRpcOWTy @Symbol p rref r
newVal -> IO () -> ClientIO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ClientIO ()) -> IO () -> ClientIO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                                   -- on the first iteration, say that everything is OK
                                   Bool
_ <- TMVar (GRpcReply ()) -> GRpcReply () -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (GRpcReply ())
var (() -> GRpcReply ()
forall a. a -> GRpcReply a
GRpcOk ())
                                   TMChan r -> r -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan r
chan (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) rref -> GRpcOWTy @Symbol p rref r -> r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> GRpcOWTy @snm p ref r -> r
unGRpcOWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) rref
forall k (t :: k). Proxy @k t
Proxy @rref) GRpcOWTy @Symbol p rref r
newVal))
            case GRpcReply ()
v of
              GRpcOk () -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan r -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan r
chan
              _         -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> GRpcReply () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (GRpcReply ())
var GRpcReply ()
v
         -- This conduit feeds information to the other thread
         let go :: ConduitT () (GRpcReply r) IO ()
go = do GRpcReply ()
firstResult <- IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ()))
-> IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ STM (GRpcReply ()) -> IO (GRpcReply ())
forall a. STM a -> IO a
atomically (STM (GRpcReply ()) -> IO (GRpcReply ()))
-> STM (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> STM (GRpcReply ())
forall a. TMVar a -> STM a
takeTMVar TMVar (GRpcReply ())
var
                     case GRpcReply ()
firstResult of
                       GRpcOk _ -> -- no error, everything is fine
                         TMChan r -> ConduitT () r IO ()
forall (m :: * -> *) a. MonadIO m => TMChan a -> ConduitT () a m ()
sourceTMChan TMChan r
chan ConduitT () r IO ()
-> ConduitM r (GRpcReply r) IO ()
-> ConduitT () (GRpcReply r) IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (r -> GRpcReply r) -> ConduitM r (GRpcReply r) IO ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.map r -> GRpcReply r
forall a. a -> GRpcReply a
GRpcOk
                       e :: GRpcReply ()
e -> GRpcReply r -> ConduitT () (GRpcReply r) IO ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (GRpcReply r -> ConduitT () (GRpcReply r) IO ())
-> GRpcReply r -> ConduitT () (GRpcReply r) IO ()
forall a b. (a -> b) -> a -> b
$ (\_ -> String -> r
forall a. HasCallStack => String -> a
error "this should never happen") (() -> r) -> GRpcReply () -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> GRpcReply ()
e
         ConduitT () (GRpcReply r) IO ()
-> IO (ConduitT () (GRpcReply r) IO ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ConduitT () (GRpcReply r) IO ()
go

instance ( KnownName name
         , GRpcInputWrapper p vref v, GRPCOutput (RPCTy p) ()
         , handler ~ (CompressMode -> IO (ConduitT v Void IO (GRpcReply ()))) )
         => GRpcMethodCall p ('Method name anns '[ 'ArgStream aname aanns vref ]
                                      'RetNothing) handler where
  gRpcMethodCall :: RPCTy p
-> Proxy
     @(Method Symbol Symbol Symbol)
     ('Method
        @Symbol
        @Symbol
        @Symbol
        name
        anns
        ((':)
           @(Argument Symbol Symbol)
           ('ArgStream @Symbol @Symbol aname aanns vref)
           ('[] @(Argument Symbol Symbol)))
        ('RetNothing @Symbol))
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client compress :: CompressMode
compress
    = do -- Create a new TMChan
         TMChan v
chan <- IO (TMChan v)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan v)
         -- Start executing the client in another thread
         Async (GRpcReply ())
promise <- IO (GRpcReply ()) -> IO (Async (GRpcReply ()))
forall a. IO a -> IO (Async a)
async (IO (GRpcReply ()) -> IO (Async (GRpcReply ())))
-> IO (GRpcReply ()) -> IO (Async (GRpcReply ()))
forall a b. (a -> b) -> a -> b
$
            ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> IO (GRpcReply ()))
-> ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$
            Either TooMuchConcurrency ((), RawReply ()) -> GRpcReply ()
forall r a.
Either TooMuchConcurrency (r, RawReply a) -> GRpcReply a
buildGRpcReply2 (Either TooMuchConcurrency ((), RawReply ()) -> GRpcReply ())
-> ExceptT
     ClientError IO (Either TooMuchConcurrency ((), RawReply ()))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
            RPCTy p
-> GrpcClient
-> ()
-> (()
    -> ClientIO
         ((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v)))
-> ExceptT
     ClientError IO (Either TooMuchConcurrency ((), RawReply ()))
forall r i o a.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> a
-> (a -> ClientIO (a, Either StreamDone (CompressMode, i)))
-> ClientIO (Either TooMuchConcurrency (a, RawReply o))
rawStreamClient @_ @(GRpcIWTy p vref v) @() RPCTy p
rpc GrpcClient
client ()
                            (\_ -> do Maybe v
nextVal <- IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe v) -> ExceptT ClientError IO (Maybe v))
-> IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ STM (Maybe v) -> IO (Maybe v)
forall a. STM a -> IO a
atomically (STM (Maybe v) -> IO (Maybe v)) -> STM (Maybe v) -> IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ TMChan v -> STM (Maybe v)
forall a. TMChan a -> STM (Maybe a)
readTMChan TMChan v
chan
                                      case Maybe v
nextVal of
                                        Nothing -> ((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
-> ClientIO
     ((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), StreamDone
-> Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v)
forall a b. a -> Either a b
Left StreamDone
StreamDone)
                                        Just v :: v
v  -> ((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
-> ClientIO
     ((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), (CompressMode, GRpcIWTy @Symbol p vref v)
-> Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v)
forall a b. b -> Either a b
Right (CompressMode
compress, Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) vref -> v -> GRpcIWTy @Symbol p vref v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> r -> GRpcIWTy @snm p ref r
buildGRpcIWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) vref
forall k (t :: k). Proxy @k t
Proxy @vref) v
v)))
         ConduitT v Void IO (GRpcReply ())
-> IO (ConduitT v Void IO (GRpcReply ()))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TMChan v
-> Async (GRpcReply ()) -> ConduitT v Void IO (GRpcReply ())
forall (m :: * -> *) a b o.
MonadIO m =>
TMChan a -> Async b -> ConduitT a o m b
conduitFromChannel TMChan v
chan Async (GRpcReply ())
promise)

instance ( KnownName name
         , GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r
         , handler ~ (CompressMode -> IO (ConduitT v Void IO (GRpcReply r))) )
         => GRpcMethodCall p ('Method name anns '[ 'ArgStream aname aanns vref ]
                                      ('RetSingle rref)) handler where
  gRpcMethodCall :: RPCTy p
-> Proxy
     @(Method Symbol Symbol Symbol)
     ('Method
        @Symbol
        @Symbol
        @Symbol
        name
        anns
        ((':)
           @(Argument Symbol Symbol)
           ('ArgStream @Symbol @Symbol aname aanns vref)
           ('[] @(Argument Symbol Symbol)))
        ('RetSingle @Symbol rref))
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client compress :: CompressMode
compress
    = do -- Create a new TMChan
         TMChan v
chan <- IO (TMChan v)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan v)
         -- Start executing the client in another thread
         Async (GRpcReply r)
promise <- IO (GRpcReply r) -> IO (Async (GRpcReply r))
forall a. IO a -> IO (Async a)
async (IO (GRpcReply r) -> IO (Async (GRpcReply r)))
-> IO (GRpcReply r) -> IO (Async (GRpcReply r))
forall a b. (a -> b) -> a -> b
$
            (GRpcReply (GRpcOWTy @Symbol p rref r) -> GRpcReply r)
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((GRpcOWTy @Symbol p rref r -> r)
-> GRpcReply (GRpcOWTy @Symbol p rref r) -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) rref -> GRpcOWTy @Symbol p rref r -> r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> GRpcOWTy @snm p ref r -> r
unGRpcOWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) rref
forall k (t :: k). Proxy @k t
Proxy @rref))) (IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r)
forall a b. (a -> b) -> a -> b
$
            ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
 -> IO (GRpcReply (GRpcOWTy @Symbol p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall a b. (a -> b) -> a -> b
$
            Either
  TooMuchConcurrency ((), RawReply (GRpcOWTy @Symbol p rref r))
-> GRpcReply (GRpcOWTy @Symbol p rref r)
forall r a.
Either TooMuchConcurrency (r, RawReply a) -> GRpcReply a
buildGRpcReply2 (Either
   TooMuchConcurrency ((), RawReply (GRpcOWTy @Symbol p rref r))
 -> GRpcReply (GRpcOWTy @Symbol p rref r))
-> ExceptT
     ClientError
     IO
     (Either
        TooMuchConcurrency ((), RawReply (GRpcOWTy @Symbol p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
            RPCTy p
-> GrpcClient
-> ()
-> (()
    -> ClientIO
         ((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v)))
-> ExceptT
     ClientError
     IO
     (Either
        TooMuchConcurrency ((), RawReply (GRpcOWTy @Symbol p rref r)))
forall r i o a.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> a
-> (a -> ClientIO (a, Either StreamDone (CompressMode, i)))
-> ClientIO (Either TooMuchConcurrency (a, RawReply o))
rawStreamClient @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r) RPCTy p
rpc GrpcClient
client ()
                            (\_ -> do Maybe v
nextVal <- IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe v) -> ExceptT ClientError IO (Maybe v))
-> IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ STM (Maybe v) -> IO (Maybe v)
forall a. STM a -> IO a
atomically (STM (Maybe v) -> IO (Maybe v)) -> STM (Maybe v) -> IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ TMChan v -> STM (Maybe v)
forall a. TMChan a -> STM (Maybe a)
readTMChan TMChan v
chan
                                      case Maybe v
nextVal of
                                        Nothing -> ((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
-> ClientIO
     ((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), StreamDone
-> Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v)
forall a b. a -> Either a b
Left StreamDone
StreamDone)
                                        Just v :: v
v  -> ((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
-> ClientIO
     ((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), (CompressMode, GRpcIWTy @Symbol p vref v)
-> Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v)
forall a b. b -> Either a b
Right (CompressMode
compress, Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) vref -> v -> GRpcIWTy @Symbol p vref v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> r -> GRpcIWTy @snm p ref r
buildGRpcIWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) vref
forall k (t :: k). Proxy @k t
Proxy @vref) v
v)))
         ConduitT v Void IO (GRpcReply r)
-> IO (ConduitT v Void IO (GRpcReply r))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TMChan v -> Async (GRpcReply r) -> ConduitT v Void IO (GRpcReply r)
forall (m :: * -> *) a b o.
MonadIO m =>
TMChan a -> Async b -> ConduitT a o m b
conduitFromChannel TMChan v
chan Async (GRpcReply r)
promise)

conduitFromChannel :: MonadIO m => TMChan a -> Async b -> ConduitT a o m b
conduitFromChannel :: TMChan a -> Async b -> ConduitT a o m b
conduitFromChannel chan :: TMChan a
chan promise :: Async b
promise = ConduitT a o m b
go
  where go :: ConduitT a o m b
go = do Maybe a
x <- ConduitT a o m (Maybe a)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await
                case Maybe a
x of
                  Just v :: a
v  -> do IO () -> ConduitT a o m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT a o m ()) -> IO () -> ConduitT a o m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan a -> a -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan a
chan a
v
                                ConduitT a o m b
go
                  Nothing -> do IO () -> ConduitT a o m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT a o m ()) -> IO () -> ConduitT a o m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan a -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan a
chan
                                IO b -> ConduitT a o m b
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO b -> ConduitT a o m b) -> IO b -> ConduitT a o m b
forall a b. (a -> b) -> a -> b
$ Async b -> IO b
forall a. Async a -> IO a
wait Async b
promise

instance ( KnownName name
         , GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r
         , handler ~ (CompressMode -> IO (ConduitT v (GRpcReply r) IO ())) )
         => GRpcMethodCall p ('Method name anns '[ 'ArgStream aname aans vref ]
                                      ('RetStream rref)) handler where
  gRpcMethodCall :: RPCTy p
-> Proxy
     @(Method Symbol Symbol Symbol)
     ('Method
        @Symbol
        @Symbol
        @Symbol
        name
        anns
        ((':)
           @(Argument Symbol Symbol)
           ('ArgStream @Symbol @Symbol aname aans vref)
           ('[] @(Argument Symbol Symbol)))
        ('RetStream @Symbol rref))
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client compress :: CompressMode
compress
    = do -- Create a new TMChan
         TMChan (GRpcReply r)
inchan <- IO (TMChan (GRpcReply r))
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan (GRpcReply r))
         TMChan v
outchan <- IO (TMChan v)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan v)
         TMVar (GRpcReply ())
var <- IO (TMVar (GRpcReply ()))
forall a. IO (TMVar a)
newEmptyTMVarIO  -- if full, this means an error
         -- Start executing the client in another thread
         Async ()
_ <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ do
            GRpcReply ()
v <- ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> IO (GRpcReply ()))
-> ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$
                 Either TooMuchConcurrency ((), ()) -> GRpcReply ()
forall v. Either TooMuchConcurrency v -> GRpcReply ()
buildGRpcReply3 (Either TooMuchConcurrency ((), ()) -> GRpcReply ())
-> ExceptT ClientError IO (Either TooMuchConcurrency ((), ()))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
                 RPCTy p
-> GrpcClient
-> ()
-> (()
    -> IncomingEvent (GRpcOWTy @Symbol p rref r) () -> ClientIO ())
-> ()
-> (()
    -> ClientIO ((), OutgoingEvent (GRpcIWTy @Symbol p vref v) ()))
-> ExceptT ClientError IO (Either TooMuchConcurrency ((), ()))
forall r i o a b.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> a
-> (a -> IncomingEvent o a -> ClientIO a)
-> b
-> (b -> ClientIO (b, OutgoingEvent i b))
-> ClientIO (Either TooMuchConcurrency (a, b))
rawGeneralStream
                   @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
                   RPCTy p
rpc GrpcClient
client
                   () (\_ ievent :: IncomingEvent (GRpcOWTy @Symbol p rref r) ()
ievent -> do -- on the first iteration, say that everything is OK
                        Bool
_ <- IO Bool -> ExceptT ClientError IO Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> ExceptT ClientError IO Bool)
-> IO Bool -> ExceptT ClientError IO Bool
forall a b. (a -> b) -> a -> b
$ STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> GRpcReply () -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (GRpcReply ())
var (() -> GRpcReply ()
forall a. a -> GRpcReply a
GRpcOk ())
                        case IncomingEvent (GRpcOWTy @Symbol p rref r) ()
ievent of
                          RecvMessage o :: GRpcOWTy @Symbol p rref r
o -> IO () -> ClientIO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ClientIO ()) -> IO () -> ClientIO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan (GRpcReply r) -> GRpcReply r -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan (GRpcReply r)
inchan (r -> GRpcReply r
forall a. a -> GRpcReply a
GRpcOk (r -> GRpcReply r) -> r -> GRpcReply r
forall a b. (a -> b) -> a -> b
$ Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) rref -> GRpcOWTy @Symbol p rref r -> r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> GRpcOWTy @snm p ref r -> r
unGRpcOWTy(Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) rref
forall k (t :: k). Proxy @k t
Proxy @rref) GRpcOWTy @Symbol p rref r
o)
                          Invalid e :: SomeException
e -> IO () -> ClientIO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ClientIO ()) -> IO () -> ClientIO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan (GRpcReply r) -> GRpcReply r -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan (GRpcReply r)
inchan (String -> GRpcReply r
forall a. String -> GRpcReply a
GRpcErrorString (SomeException -> String
forall a. Show a => a -> String
show SomeException
e))
                          _ -> () -> ClientIO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure () )
                   () (\_ -> do
                        Maybe v
nextVal <- IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe v) -> ExceptT ClientError IO (Maybe v))
-> IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ STM (Maybe v) -> IO (Maybe v)
forall a. STM a -> IO a
atomically (STM (Maybe v) -> IO (Maybe v)) -> STM (Maybe v) -> IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ TMChan v -> STM (Maybe v)
forall a. TMChan a -> STM (Maybe a)
readTMChan TMChan v
outchan
                        case Maybe v
nextVal of
                          Nothing -> ((), OutgoingEvent (GRpcIWTy @Symbol p vref v) ())
-> ClientIO ((), OutgoingEvent (GRpcIWTy @Symbol p vref v) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), OutgoingEvent (GRpcIWTy @Symbol p vref v) ()
forall i b. OutgoingEvent i b
Finalize)
                          Just v :: v
v  -> ((), OutgoingEvent (GRpcIWTy @Symbol p vref v) ())
-> ClientIO ((), OutgoingEvent (GRpcIWTy @Symbol p vref v) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), CompressMode
-> GRpcIWTy @Symbol p vref v
-> OutgoingEvent (GRpcIWTy @Symbol p vref v) ()
forall i b. CompressMode -> i -> OutgoingEvent i b
SendMessage CompressMode
compress (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) vref -> v -> GRpcIWTy @Symbol p vref v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> r -> GRpcIWTy @snm p ref r
buildGRpcIWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) vref
forall k (t :: k). Proxy @k t
Proxy @vref) v
v)))
            case GRpcReply ()
v of
              GRpcOk () -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan (GRpcReply r) -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan (GRpcReply r)
inchan
              _         -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> GRpcReply () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (GRpcReply ())
var GRpcReply ()
v
         -- This conduit feeds information to the other thread
         let go :: ConduitT v (GRpcReply r) IO ()
go = do GRpcReply ()
err <- IO (GRpcReply ()) -> ConduitT v (GRpcReply r) IO (GRpcReply ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (GRpcReply ()) -> ConduitT v (GRpcReply r) IO (GRpcReply ()))
-> IO (GRpcReply ()) -> ConduitT v (GRpcReply r) IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ STM (GRpcReply ()) -> IO (GRpcReply ())
forall a. STM a -> IO a
atomically (STM (GRpcReply ()) -> IO (GRpcReply ()))
-> STM (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> STM (GRpcReply ())
forall a. TMVar a -> STM a
takeTMVar TMVar (GRpcReply ())
var
                     case GRpcReply ()
err of
                       GRpcOk _ -> ConduitT v (GRpcReply r) IO ()
go2
                       e :: GRpcReply ()
e        -> GRpcReply r -> ConduitT v (GRpcReply r) IO ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (GRpcReply r -> ConduitT v (GRpcReply r) IO ())
-> GRpcReply r -> ConduitT v (GRpcReply r) IO ()
forall a b. (a -> b) -> a -> b
$ (\_ -> String -> r
forall a. HasCallStack => String -> a
error "this should never happen") (() -> r) -> GRpcReply () -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> GRpcReply ()
e
             go2 :: ConduitT v (GRpcReply r) IO ()
go2 = do Maybe v
nextOut <- ConduitT v (GRpcReply r) IO (Maybe v)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await
                      case Maybe v
nextOut of
                        Just v :: v
v  -> do IO () -> ConduitT v (GRpcReply r) IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT v (GRpcReply r) IO ())
-> IO () -> ConduitT v (GRpcReply r) IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan v -> v -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan v
outchan v
v
                                      ConduitT v (GRpcReply r) IO ()
go2
                        Nothing -> do Maybe (Maybe (GRpcReply r))
r <- IO (Maybe (Maybe (GRpcReply r)))
-> ConduitT v (GRpcReply r) IO (Maybe (Maybe (GRpcReply r)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (Maybe (GRpcReply r)))
 -> ConduitT v (GRpcReply r) IO (Maybe (Maybe (GRpcReply r))))
-> IO (Maybe (Maybe (GRpcReply r)))
-> ConduitT v (GRpcReply r) IO (Maybe (Maybe (GRpcReply r)))
forall a b. (a -> b) -> a -> b
$ STM (Maybe (Maybe (GRpcReply r)))
-> IO (Maybe (Maybe (GRpcReply r)))
forall a. STM a -> IO a
atomically (STM (Maybe (Maybe (GRpcReply r)))
 -> IO (Maybe (Maybe (GRpcReply r))))
-> STM (Maybe (Maybe (GRpcReply r)))
-> IO (Maybe (Maybe (GRpcReply r)))
forall a b. (a -> b) -> a -> b
$ TMChan (GRpcReply r) -> STM (Maybe (Maybe (GRpcReply r)))
forall a. TMChan a -> STM (Maybe (Maybe a))
tryReadTMChan TMChan (GRpcReply r)
inchan
                                      case Maybe (Maybe (GRpcReply r))
r of
                                        Nothing            -> () -> ConduitT v (GRpcReply r) IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure () -- both are empty, end
                                        Just Nothing       -> ConduitT v (GRpcReply r) IO ()
go2
                                        Just (Just nextIn :: GRpcReply r
nextIn) -> GRpcReply r -> ConduitT v (GRpcReply r) IO ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield GRpcReply r
nextIn ConduitT v (GRpcReply r) IO ()
-> ConduitT v (GRpcReply r) IO () -> ConduitT v (GRpcReply r) IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ConduitT v (GRpcReply r) IO ()
go2
         ConduitT v (GRpcReply r) IO ()
-> IO (ConduitT v (GRpcReply r) IO ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ConduitT v (GRpcReply r) IO ()
go