{-|
Module: OpenTracing.Reporting.Batch

This module provides a trace reporter that groups recorded spans into batches
before sending them to their destination in bulk.

-}
{-# LANGUAGE LambdaCase        #-}
{-# LANGUAGE NamedFieldPuns    #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards   #-}
{-# LANGUAGE StrictData        #-}
{-# LANGUAGE TemplateHaskell   #-}
{-# LANGUAGE TypeApplications  #-}

module OpenTracing.Reporting.Batch
    ( BatchOptions
    , batchOptions
    , boptBatchSize
    , boptTimeoutSec
    , boptReporter
    , boptErrorLog

    , defaultErrorLog

    , BatchEnv
    , newBatchEnv
    , closeBatchEnv

    , batchReporter
    )
where

import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception        (AsyncException (ThreadKilled))
import Control.Exception.Safe
import Control.Lens
import Control.Monad
import Control.Monad.IO.Class
import Data.ByteString.Builder
import Data.Semigroup
import Data.Time                (NominalDiffTime)
import Data.Word
import OpenTracing.Span
import OpenTracing.Time
import System.IO                (stderr)
import System.Timeout

-- | Options available to construct a batch reporter. Default options are
-- available with `batchOptions`
data BatchOptions = BatchOptions
    { BatchOptions -> Word16
_boptBatchSize  :: Word16
    -- ^ The maximum number of elements to report in a batch. Default 100
    , BatchOptions -> Word
_boptTimeoutSec :: Word
    -- ^ The maximum time (in seconds) to wait while reporting a batch before erroring.
    -- Default 5 seconds.
    , BatchOptions -> [FinishedSpan] -> IO ()
_boptReporter   :: [FinishedSpan] -> IO ()
    -- ^ The function to call with the batch of spans. Has an upper bound on size equal
    -- to _boptBatchSize. No default.
    , BatchOptions -> Builder -> IO ()
_boptErrorLog   :: Builder        -> IO ()
    -- ^ What to do with errors. Default print to stderr.
    }

-- | Default batch options which can be overridden via lenses.
batchOptions :: ([FinishedSpan] -> IO ()) -> BatchOptions
batchOptions :: ([FinishedSpan] -> IO ()) -> BatchOptions
batchOptions [FinishedSpan] -> IO ()
f = BatchOptions :: Word16
-> Word
-> ([FinishedSpan] -> IO ())
-> (Builder -> IO ())
-> BatchOptions
BatchOptions
    { _boptBatchSize :: Word16
_boptBatchSize  = Word16
100
    , _boptTimeoutSec :: Word
_boptTimeoutSec = Word
5
    , _boptReporter :: [FinishedSpan] -> IO ()
_boptReporter   = [FinishedSpan] -> IO ()
f
    , _boptErrorLog :: Builder -> IO ()
_boptErrorLog   = Builder -> IO ()
defaultErrorLog
    }

-- | An error logging function which prints to stderr.
defaultErrorLog :: Builder -> IO ()
defaultErrorLog :: Builder -> IO ()
defaultErrorLog = Handle -> Builder -> IO ()
hPutBuilder Handle
stderr

makeLenses ''BatchOptions


-- | The environment of a batch reporter.
data BatchEnv = BatchEnv
    { BatchEnv -> TQueue FinishedSpan
envQ   :: TQueue FinishedSpan
    -- ^ The queue of spans to be reported
    , BatchEnv -> Async ()
envRep :: Async ()
    -- ^ Asynchronous consumer of the queue
    }

-- | Create a new batch environment
newBatchEnv :: BatchOptions -> IO BatchEnv
newBatchEnv :: BatchOptions -> IO BatchEnv
newBatchEnv BatchOptions
opt = do
    TQueue FinishedSpan
q <- IO (TQueue FinishedSpan)
forall a. IO (TQueue a)
newTQueueIO
    TQueue FinishedSpan -> Async () -> BatchEnv
BatchEnv TQueue FinishedSpan
q (Async () -> BatchEnv) -> IO (Async ()) -> IO BatchEnv
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BatchOptions -> TQueue FinishedSpan -> IO (Async ())
consumer BatchOptions
opt TQueue FinishedSpan
q

-- | Close a batch reporter, stop consuming any new spans. Any
-- spans in the queue will be drained.
closeBatchEnv :: BatchEnv -> IO ()
closeBatchEnv :: BatchEnv -> IO ()
closeBatchEnv = Async () -> IO ()
forall a. Async a -> IO ()
cancel (Async () -> IO ()) -> (BatchEnv -> Async ()) -> BatchEnv -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BatchEnv -> Async ()
envRep

-- | An implementation of `OpenTracing.Tracer.tracerReport` that batches the finished
-- spans for transimission to their destination.
batchReporter :: MonadIO m => BatchEnv -> FinishedSpan -> m ()
batchReporter :: BatchEnv -> FinishedSpan -> m ()
batchReporter BatchEnv{TQueue FinishedSpan
envQ :: TQueue FinishedSpan
envQ :: BatchEnv -> TQueue FinishedSpan
envQ} = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (FinishedSpan -> IO ()) -> FinishedSpan -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ())
-> (FinishedSpan -> STM ()) -> FinishedSpan -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TQueue FinishedSpan -> FinishedSpan -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue FinishedSpan
envQ

consumer :: BatchOptions -> TQueue FinishedSpan -> IO (Async ())
consumer :: BatchOptions -> TQueue FinishedSpan -> IO (Async ())
consumer opt :: BatchOptions
opt@BatchOptions{Word
Word16
[FinishedSpan] -> IO ()
Builder -> IO ()
_boptErrorLog :: Builder -> IO ()
_boptReporter :: [FinishedSpan] -> IO ()
_boptTimeoutSec :: Word
_boptBatchSize :: Word16
_boptErrorLog :: BatchOptions -> Builder -> IO ()
_boptReporter :: BatchOptions -> [FinishedSpan] -> IO ()
_boptTimeoutSec :: BatchOptions -> Word
_boptBatchSize :: BatchOptions -> Word16
..} TQueue FinishedSpan
q = IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ()))
-> (IO () -> IO ()) -> IO () -> IO (Async ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ do
    [FinishedSpan]
xs <- IO [FinishedSpan]
popBlocking
    Bool -> [FinishedSpan] -> IO ()
go Bool
False [FinishedSpan]
xs
  where
    popBlocking :: IO [FinishedSpan]
popBlocking = STM [FinishedSpan] -> IO [FinishedSpan]
forall a. STM a -> IO a
atomically (STM [FinishedSpan] -> IO [FinishedSpan])
-> STM [FinishedSpan] -> IO [FinishedSpan]
forall a b. (a -> b) -> a -> b
$ do
        FinishedSpan
x <- TQueue FinishedSpan -> STM FinishedSpan
forall a. TQueue a -> STM a
readTQueue TQueue FinishedSpan
q
        (FinishedSpan
xFinishedSpan -> [FinishedSpan] -> [FinishedSpan]
forall a. a -> [a] -> [a]
:) ([FinishedSpan] -> [FinishedSpan])
-> STM [FinishedSpan] -> STM [FinishedSpan]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Word16 -> TQueue FinishedSpan -> STM [FinishedSpan]
forall a. Word16 -> TQueue a -> STM [a]
pop (Word16
_boptBatchSize Word16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
- Word16
1) TQueue FinishedSpan
q

    popNonblock :: IO [FinishedSpan]
popNonblock = STM [FinishedSpan] -> IO [FinishedSpan]
forall a. STM a -> IO a
atomically (STM [FinishedSpan] -> IO [FinishedSpan])
-> STM [FinishedSpan] -> IO [FinishedSpan]
forall a b. (a -> b) -> a -> b
$ Word16 -> TQueue FinishedSpan -> STM [FinishedSpan]
forall a. Word16 -> TQueue a -> STM [a]
pop Word16
_boptBatchSize TQueue FinishedSpan
q

    go :: Bool -> [FinishedSpan] -> IO ()
go Bool
_     []    = () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    go Bool
True  [FinishedSpan]
batch = [FinishedSpan] -> IO ()
report [FinishedSpan]
batch IO () -> IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> IO ()
drain
    go Bool
False [FinishedSpan]
batch = IO () -> (Async () -> IO ()) -> IO ()
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync ([FinishedSpan] -> IO ()
report [FinishedSpan]
batch) ((Async () -> IO ()) -> IO ()) -> (Async () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Async ()
a ->
        Async () -> IO ()
forall a. Async a -> IO ()
timedWait Async ()
a IO () -> (AsyncException -> IO ()) -> IO ()
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catchAsync` \case
            AsyncException
ThreadKilled -> do
                BatchOptions -> Err -> IO ()
logErr BatchOptions
opt Err
ErrReporterCancelled
                Async () -> IO ()
forall a. Async a -> IO ()
timedWait Async ()
a IO () -> IO () -> IO ()
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally` Async () -> IO ()
forall a. Async a -> IO ()
uninterruptibleCancel Async ()
a
                IO ()
drain
                AsyncException -> IO ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM AsyncException
ThreadKilled

            AsyncException
e -> BatchOptions -> Err -> IO ()
logErr BatchOptions
opt (AsyncException -> Err
ErrReporterAsyncException AsyncException
e) IO () -> IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> AsyncException -> IO ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM AsyncException
e

    report :: [FinishedSpan] -> IO ()
report [FinishedSpan]
batch = [FinishedSpan] -> IO ()
_boptReporter [FinishedSpan]
batch IO () -> (SomeException -> IO ()) -> IO ()
forall (m :: * -> *) a.
MonadCatch m =>
m a -> (SomeException -> m a) -> m a
`catchAny`
        (BatchOptions -> Err -> IO ()
logErr BatchOptions
opt (Err -> IO ()) -> (SomeException -> Err) -> SomeException -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> Err
ErrReporterException)

    timedWait :: Async a -> IO ()
timedWait Async a
a = Int -> IO a -> IO (Maybe a)
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
timeoutMicros (Async a -> IO a
forall a. Async a -> IO a
wait Async a
a) IO (Maybe a) -> (Maybe a -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Maybe a
Nothing -> BatchOptions -> Err -> IO ()
logErr BatchOptions
opt Err
ErrReporterTimeout
        Maybe a
_       -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

    drain :: IO ()
drain = do
        BatchOptions -> Err -> IO ()
logErr BatchOptions
opt Err
ErrReporterDraining
        IO [FinishedSpan]
popNonblock IO [FinishedSpan] -> ([FinishedSpan] -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> [FinishedSpan] -> IO ()
go Bool
True

    timeoutMicros :: Int
timeoutMicros = forall b.
(AsMicros NominalDiffTime, Integral b) =>
NominalDiffTime -> b
forall a b. (AsMicros a, Integral b) => a -> b
micros @NominalDiffTime (NominalDiffTime -> Int) -> NominalDiffTime -> Int
forall a b. (a -> b) -> a -> b
$ Word -> NominalDiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
_boptTimeoutSec


pop :: Word16 -> TQueue a -> STM [a]
pop :: Word16 -> TQueue a -> STM [a]
pop Word16
0 TQueue a
_ = [a] -> STM [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
pop Word16
n TQueue a
q = do
    Maybe a
v <- TQueue a -> STM (Maybe a)
forall a. TQueue a -> STM (Maybe a)
tryReadTQueue TQueue a
q
    case Maybe a
v of
        Maybe a
Nothing -> [a] -> STM [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
        Just a
v' -> (a
v' a -> [a] -> [a]
forall a. a -> [a] -> [a]
:) ([a] -> [a]) -> STM [a] -> STM [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Word16 -> TQueue a -> STM [a]
forall a. Word16 -> TQueue a -> STM [a]
pop (Word16
nWord16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
-Word16
1) TQueue a
q

data Err
    = ErrReporterException      SomeException
    | ErrReporterTimeout
    | ErrReporterCancelled
    | ErrReporterAsyncException AsyncException
    | ErrReporterDraining

logErr :: BatchOptions -> Err -> IO ()
logErr :: BatchOptions -> Err -> IO ()
logErr BatchOptions{_boptErrorLog :: BatchOptions -> Builder -> IO ()
_boptErrorLog=Builder -> IO ()
errlog} Err
e = Builder -> IO ()
errlog (Builder -> IO ()) -> Builder -> IO ()
forall a b. (a -> b) -> a -> b
$ Err -> Builder
msg Err
e Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Builder
nl
  where
    sbs :: ShortByteString -> Builder
sbs = ShortByteString -> Builder
shortByteString

    ebs :: Exception e => e -> Builder
    ebs :: e -> Builder
ebs = String -> Builder
string8 (String -> Builder) -> (e -> String) -> e -> Builder
forall b c a. (b -> c) -> (a -> b) -> a -> c
. e -> String
forall a. Show a => a -> String
show

    msg :: Err -> Builder
msg = \case
        ErrReporterException      SomeException
ex -> ShortByteString -> Builder
sbs ShortByteString
"Reporter Error: " Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> SomeException -> Builder
forall e. Exception e => e -> Builder
ebs SomeException
ex
        Err
ErrReporterTimeout           -> ShortByteString -> Builder
sbs ShortByteString
"Reporter timed out!"
        Err
ErrReporterCancelled         -> ShortByteString -> Builder
sbs ShortByteString
"Batch reporter cancelled, shutting down gracefully"
        ErrReporterAsyncException AsyncException
ex -> ShortByteString -> Builder
sbs ShortByteString
"Batch reporter received async exception: " Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> AsyncException -> Builder
forall e. Exception e => e -> Builder
ebs AsyncException
ex
        Err
ErrReporterDraining          -> ShortByteString -> Builder
sbs ShortByteString
"Draining batch reporter queue"

    nl :: Builder
nl = Char -> Builder
char8 Char
'\n'