{-# language DataKinds #-} {-# language FlexibleContexts #-} {-# language FlexibleInstances #-} {-# language GADTs #-} {-# language MultiParamTypeClasses #-} {-# language PolyKinds #-} {-# language RankNTypes #-} {-# language ScopedTypeVariables #-} {-# language TypeApplications #-} {-# language TypeOperators #-} {-# language UndecidableInstances #-} {-| Description : Execute a Mu 'Server' using gRPC as transport layer This module allows you to server a Mu 'Server' as a WAI 'Application' using gRPC as transport layer. The simples way is to use 'runGRpcApp', all other variants provide more control over the settings. -} module Mu.GRpc.Server ( -- * Run a 'Server' directly runGRpcApp, runGRpcAppTrans , runGRpcAppSettings, Settings , runGRpcAppTLS, TLSSettings -- * Convert a 'Server' into a WAI application , gRpcApp -- * Raise errors as exceptions in IO , raiseErrors, liftServerConduit ) where import Control.Concurrent.Async import Control.Concurrent.STM (atomically) import Control.Concurrent.STM.TMVar import Control.Exception import Control.Monad.Except import Data.ByteString (ByteString) import qualified Data.ByteString.Char8 as BS import Data.Conduit import Data.Conduit.TMChan import Data.Kind import Data.Proxy import Network.GRPC.HTTP2.Encoding (gzip, uncompressed) import Network.GRPC.HTTP2.Proto3Wire import Network.GRPC.HTTP2.Types (GRPCStatus (..), GRPCStatusCode (..)) import Network.GRPC.Server.Handlers import Network.GRPC.Server.Wai as Wai import Network.Wai (Application) import Network.Wai.Handler.Warp (Port, Settings, run, runSettings) import Network.Wai.Handler.WarpTLS (TLSSettings, runTLS) import Mu.Adapter.ProtoBuf.Via import Mu.Rpc import Mu.Schema import Mu.Server -- | Run a Mu 'Server' on the given port. runGRpcApp :: ( KnownName name, KnownName (FindPackageName anns) , GRpcMethodHandlers ServerErrorIO methods handlers ) => Port -> ServerT Maybe ('Service name anns methods) ServerErrorIO handlers -> IO () runGRpcApp port = runGRpcAppTrans port id -- | Run a Mu 'Server' on the given port. runGRpcAppTrans :: ( KnownName name, KnownName (FindPackageName anns) , GRpcMethodHandlers m methods handlers ) => Port -> (forall a. m a -> ServerErrorIO a) -> ServerT Maybe ('Service name anns methods) m handlers -> IO () runGRpcAppTrans port f svr = run port (gRpcAppTrans f svr) -- | Run a Mu 'Server' using the given 'Settings'. -- -- Go to 'Network.Wai.Handler.Warp' to declare 'Settings'. runGRpcAppSettings :: ( KnownName name, KnownName (FindPackageName anns) , GRpcMethodHandlers m methods handlers ) => Settings -> (forall a. m a -> ServerErrorIO a) -> ServerT Maybe ('Service name anns methods) m handlers -> IO () runGRpcAppSettings st f svr = runSettings st (gRpcAppTrans f svr) -- | Run a Mu 'Server' using the given 'TLSSettings' and 'Settings'. -- -- Go to 'Network.Wai.Handler.WarpTLS' to declare 'TLSSettings' -- and to 'Network.Wai.Handler.Warp' to declare 'Settings'. runGRpcAppTLS :: ( KnownName name, KnownName (FindPackageName anns) , GRpcMethodHandlers m methods handlers ) => TLSSettings -> Settings -> (forall a. m a -> ServerErrorIO a) -> ServerT Maybe ('Service name anns methods) m handlers -> IO () runGRpcAppTLS tls st f svr = runTLS tls st (gRpcAppTrans f svr) -- | Turn a Mu 'Server' into a WAI 'Application'. -- -- These 'Application's can be later combined using, -- for example, @wai-routes@, or you can add middleware -- from @wai-extra@, among others. gRpcApp :: ( KnownName name, KnownName (FindPackageName anns) , GRpcMethodHandlers ServerErrorIO methods handlers ) => ServerT Maybe ('Service name anns methods) ServerErrorIO handlers -> Application gRpcApp = gRpcAppTrans id -- | Turn a Mu 'Server' into a WAI 'Application'. -- -- These 'Application's can be later combined using, -- for example, @wai-routes@, or you can add middleware -- from @wai-extra@, among others. gRpcAppTrans :: ( KnownName name, KnownName (FindPackageName anns) , GRpcMethodHandlers m methods handlers ) => (forall a. m a -> ServerErrorIO a) -> ServerT Maybe ('Service name anns methods) m handlers -> Application gRpcAppTrans f svr = Wai.grpcApp [uncompressed, gzip] (gRpcServiceHandlers f svr) gRpcServiceHandlers :: forall name anns methods handlers m. (KnownName name, KnownName (FindPackageName anns), GRpcMethodHandlers m methods handlers) => (forall a. m a -> ServerErrorIO a) -> ServerT Maybe ('Service name anns methods) m handlers -> [ServiceHandler] gRpcServiceHandlers f (Server svr) = gRpcMethodHandlers f packageName serviceName svr where packageName = BS.pack (nameVal (Proxy @(FindPackageName anns))) serviceName = BS.pack (nameVal (Proxy @name)) class GRpcMethodHandlers (m :: Type -> Type) (ms :: [Method mnm]) (hs :: [Type]) where gRpcMethodHandlers :: (forall a. m a -> ServerErrorIO a) -> ByteString -> ByteString -> HandlersT Maybe ms m hs -> [ServiceHandler] instance GRpcMethodHandlers m '[] '[] where gRpcMethodHandlers _ _ _ H0 = [] instance (KnownName name, GRpcMethodHandler m args r h, GRpcMethodHandlers m rest hs) => GRpcMethodHandlers m ('Method name anns args r ': rest) (h ': hs) where gRpcMethodHandlers f p s (h :<|>: rest) = gRpcMethodHandler f (Proxy @args) (Proxy @r) (RPC p s methodName) h : gRpcMethodHandlers f p s rest where methodName = BS.pack (nameVal (Proxy @name)) class GRpcMethodHandler m args r h where gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a) -> Proxy args -> Proxy r -> RPC -> h -> ServiceHandler -- | Turns a 'Conduit' working on 'ServerErrorIO' -- into any other base monad which supports 'IO', -- by raising any error as an exception. -- -- This function is useful to interoperate with -- libraries which generate 'Conduit's with other -- base monads, such as @persistent@. liftServerConduit :: MonadIO m => ConduitT a b ServerErrorIO r -> ConduitT a b m r liftServerConduit = transPipe raiseErrors -- | Raises errors from 'ServerErrorIO' as exceptions -- in a monad which supports 'IO'. -- -- This function is useful to interoperate with other -- libraries which cannot handle the additional error -- layer. In particular, with Conduit, as witnessed -- by 'liftServerConduit'. raiseErrors :: MonadIO m => ServerErrorIO a -> m a raiseErrors h = liftIO $ do h' <- runExceptT h case h' of Right r -> return r Left (ServerError code msg) -> closeEarly $ GRPCStatus (serverErrorToGRpcError code) (BS.pack msg) `catches` [ Handler (\(e :: GRPCStatus) -> throwIO e) , Handler (\(e :: SomeException) -> closeEarly $ GRPCStatus INTERNAL (BS.pack $ show e)) ] where serverErrorToGRpcError :: ServerErrorCode -> GRPCStatusCode serverErrorToGRpcError Unknown = UNKNOWN serverErrorToGRpcError Unavailable = UNAVAILABLE serverErrorToGRpcError Unimplemented = UNIMPLEMENTED serverErrorToGRpcError Unauthenticated = UNAUTHENTICATED serverErrorToGRpcError Internal = INTERNAL serverErrorToGRpcError NotFound = NOT_FOUND serverErrorToGRpcError Invalid = INVALID_ARGUMENT instance GRpcMethodHandler m '[ ] 'RetNothing (m ()) where gRpcMethodHandler f _ _ rpc h = unary @_ @() @() rpc (\_ _ -> raiseErrors (f h)) instance (ToProtoBufTypeRef rref r) => GRpcMethodHandler m '[ ] ('RetSingle rref) (m r) where gRpcMethodHandler f _ _ rpc h = unary @_ @() @(ViaToProtoBufTypeRef rref r) rpc (\_ _ -> ViaToProtoBufTypeRef <$> raiseErrors (f h)) instance (ToProtoBufTypeRef rref r, MonadIO m) => GRpcMethodHandler m '[ ] ('RetStream rref) (ConduitT r Void m () -> m ()) where gRpcMethodHandler f _ _ rpc h = serverStream @_ @() @(ViaToProtoBufTypeRef rref r) rpc sstream where sstream :: req -> () -> IO ((), ServerStream (ViaToProtoBufTypeRef rref r) ()) sstream _ _ = do -- Variable to connect input and output var <- newEmptyTMVarIO :: IO (TMVar (Maybe r)) -- Start executing the handler promise <- async (raiseErrors $ ViaToProtoBufTypeRef <$> f (h (toTMVarConduit var))) -- Return the information let readNext _ = do nextOutput <- atomically $ takeTMVar var case nextOutput of Just o -> return $ Just ((), ViaToProtoBufTypeRef o) Nothing -> do cancel promise return Nothing return ((), ServerStream readNext) instance (FromProtoBufTypeRef vref v) => GRpcMethodHandler m '[ 'ArgSingle vref ] 'RetNothing (v -> m ()) where gRpcMethodHandler f _ _ rpc h = unary @_ @(ViaFromProtoBufTypeRef vref v) @() rpc (\_ -> raiseErrors . f . h . unViaFromProtoBufTypeRef) instance (FromProtoBufTypeRef vref v, ToProtoBufTypeRef rref r) => GRpcMethodHandler m '[ 'ArgSingle vref ] ('RetSingle rref) (v -> m r) where gRpcMethodHandler f _ _ rpc h = unary @_ @(ViaFromProtoBufTypeRef vref v) @(ViaToProtoBufTypeRef rref r) rpc (\_ -> (ViaToProtoBufTypeRef <$>) . raiseErrors . f . h . unViaFromProtoBufTypeRef) instance (FromProtoBufTypeRef vref v, ToProtoBufTypeRef rref r, MonadIO m) => GRpcMethodHandler m '[ 'ArgStream vref ] ('RetSingle rref) (ConduitT () v m () -> m r) where gRpcMethodHandler f _ _ rpc h = clientStream @_ @(ViaFromProtoBufTypeRef vref v) @(ViaToProtoBufTypeRef rref r) rpc cstream where cstream :: req -> IO ((), ClientStream (ViaFromProtoBufTypeRef vref v) (ViaToProtoBufTypeRef rref r) ()) cstream _ = do -- Create a new TMChan chan <- newTMChanIO :: IO (TMChan v) let producer = sourceTMChan @m chan -- Start executing the handler in another thread promise <- async (raiseErrors $ ViaToProtoBufTypeRef <$> f (h producer)) -- Build the actual handler let cstreamHandler _ (ViaFromProtoBufTypeRef newInput) = atomically (writeTMChan chan newInput) cstreamFinalizer _ = atomically (closeTMChan chan) >> wait promise -- Return the information return ((), ClientStream cstreamHandler cstreamFinalizer) instance (FromProtoBufTypeRef vref v, ToProtoBufTypeRef rref r, MonadIO m) => GRpcMethodHandler m '[ 'ArgSingle vref ] ('RetStream rref) (v -> ConduitT r Void m () -> m ()) where gRpcMethodHandler f _ _ rpc h = serverStream @_ @(ViaFromProtoBufTypeRef vref v) @(ViaToProtoBufTypeRef rref r) rpc sstream where sstream :: req -> ViaFromProtoBufTypeRef vref v -> IO ((), ServerStream (ViaToProtoBufTypeRef rref r) ()) sstream _ (ViaFromProtoBufTypeRef v) = do -- Variable to connect input and output var <- newEmptyTMVarIO :: IO (TMVar (Maybe r)) -- Start executing the handler promise <- async (raiseErrors $ ViaToProtoBufTypeRef <$> f (h v (toTMVarConduit var))) -- Return the information let readNext _ = do nextOutput <- atomically $ takeTMVar var case nextOutput of Just o -> return $ Just ((), ViaToProtoBufTypeRef o) Nothing -> do cancel promise return Nothing return ((), ServerStream readNext) instance (FromProtoBufTypeRef vref v, ToProtoBufTypeRef rref r, MonadIO m) => GRpcMethodHandler m '[ 'ArgStream vref ] ('RetStream rref) (ConduitT () v m () -> ConduitT r Void m () -> m ()) where gRpcMethodHandler f _ _ rpc h = generalStream @_ @(ViaFromProtoBufTypeRef vref v) @(ViaToProtoBufTypeRef rref r) rpc bdstream where bdstream :: req -> IO ( (), IncomingStream (ViaFromProtoBufTypeRef vref v) () , (), OutgoingStream (ViaToProtoBufTypeRef rref r) () ) bdstream _ = do -- Create a new TMChan and a new variable chan <- newTMChanIO :: IO (TMChan v) let producer = sourceTMChan @m chan var <- newEmptyTMVarIO :: IO (TMVar (Maybe r)) -- Start executing the handler promise <- async (raiseErrors $ f $ h producer (toTMVarConduit var)) -- Build the actual handler let cstreamHandler _ (ViaFromProtoBufTypeRef newInput) = atomically (writeTMChan chan newInput) cstreamFinalizer _ = atomically (closeTMChan chan) >> wait promise readNext _ = do nextOutput <- atomically $ tryTakeTMVar var case nextOutput of Just (Just o) -> return $ Just ((), ViaToProtoBufTypeRef o) Just Nothing -> do cancel promise return Nothing Nothing -> -- no new elements to output readNext () return ((), IncomingStream cstreamHandler cstreamFinalizer, (), OutgoingStream readNext) toTMVarConduit :: MonadIO m => TMVar (Maybe r) -> ConduitT r Void m () toTMVarConduit var = do x <- await liftIO $ atomically $ putTMVar var x toTMVarConduit var