{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE PackageImports #-}
{-# LANGUAGE RankNTypes #-}

module Utxorpc.Logged
  ( UtxorpcClientLogger (..),
    RequestLogger,
    ReplyLogger,
    ServerStreamLogger,
    ServerStreamEndLogger,
    loggedSStream,
    loggedSStream',
    loggedUnary,
    loggedUnary',
    UnaryExecutor,
    ServerStreamExecutor,
  )
where

import Control.Monad.IO.Class (liftIO)
import qualified Data.ByteString.Char8 as BS
import Data.ProtoLens (Message)
import Data.UUID (UUID)
import Data.UUID.V4 (nextRandom)
import Network.GRPC.Client (HeaderList, RawReply)
import Network.GRPC.Client.Helpers (GrpcClient (..), rawStreamServer, rawUnary)
import Network.GRPC.HTTP2.Encoding (GRPCInput, GRPCOutput)
import Network.GRPC.HTTP2.Types (IsRPC (..))
import Network.HTTP2.Client.Exceptions (ClientIO)
import Utxorpc.Types (ServerStreamReply, UnaryReply)
import "http2-client" Network.HTTP2.Client (ClientError, TooMuchConcurrency, runClientIO)

{--------------------------------------
  Types
--------------------------------------}

-- | Logging functions to log requests, replies, server stream messages, and server stream endings.
-- A UUID is generated for each request and passed downstream to the other logging functions.
data UtxorpcClientLogger m = UtxorpcClientLogger
  { -- | Log outgoing requests.
    forall (m :: * -> *). UtxorpcClientLogger m -> RequestLogger m
requestLogger :: RequestLogger m,
    -- | Log incoming unary replies.
    forall (m :: * -> *). UtxorpcClientLogger m -> ReplyLogger m
replyLogger :: ReplyLogger m,
    -- | Log incoming server stream messages.
    forall (m :: * -> *). UtxorpcClientLogger m -> ServerStreamLogger m
serverStreamLogger :: ServerStreamLogger m,
    -- | Log the end of a server stream.
    forall (m :: * -> *).
UtxorpcClientLogger m -> ServerStreamEndLogger m
serverStreamEndLogger :: ServerStreamEndLogger m,
    -- | Provided here as a convenience to allow logging functions to be written in any monadic stack
    -- without having to apply the unlift function to each logging function individually.
    forall (m :: * -> *).
UtxorpcClientLogger m -> forall x. m x -> IO x
unlift :: forall x. m x -> IO x
  }

-- | Log outgoing requests of all types (i.e., unary requests and server stream requests).
type RequestLogger m =
  forall i.
  (Show i, Message i) =>
  -- | The RPC path
  BS.ByteString ->
  -- | Included because it contains useful information such as the server address.
  GrpcClient ->
  -- | Generated for this request, and passed to other logging functions for other RPC events generated by this request.
  -- E.g., A unary request and its reply both have the same UUID.
  UUID ->
  -- | The request message being sent.
  i ->
  m ()

-- | Log unary replies.
type ReplyLogger m =
  forall o.
  (Show o, Message o) =>
  -- | The RPC path
  BS.ByteString ->
  -- | Included because it contains useful information such as the server address.
  GrpcClient ->
  -- | Generated for the request that this reply is associated with.
  UUID ->
  -- | Message received from the service (with headers) or an error.
  Either ClientError (Either TooMuchConcurrency (RawReply o)) ->
  m ()

-- | Log server stream messages.
type ServerStreamLogger m =
  forall o.
  (Show o, Message o) =>
  -- | The RPC path
  BS.ByteString ->
  -- | Included because it contains useful information such as the server address.
  GrpcClient ->
  -- | The UUID was generated for the request that caused this reply,
  -- the Int is the index of this message in the stream.
  (UUID, Int) ->
  -- | Message received from the service.
  o ->
  m ()

-- The GrcpClient is included because it contains useful information such as the server address.
-- The UUID was generated for the request that this reply is associated with.
type ServerStreamEndLogger m =
  -- | The RPC path
  BS.ByteString ->
  -- | Included because it contains useful information such as the server address.
  GrpcClient ->
  -- | The UUID was generated for the request that caused this reply,
  -- the Int is the total number of messages received in the stream.
  (UUID, Int) ->
  -- | Headers and Trailers.
  (HeaderList, HeaderList) ->
  m ()

-- | The type of http-client-grpc's `rawUnary`. Used internally and in tests.
type UnaryExecutor r i o =
  -- | The RPC to call.
  r ->
  -- | An initialized client.
  GrpcClient ->
  -- | The input.
  i ->
  ClientIO (Either TooMuchConcurrency (RawReply o))

-- | The type of http2-client-grpc's `rawStreamServer`. Used internally and in tests.
type ServerStreamExecutor r i o =
  forall a.
  -- | The RPC to call.
  r ->
  -- | An initialized client.
  GrpcClient ->
  -- | An initial state.
  a ->
  -- | The input of the stream request.
  i ->
  -- | A state-passing handler called for each server-sent output.
  -- Headers are repeated for convenience but are the same for every iteration.
  (a -> HeaderList -> o -> ClientIO a) ->
  ClientIO (Either TooMuchConcurrency (a, HeaderList, HeaderList))

{--------------------------------------
  Logged wrappers of gRPC library functions
--------------------------------------}

loggedUnary ::
  (GRPCInput r i, GRPCOutput r o, Show i, Message i, Show o, Message o) =>
  Maybe (UtxorpcClientLogger m) ->
  r ->
  GrpcClient ->
  i ->
  UnaryReply o
loggedUnary :: forall r i o (m :: * -> *).
(GRPCInput r i, GRPCOutput r o, Show i, Message i, Show o,
 Message o) =>
Maybe (UtxorpcClientLogger m)
-> r -> GrpcClient -> i -> UnaryReply o
loggedUnary = UnaryExecutor r i o
-> Maybe (UtxorpcClientLogger m)
-> r
-> GrpcClient
-> i
-> UnaryReply o
forall r i o (m :: * -> *).
(GRPCInput r i, Show i, Message i, Show o, Message o) =>
UnaryExecutor r i o
-> Maybe (UtxorpcClientLogger m)
-> r
-> GrpcClient
-> i
-> UnaryReply o
loggedUnary' UnaryExecutor r i o
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> i
-> ClientIO (Either TooMuchConcurrency (RawReply o))
rawUnary

loggedUnary' ::
  (GRPCInput r i, Show i, Message i, Show o, Message o) =>
  UnaryExecutor r i o ->
  Maybe (UtxorpcClientLogger m) ->
  r ->
  GrpcClient ->
  i ->
  UnaryReply o
loggedUnary' :: forall r i o (m :: * -> *).
(GRPCInput r i, Show i, Message i, Show o, Message o) =>
UnaryExecutor r i o
-> Maybe (UtxorpcClientLogger m)
-> r
-> GrpcClient
-> i
-> UnaryReply o
loggedUnary' UnaryExecutor r i o
sendUnary Maybe (UtxorpcClientLogger m)
logger r
rpc GrpcClient
client i
msg =
  UnaryReply o
-> (UtxorpcClientLogger m -> UnaryReply o)
-> Maybe (UtxorpcClientLogger m)
-> UnaryReply o
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (ClientIO (Either TooMuchConcurrency (RawReply o)) -> UnaryReply o
forall a. ClientIO a -> IO (Either ClientError a)
runClientIO (ClientIO (Either TooMuchConcurrency (RawReply o)) -> UnaryReply o)
-> ClientIO (Either TooMuchConcurrency (RawReply o))
-> UnaryReply o
forall a b. (a -> b) -> a -> b
$ UnaryExecutor r i o
sendUnary r
rpc GrpcClient
client i
msg) UtxorpcClientLogger m -> UnaryReply o
forall {m :: * -> *}. UtxorpcClientLogger m -> UnaryReply o
logged Maybe (UtxorpcClientLogger m)
logger
  where
    logged :: UtxorpcClientLogger m -> UnaryReply o
logged UtxorpcClientLogger {RequestLogger m
requestLogger :: forall (m :: * -> *). UtxorpcClientLogger m -> RequestLogger m
requestLogger :: RequestLogger m
requestLogger, ReplyLogger m
replyLogger :: forall (m :: * -> *). UtxorpcClientLogger m -> ReplyLogger m
replyLogger :: ReplyLogger m
replyLogger, forall x. m x -> IO x
unlift :: forall (m :: * -> *).
UtxorpcClientLogger m -> forall x. m x -> IO x
unlift :: forall x. m x -> IO x
unlift} = do
      UUID
uuid <- IO UUID
nextRandom
      m () -> IO ()
forall x. m x -> IO x
unlift (m () -> IO ()) -> m () -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> GrpcClient -> UUID -> i -> m ()
RequestLogger m
requestLogger (r -> ByteString
forall t. IsRPC t => t -> ByteString
path r
rpc) GrpcClient
client UUID
uuid i
msg
      Either ClientError (Either TooMuchConcurrency (RawReply o))
o <- ClientIO (Either TooMuchConcurrency (RawReply o)) -> UnaryReply o
forall a. ClientIO a -> IO (Either ClientError a)
runClientIO (ClientIO (Either TooMuchConcurrency (RawReply o)) -> UnaryReply o)
-> ClientIO (Either TooMuchConcurrency (RawReply o))
-> UnaryReply o
forall a b. (a -> b) -> a -> b
$ UnaryExecutor r i o
sendUnary r
rpc GrpcClient
client i
msg
      m () -> IO ()
forall x. m x -> IO x
unlift (m () -> IO ()) -> m () -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString
-> GrpcClient
-> UUID
-> Either ClientError (Either TooMuchConcurrency (RawReply o))
-> m ()
ReplyLogger m
replyLogger (r -> ByteString
forall t. IsRPC t => t -> ByteString
path r
rpc) GrpcClient
client UUID
uuid Either ClientError (Either TooMuchConcurrency (RawReply o))
o
      Either ClientError (Either TooMuchConcurrency (RawReply o))
-> UnaryReply o
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Either ClientError (Either TooMuchConcurrency (RawReply o))
o

-- The gRPC library requires a handler that produces a `ClientIO`,
-- but this does not make sense since a user-provided handler is not
-- likely to generate a `ClientError` or `TooMuchConcurrency`.
-- Instead, we accept a handler of type `IO` and lift it.
loggedSStream ::
  (GRPCOutput r o, GRPCInput r i, Show i, Message i, Show o, Message o) =>
  Maybe (UtxorpcClientLogger m) ->
  r ->
  GrpcClient ->
  a ->
  i ->
  (a -> HeaderList -> o -> IO a) ->
  ServerStreamReply a
loggedSStream :: forall r o i (m :: * -> *) a.
(GRPCOutput r o, GRPCInput r i, Show i, Message i, Show o,
 Message o) =>
Maybe (UtxorpcClientLogger m)
-> r
-> GrpcClient
-> a
-> i
-> (a -> HeaderList -> o -> IO a)
-> ServerStreamReply a
loggedSStream = ServerStreamExecutor r i o
-> Maybe (UtxorpcClientLogger m)
-> r
-> GrpcClient
-> a
-> i
-> (a -> HeaderList -> o -> IO a)
-> ServerStreamReply a
forall r o i (m :: * -> *) a.
(GRPCOutput r o, Show i, Message i, Show o, Message o) =>
ServerStreamExecutor r i o
-> Maybe (UtxorpcClientLogger m)
-> r
-> GrpcClient
-> a
-> i
-> (a -> HeaderList -> o -> IO a)
-> ServerStreamReply a
loggedSStream' r
-> GrpcClient
-> a
-> i
-> (a -> HeaderList -> o -> ClientIO a)
-> ClientIO (Either TooMuchConcurrency (a, HeaderList, HeaderList))
ServerStreamExecutor r i o
forall r i o a.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> a
-> i
-> (a -> HeaderList -> o -> ClientIO a)
-> ClientIO (Either TooMuchConcurrency (a, HeaderList, HeaderList))
rawStreamServer

loggedSStream' ::
  (GRPCOutput r o, Show i, Message i, Show o, Message o) =>
  ServerStreamExecutor r i o ->
  Maybe (UtxorpcClientLogger m) ->
  r ->
  GrpcClient ->
  a ->
  i ->
  (a -> HeaderList -> o -> IO a) ->
  ServerStreamReply a
loggedSStream' :: forall r o i (m :: * -> *) a.
(GRPCOutput r o, Show i, Message i, Show o, Message o) =>
ServerStreamExecutor r i o
-> Maybe (UtxorpcClientLogger m)
-> r
-> GrpcClient
-> a
-> i
-> (a -> HeaderList -> o -> IO a)
-> ServerStreamReply a
loggedSStream' ServerStreamExecutor r i o
sendStreamReq Maybe (UtxorpcClientLogger m)
logger r
r GrpcClient
client a
initStreamState i
req a -> HeaderList -> o -> IO a
chunkHandler =
  ServerStreamReply a
-> (UtxorpcClientLogger m -> ServerStreamReply a)
-> Maybe (UtxorpcClientLogger m)
-> ServerStreamReply a
forall b a. b -> (a -> b) -> Maybe a -> b
maybe
    (ClientIO (Either TooMuchConcurrency (a, HeaderList, HeaderList))
-> ServerStreamReply a
forall a. ClientIO a -> IO (Either ClientError a)
runClientIO (ClientIO (Either TooMuchConcurrency (a, HeaderList, HeaderList))
 -> ServerStreamReply a)
-> ClientIO (Either TooMuchConcurrency (a, HeaderList, HeaderList))
-> ServerStreamReply a
forall a b. (a -> b) -> a -> b
$ r
-> GrpcClient
-> a
-> i
-> (a -> HeaderList -> o -> ClientIO a)
-> ClientIO (Either TooMuchConcurrency (a, HeaderList, HeaderList))
ServerStreamExecutor r i o
sendStreamReq r
r GrpcClient
client a
initStreamState i
req a -> HeaderList -> o -> ClientIO a
forall {m :: * -> *}. MonadIO m => a -> HeaderList -> o -> m a
liftedChunkHandler)
    UtxorpcClientLogger m -> ServerStreamReply a
forall {m :: * -> *}. UtxorpcClientLogger m -> ServerStreamReply a
logged
    Maybe (UtxorpcClientLogger m)
logger
  where
    liftedChunkHandler :: a -> HeaderList -> o -> m a
liftedChunkHandler a
streamState HeaderList
headerList o
reply = IO a -> m a
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> m a) -> IO a -> m a
forall a b. (a -> b) -> a -> b
$ a -> HeaderList -> o -> IO a
chunkHandler a
streamState HeaderList
headerList o
reply

    logged :: UtxorpcClientLogger m -> ServerStreamReply a
logged
      UtxorpcClientLogger {RequestLogger m
requestLogger :: forall (m :: * -> *). UtxorpcClientLogger m -> RequestLogger m
requestLogger :: RequestLogger m
requestLogger, ServerStreamLogger m
serverStreamLogger :: forall (m :: * -> *). UtxorpcClientLogger m -> ServerStreamLogger m
serverStreamLogger :: ServerStreamLogger m
serverStreamLogger, ServerStreamEndLogger m
serverStreamEndLogger :: forall (m :: * -> *).
UtxorpcClientLogger m -> ServerStreamEndLogger m
serverStreamEndLogger :: ServerStreamEndLogger m
serverStreamEndLogger, forall x. m x -> IO x
unlift :: forall (m :: * -> *).
UtxorpcClientLogger m -> forall x. m x -> IO x
unlift :: forall x. m x -> IO x
unlift} = do
        UUID
uuid <- IO UUID
nextRandom
        m () -> IO ()
forall x. m x -> IO x
unlift (m () -> IO ()) -> m () -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> GrpcClient -> UUID -> i -> m ()
RequestLogger m
requestLogger ByteString
rpcPath GrpcClient
client UUID
uuid i
req
        Either
  ClientError
  (Either
     TooMuchConcurrency ((a, UUID, Int), HeaderList, HeaderList))
streamResult <- UUID
-> IO
     (Either
        ClientError
        (Either
           TooMuchConcurrency ((a, UUID, Int), HeaderList, HeaderList)))
runLoggedStream UUID
uuid
        Either
  ClientError
  (Either
     TooMuchConcurrency ((a, UUID, Int), HeaderList, HeaderList))
-> ServerStreamReply a
forall {m :: * -> *} {a} {a} {a}.
MonadIO m =>
Either a (Either a ((a, UUID, Int), HeaderList, HeaderList))
-> m (Either a (Either a (a, HeaderList, HeaderList)))
handleStreamResult Either
  ClientError
  (Either
     TooMuchConcurrency ((a, UUID, Int), HeaderList, HeaderList))
streamResult
        where
          runLoggedStream :: UUID
-> IO
     (Either
        ClientError
        (Either
           TooMuchConcurrency ((a, UUID, Int), HeaderList, HeaderList)))
runLoggedStream UUID
uuid =
            ClientIO
  (Either
     TooMuchConcurrency ((a, UUID, Int), HeaderList, HeaderList))
-> IO
     (Either
        ClientError
        (Either
           TooMuchConcurrency ((a, UUID, Int), HeaderList, HeaderList)))
forall a. ClientIO a -> IO (Either ClientError a)
runClientIO (ClientIO
   (Either
      TooMuchConcurrency ((a, UUID, Int), HeaderList, HeaderList))
 -> IO
      (Either
         ClientError
         (Either
            TooMuchConcurrency ((a, UUID, Int), HeaderList, HeaderList))))
-> ClientIO
     (Either
        TooMuchConcurrency ((a, UUID, Int), HeaderList, HeaderList))
-> IO
     (Either
        ClientError
        (Either
           TooMuchConcurrency ((a, UUID, Int), HeaderList, HeaderList)))
forall a b. (a -> b) -> a -> b
$
              r
-> GrpcClient
-> (a, UUID, Int)
-> i
-> ((a, UUID, Int) -> HeaderList -> o -> ClientIO (a, UUID, Int))
-> ClientIO
     (Either
        TooMuchConcurrency ((a, UUID, Int), HeaderList, HeaderList))
ServerStreamExecutor r i o
sendStreamReq r
r GrpcClient
client (a
initStreamState, UUID
uuid, Int
0) i
req (a, UUID, Int) -> HeaderList -> o -> ClientIO (a, UUID, Int)
forall {m :: * -> *}.
MonadIO m =>
(a, UUID, Int) -> HeaderList -> o -> m (a, UUID, Int)
loggedChunkHandler

          loggedChunkHandler :: (a, UUID, Int) -> HeaderList -> o -> m (a, UUID, Int)
loggedChunkHandler (a
streamState, UUID
uuid, Int
index) HeaderList
headerList o
msg = do
            IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (m () -> IO ()) -> m () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> IO ()
forall x. m x -> IO x
unlift (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ ByteString -> GrpcClient -> (UUID, Int) -> o -> m ()
ServerStreamLogger m
serverStreamLogger ByteString
rpcPath GrpcClient
client (UUID
uuid, Int
index) o
msg
            a
a <- IO a -> m a
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> m a) -> IO a -> m a
forall a b. (a -> b) -> a -> b
$ a -> HeaderList -> o -> IO a
chunkHandler a
streamState HeaderList
headerList o
msg
            (a, UUID, Int) -> m (a, UUID, Int)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, UUID
uuid, Int
index Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)

          handleStreamResult :: Either a (Either a ((a, UUID, Int), HeaderList, HeaderList))
-> m (Either a (Either a (a, HeaderList, HeaderList)))
handleStreamResult Either a (Either a ((a, UUID, Int), HeaderList, HeaderList))
streamResult =
            case Either a (Either a ((a, UUID, Int), HeaderList, HeaderList))
streamResult of
              Right (Right ((a
finalStreamState, UUID
uuid, Int
index), HeaderList
hl, HeaderList
hl')) -> do
                IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (m () -> IO ()) -> m () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> IO ()
forall x. m x -> IO x
unlift (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ ServerStreamEndLogger m
serverStreamEndLogger ByteString
rpcPath GrpcClient
client (UUID
uuid, Int
index) (HeaderList
hl, HeaderList
hl')
                Either a (Either a (a, HeaderList, HeaderList))
-> m (Either a (Either a (a, HeaderList, HeaderList)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either a (Either a (a, HeaderList, HeaderList))
 -> m (Either a (Either a (a, HeaderList, HeaderList))))
-> Either a (Either a (a, HeaderList, HeaderList))
-> m (Either a (Either a (a, HeaderList, HeaderList)))
forall a b. (a -> b) -> a -> b
$ Either a (a, HeaderList, HeaderList)
-> Either a (Either a (a, HeaderList, HeaderList))
forall a b. b -> Either a b
Right (Either a (a, HeaderList, HeaderList)
 -> Either a (Either a (a, HeaderList, HeaderList)))
-> Either a (a, HeaderList, HeaderList)
-> Either a (Either a (a, HeaderList, HeaderList))
forall a b. (a -> b) -> a -> b
$ (a, HeaderList, HeaderList) -> Either a (a, HeaderList, HeaderList)
forall a b. b -> Either a b
Right (a
finalStreamState, HeaderList
hl, HeaderList
hl')
              Right (Left a
tmc) -> Either a (Either a (a, HeaderList, HeaderList))
-> m (Either a (Either a (a, HeaderList, HeaderList)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either a (Either a (a, HeaderList, HeaderList))
 -> m (Either a (Either a (a, HeaderList, HeaderList))))
-> Either a (Either a (a, HeaderList, HeaderList))
-> m (Either a (Either a (a, HeaderList, HeaderList)))
forall a b. (a -> b) -> a -> b
$ Either a (a, HeaderList, HeaderList)
-> Either a (Either a (a, HeaderList, HeaderList))
forall a b. b -> Either a b
Right (Either a (a, HeaderList, HeaderList)
 -> Either a (Either a (a, HeaderList, HeaderList)))
-> Either a (a, HeaderList, HeaderList)
-> Either a (Either a (a, HeaderList, HeaderList))
forall a b. (a -> b) -> a -> b
$ a -> Either a (a, HeaderList, HeaderList)
forall a b. a -> Either a b
Left a
tmc
              Left a
errCode -> Either a (Either a (a, HeaderList, HeaderList))
-> m (Either a (Either a (a, HeaderList, HeaderList)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either a (Either a (a, HeaderList, HeaderList))
 -> m (Either a (Either a (a, HeaderList, HeaderList))))
-> Either a (Either a (a, HeaderList, HeaderList))
-> m (Either a (Either a (a, HeaderList, HeaderList)))
forall a b. (a -> b) -> a -> b
$ a -> Either a (Either a (a, HeaderList, HeaderList))
forall a b. a -> Either a b
Left a
errCode

          rpcPath :: ByteString
rpcPath = r -> ByteString
forall t. IsRPC t => t -> ByteString
path r
r