{-# LANGUAGE RecordWildCards #-}
module OpenTelemetry.Processor.Batch
( BatchTimeoutConfig(..)
, batchTimeoutConfig
, batchProcessor
) where
import Control.Monad.IO.Class
import OpenTelemetry.Processor
import OpenTelemetry.Exporter (Exporter)
import qualified OpenTelemetry.Exporter as Exporter
import VectorBuilder.Builder as Builder
import VectorBuilder.Vector as Builder
import Data.IORef (atomicModifyIORef', readIORef, newIORef)
import Control.Concurrent.Async
import Control.Concurrent.MVar (newEmptyMVar, takeMVar, tryPutMVar)
import Control.Monad
import System.Timeout
import Control.Exception
import Data.HashMap.Strict (HashMap)
import OpenTelemetry.Trace.Core
import qualified Data.HashMap.Strict as HashMap
import Data.Vector (Vector)
data BatchTimeoutConfig = BatchTimeoutConfig
{ BatchTimeoutConfig -> Int
maxQueueSize :: Int
, BatchTimeoutConfig -> Int
scheduledDelayMillis :: Int
, BatchTimeoutConfig -> Int
exportTimeoutMillis :: Int
, BatchTimeoutConfig -> Int
maxExportBatchSize :: Int
} deriving (Int -> BatchTimeoutConfig -> ShowS
[BatchTimeoutConfig] -> ShowS
BatchTimeoutConfig -> String
(Int -> BatchTimeoutConfig -> ShowS)
-> (BatchTimeoutConfig -> String)
-> ([BatchTimeoutConfig] -> ShowS)
-> Show BatchTimeoutConfig
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [BatchTimeoutConfig] -> ShowS
$cshowList :: [BatchTimeoutConfig] -> ShowS
show :: BatchTimeoutConfig -> String
$cshow :: BatchTimeoutConfig -> String
showsPrec :: Int -> BatchTimeoutConfig -> ShowS
$cshowsPrec :: Int -> BatchTimeoutConfig -> ShowS
Show)
batchTimeoutConfig :: BatchTimeoutConfig
batchTimeoutConfig :: BatchTimeoutConfig
batchTimeoutConfig = BatchTimeoutConfig :: Int -> Int -> Int -> Int -> BatchTimeoutConfig
BatchTimeoutConfig
{ maxQueueSize :: Int
maxQueueSize = Int
1024
, scheduledDelayMillis :: Int
scheduledDelayMillis = Int
5000
, exportTimeoutMillis :: Int
exportTimeoutMillis = Int
30000
, maxExportBatchSize :: Int
maxExportBatchSize = Int
512
}
data BoundedSpanMap = BoundedSpanMap
{ BoundedSpanMap -> Int
itemBounds :: !Int
, BoundedSpanMap -> Int
itemCount :: !Int
, BoundedSpanMap
-> HashMap InstrumentationLibrary (Builder ImmutableSpan)
itemMap :: HashMap InstrumentationLibrary (Builder.Builder ImmutableSpan)
}
boundedSpanMap :: Int -> BoundedSpanMap
boundedSpanMap :: Int -> BoundedSpanMap
boundedSpanMap Int
bounds = Int
-> Int
-> HashMap InstrumentationLibrary (Builder ImmutableSpan)
-> BoundedSpanMap
BoundedSpanMap Int
bounds Int
0 HashMap InstrumentationLibrary (Builder ImmutableSpan)
forall a. Monoid a => a
mempty
pushSpan :: ImmutableSpan -> BoundedSpanMap -> Maybe BoundedSpanMap
pushSpan :: ImmutableSpan -> BoundedSpanMap -> Maybe BoundedSpanMap
pushSpan ImmutableSpan
s BoundedSpanMap
m = if BoundedSpanMap -> Int
itemCount BoundedSpanMap
m Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= BoundedSpanMap -> Int
itemBounds BoundedSpanMap
m
then Maybe BoundedSpanMap
forall a. Maybe a
Nothing
else BoundedSpanMap -> Maybe BoundedSpanMap
forall a. a -> Maybe a
Just (BoundedSpanMap -> Maybe BoundedSpanMap)
-> BoundedSpanMap -> Maybe BoundedSpanMap
forall a b. (a -> b) -> a -> b
$! BoundedSpanMap
m
{ itemCount :: Int
itemCount = BoundedSpanMap -> Int
itemCount BoundedSpanMap
m Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
, itemMap :: HashMap InstrumentationLibrary (Builder ImmutableSpan)
itemMap = (Builder ImmutableSpan
-> Builder ImmutableSpan -> Builder ImmutableSpan)
-> InstrumentationLibrary
-> Builder ImmutableSpan
-> HashMap InstrumentationLibrary (Builder ImmutableSpan)
-> HashMap InstrumentationLibrary (Builder ImmutableSpan)
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> k -> v -> HashMap k v -> HashMap k v
HashMap.insertWith
Builder ImmutableSpan
-> Builder ImmutableSpan -> Builder ImmutableSpan
forall a. Semigroup a => a -> a -> a
(<>)
(Tracer -> InstrumentationLibrary
tracerName (Tracer -> InstrumentationLibrary)
-> Tracer -> InstrumentationLibrary
forall a b. (a -> b) -> a -> b
$ ImmutableSpan -> Tracer
spanTracer ImmutableSpan
s)
(ImmutableSpan -> Builder ImmutableSpan
forall element. element -> Builder element
Builder.singleton ImmutableSpan
s) (HashMap InstrumentationLibrary (Builder ImmutableSpan)
-> HashMap InstrumentationLibrary (Builder ImmutableSpan))
-> HashMap InstrumentationLibrary (Builder ImmutableSpan)
-> HashMap InstrumentationLibrary (Builder ImmutableSpan)
forall a b. (a -> b) -> a -> b
$
BoundedSpanMap
-> HashMap InstrumentationLibrary (Builder ImmutableSpan)
itemMap BoundedSpanMap
m
}
buildSpanExport :: BoundedSpanMap -> (BoundedSpanMap, HashMap InstrumentationLibrary (Vector ImmutableSpan))
buildSpanExport :: BoundedSpanMap
-> (BoundedSpanMap,
HashMap InstrumentationLibrary (Vector ImmutableSpan))
buildSpanExport BoundedSpanMap
m =
( BoundedSpanMap
m { itemCount :: Int
itemCount = Int
0, itemMap :: HashMap InstrumentationLibrary (Builder ImmutableSpan)
itemMap = HashMap InstrumentationLibrary (Builder ImmutableSpan)
forall a. Monoid a => a
mempty }
, Builder ImmutableSpan -> Vector ImmutableSpan
forall (vector :: * -> *) element.
Vector vector element =>
Builder element -> vector element
Builder.build (Builder ImmutableSpan -> Vector ImmutableSpan)
-> HashMap InstrumentationLibrary (Builder ImmutableSpan)
-> HashMap InstrumentationLibrary (Vector ImmutableSpan)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BoundedSpanMap
-> HashMap InstrumentationLibrary (Builder ImmutableSpan)
itemMap BoundedSpanMap
m
)
batchProcessor :: MonadIO m => BatchTimeoutConfig -> Exporter -> m Processor
batchProcessor :: BatchTimeoutConfig -> Exporter -> m Processor
batchProcessor BatchTimeoutConfig{Int
maxExportBatchSize :: Int
exportTimeoutMillis :: Int
scheduledDelayMillis :: Int
maxQueueSize :: Int
maxExportBatchSize :: BatchTimeoutConfig -> Int
exportTimeoutMillis :: BatchTimeoutConfig -> Int
scheduledDelayMillis :: BatchTimeoutConfig -> Int
maxQueueSize :: BatchTimeoutConfig -> Int
..} Exporter
exporter = IO Processor -> m Processor
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Processor -> m Processor) -> IO Processor -> m Processor
forall a b. (a -> b) -> a -> b
$ do
IORef BoundedSpanMap
batch <- BoundedSpanMap -> IO (IORef BoundedSpanMap)
forall a. a -> IO (IORef a)
newIORef (BoundedSpanMap -> IO (IORef BoundedSpanMap))
-> BoundedSpanMap -> IO (IORef BoundedSpanMap)
forall a b. (a -> b) -> a -> b
$ Int -> BoundedSpanMap
boundedSpanMap Int
maxQueueSize
MVar ()
workSignal <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
Async Any
worker <- IO Any -> IO (Async Any)
forall a. IO a -> IO (Async a)
async (IO Any -> IO (Async Any)) -> IO Any -> IO (Async Any)
forall a b. (a -> b) -> a -> b
$ IO ExportResult -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO ExportResult -> IO Any) -> IO ExportResult -> IO Any
forall a b. (a -> b) -> a -> b
$ do
IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout (Int -> Int
millisToMicros Int
scheduledDelayMillis) (IO () -> IO (Maybe ())) -> IO () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
workSignal
HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess <- IORef BoundedSpanMap
-> (BoundedSpanMap
-> (BoundedSpanMap,
HashMap InstrumentationLibrary (Vector ImmutableSpan)))
-> IO (HashMap InstrumentationLibrary (Vector ImmutableSpan))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef BoundedSpanMap
batch BoundedSpanMap
-> (BoundedSpanMap,
HashMap InstrumentationLibrary (Vector ImmutableSpan))
buildSpanExport
Exporter
-> HashMap InstrumentationLibrary (Vector ImmutableSpan)
-> IO ExportResult
Exporter.exporterExport Exporter
exporter HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess
Processor -> IO Processor
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Processor -> IO Processor) -> Processor -> IO Processor
forall a b. (a -> b) -> a -> b
$ Processor :: (IORef ImmutableSpan -> Context -> IO ())
-> (IORef ImmutableSpan -> IO ())
-> IO (Async ShutdownResult)
-> IO ()
-> Processor
Processor
{ processorOnStart :: IORef ImmutableSpan -> Context -> IO ()
processorOnStart = \IORef ImmutableSpan
_ Context
_ -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
, processorOnEnd :: IORef ImmutableSpan -> IO ()
processorOnEnd = \IORef ImmutableSpan
s -> do
ImmutableSpan
span_ <- IORef ImmutableSpan -> IO ImmutableSpan
forall a. IORef a -> IO a
readIORef IORef ImmutableSpan
s
Bool
appendFailed <- IORef BoundedSpanMap
-> (BoundedSpanMap -> (BoundedSpanMap, Bool)) -> IO Bool
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef BoundedSpanMap
batch ((BoundedSpanMap -> (BoundedSpanMap, Bool)) -> IO Bool)
-> (BoundedSpanMap -> (BoundedSpanMap, Bool)) -> IO Bool
forall a b. (a -> b) -> a -> b
$ \BoundedSpanMap
builder ->
case ImmutableSpan -> BoundedSpanMap -> Maybe BoundedSpanMap
pushSpan ImmutableSpan
span_ BoundedSpanMap
builder of
Maybe BoundedSpanMap
Nothing -> (BoundedSpanMap
builder, Bool
True)
Just BoundedSpanMap
b' -> (BoundedSpanMap
b', Bool
False)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
appendFailed (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ()
workSignal ()
, processorForceFlush :: IO ()
processorForceFlush = IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ()
workSignal ()
, processorShutdown :: IO (Async ShutdownResult)
processorShutdown = IO ShutdownResult -> IO (Async ShutdownResult)
forall a. IO a -> IO (Async a)
async (IO ShutdownResult -> IO (Async ShutdownResult))
-> IO ShutdownResult -> IO (Async ShutdownResult)
forall a b. (a -> b) -> a -> b
$ ((forall a. IO a -> IO a) -> IO ShutdownResult)
-> IO ShutdownResult
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO ShutdownResult)
-> IO ShutdownResult)
-> ((forall a. IO a -> IO a) -> IO ShutdownResult)
-> IO ShutdownResult
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
_restore -> do
Maybe ()
shutdownResult <- Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout (Int -> Int
millisToMicros Int
exportTimeoutMillis) (IO () -> IO (Maybe ())) -> IO () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$
Async Any -> IO ()
forall a. Async a -> IO ()
cancel Async Any
worker
case Maybe ()
shutdownResult of
Maybe ()
Nothing -> ShutdownResult -> IO ShutdownResult
forall (f :: * -> *) a. Applicative f => a -> f a
pure ShutdownResult
ShutdownFailure
Just ()
_ -> ShutdownResult -> IO ShutdownResult
forall (f :: * -> *) a. Applicative f => a -> f a
pure ShutdownResult
ShutdownSuccess
}
where
millisToMicros :: Int -> Int
millisToMicros = (Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000)