{-# language AllowAmbiguousTypes #-}
{-# language DataKinds #-}
{-# language DeriveFunctor #-}
{-# language FlexibleContexts #-}
{-# language FlexibleInstances #-}
{-# language GADTs #-}
{-# language MultiParamTypeClasses #-}
{-# language PolyKinds #-}
{-# language ScopedTypeVariables #-}
{-# language TypeApplications #-}
{-# language TypeFamilies #-}
{-# language TypeOperators #-}
{-# language UndecidableInstances #-}
{-# OPTIONS_GHC -fprint-explicit-kinds #-}
module Mu.GRpc.Client.Internal where
import Control.Concurrent.Async
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TMChan
import Control.Concurrent.STM.TMVar
import Control.Monad.IO.Class
import Data.Avro
import qualified Data.ByteString.Char8 as BS
import Data.Conduit
import qualified Data.Conduit.Combinators as C
import Data.Conduit.TMChan
import Data.Kind
import GHC.TypeLits
import Network.GRPC.Client (CompressMode (..), IncomingEvent (..),
OutgoingEvent (..), RawReply, StreamDone (..))
import Network.GRPC.Client.Helpers
import Network.GRPC.HTTP2.Encoding (GRPCInput, GRPCOutput)
import Network.HTTP2 (ErrorCode)
import Network.HTTP2.Client (ClientError, ClientIO, TooMuchConcurrency,
runExceptT)
import Mu.Adapter.ProtoBuf.Via
import Mu.GRpc.Avro
import Mu.GRpc.Bridge
import Mu.Rpc
import Mu.Schema
setupGrpcClient' :: GrpcClientConfig -> IO (Either ClientError GrpcClient)
setupGrpcClient' :: GrpcClientConfig -> IO (Either ClientError GrpcClient)
setupGrpcClient' = ExceptT ClientError IO GrpcClient
-> IO (Either ClientError GrpcClient)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT ClientError IO GrpcClient
-> IO (Either ClientError GrpcClient))
-> (GrpcClientConfig -> ExceptT ClientError IO GrpcClient)
-> GrpcClientConfig
-> IO (Either ClientError GrpcClient)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. GrpcClientConfig -> ExceptT ClientError IO GrpcClient
setupGrpcClient
class GRpcServiceMethodCall (p :: GRpcMessageProtocol)
(pkg :: snm) (s :: snm) (m :: Method snm mnm anm) h where
gRpcServiceMethodCall :: Proxy p -> Proxy pkg -> Proxy s -> Proxy m -> GrpcClient -> h
instance ( KnownName serviceName, KnownName pkg, KnownName mname
, GRpcMethodCall p ('Method mname manns margs mret) h, MkRPC p )
=> GRpcServiceMethodCall p pkg serviceName ('Method mname manns margs mret) h where
gRpcServiceMethodCall :: Proxy @GRpcMessageProtocol p
-> Proxy @Symbol pkg
-> Proxy @Symbol serviceName
-> Proxy
@(Method Symbol Symbol Symbol)
('Method @Symbol @Symbol @Symbol mname manns margs mret)
-> GrpcClient
-> h
gRpcServiceMethodCall pro :: Proxy @GRpcMessageProtocol p
pro _ _ = RPCTy p
-> Proxy
@(Method Symbol Symbol Symbol)
('Method @Symbol @Symbol @Symbol mname manns margs mret)
-> GrpcClient
-> h
forall (p :: GRpcMessageProtocol)
(method :: Method Symbol Symbol Symbol) h.
GRpcMethodCall p method h =>
RPCTy p
-> Proxy @(Method Symbol Symbol Symbol) method -> GrpcClient -> h
gRpcMethodCall @p RPCTy p
rpc
where pkgName :: ByteString
pkgName = String -> ByteString
BS.pack (Proxy @Symbol pkg -> String
forall k (a :: k) (proxy :: k -> *).
KnownName @k a =>
proxy a -> String
nameVal (Proxy @Symbol pkg
forall k (t :: k). Proxy @k t
Proxy @pkg))
svrName :: ByteString
svrName = String -> ByteString
BS.pack (Proxy @Symbol serviceName -> String
forall k (a :: k) (proxy :: k -> *).
KnownName @k a =>
proxy a -> String
nameVal (Proxy @Symbol serviceName
forall k (t :: k). Proxy @k t
Proxy @serviceName))
metName :: ByteString
metName = String -> ByteString
BS.pack (Proxy @Symbol mname -> String
forall k (a :: k) (proxy :: k -> *).
KnownName @k a =>
proxy a -> String
nameVal (Proxy @Symbol mname
forall k (t :: k). Proxy @k t
Proxy @mname))
rpc :: RPCTy p
rpc = Proxy @GRpcMessageProtocol p
-> ByteString -> ByteString -> ByteString -> RPCTy p
forall (p :: GRpcMessageProtocol).
MkRPC p =>
Proxy @GRpcMessageProtocol p
-> ByteString -> ByteString -> ByteString -> RPCTy p
mkRPC Proxy @GRpcMessageProtocol p
pro ByteString
pkgName ByteString
svrName ByteString
metName
data GRpcReply a
= GRpcTooMuchConcurrency TooMuchConcurrency
| GRpcErrorCode ErrorCode
| GRpcErrorString String
| GRpcClientError ClientError
| GRpcOk a
deriving (Int -> GRpcReply a -> ShowS
[GRpcReply a] -> ShowS
GRpcReply a -> String
(Int -> GRpcReply a -> ShowS)
-> (GRpcReply a -> String)
-> ([GRpcReply a] -> ShowS)
-> Show (GRpcReply a)
forall a. Show a => Int -> GRpcReply a -> ShowS
forall a. Show a => [GRpcReply a] -> ShowS
forall a. Show a => GRpcReply a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [GRpcReply a] -> ShowS
$cshowList :: forall a. Show a => [GRpcReply a] -> ShowS
show :: GRpcReply a -> String
$cshow :: forall a. Show a => GRpcReply a -> String
showsPrec :: Int -> GRpcReply a -> ShowS
$cshowsPrec :: forall a. Show a => Int -> GRpcReply a -> ShowS
Show, a -> GRpcReply b -> GRpcReply a
(a -> b) -> GRpcReply a -> GRpcReply b
(forall a b. (a -> b) -> GRpcReply a -> GRpcReply b)
-> (forall a b. a -> GRpcReply b -> GRpcReply a)
-> Functor GRpcReply
forall a b. a -> GRpcReply b -> GRpcReply a
forall a b. (a -> b) -> GRpcReply a -> GRpcReply b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> GRpcReply b -> GRpcReply a
$c<$ :: forall a b. a -> GRpcReply b -> GRpcReply a
fmap :: (a -> b) -> GRpcReply a -> GRpcReply b
$cfmap :: forall a b. (a -> b) -> GRpcReply a -> GRpcReply b
Functor)
buildGRpcReply1 :: Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 :: Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 (Left tmc :: TooMuchConcurrency
tmc) = TooMuchConcurrency -> GRpcReply a
forall a. TooMuchConcurrency -> GRpcReply a
GRpcTooMuchConcurrency TooMuchConcurrency
tmc
buildGRpcReply1 (Right (Left ec :: ErrorCode
ec)) = ErrorCode -> GRpcReply a
forall a. ErrorCode -> GRpcReply a
GRpcErrorCode ErrorCode
ec
buildGRpcReply1 (Right (Right (_, _, Left es :: String
es))) = String -> GRpcReply a
forall a. String -> GRpcReply a
GRpcErrorString String
es
buildGRpcReply1 (Right (Right (_, _, Right r :: a
r))) = a -> GRpcReply a
forall a. a -> GRpcReply a
GRpcOk a
r
buildGRpcReply2 :: Either TooMuchConcurrency (r, RawReply a) -> GRpcReply a
buildGRpcReply2 :: Either TooMuchConcurrency (r, RawReply a) -> GRpcReply a
buildGRpcReply2 (Left tmc :: TooMuchConcurrency
tmc) = TooMuchConcurrency -> GRpcReply a
forall a. TooMuchConcurrency -> GRpcReply a
GRpcTooMuchConcurrency TooMuchConcurrency
tmc
buildGRpcReply2 (Right (_, Left ec :: ErrorCode
ec)) = ErrorCode -> GRpcReply a
forall a. ErrorCode -> GRpcReply a
GRpcErrorCode ErrorCode
ec
buildGRpcReply2 (Right (_, Right (_, _, Left es :: String
es))) = String -> GRpcReply a
forall a. String -> GRpcReply a
GRpcErrorString String
es
buildGRpcReply2 (Right (_, Right (_, _, Right r :: a
r))) = a -> GRpcReply a
forall a. a -> GRpcReply a
GRpcOk a
r
buildGRpcReply3 :: Either TooMuchConcurrency v -> GRpcReply ()
buildGRpcReply3 :: Either TooMuchConcurrency v -> GRpcReply ()
buildGRpcReply3 (Left tmc :: TooMuchConcurrency
tmc) = TooMuchConcurrency -> GRpcReply ()
forall a. TooMuchConcurrency -> GRpcReply a
GRpcTooMuchConcurrency TooMuchConcurrency
tmc
buildGRpcReply3 (Right _) = () -> GRpcReply ()
forall a. a -> GRpcReply a
GRpcOk ()
simplifyResponse :: ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse :: ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse reply :: ClientIO (GRpcReply a)
reply = do
Either ClientError (GRpcReply a)
r <- ClientIO (GRpcReply a) -> IO (Either ClientError (GRpcReply a))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT ClientIO (GRpcReply a)
reply
GRpcReply a -> IO (GRpcReply a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (GRpcReply a -> IO (GRpcReply a))
-> GRpcReply a -> IO (GRpcReply a)
forall a b. (a -> b) -> a -> b
$ case Either ClientError (GRpcReply a)
r of
Left e :: ClientError
e -> ClientError -> GRpcReply a
forall a. ClientError -> GRpcReply a
GRpcClientError ClientError
e
Right v :: GRpcReply a
v -> GRpcReply a
v
class GRPCInput (RPCTy p) (GRpcIWTy p ref r)
=> GRpcInputWrapper (p :: GRpcMessageProtocol) (ref :: TypeRef snm) (r :: Type) where
type GRpcIWTy p ref r :: Type
buildGRpcIWTy :: Proxy p -> Proxy ref -> r -> GRpcIWTy p ref r
instance ToProtoBufTypeRef ref r
=> GRpcInputWrapper 'MsgProtoBuf ref r where
type GRpcIWTy 'MsgProtoBuf ref r = ViaToProtoBufTypeRef ref r
buildGRpcIWTy :: Proxy @GRpcMessageProtocol 'MsgProtoBuf
-> Proxy @(TypeRef snm) ref
-> r
-> GRpcIWTy @snm 'MsgProtoBuf ref r
buildGRpcIWTy _ _ = r -> GRpcIWTy @snm 'MsgProtoBuf ref r
forall snm (ref :: TypeRef snm) t.
t -> ViaToProtoBufTypeRef @snm ref t
ViaToProtoBufTypeRef
instance forall (sch :: Schema') (sty :: Symbol) (r :: Type).
( ToSchema sch sty r
, ToAvro (WithSchema sch sty r)
, HasAvroSchema (WithSchema sch sty r) )
=> GRpcInputWrapper 'MsgAvro ('SchemaRef sch sty) r where
type GRpcIWTy 'MsgAvro ('SchemaRef sch sty) r = ViaToAvroTypeRef ('SchemaRef sch sty) r
buildGRpcIWTy :: Proxy @GRpcMessageProtocol 'MsgAvro
-> Proxy @(TypeRef snm) ('SchemaRef @Symbol @Symbol @snm sch sty)
-> r
-> GRpcIWTy
@snm 'MsgAvro ('SchemaRef @Symbol @Symbol @snm sch sty) r
buildGRpcIWTy _ _ = r
-> GRpcIWTy
@snm 'MsgAvro ('SchemaRef @Symbol @Symbol @snm sch sty) r
forall snm (ref :: TypeRef snm) t. t -> ViaToAvroTypeRef @snm ref t
ViaToAvroTypeRef
class GRPCOutput (RPCTy p) (GRpcOWTy p ref r)
=> GRpcOutputWrapper (p :: GRpcMessageProtocol) (ref :: TypeRef snm) (r :: Type) where
type GRpcOWTy p ref r :: Type
unGRpcOWTy :: Proxy p -> Proxy ref -> GRpcOWTy p ref r -> r
instance FromProtoBufTypeRef ref r
=> GRpcOutputWrapper 'MsgProtoBuf ref r where
type GRpcOWTy 'MsgProtoBuf ref r = ViaFromProtoBufTypeRef ref r
unGRpcOWTy :: Proxy @GRpcMessageProtocol 'MsgProtoBuf
-> Proxy @(TypeRef snm) ref
-> GRpcOWTy @snm 'MsgProtoBuf ref r
-> r
unGRpcOWTy _ _ = GRpcOWTy @snm 'MsgProtoBuf ref r -> r
forall snm (ref :: TypeRef snm) t.
ViaFromProtoBufTypeRef @snm ref t -> t
unViaFromProtoBufTypeRef
instance forall (sch :: Schema') (sty :: Symbol) (r :: Type).
( FromSchema sch sty r
, FromAvro (WithSchema sch sty r)
, HasAvroSchema (WithSchema sch sty r) )
=> GRpcOutputWrapper 'MsgAvro ('SchemaRef sch sty) r where
type GRpcOWTy 'MsgAvro ('SchemaRef sch sty) r = ViaFromAvroTypeRef ('SchemaRef sch sty) r
unGRpcOWTy :: Proxy @GRpcMessageProtocol 'MsgAvro
-> Proxy @(TypeRef snm) ('SchemaRef @Symbol @Symbol @snm sch sty)
-> GRpcOWTy
@snm 'MsgAvro ('SchemaRef @Symbol @Symbol @snm sch sty) r
-> r
unGRpcOWTy _ _ = GRpcOWTy @snm 'MsgAvro ('SchemaRef @Symbol @Symbol @snm sch sty) r
-> r
forall snm (ref :: TypeRef snm) t.
ViaFromAvroTypeRef @snm ref t -> t
unViaFromAvroTypeRef
class GRpcMethodCall (p :: GRpcMessageProtocol) (method :: Method') h where
gRpcMethodCall :: RPCTy p -> Proxy method -> GrpcClient -> h
instance ( KnownName name
, GRPCInput (RPCTy p) (), GRPCOutput (RPCTy p) ()
, handler ~ IO (GRpcReply ()) )
=> GRpcMethodCall p ('Method name anns '[ ] 'RetNothing) handler where
gRpcMethodCall :: RPCTy p
-> Proxy
@(Method Symbol Symbol Symbol)
('Method
@Symbol
@Symbol
@Symbol
name
anns
('[] @(Argument Symbol Symbol))
('RetNothing @Symbol))
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client
= ClientIO (GRpcReply ()) -> handler
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> handler)
-> ClientIO (GRpcReply ()) -> handler
forall a b. (a -> b) -> a -> b
$
Either TooMuchConcurrency (RawReply ()) -> GRpcReply ()
forall a. Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 (Either TooMuchConcurrency (RawReply ()) -> GRpcReply ())
-> ExceptT ClientError IO (Either TooMuchConcurrency (RawReply ()))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
RPCTy p
-> GrpcClient
-> ()
-> ExceptT ClientError IO (Either TooMuchConcurrency (RawReply ()))
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> i
-> ClientIO (Either TooMuchConcurrency (RawReply o))
rawUnary RPCTy p
rpc GrpcClient
client ()
instance ( KnownName name
, GRPCInput (RPCTy p) (), GRpcOutputWrapper p rref r
, handler ~ IO (GRpcReply r) )
=> GRpcMethodCall p ('Method name anns '[ ] ('RetSingle rref)) handler where
gRpcMethodCall :: RPCTy p
-> Proxy
@(Method Symbol Symbol Symbol)
('Method
@Symbol
@Symbol
@Symbol
name
anns
('[] @(Argument Symbol Symbol))
('RetSingle @Symbol rref))
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client
= (GRpcReply (GRpcOWTy @Symbol p rref r) -> GRpcReply r)
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((GRpcOWTy @Symbol p rref r -> r)
-> GRpcReply (GRpcOWTy @Symbol p rref r) -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) rref -> GRpcOWTy @Symbol p rref r -> r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> GRpcOWTy @snm p ref r -> r
unGRpcOWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) rref
forall k (t :: k). Proxy @k t
Proxy @rref))) (IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> handler)
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> handler
forall a b. (a -> b) -> a -> b
$
ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall a b. (a -> b) -> a -> b
$
Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r))
-> GRpcReply (GRpcOWTy @Symbol p rref r)
forall a. Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 (Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r))
-> GRpcReply (GRpcOWTy @Symbol p rref r))
-> ExceptT
ClientError
IO
(Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
RPCTy p
-> GrpcClient
-> ()
-> ExceptT
ClientError
IO
(Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r)))
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> i
-> ClientIO (Either TooMuchConcurrency (RawReply o))
rawUnary @_ @() @(GRpcOWTy p rref r) RPCTy p
rpc GrpcClient
client ()
instance ( KnownName name
, GRPCInput (RPCTy p) (), GRpcOutputWrapper p rref r
, handler ~ IO (ConduitT () (GRpcReply r) IO ()) )
=> GRpcMethodCall p ('Method name anns '[ ] ('RetStream rref)) handler where
gRpcMethodCall :: RPCTy p
-> Proxy
@(Method Symbol Symbol Symbol)
('Method
@Symbol
@Symbol
@Symbol
name
anns
('[] @(Argument Symbol Symbol))
('RetStream @Symbol rref))
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client
= do
TMChan r
chan <- IO (TMChan r)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan r)
TMVar (GRpcReply ())
var <- IO (TMVar (GRpcReply ()))
forall a. IO (TMVar a)
newEmptyTMVarIO
Async ()
_ <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ do
GRpcReply ()
v <- ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> IO (GRpcReply ()))
-> ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$
Either TooMuchConcurrency ((), HeaderList, HeaderList)
-> GRpcReply ()
forall v. Either TooMuchConcurrency v -> GRpcReply ()
buildGRpcReply3 (Either TooMuchConcurrency ((), HeaderList, HeaderList)
-> GRpcReply ())
-> ExceptT
ClientError
IO
(Either TooMuchConcurrency ((), HeaderList, HeaderList))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
RPCTy p
-> GrpcClient
-> ()
-> ()
-> (() -> HeaderList -> GRpcOWTy @Symbol p rref r -> ClientIO ())
-> ExceptT
ClientError
IO
(Either TooMuchConcurrency ((), HeaderList, HeaderList))
forall r i o a.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> a
-> i
-> (a -> HeaderList -> o -> ClientIO a)
-> ClientIO (Either TooMuchConcurrency (a, HeaderList, HeaderList))
rawStreamServer @_ @() @(GRpcOWTy p rref r)
RPCTy p
rpc GrpcClient
client () ()
(\_ _ newVal :: GRpcOWTy @Symbol p rref r
newVal -> IO () -> ClientIO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ClientIO ()) -> IO () -> ClientIO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Bool
_ <- TMVar (GRpcReply ()) -> GRpcReply () -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (GRpcReply ())
var (() -> GRpcReply ()
forall a. a -> GRpcReply a
GRpcOk ())
TMChan r -> r -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan r
chan (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) rref -> GRpcOWTy @Symbol p rref r -> r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> GRpcOWTy @snm p ref r -> r
unGRpcOWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) rref
forall k (t :: k). Proxy @k t
Proxy @rref) GRpcOWTy @Symbol p rref r
newVal))
case GRpcReply ()
v of
GRpcOk () -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan r -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan r
chan
_ -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> GRpcReply () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (GRpcReply ())
var GRpcReply ()
v
let go :: ConduitT () (GRpcReply r) IO ()
go = do GRpcReply ()
firstResult <- IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ()))
-> IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ STM (GRpcReply ()) -> IO (GRpcReply ())
forall a. STM a -> IO a
atomically (STM (GRpcReply ()) -> IO (GRpcReply ()))
-> STM (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> STM (GRpcReply ())
forall a. TMVar a -> STM a
takeTMVar TMVar (GRpcReply ())
var
case GRpcReply ()
firstResult of
GRpcOk _ ->
TMChan r -> ConduitT () r IO ()
forall (m :: * -> *) a. MonadIO m => TMChan a -> ConduitT () a m ()
sourceTMChan TMChan r
chan ConduitT () r IO ()
-> ConduitM r (GRpcReply r) IO ()
-> ConduitT () (GRpcReply r) IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (r -> GRpcReply r) -> ConduitM r (GRpcReply r) IO ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.map r -> GRpcReply r
forall a. a -> GRpcReply a
GRpcOk
e :: GRpcReply ()
e -> GRpcReply r -> ConduitT () (GRpcReply r) IO ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (GRpcReply r -> ConduitT () (GRpcReply r) IO ())
-> GRpcReply r -> ConduitT () (GRpcReply r) IO ()
forall a b. (a -> b) -> a -> b
$ (\_ -> String -> r
forall a. HasCallStack => String -> a
error "this should never happen") (() -> r) -> GRpcReply () -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> GRpcReply ()
e
ConduitT () (GRpcReply r) IO ()
-> IO (ConduitT () (GRpcReply r) IO ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ConduitT () (GRpcReply r) IO ()
go
instance ( KnownName name
, GRpcInputWrapper p vref v, GRPCOutput (RPCTy p) ()
, handler ~ (v -> IO (GRpcReply ())) )
=> GRpcMethodCall p ('Method name anns '[ 'ArgSingle aname aanns vref ]
'RetNothing) handler where
gRpcMethodCall :: RPCTy p
-> Proxy
@(Method Symbol Symbol Symbol)
('Method
@Symbol
@Symbol
@Symbol
name
anns
((':)
@(Argument Symbol Symbol)
('ArgSingle @Symbol @Symbol aname aanns vref)
('[] @(Argument Symbol Symbol)))
('RetNothing @Symbol))
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client x :: v
x
= ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> IO (GRpcReply ()))
-> ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$
Either TooMuchConcurrency (RawReply ()) -> GRpcReply ()
forall a. Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 (Either TooMuchConcurrency (RawReply ()) -> GRpcReply ())
-> ExceptT ClientError IO (Either TooMuchConcurrency (RawReply ()))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
RPCTy p
-> GrpcClient
-> GRpcIWTy @Symbol p vref v
-> ExceptT ClientError IO (Either TooMuchConcurrency (RawReply ()))
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> i
-> ClientIO (Either TooMuchConcurrency (RawReply o))
rawUnary @_ @(GRpcIWTy p vref v) @() RPCTy p
rpc GrpcClient
client (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) vref -> v -> GRpcIWTy @Symbol p vref v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> r -> GRpcIWTy @snm p ref r
buildGRpcIWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) vref
forall k (t :: k). Proxy @k t
Proxy @vref) v
x)
instance ( KnownName name
, GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r
, handler ~ (v -> IO (GRpcReply r)) )
=> GRpcMethodCall p ('Method name anns '[ 'ArgSingle aname aanns vref ]
('RetSingle rref)) handler where
gRpcMethodCall :: RPCTy p
-> Proxy
@(Method Symbol Symbol Symbol)
('Method
@Symbol
@Symbol
@Symbol
name
anns
((':)
@(Argument Symbol Symbol)
('ArgSingle @Symbol @Symbol aname aanns vref)
('[] @(Argument Symbol Symbol)))
('RetSingle @Symbol rref))
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client x :: v
x
= (GRpcReply (GRpcOWTy @Symbol p rref r) -> GRpcReply r)
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((GRpcOWTy @Symbol p rref r -> r)
-> GRpcReply (GRpcOWTy @Symbol p rref r) -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) rref -> GRpcOWTy @Symbol p rref r -> r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> GRpcOWTy @snm p ref r -> r
unGRpcOWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) rref
forall k (t :: k). Proxy @k t
Proxy @rref))) (IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r)
forall a b. (a -> b) -> a -> b
$
ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall a b. (a -> b) -> a -> b
$
Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r))
-> GRpcReply (GRpcOWTy @Symbol p rref r)
forall a. Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 (Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r))
-> GRpcReply (GRpcOWTy @Symbol p rref r))
-> ExceptT
ClientError
IO
(Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
RPCTy p
-> GrpcClient
-> GRpcIWTy @Symbol p vref v
-> ExceptT
ClientError
IO
(Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r)))
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> i
-> ClientIO (Either TooMuchConcurrency (RawReply o))
rawUnary @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
RPCTy p
rpc GrpcClient
client (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) vref -> v -> GRpcIWTy @Symbol p vref v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> r -> GRpcIWTy @snm p ref r
buildGRpcIWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) vref
forall k (t :: k). Proxy @k t
Proxy @vref) v
x)
instance ( KnownName name
, GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r
, handler ~ (v -> IO (ConduitT () (GRpcReply r) IO ())) )
=> GRpcMethodCall p ('Method name anns '[ 'ArgSingle aname aanns vref ]
('RetStream rref)) handler where
gRpcMethodCall :: RPCTy p
-> Proxy
@(Method Symbol Symbol Symbol)
('Method
@Symbol
@Symbol
@Symbol
name
anns
((':)
@(Argument Symbol Symbol)
('ArgSingle @Symbol @Symbol aname aanns vref)
('[] @(Argument Symbol Symbol)))
('RetStream @Symbol rref))
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client x :: v
x
= do
TMChan r
chan <- IO (TMChan r)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan r)
TMVar (GRpcReply ())
var <- IO (TMVar (GRpcReply ()))
forall a. IO (TMVar a)
newEmptyTMVarIO
Async ()
_ <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ do
GRpcReply ()
v <- ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> IO (GRpcReply ()))
-> ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$
Either TooMuchConcurrency ((), HeaderList, HeaderList)
-> GRpcReply ()
forall v. Either TooMuchConcurrency v -> GRpcReply ()
buildGRpcReply3 (Either TooMuchConcurrency ((), HeaderList, HeaderList)
-> GRpcReply ())
-> ExceptT
ClientError
IO
(Either TooMuchConcurrency ((), HeaderList, HeaderList))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
RPCTy p
-> GrpcClient
-> ()
-> GRpcIWTy @Symbol p vref v
-> (() -> HeaderList -> GRpcOWTy @Symbol p rref r -> ClientIO ())
-> ExceptT
ClientError
IO
(Either TooMuchConcurrency ((), HeaderList, HeaderList))
forall r i o a.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> a
-> i
-> (a -> HeaderList -> o -> ClientIO a)
-> ClientIO (Either TooMuchConcurrency (a, HeaderList, HeaderList))
rawStreamServer @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
RPCTy p
rpc GrpcClient
client () (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) vref -> v -> GRpcIWTy @Symbol p vref v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> r -> GRpcIWTy @snm p ref r
buildGRpcIWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) vref
forall k (t :: k). Proxy @k t
Proxy @vref) v
x)
(\_ _ newVal :: GRpcOWTy @Symbol p rref r
newVal -> IO () -> ClientIO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ClientIO ()) -> IO () -> ClientIO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Bool
_ <- TMVar (GRpcReply ()) -> GRpcReply () -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (GRpcReply ())
var (() -> GRpcReply ()
forall a. a -> GRpcReply a
GRpcOk ())
TMChan r -> r -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan r
chan (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) rref -> GRpcOWTy @Symbol p rref r -> r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> GRpcOWTy @snm p ref r -> r
unGRpcOWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) rref
forall k (t :: k). Proxy @k t
Proxy @rref) GRpcOWTy @Symbol p rref r
newVal))
case GRpcReply ()
v of
GRpcOk () -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan r -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan r
chan
_ -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> GRpcReply () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (GRpcReply ())
var GRpcReply ()
v
let go :: ConduitT () (GRpcReply r) IO ()
go = do GRpcReply ()
firstResult <- IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ()))
-> IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ STM (GRpcReply ()) -> IO (GRpcReply ())
forall a. STM a -> IO a
atomically (STM (GRpcReply ()) -> IO (GRpcReply ()))
-> STM (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> STM (GRpcReply ())
forall a. TMVar a -> STM a
takeTMVar TMVar (GRpcReply ())
var
case GRpcReply ()
firstResult of
GRpcOk _ ->
TMChan r -> ConduitT () r IO ()
forall (m :: * -> *) a. MonadIO m => TMChan a -> ConduitT () a m ()
sourceTMChan TMChan r
chan ConduitT () r IO ()
-> ConduitM r (GRpcReply r) IO ()
-> ConduitT () (GRpcReply r) IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (r -> GRpcReply r) -> ConduitM r (GRpcReply r) IO ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.map r -> GRpcReply r
forall a. a -> GRpcReply a
GRpcOk
e :: GRpcReply ()
e -> GRpcReply r -> ConduitT () (GRpcReply r) IO ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (GRpcReply r -> ConduitT () (GRpcReply r) IO ())
-> GRpcReply r -> ConduitT () (GRpcReply r) IO ()
forall a b. (a -> b) -> a -> b
$ (\_ -> String -> r
forall a. HasCallStack => String -> a
error "this should never happen") (() -> r) -> GRpcReply () -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> GRpcReply ()
e
ConduitT () (GRpcReply r) IO ()
-> IO (ConduitT () (GRpcReply r) IO ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ConduitT () (GRpcReply r) IO ()
go
instance ( KnownName name
, GRpcInputWrapper p vref v, GRPCOutput (RPCTy p) ()
, handler ~ (CompressMode -> IO (ConduitT v Void IO (GRpcReply ()))) )
=> GRpcMethodCall p ('Method name anns '[ 'ArgStream aname aanns vref ]
'RetNothing) handler where
gRpcMethodCall :: RPCTy p
-> Proxy
@(Method Symbol Symbol Symbol)
('Method
@Symbol
@Symbol
@Symbol
name
anns
((':)
@(Argument Symbol Symbol)
('ArgStream @Symbol @Symbol aname aanns vref)
('[] @(Argument Symbol Symbol)))
('RetNothing @Symbol))
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client compress :: CompressMode
compress
= do
TMChan v
chan <- IO (TMChan v)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan v)
Async (GRpcReply ())
promise <- IO (GRpcReply ()) -> IO (Async (GRpcReply ()))
forall a. IO a -> IO (Async a)
async (IO (GRpcReply ()) -> IO (Async (GRpcReply ())))
-> IO (GRpcReply ()) -> IO (Async (GRpcReply ()))
forall a b. (a -> b) -> a -> b
$
ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> IO (GRpcReply ()))
-> ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$
Either TooMuchConcurrency ((), RawReply ()) -> GRpcReply ()
forall r a.
Either TooMuchConcurrency (r, RawReply a) -> GRpcReply a
buildGRpcReply2 (Either TooMuchConcurrency ((), RawReply ()) -> GRpcReply ())
-> ExceptT
ClientError IO (Either TooMuchConcurrency ((), RawReply ()))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
RPCTy p
-> GrpcClient
-> ()
-> (()
-> ClientIO
((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v)))
-> ExceptT
ClientError IO (Either TooMuchConcurrency ((), RawReply ()))
forall r i o a.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> a
-> (a -> ClientIO (a, Either StreamDone (CompressMode, i)))
-> ClientIO (Either TooMuchConcurrency (a, RawReply o))
rawStreamClient @_ @(GRpcIWTy p vref v) @() RPCTy p
rpc GrpcClient
client ()
(\_ -> do Maybe v
nextVal <- IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe v) -> ExceptT ClientError IO (Maybe v))
-> IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ STM (Maybe v) -> IO (Maybe v)
forall a. STM a -> IO a
atomically (STM (Maybe v) -> IO (Maybe v)) -> STM (Maybe v) -> IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ TMChan v -> STM (Maybe v)
forall a. TMChan a -> STM (Maybe a)
readTMChan TMChan v
chan
case Maybe v
nextVal of
Nothing -> ((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
-> ClientIO
((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), StreamDone
-> Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v)
forall a b. a -> Either a b
Left StreamDone
StreamDone)
Just v :: v
v -> ((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
-> ClientIO
((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), (CompressMode, GRpcIWTy @Symbol p vref v)
-> Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v)
forall a b. b -> Either a b
Right (CompressMode
compress, Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) vref -> v -> GRpcIWTy @Symbol p vref v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> r -> GRpcIWTy @snm p ref r
buildGRpcIWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) vref
forall k (t :: k). Proxy @k t
Proxy @vref) v
v)))
ConduitT v Void IO (GRpcReply ())
-> IO (ConduitT v Void IO (GRpcReply ()))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TMChan v
-> Async (GRpcReply ()) -> ConduitT v Void IO (GRpcReply ())
forall (m :: * -> *) a b o.
MonadIO m =>
TMChan a -> Async b -> ConduitT a o m b
conduitFromChannel TMChan v
chan Async (GRpcReply ())
promise)
instance ( KnownName name
, GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r
, handler ~ (CompressMode -> IO (ConduitT v Void IO (GRpcReply r))) )
=> GRpcMethodCall p ('Method name anns '[ 'ArgStream aname aanns vref ]
('RetSingle rref)) handler where
gRpcMethodCall :: RPCTy p
-> Proxy
@(Method Symbol Symbol Symbol)
('Method
@Symbol
@Symbol
@Symbol
name
anns
((':)
@(Argument Symbol Symbol)
('ArgStream @Symbol @Symbol aname aanns vref)
('[] @(Argument Symbol Symbol)))
('RetSingle @Symbol rref))
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client compress :: CompressMode
compress
= do
TMChan v
chan <- IO (TMChan v)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan v)
Async (GRpcReply r)
promise <- IO (GRpcReply r) -> IO (Async (GRpcReply r))
forall a. IO a -> IO (Async a)
async (IO (GRpcReply r) -> IO (Async (GRpcReply r)))
-> IO (GRpcReply r) -> IO (Async (GRpcReply r))
forall a b. (a -> b) -> a -> b
$
(GRpcReply (GRpcOWTy @Symbol p rref r) -> GRpcReply r)
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((GRpcOWTy @Symbol p rref r -> r)
-> GRpcReply (GRpcOWTy @Symbol p rref r) -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) rref -> GRpcOWTy @Symbol p rref r -> r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> GRpcOWTy @snm p ref r -> r
unGRpcOWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) rref
forall k (t :: k). Proxy @k t
Proxy @rref))) (IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r)
forall a b. (a -> b) -> a -> b
$
ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall a b. (a -> b) -> a -> b
$
Either
TooMuchConcurrency ((), RawReply (GRpcOWTy @Symbol p rref r))
-> GRpcReply (GRpcOWTy @Symbol p rref r)
forall r a.
Either TooMuchConcurrency (r, RawReply a) -> GRpcReply a
buildGRpcReply2 (Either
TooMuchConcurrency ((), RawReply (GRpcOWTy @Symbol p rref r))
-> GRpcReply (GRpcOWTy @Symbol p rref r))
-> ExceptT
ClientError
IO
(Either
TooMuchConcurrency ((), RawReply (GRpcOWTy @Symbol p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
RPCTy p
-> GrpcClient
-> ()
-> (()
-> ClientIO
((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v)))
-> ExceptT
ClientError
IO
(Either
TooMuchConcurrency ((), RawReply (GRpcOWTy @Symbol p rref r)))
forall r i o a.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> a
-> (a -> ClientIO (a, Either StreamDone (CompressMode, i)))
-> ClientIO (Either TooMuchConcurrency (a, RawReply o))
rawStreamClient @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r) RPCTy p
rpc GrpcClient
client ()
(\_ -> do Maybe v
nextVal <- IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe v) -> ExceptT ClientError IO (Maybe v))
-> IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ STM (Maybe v) -> IO (Maybe v)
forall a. STM a -> IO a
atomically (STM (Maybe v) -> IO (Maybe v)) -> STM (Maybe v) -> IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ TMChan v -> STM (Maybe v)
forall a. TMChan a -> STM (Maybe a)
readTMChan TMChan v
chan
case Maybe v
nextVal of
Nothing -> ((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
-> ClientIO
((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), StreamDone
-> Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v)
forall a b. a -> Either a b
Left StreamDone
StreamDone)
Just v :: v
v -> ((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
-> ClientIO
((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), (CompressMode, GRpcIWTy @Symbol p vref v)
-> Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v)
forall a b. b -> Either a b
Right (CompressMode
compress, Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) vref -> v -> GRpcIWTy @Symbol p vref v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> r -> GRpcIWTy @snm p ref r
buildGRpcIWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) vref
forall k (t :: k). Proxy @k t
Proxy @vref) v
v)))
ConduitT v Void IO (GRpcReply r)
-> IO (ConduitT v Void IO (GRpcReply r))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TMChan v -> Async (GRpcReply r) -> ConduitT v Void IO (GRpcReply r)
forall (m :: * -> *) a b o.
MonadIO m =>
TMChan a -> Async b -> ConduitT a o m b
conduitFromChannel TMChan v
chan Async (GRpcReply r)
promise)
conduitFromChannel :: MonadIO m => TMChan a -> Async b -> ConduitT a o m b
conduitFromChannel :: TMChan a -> Async b -> ConduitT a o m b
conduitFromChannel chan :: TMChan a
chan promise :: Async b
promise = ConduitT a o m b
go
where go :: ConduitT a o m b
go = do Maybe a
x <- ConduitT a o m (Maybe a)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await
case Maybe a
x of
Just v :: a
v -> do IO () -> ConduitT a o m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT a o m ()) -> IO () -> ConduitT a o m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan a -> a -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan a
chan a
v
ConduitT a o m b
go
Nothing -> do IO () -> ConduitT a o m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT a o m ()) -> IO () -> ConduitT a o m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan a -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan a
chan
IO b -> ConduitT a o m b
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO b -> ConduitT a o m b) -> IO b -> ConduitT a o m b
forall a b. (a -> b) -> a -> b
$ Async b -> IO b
forall a. Async a -> IO a
wait Async b
promise
instance ( KnownName name
, GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r
, handler ~ (CompressMode -> IO (ConduitT v (GRpcReply r) IO ())) )
=> GRpcMethodCall p ('Method name anns '[ 'ArgStream aname aans vref ]
('RetStream rref)) handler where
gRpcMethodCall :: RPCTy p
-> Proxy
@(Method Symbol Symbol Symbol)
('Method
@Symbol
@Symbol
@Symbol
name
anns
((':)
@(Argument Symbol Symbol)
('ArgStream @Symbol @Symbol aname aans vref)
('[] @(Argument Symbol Symbol)))
('RetStream @Symbol rref))
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client compress :: CompressMode
compress
= do
TMChan (GRpcReply r)
inchan <- IO (TMChan (GRpcReply r))
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan (GRpcReply r))
TMChan v
outchan <- IO (TMChan v)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan v)
TMVar (GRpcReply ())
var <- IO (TMVar (GRpcReply ()))
forall a. IO (TMVar a)
newEmptyTMVarIO
Async ()
_ <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ do
GRpcReply ()
v <- ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> IO (GRpcReply ()))
-> ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$
Either TooMuchConcurrency ((), ()) -> GRpcReply ()
forall v. Either TooMuchConcurrency v -> GRpcReply ()
buildGRpcReply3 (Either TooMuchConcurrency ((), ()) -> GRpcReply ())
-> ExceptT ClientError IO (Either TooMuchConcurrency ((), ()))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
RPCTy p
-> GrpcClient
-> ()
-> (()
-> IncomingEvent (GRpcOWTy @Symbol p rref r) () -> ClientIO ())
-> ()
-> (()
-> ClientIO ((), OutgoingEvent (GRpcIWTy @Symbol p vref v) ()))
-> ExceptT ClientError IO (Either TooMuchConcurrency ((), ()))
forall r i o a b.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> a
-> (a -> IncomingEvent o a -> ClientIO a)
-> b
-> (b -> ClientIO (b, OutgoingEvent i b))
-> ClientIO (Either TooMuchConcurrency (a, b))
rawGeneralStream
@_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
RPCTy p
rpc GrpcClient
client
() (\_ ievent :: IncomingEvent (GRpcOWTy @Symbol p rref r) ()
ievent -> do
Bool
_ <- IO Bool -> ExceptT ClientError IO Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> ExceptT ClientError IO Bool)
-> IO Bool -> ExceptT ClientError IO Bool
forall a b. (a -> b) -> a -> b
$ STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> GRpcReply () -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (GRpcReply ())
var (() -> GRpcReply ()
forall a. a -> GRpcReply a
GRpcOk ())
case IncomingEvent (GRpcOWTy @Symbol p rref r) ()
ievent of
RecvMessage o :: GRpcOWTy @Symbol p rref r
o -> IO () -> ClientIO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ClientIO ()) -> IO () -> ClientIO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan (GRpcReply r) -> GRpcReply r -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan (GRpcReply r)
inchan (r -> GRpcReply r
forall a. a -> GRpcReply a
GRpcOk (r -> GRpcReply r) -> r -> GRpcReply r
forall a b. (a -> b) -> a -> b
$ Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) rref -> GRpcOWTy @Symbol p rref r -> r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> GRpcOWTy @snm p ref r -> r
unGRpcOWTy(Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) rref
forall k (t :: k). Proxy @k t
Proxy @rref) GRpcOWTy @Symbol p rref r
o)
Invalid e :: SomeException
e -> IO () -> ClientIO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ClientIO ()) -> IO () -> ClientIO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan (GRpcReply r) -> GRpcReply r -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan (GRpcReply r)
inchan (String -> GRpcReply r
forall a. String -> GRpcReply a
GRpcErrorString (SomeException -> String
forall a. Show a => a -> String
show SomeException
e))
_ -> () -> ClientIO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure () )
() (\_ -> do
Maybe v
nextVal <- IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe v) -> ExceptT ClientError IO (Maybe v))
-> IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ STM (Maybe v) -> IO (Maybe v)
forall a. STM a -> IO a
atomically (STM (Maybe v) -> IO (Maybe v)) -> STM (Maybe v) -> IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ TMChan v -> STM (Maybe v)
forall a. TMChan a -> STM (Maybe a)
readTMChan TMChan v
outchan
case Maybe v
nextVal of
Nothing -> ((), OutgoingEvent (GRpcIWTy @Symbol p vref v) ())
-> ClientIO ((), OutgoingEvent (GRpcIWTy @Symbol p vref v) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), OutgoingEvent (GRpcIWTy @Symbol p vref v) ()
forall i b. OutgoingEvent i b
Finalize)
Just v :: v
v -> ((), OutgoingEvent (GRpcIWTy @Symbol p vref v) ())
-> ClientIO ((), OutgoingEvent (GRpcIWTy @Symbol p vref v) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), CompressMode
-> GRpcIWTy @Symbol p vref v
-> OutgoingEvent (GRpcIWTy @Symbol p vref v) ()
forall i b. CompressMode -> i -> OutgoingEvent i b
SendMessage CompressMode
compress (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) vref -> v -> GRpcIWTy @Symbol p vref v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> r -> GRpcIWTy @snm p ref r
buildGRpcIWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) vref
forall k (t :: k). Proxy @k t
Proxy @vref) v
v)))
case GRpcReply ()
v of
GRpcOk () -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan (GRpcReply r) -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan (GRpcReply r)
inchan
_ -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> GRpcReply () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (GRpcReply ())
var GRpcReply ()
v
let go :: ConduitT v (GRpcReply r) IO ()
go = do GRpcReply ()
err <- IO (GRpcReply ()) -> ConduitT v (GRpcReply r) IO (GRpcReply ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (GRpcReply ()) -> ConduitT v (GRpcReply r) IO (GRpcReply ()))
-> IO (GRpcReply ()) -> ConduitT v (GRpcReply r) IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ STM (GRpcReply ()) -> IO (GRpcReply ())
forall a. STM a -> IO a
atomically (STM (GRpcReply ()) -> IO (GRpcReply ()))
-> STM (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> STM (GRpcReply ())
forall a. TMVar a -> STM a
takeTMVar TMVar (GRpcReply ())
var
case GRpcReply ()
err of
GRpcOk _ -> ConduitT v (GRpcReply r) IO ()
go2
e :: GRpcReply ()
e -> GRpcReply r -> ConduitT v (GRpcReply r) IO ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (GRpcReply r -> ConduitT v (GRpcReply r) IO ())
-> GRpcReply r -> ConduitT v (GRpcReply r) IO ()
forall a b. (a -> b) -> a -> b
$ (\_ -> String -> r
forall a. HasCallStack => String -> a
error "this should never happen") (() -> r) -> GRpcReply () -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> GRpcReply ()
e
go2 :: ConduitT v (GRpcReply r) IO ()
go2 = do Maybe v
nextOut <- ConduitT v (GRpcReply r) IO (Maybe v)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await
case Maybe v
nextOut of
Just v :: v
v -> do IO () -> ConduitT v (GRpcReply r) IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT v (GRpcReply r) IO ())
-> IO () -> ConduitT v (GRpcReply r) IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan v -> v -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan v
outchan v
v
ConduitT v (GRpcReply r) IO ()
go2
Nothing -> do Maybe (Maybe (GRpcReply r))
r <- IO (Maybe (Maybe (GRpcReply r)))
-> ConduitT v (GRpcReply r) IO (Maybe (Maybe (GRpcReply r)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (Maybe (GRpcReply r)))
-> ConduitT v (GRpcReply r) IO (Maybe (Maybe (GRpcReply r))))
-> IO (Maybe (Maybe (GRpcReply r)))
-> ConduitT v (GRpcReply r) IO (Maybe (Maybe (GRpcReply r)))
forall a b. (a -> b) -> a -> b
$ STM (Maybe (Maybe (GRpcReply r)))
-> IO (Maybe (Maybe (GRpcReply r)))
forall a. STM a -> IO a
atomically (STM (Maybe (Maybe (GRpcReply r)))
-> IO (Maybe (Maybe (GRpcReply r))))
-> STM (Maybe (Maybe (GRpcReply r)))
-> IO (Maybe (Maybe (GRpcReply r)))
forall a b. (a -> b) -> a -> b
$ TMChan (GRpcReply r) -> STM (Maybe (Maybe (GRpcReply r)))
forall a. TMChan a -> STM (Maybe (Maybe a))
tryReadTMChan TMChan (GRpcReply r)
inchan
case Maybe (Maybe (GRpcReply r))
r of
Nothing -> () -> ConduitT v (GRpcReply r) IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just Nothing -> ConduitT v (GRpcReply r) IO ()
go2
Just (Just nextIn :: GRpcReply r
nextIn) -> GRpcReply r -> ConduitT v (GRpcReply r) IO ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield GRpcReply r
nextIn ConduitT v (GRpcReply r) IO ()
-> ConduitT v (GRpcReply r) IO () -> ConduitT v (GRpcReply r) IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ConduitT v (GRpcReply r) IO ()
go2
ConduitT v (GRpcReply r) IO ()
-> IO (ConduitT v (GRpcReply r) IO ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ConduitT v (GRpcReply r) IO ()
go