{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE StrictData #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeApplications #-}
module OpenTracing.Reporting.Batch
( BatchOptions
, batchOptions
, boptAtCapacity
, boptBatchSize
, boptErrorLog
, boptQueueSize
, boptReporter
, boptTimeoutSec
, AtCapacity (..)
, defaultErrorLog
, BatchEnv
, newBatchEnv
, closeBatchEnv
, batchReporter
)
where
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception (AsyncException (ThreadKilled))
import Control.Exception.Safe
import Control.Lens
import Control.Monad
import Control.Monad.IO.Class
import Data.ByteString.Builder
import Data.Time (NominalDiffTime)
import Data.Word
import Numeric.Natural (Natural)
import OpenTracing.Span
import OpenTracing.Time
import System.IO (stderr)
import System.Timeout
data BatchOptions = BatchOptions
{ BatchOptions -> Word16
_boptBatchSize :: Word16
, BatchOptions -> Word
_boptTimeoutSec :: Word
, BatchOptions -> [FinishedSpan] -> IO ()
_boptReporter :: [FinishedSpan] -> IO ()
, BatchOptions -> Builder -> IO ()
_boptErrorLog :: Builder -> IO ()
, BatchOptions -> Natural
_boptQueueSize :: Natural
, BatchOptions -> AtCapacity
_boptAtCapacity :: AtCapacity
}
data AtCapacity = Drop | Block
batchOptions :: ([FinishedSpan] -> IO ()) -> BatchOptions
batchOptions :: ([FinishedSpan] -> IO ()) -> BatchOptions
batchOptions [FinishedSpan] -> IO ()
f = BatchOptions
{ _boptBatchSize :: Word16
_boptBatchSize = Word16
100
, _boptTimeoutSec :: Word
_boptTimeoutSec = Word
5
, _boptReporter :: [FinishedSpan] -> IO ()
_boptReporter = [FinishedSpan] -> IO ()
f
, _boptErrorLog :: Builder -> IO ()
_boptErrorLog = Builder -> IO ()
defaultErrorLog
, _boptQueueSize :: Natural
_boptQueueSize = Natural
1000
, _boptAtCapacity :: AtCapacity
_boptAtCapacity = AtCapacity
Drop
}
defaultErrorLog :: Builder -> IO ()
defaultErrorLog :: Builder -> IO ()
defaultErrorLog = Handle -> Builder -> IO ()
hPutBuilder Handle
stderr
makeLenses ''BatchOptions
data BatchEnv = BatchEnv
{ BatchEnv -> TBQueue FinishedSpan
envQ :: TBQueue FinishedSpan
, BatchEnv -> Async ()
envRep :: Async ()
, BatchEnv -> AtCapacity
envCap :: AtCapacity
, BatchEnv -> Builder -> IO ()
envLog :: Builder -> IO ()
}
newBatchEnv :: BatchOptions -> IO BatchEnv
newBatchEnv :: BatchOptions -> IO BatchEnv
newBatchEnv BatchOptions
opt = do
TBQueue FinishedSpan
q <- forall a. Natural -> IO (TBQueue a)
newTBQueueIO (BatchOptions -> Natural
_boptQueueSize BatchOptions
opt)
Async ()
c <- BatchOptions -> TBQueue FinishedSpan -> IO (Async ())
consumer BatchOptions
opt TBQueue FinishedSpan
q
forall (f :: * -> *) a. Applicative f => a -> f a
pure BatchEnv
{ envQ :: TBQueue FinishedSpan
envQ = TBQueue FinishedSpan
q
, envRep :: Async ()
envRep = Async ()
c
, envCap :: AtCapacity
envCap = BatchOptions -> AtCapacity
_boptAtCapacity BatchOptions
opt
, envLog :: Builder -> IO ()
envLog = BatchOptions -> Builder -> IO ()
_boptErrorLog BatchOptions
opt
}
closeBatchEnv :: BatchEnv -> IO ()
closeBatchEnv :: BatchEnv -> IO ()
closeBatchEnv = forall a. Async a -> IO ()
cancel forall b c a. (b -> c) -> (a -> b) -> a -> c
. BatchEnv -> Async ()
envRep
batchReporter :: MonadIO m => BatchEnv -> FinishedSpan -> m ()
batchReporter :: forall (m :: * -> *). MonadIO m => BatchEnv -> FinishedSpan -> m ()
batchReporter BatchEnv{envCap :: BatchEnv -> AtCapacity
envCap = AtCapacity
Block, TBQueue FinishedSpan
envQ :: TBQueue FinishedSpan
envQ :: BatchEnv -> TBQueue FinishedSpan
envQ, Builder -> IO ()
envLog :: Builder -> IO ()
envLog :: BatchEnv -> Builder -> IO ()
envLog} FinishedSpan
fspan = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
Bool
full <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> STM Bool
isFullTBQueue TBQueue FinishedSpan
envQ
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
full forall a b. (a -> b) -> a -> b
$
Builder -> IO ()
envLog Builder
"Queue at capacity, enqueueing span may block\n"
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue FinishedSpan
envQ FinishedSpan
fspan
batchReporter BatchEnv{envCap :: BatchEnv -> AtCapacity
envCap = AtCapacity
Drop, TBQueue FinishedSpan
envQ :: TBQueue FinishedSpan
envQ :: BatchEnv -> TBQueue FinishedSpan
envQ, Builder -> IO ()
envLog :: Builder -> IO ()
envLog :: BatchEnv -> Builder -> IO ()
envLog} FinishedSpan
fspan = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
Bool
full <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
Bool
full <- forall a. TBQueue a -> STM Bool
isFullTBQueue TBQueue FinishedSpan
envQ
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
full forall a b. (a -> b) -> a -> b
$
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue FinishedSpan
envQ FinishedSpan
fspan
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
full
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
full forall a b. (a -> b) -> a -> b
$
Builder -> IO ()
envLog Builder
"Queue at capacity, span was dropped\n"
consumer :: BatchOptions -> TBQueue FinishedSpan -> IO (Async ())
consumer :: BatchOptions -> TBQueue FinishedSpan -> IO (Async ())
consumer opt :: BatchOptions
opt@BatchOptions{Natural
Word
Word16
AtCapacity
[FinishedSpan] -> IO ()
Builder -> IO ()
_boptAtCapacity :: AtCapacity
_boptQueueSize :: Natural
_boptErrorLog :: Builder -> IO ()
_boptReporter :: [FinishedSpan] -> IO ()
_boptTimeoutSec :: Word
_boptBatchSize :: Word16
_boptAtCapacity :: BatchOptions -> AtCapacity
_boptQueueSize :: BatchOptions -> Natural
_boptErrorLog :: BatchOptions -> Builder -> IO ()
_boptReporter :: BatchOptions -> [FinishedSpan] -> IO ()
_boptTimeoutSec :: BatchOptions -> Word
_boptBatchSize :: BatchOptions -> Word16
..} TBQueue FinishedSpan
q = forall a. IO a -> IO (Async a)
async forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
[FinishedSpan]
xs <- IO [FinishedSpan]
popBlocking
Bool -> [FinishedSpan] -> IO ()
go Bool
False [FinishedSpan]
xs
where
popBlocking :: IO [FinishedSpan]
popBlocking = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
FinishedSpan
x <- forall a. TBQueue a -> STM a
readTBQueue TBQueue FinishedSpan
q
(FinishedSpan
xforall a. a -> [a] -> [a]
:) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Word16 -> TBQueue a -> STM [a]
pop (Word16
_boptBatchSize forall a. Num a => a -> a -> a
- Word16
1) TBQueue FinishedSpan
q
popNonblock :: IO [FinishedSpan]
popNonblock = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. Word16 -> TBQueue a -> STM [a]
pop Word16
_boptBatchSize TBQueue FinishedSpan
q
go :: Bool -> [FinishedSpan] -> IO ()
go Bool
_ [] = forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
go Bool
True [FinishedSpan]
batch = [FinishedSpan] -> IO ()
report [FinishedSpan]
batch forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> IO ()
drain
go Bool
False [FinishedSpan]
batch = forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync ([FinishedSpan] -> IO ()
report [FinishedSpan]
batch) forall a b. (a -> b) -> a -> b
$ \Async ()
a ->
forall a. Async a -> IO ()
timedWait Async ()
a forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catchAsync` \case
AsyncException
ThreadKilled -> do
BatchOptions -> Err -> IO ()
logErr BatchOptions
opt Err
ErrReporterCancelled
forall a. Async a -> IO ()
timedWait Async ()
a forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally` forall a. Async a -> IO ()
uninterruptibleCancel Async ()
a
IO ()
drain
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM AsyncException
ThreadKilled
AsyncException
e -> BatchOptions -> Err -> IO ()
logErr BatchOptions
opt (AsyncException -> Err
ErrReporterAsyncException AsyncException
e) forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM AsyncException
e
report :: [FinishedSpan] -> IO ()
report [FinishedSpan]
batch = [FinishedSpan] -> IO ()
_boptReporter [FinishedSpan]
batch forall (m :: * -> *) a.
MonadCatch m =>
m a -> (SomeException -> m a) -> m a
`catchAny`
(BatchOptions -> Err -> IO ()
logErr BatchOptions
opt forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> Err
ErrReporterException)
timedWait :: Async a -> IO ()
timedWait Async a
a = forall a. Int -> IO a -> IO (Maybe a)
timeout Int
timeoutMicros (forall a. Async a -> IO a
wait Async a
a) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe a
Nothing -> BatchOptions -> Err -> IO ()
logErr BatchOptions
opt Err
ErrReporterTimeout
Maybe a
_ -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
drain :: IO ()
drain = do
BatchOptions -> Err -> IO ()
logErr BatchOptions
opt Err
ErrReporterDraining
IO [FinishedSpan]
popNonblock forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> [FinishedSpan] -> IO ()
go Bool
True
timeoutMicros :: Int
timeoutMicros = forall a b. (AsMicros a, Integral b) => a -> b
micros @NominalDiffTime forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
_boptTimeoutSec
pop :: Word16 -> TBQueue a -> STM [a]
pop :: forall a. Word16 -> TBQueue a -> STM [a]
pop Word16
0 TBQueue a
_ = forall (f :: * -> *) a. Applicative f => a -> f a
pure []
pop Word16
n TBQueue a
q = do
Maybe a
v <- forall a. TBQueue a -> STM (Maybe a)
tryReadTBQueue TBQueue a
q
case Maybe a
v of
Maybe a
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure []
Just a
v' -> (a
v' forall a. a -> [a] -> [a]
:) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Word16 -> TBQueue a -> STM [a]
pop (Word16
nforall a. Num a => a -> a -> a
-Word16
1) TBQueue a
q
data Err
= ErrReporterException SomeException
| ErrReporterTimeout
| ErrReporterCancelled
| ErrReporterAsyncException AsyncException
| ErrReporterDraining
logErr :: BatchOptions -> Err -> IO ()
logErr :: BatchOptions -> Err -> IO ()
logErr BatchOptions{_boptErrorLog :: BatchOptions -> Builder -> IO ()
_boptErrorLog=Builder -> IO ()
errlog} Err
e = Builder -> IO ()
errlog forall a b. (a -> b) -> a -> b
$ Err -> Builder
msg Err
e forall a. Semigroup a => a -> a -> a
<> Builder
nl
where
sbs :: ShortByteString -> Builder
sbs = ShortByteString -> Builder
shortByteString
ebs :: Exception e => e -> Builder
ebs :: forall e. Exception e => e -> Builder
ebs = String -> Builder
string8 forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Show a => a -> String
show
msg :: Err -> Builder
msg = \case
ErrReporterException SomeException
ex -> ShortByteString -> Builder
sbs ShortByteString
"Reporter Error: " forall a. Semigroup a => a -> a -> a
<> forall e. Exception e => e -> Builder
ebs SomeException
ex
Err
ErrReporterTimeout -> ShortByteString -> Builder
sbs ShortByteString
"Reporter timed out!"
Err
ErrReporterCancelled -> ShortByteString -> Builder
sbs ShortByteString
"Batch reporter cancelled, shutting down gracefully"
ErrReporterAsyncException AsyncException
ex -> ShortByteString -> Builder
sbs ShortByteString
"Batch reporter received async exception: " forall a. Semigroup a => a -> a -> a
<> forall e. Exception e => e -> Builder
ebs AsyncException
ex
Err
ErrReporterDraining -> ShortByteString -> Builder
sbs ShortByteString
"Draining batch reporter queue"
nl :: Builder
nl = Char -> Builder
char8 Char
'\n'