{-# LANGUAGE DataKinds #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE GADTs #-}
module Network.GRPC.Client.Helpers where
import Control.Concurrent.Async (Async, async)
import Control.Concurrent (threadDelay)
import Control.Exception (throwIO)
import Control.Monad (forever)
import qualified Data.ByteString.Char8 as ByteString
import Data.ByteString.Char8 (ByteString)
import Data.Default.Class (def)
import Data.ProtoLens.Service.Types (Service(..), HasMethod, HasMethodImpl(..), StreamingType(..))
import qualified Network.TLS as TLS
import qualified Network.TLS.Extra.Cipher as TLS
import Network.HPACK (HeaderList)
import Network.HTTP2.Client (newHttp2FrameConnection, newHttp2Client, Http2Client(..), IncomingFlowControl(..), GoAwayHandler, FallBackFrameHandler, ignoreFallbackHandler, HostName, PortNumber, TooMuchConcurrency)
import Network.HTTP2.Client.Helpers (ping)
import Network.GRPC.Client (RPC, open, singleRequest, streamReply, streamRequest, Authority, Timeout(..), StreamDone, CompressMode, RawReply)
import Network.GRPC.HTTP2.Encoding (Compression, Encoding(..), Decoding(..), gzip)
data GrpcClient = GrpcClient {
_grpcClientHttp2Client :: Http2Client
, _grpcClientAuthority :: Authority
, _grpcClientHeaders :: [(ByteString, ByteString)]
, _grpcClientTimeout :: Timeout
, _grpcClientCompression :: Compression
, _grpcClientBackground :: BackgroundTasks
}
data BackgroundTasks = BackgroundTasks {
backgroundWindowUpdate :: Async ()
, backgroundPing :: Async ()
}
data GrpcClientConfig = GrpcClientConfig {
_grpcClientConfigHost :: !HostName
, _grpcClientConfigPort :: !PortNumber
, _grpcClientConfigHeaders :: ![(ByteString, ByteString)]
, _grpcClientConfigTimeout :: !Timeout
, _grpcClientConfigCompression :: !Compression
, _grpcClientConfigTLS :: !(Maybe TLS.ClientParams)
, _grpcClientConfigGoAwayHandler :: GoAwayHandler
, _grpcClientConfigFallbackHandler :: FallBackFrameHandler
, _grpcClientConfigWindowUpdateDelay :: Int
, _grpcClientConfigPingDelay :: Int
}
grpcClientConfigSimple :: HostName -> PortNumber -> UseTlsOrNot -> GrpcClientConfig
grpcClientConfigSimple host port tls =
GrpcClientConfig host port [] (Timeout 3000) gzip (tlsSettings tls host port) throwIO ignoreFallbackHandler 5000000 1000000
type UseTlsOrNot = Bool
tlsSettings :: UseTlsOrNot -> HostName -> PortNumber -> Maybe TLS.ClientParams
tlsSettings False _ _ = Nothing
tlsSettings True host port = Just $ TLS.ClientParams {
TLS.clientWantSessionResume = Nothing
, TLS.clientUseMaxFragmentLength = Nothing
, TLS.clientServerIdentification = (host, ByteString.pack $ show port)
, TLS.clientUseServerNameIndication = True
, TLS.clientShared = def
, TLS.clientHooks = def { TLS.onServerCertificate = \_ _ _ _ -> return []
}
, TLS.clientSupported = def { TLS.supportedCiphers = TLS.ciphersuite_default }
, TLS.clientDebug = def
}
setupGrpcClient :: GrpcClientConfig -> IO GrpcClient
setupGrpcClient config = do
let host = _grpcClientConfigHost config
let port = _grpcClientConfigPort config
let tls = _grpcClientConfigTLS config
let compression = _grpcClientConfigCompression config
let onGoAway = _grpcClientConfigGoAwayHandler config
let onFallback = _grpcClientConfigFallbackHandler config
let timeout = _grpcClientConfigTimeout config
let headers = _grpcClientConfigHeaders config
let authority = ByteString.pack $ host <> ":" <> show port
conn <- newHttp2FrameConnection host port tls
cli <- newHttp2Client conn 8192 8192 [] onGoAway onFallback
wuAsync <- async $ forever $ do
threadDelay $ _grpcClientConfigWindowUpdateDelay config
_updateWindow $ _incomingFlowControl cli
pingAsync <- async $ forever $ do
threadDelay $ _grpcClientConfigPingDelay config
ping cli 3000000 "grpc.hs"
let tasks = BackgroundTasks wuAsync pingAsync
return $ GrpcClient cli authority headers timeout compression tasks
rawUnary
:: (Service s, HasMethod s m)
=> RPC s m
-> GrpcClient
-> MethodInput s m
-> IO (Either TooMuchConcurrency (RawReply (MethodOutput s m)))
rawUnary rpc (GrpcClient client authority headers timeout compression _) input =
let call = singleRequest rpc input
in open client authority headers timeout (Encoding compression) (Decoding compression) call
rawStreamServer
:: (Service s, HasMethod s m, MethodStreamingType s m ~ 'ServerStreaming)
=> RPC s m
-> GrpcClient
-> a
-> MethodInput s m
-> (a -> HeaderList -> MethodOutput s m -> IO a)
-> IO (Either TooMuchConcurrency (a, HeaderList, HeaderList))
rawStreamServer rpc (GrpcClient client authority headers timeout compression _) v0 input handler =
let call = streamReply rpc v0 input handler
in open client authority headers timeout (Encoding compression) (Decoding compression) call
rawStreamClient
:: (Service s, HasMethod s m, MethodStreamingType s m ~ 'ClientStreaming)
=> RPC s m
-> GrpcClient
-> a
-> (a -> IO (a, Either StreamDone (CompressMode, MethodInput s m)))
-> IO (Either TooMuchConcurrency (a, (RawReply (MethodOutput s m))))
rawStreamClient rpc (GrpcClient client authority headers timeout compression _) v0 getNext =
let call = streamRequest rpc v0 getNext
in open client authority headers timeout (Encoding compression) (Decoding compression) call