{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Instana.SDK.Internal.Worker
( spawnWorker
) where
import qualified Control.Concurrent as Concurrent
import qualified Control.Concurrent.STM as STM
import Control.Exception (SomeException,
catch)
import Control.Monad (forever,
when)
import qualified Data.Aeson as Aeson
import Data.Foldable (toList)
import Data.Sequence ((|>))
import qualified Data.Sequence as Seq
import qualified Data.Text as T
import Data.Time.Clock.POSIX (getPOSIXTime)
import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Types.Status as HttpTypes
import System.Log.Logger (debugM,
warningM)
import qualified System.Metrics as Metrics
import qualified Instana.SDK.Internal.AgentConnection.ConnectLoop as ConnectLoop
import Instana.SDK.Internal.AgentConnection.Paths (haskellEntityDataPathPrefix,
haskellTracePluginPath)
import Instana.SDK.Internal.Command (Command (..))
import qualified Instana.SDK.Internal.Config as InternalConfig
import Instana.SDK.Internal.Context (AgentConnection (..),
ConnectionState (..),
InternalContext)
import qualified Instana.SDK.Internal.Context as InternalContext
import qualified Instana.SDK.Internal.Id as Id
import Instana.SDK.Internal.Logging (instanaLogger)
import qualified Instana.SDK.Internal.Metrics.Collector as MetricsCollector
import qualified Instana.SDK.Internal.Metrics.Compression as MetricsCompression
import qualified Instana.SDK.Internal.Metrics.Deltas as Deltas
import qualified Instana.SDK.Internal.Metrics.Sample as Sample
import qualified Instana.SDK.Internal.URL as URL
import Instana.SDK.Internal.W3CTraceContext (InstanaKeyValuePair (..))
import qualified Instana.SDK.Internal.W3CTraceContext as W3CTraceContext
import Instana.SDK.Internal.WireSpan (QueuedSpan (QueuedSpan),
SpanKind (Entry, Exit),
WireSpan (WireSpan))
import qualified Instana.SDK.Internal.WireSpan as WireSpan
import Instana.SDK.Span.EntrySpan (EntrySpan (..))
import qualified Instana.SDK.Span.EntrySpan as EntrySpan
import Instana.SDK.Span.ExitSpan (ExitSpan (..))
import qualified Instana.SDK.Span.ExitSpan as ExitSpan
spawnWorker :: InternalContext -> IO()
spawnWorker :: InternalContext -> IO ()
spawnWorker context :: InternalContext
context = do
String -> String -> IO ()
debugM String
instanaLogger "Spawning the Instana Haskell SDK worker"
ThreadId
_ <- IO () -> IO ThreadId
Concurrent.forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ InternalContext -> IO ()
ConnectLoop.initConnectLoop InternalContext
context
ThreadId
_ <- IO () -> IO ThreadId
Concurrent.forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ InternalContext -> IO ()
initReadLoop InternalContext
context
ThreadId
_ <- IO () -> IO ThreadId
Concurrent.forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ InternalContext -> IO ()
initDrainSpanBufferAfterTimeoutLoop InternalContext
context
ThreadId
_ <- IO () -> IO ThreadId
Concurrent.forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ InternalContext -> IO ()
collectAndSendMetricsLoop InternalContext
context
ThreadId
_ <- IO () -> IO ThreadId
Concurrent.forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ InternalContext -> IO ()
resetPreviouslySendMetrics InternalContext
context
() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
initReadLoop :: InternalContext -> IO()
initReadLoop :: InternalContext -> IO ()
initReadLoop context :: InternalContext
context =
IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ InternalContext -> IO ()
readFromQueue InternalContext
context
readFromQueue :: InternalContext -> IO ()
readFromQueue :: InternalContext -> IO ()
readFromQueue context :: InternalContext
context =
IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch
( do
let
commandQueue :: TQueue Command
commandQueue = InternalContext -> TQueue Command
InternalContext.commandQueue InternalContext
context
Command
command <- STM Command -> IO Command
forall a. STM a -> IO a
STM.atomically (STM Command -> IO Command) -> STM Command -> IO Command
forall a b. (a -> b) -> a -> b
$ TQueue Command -> STM Command
forall a. TQueue a -> STM a
STM.readTQueue TQueue Command
commandQueue
Command -> InternalContext -> IO ()
execute Command
command InternalContext
context
)
(\e :: SomeException
e -> String -> String -> IO ()
warningM String
instanaLogger (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ SomeException -> String
forall a. Show a => a -> String
show (SomeException
e :: SomeException))
execute :: Command -> InternalContext -> IO ()
execute :: Command -> InternalContext -> IO ()
execute (CompleteEntry entrySpan :: EntrySpan
entrySpan) =
EntrySpan -> InternalContext -> IO ()
queueEntrySpan EntrySpan
entrySpan
execute (CompleteExit exitSpan :: ExitSpan
exitSpan) =
ExitSpan -> InternalContext -> IO ()
queueExitSpan ExitSpan
exitSpan
queueEntrySpan :: EntrySpan -> InternalContext -> IO ()
queueEntrySpan :: EntrySpan -> InternalContext -> IO ()
queueEntrySpan entrySpan :: EntrySpan
entrySpan context :: InternalContext
context = do
Int
now <- POSIXTime -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (POSIXTime -> Int) -> (POSIXTime -> POSIXTime) -> POSIXTime -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (POSIXTime -> POSIXTime -> POSIXTime
forall a. Num a => a -> a -> a
* 1000) (POSIXTime -> Int) -> IO POSIXTime -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO POSIXTime
getPOSIXTime
let
timestamp :: Int
timestamp = EntrySpan -> Int
EntrySpan.timestamp EntrySpan
entrySpan
instanaAncestor :: Maybe (String, String)
instanaAncestor =
case ( EntrySpan -> Bool
EntrySpan.tpFlag EntrySpan
entrySpan
, TraceState -> Maybe InstanaKeyValuePair
W3CTraceContext.instanaKeyValuePair (TraceState -> Maybe InstanaKeyValuePair)
-> Maybe TraceState -> Maybe InstanaKeyValuePair
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<
W3CTraceContext -> TraceState
W3CTraceContext.traceState (W3CTraceContext -> TraceState)
-> Maybe W3CTraceContext -> Maybe TraceState
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
EntrySpan -> Maybe W3CTraceContext
EntrySpan.w3cTraceContext EntrySpan
entrySpan
) of
( True
, Just (InstanaKeyValuePair { Id
instanaTraceId :: InstanaKeyValuePair -> Id
instanaTraceId :: Id
instanaTraceId, Id
instanaParentId :: InstanaKeyValuePair -> Id
instanaParentId :: Id
instanaParentId })) ->
(String, String) -> Maybe (String, String)
forall a. a -> Maybe a
Just ( Id -> String
Id.longOrShortTraceId Id
instanaTraceId
, Id -> String
Id.toString Id
instanaParentId
)
_ ->
Maybe (String, String)
forall a. Maybe a
Nothing
InternalContext -> QueuedSpan -> IO ()
queueSpan
InternalContext
context
QueuedSpan :: Id
-> Id
-> Maybe Id
-> Text
-> Int
-> Int
-> SpanKind
-> Int
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Bool
-> Maybe (String, String)
-> Maybe Bool
-> SpanData
-> QueuedSpan
QueuedSpan
{ traceId :: Id
WireSpan.traceId = EntrySpan -> Id
EntrySpan.traceId EntrySpan
entrySpan
, spanId :: Id
WireSpan.spanId = EntrySpan -> Id
EntrySpan.spanId EntrySpan
entrySpan
, parentId :: Maybe Id
WireSpan.parentId = EntrySpan -> Maybe Id
EntrySpan.parentId EntrySpan
entrySpan
, spanName :: Text
WireSpan.spanName = EntrySpan -> Text
EntrySpan.spanName EntrySpan
entrySpan
, timestamp :: Int
WireSpan.timestamp = Int
timestamp
, duration :: Int
WireSpan.duration = Int
now Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
timestamp
, kind :: SpanKind
WireSpan.kind = SpanKind
Entry
, errorCount :: Int
WireSpan.errorCount = EntrySpan -> Int
EntrySpan.errorCount EntrySpan
entrySpan
, serviceName :: Maybe Text
WireSpan.serviceName = EntrySpan -> Maybe Text
EntrySpan.serviceName EntrySpan
entrySpan
, correlationType :: Maybe Text
WireSpan.correlationType = EntrySpan -> Maybe Text
EntrySpan.correlationType EntrySpan
entrySpan
, correlationId :: Maybe Text
WireSpan.correlationId = EntrySpan -> Maybe Text
EntrySpan.correlationId EntrySpan
entrySpan
, tpFlag :: Maybe Bool
WireSpan.tpFlag =
if EntrySpan -> Bool
EntrySpan.tpFlag EntrySpan
entrySpan then Bool -> Maybe Bool
forall a. a -> Maybe a
Just Bool
True else Maybe Bool
forall a. Maybe a
Nothing
, instanaAncestor :: Maybe (String, String)
WireSpan.instanaAncestor = Maybe (String, String)
instanaAncestor
, synthetic :: Maybe Bool
WireSpan.synthetic =
if EntrySpan -> Bool
EntrySpan.synthetic EntrySpan
entrySpan then Bool -> Maybe Bool
forall a. a -> Maybe a
Just Bool
True else Maybe Bool
forall a. Maybe a
Nothing
, spanData :: SpanData
WireSpan.spanData = EntrySpan -> SpanData
EntrySpan.spanData EntrySpan
entrySpan
}
queueExitSpan :: ExitSpan -> InternalContext -> IO ()
queueExitSpan :: ExitSpan -> InternalContext -> IO ()
queueExitSpan exitSpan :: ExitSpan
exitSpan context :: InternalContext
context = do
let
parentSpan :: EntrySpan
parentSpan = ExitSpan -> EntrySpan
ExitSpan.parentSpan ExitSpan
exitSpan
Int
now <- POSIXTime -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (POSIXTime -> Int) -> (POSIXTime -> POSIXTime) -> POSIXTime -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (POSIXTime -> POSIXTime -> POSIXTime
forall a. Num a => a -> a -> a
* 1000) (POSIXTime -> Int) -> IO POSIXTime -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO POSIXTime
getPOSIXTime
InternalContext -> QueuedSpan -> IO ()
queueSpan
InternalContext
context
QueuedSpan :: Id
-> Id
-> Maybe Id
-> Text
-> Int
-> Int
-> SpanKind
-> Int
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Bool
-> Maybe (String, String)
-> Maybe Bool
-> SpanData
-> QueuedSpan
QueuedSpan
{ traceId :: Id
WireSpan.traceId = EntrySpan -> Id
EntrySpan.traceId EntrySpan
parentSpan
, spanId :: Id
WireSpan.spanId = ExitSpan -> Id
ExitSpan.spanId ExitSpan
exitSpan
, parentId :: Maybe Id
WireSpan.parentId = Id -> Maybe Id
forall a. a -> Maybe a
Just (Id -> Maybe Id) -> Id -> Maybe Id
forall a b. (a -> b) -> a -> b
$ EntrySpan -> Id
EntrySpan.spanId EntrySpan
parentSpan
, spanName :: Text
WireSpan.spanName = ExitSpan -> Text
ExitSpan.spanName ExitSpan
exitSpan
, timestamp :: Int
WireSpan.timestamp = ExitSpan -> Int
ExitSpan.timestamp ExitSpan
exitSpan
, duration :: Int
WireSpan.duration = Int
now Int -> Int -> Int
forall a. Num a => a -> a -> a
- ExitSpan -> Int
ExitSpan.timestamp ExitSpan
exitSpan
, kind :: SpanKind
WireSpan.kind = SpanKind
Exit
, errorCount :: Int
WireSpan.errorCount = ExitSpan -> Int
ExitSpan.errorCount ExitSpan
exitSpan
, serviceName :: Maybe Text
WireSpan.serviceName = ExitSpan -> Maybe Text
ExitSpan.serviceName ExitSpan
exitSpan
, correlationType :: Maybe Text
WireSpan.correlationType = Maybe Text
forall a. Maybe a
Nothing
, correlationId :: Maybe Text
WireSpan.correlationId = Maybe Text
forall a. Maybe a
Nothing
, tpFlag :: Maybe Bool
WireSpan.tpFlag = Maybe Bool
forall a. Maybe a
Nothing
, instanaAncestor :: Maybe (String, String)
WireSpan.instanaAncestor = Maybe (String, String)
forall a. Maybe a
Nothing
, synthetic :: Maybe Bool
WireSpan.synthetic = Maybe Bool
forall a. Maybe a
Nothing
, spanData :: SpanData
WireSpan.spanData = ExitSpan -> SpanData
ExitSpan.spanData ExitSpan
exitSpan
}
queueSpan :: InternalContext -> QueuedSpan -> IO ()
queueSpan :: InternalContext -> QueuedSpan -> IO ()
queueSpan context :: InternalContext
context span_ :: QueuedSpan
span_ = do
Seq QueuedSpan
currentSpanQueue <-
STM (Seq QueuedSpan) -> IO (Seq QueuedSpan)
forall a. STM a -> IO a
STM.atomically (STM (Seq QueuedSpan) -> IO (Seq QueuedSpan))
-> STM (Seq QueuedSpan) -> IO (Seq QueuedSpan)
forall a b. (a -> b) -> a -> b
$
TVar (Seq QueuedSpan) -> STM (Seq QueuedSpan)
forall a. TVar a -> STM a
STM.readTVar (TVar (Seq QueuedSpan) -> STM (Seq QueuedSpan))
-> TVar (Seq QueuedSpan) -> STM (Seq QueuedSpan)
forall a b. (a -> b) -> a -> b
$
InternalContext -> TVar (Seq QueuedSpan)
InternalContext.spanQueue InternalContext
context
let
bufferedSpans :: Int
bufferedSpans = Seq QueuedSpan -> Int
forall a. Seq a -> Int
Seq.length Seq QueuedSpan
currentSpanQueue
maxBufferedSpans :: Int
maxBufferedSpans =
FinalConfig -> Int
InternalConfig.maxBufferedSpans (FinalConfig -> Int) -> FinalConfig -> Int
forall a b. (a -> b) -> a -> b
$ InternalContext -> FinalConfig
InternalContext.config InternalContext
context
forceTransmissionStartingAt :: Int
forceTransmissionStartingAt =
FinalConfig -> Int
InternalConfig.forceTransmissionStartingAt (FinalConfig -> Int) -> FinalConfig -> Int
forall a b. (a -> b) -> a -> b
$
InternalContext -> FinalConfig
InternalContext.config InternalContext
context
if Int
bufferedSpans Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
maxBufferedSpans
then do
String -> String -> IO ()
debugM String
instanaLogger "dropping span, buffer limit reached"
() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
else do
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
TVar (Seq QueuedSpan)
-> (Seq QueuedSpan -> Seq QueuedSpan) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
STM.modifyTVar
(InternalContext -> TVar (Seq QueuedSpan)
InternalContext.spanQueue InternalContext
context)
(\q :: Seq QueuedSpan
q -> Seq QueuedSpan
q Seq QueuedSpan -> QueuedSpan -> Seq QueuedSpan
forall a. Seq a -> a -> Seq a
|> QueuedSpan
span_)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when
(Int
bufferedSpans Int -> Int -> Int
forall a. Num a => a -> a -> a
+ 1 Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
forceTransmissionStartingAt)
(InternalContext -> IO ()
drainSpanBuffer InternalContext
context)
initDrainSpanBufferAfterTimeoutLoop :: InternalContext -> IO()
initDrainSpanBufferAfterTimeoutLoop :: InternalContext -> IO ()
initDrainSpanBufferAfterTimeoutLoop context :: InternalContext
context = do
let
delayMilliSeconds :: Int
delayMilliSeconds =
FinalConfig -> Int
InternalConfig.forceTransmissionAfter (FinalConfig -> Int) -> FinalConfig -> Int
forall a b. (a -> b) -> a -> b
$ InternalContext -> FinalConfig
InternalContext.config InternalContext
context
delayMicroSeconds :: Int
delayMicroSeconds = Int
delayMilliSeconds Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000
IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> InternalContext -> IO ()
drainSpanBufferAfterTimeoutLoop Int
delayMicroSeconds InternalContext
context
drainSpanBufferAfterTimeoutLoop :: Int -> InternalContext -> IO()
drainSpanBufferAfterTimeoutLoop :: Int -> InternalContext -> IO ()
drainSpanBufferAfterTimeoutLoop delayMicroSeconds :: Int
delayMicroSeconds context :: InternalContext
context = do
IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch
( do
InternalContext -> IO ()
drainSpanBuffer InternalContext
context
)
(\e :: SomeException
e -> String -> String -> IO ()
warningM String
instanaLogger (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ SomeException -> String
forall a. Show a => a -> String
show (SomeException
e :: SomeException))
Int -> IO ()
Concurrent.threadDelay Int
delayMicroSeconds
drainSpanBuffer :: InternalContext -> IO ()
drainSpanBuffer :: InternalContext -> IO ()
drainSpanBuffer context :: InternalContext
context = do
Seq QueuedSpan
spansSeq <- STM (Seq QueuedSpan) -> IO (Seq QueuedSpan)
forall a. STM a -> IO a
STM.atomically (STM (Seq QueuedSpan) -> IO (Seq QueuedSpan))
-> STM (Seq QueuedSpan) -> IO (Seq QueuedSpan)
forall a b. (a -> b) -> a -> b
$
TVar (Seq QueuedSpan) -> Seq QueuedSpan -> STM (Seq QueuedSpan)
forall a. TVar a -> a -> STM a
STM.swapTVar (InternalContext -> TVar (Seq QueuedSpan)
InternalContext.spanQueue InternalContext
context) Seq QueuedSpan
forall a. Seq a
Seq.empty
let
spans :: [QueuedSpan]
spans :: [QueuedSpan]
spans = Seq QueuedSpan -> [QueuedSpan]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList Seq QueuedSpan
spansSeq
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ [QueuedSpan] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [QueuedSpan]
spans) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
InternalContext -> (AgentConnection -> Store -> IO ()) -> IO ()
InternalContext.whenConnected InternalContext
context ((AgentConnection -> Store -> IO ()) -> IO ())
-> (AgentConnection -> Store -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$
InternalContext
-> [QueuedSpan] -> AgentConnection -> Store -> IO ()
sendSpansToAgent InternalContext
context [QueuedSpan]
spans
sendSpansToAgent ::
InternalContext
-> [QueuedSpan]
-> AgentConnection
-> Metrics.Store
-> IO ()
sendSpansToAgent :: InternalContext
-> [QueuedSpan] -> AgentConnection -> Store -> IO ()
sendSpansToAgent context :: InternalContext
context spans :: [QueuedSpan]
spans agentConnection :: AgentConnection
agentConnection _ = do
let
agentHost :: String
agentHost = AgentConnection -> String
InternalContext.agentHost AgentConnection
agentConnection
agentPort :: Int
agentPort = AgentConnection -> Int
InternalContext.agentPort AgentConnection
agentConnection
translatedPidStr :: String
translatedPidStr = AgentConnection -> String
InternalContext.pid AgentConnection
agentConnection
agentUuid :: Text
agentUuid = AgentConnection -> Text
InternalContext.agentUuid AgentConnection
agentConnection
serviceNameConfig :: Maybe Text
serviceNameConfig =
String -> Text
T.pack (String -> Text) -> Maybe String -> Maybe Text
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (FinalConfig -> Maybe String
InternalConfig.serviceName (FinalConfig -> Maybe String)
-> (InternalContext -> FinalConfig)
-> InternalContext
-> Maybe String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. InternalContext -> FinalConfig
InternalContext.config (InternalContext -> Maybe String)
-> InternalContext -> Maybe String
forall a b. (a -> b) -> a -> b
$ InternalContext
context)
traceEndpointUrl :: String
traceEndpointUrl =
(URL -> String
forall a. Show a => a -> String
show (URL -> String) -> URL -> String
forall a b. (a -> b) -> a -> b
$
String -> Int -> String -> URL
URL.mkHttp String
agentHost Int
agentPort String
haskellTracePluginPath
) String -> String -> String
forall a. [a] -> [a] -> [a]
++ "." String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
translatedPidStr
wireSpans :: [WireSpan]
wireSpans = (QueuedSpan -> WireSpan) -> [QueuedSpan] -> [WireSpan]
forall a b. (a -> b) -> [a] -> [b]
map
(\queuedSpan :: QueuedSpan
queuedSpan ->
WireSpan :: QueuedSpan -> String -> Text -> Maybe Text -> WireSpan
WireSpan {
queuedSpan :: QueuedSpan
WireSpan.queuedSpan = QueuedSpan
queuedSpan
, pid :: String
WireSpan.pid = String
translatedPidStr
, agentUuid :: Text
WireSpan.agentUuid = Text
agentUuid
, serviceNameConfig :: Maybe Text
WireSpan.serviceNameConfig = Maybe Text
serviceNameConfig
}
) [QueuedSpan]
spans
Request
defaultRequestSettings <- String -> IO Request
forall (m :: * -> *). MonadThrow m => String -> m Request
HTTP.parseUrlThrow String
traceEndpointUrl
let
request :: Request
request =
Request
defaultRequestSettings
{ method :: Method
HTTP.method = "POST"
, requestBody :: RequestBody
HTTP.requestBody = ByteString -> RequestBody
HTTP.RequestBodyLBS (ByteString -> RequestBody) -> ByteString -> RequestBody
forall a b. (a -> b) -> a -> b
$ [WireSpan] -> ByteString
forall a. ToJSON a => a -> ByteString
Aeson.encode [WireSpan]
wireSpans
, requestHeaders :: RequestHeaders
HTTP.requestHeaders =
[ ("Accept", "application/json")
, ("Content-Type", "application/json; charset=UTF-8'")
]
}
IO () -> (HttpException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch
(do
Response ByteString
_ <- Request -> Manager -> IO (Response ByteString)
HTTP.httpLbs Request
request (Manager -> IO (Response ByteString))
-> Manager -> IO (Response ByteString)
forall a b. (a -> b) -> a -> b
$ InternalContext -> Manager
InternalContext.httpManager InternalContext
context
() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
)
(\(HttpException
e :: HTTP.HttpException) -> do
let
statusCode :: Int
statusCode =
case HttpException
e of
HTTP.HttpExceptionRequest _ (HTTP.StatusCodeException response :: Response ()
response _) ->
Status -> Int
HttpTypes.statusCode (Response () -> Status
forall body. Response body -> Status
HTTP.responseStatus Response ()
response)
_ ->
0
if Int
statusCode Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== 404
then do
String -> String -> IO ()
warningM String
instanaLogger (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$
"Received HTTP 404 when sending spans to " String -> String -> String
forall a. [a] -> [a] -> [a]
++
String -> String
forall a. Show a => a -> String
show String
traceEndpointUrl String -> String -> String
forall a. [a] -> [a] -> [a]
++
", resetting connection state to unconnected."
InternalContext -> IO ()
resetToUnconnected InternalContext
context
else do
String -> String -> IO ()
debugM String
instanaLogger (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ HttpException -> String
forall a. Show a => a -> String
show HttpException
e
() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
)
collectAndSendMetricsLoop :: InternalContext -> IO()
collectAndSendMetricsLoop :: InternalContext -> IO ()
collectAndSendMetricsLoop context :: InternalContext
context = do
let
delayMicroSeconds :: Int
delayMicroSeconds = 1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000
Int -> IO ()
Concurrent.threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ 500 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000
IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> InternalContext -> IO ()
collectAndSendMetricsSafe Int
delayMicroSeconds InternalContext
context
resetPreviouslySendMetrics :: InternalContext -> IO()
resetPreviouslySendMetrics :: InternalContext -> IO ()
resetPreviouslySendMetrics context :: InternalContext
context = do
let
delayMicroSeconds :: Int
delayMicroSeconds = 5 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 60 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000
IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch
(STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar TimedSample -> (TimedSample -> TimedSample) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
STM.modifyTVar
(InternalContext -> TVar TimedSample
InternalContext.previousMetricsSample InternalContext
context)
TimedSample -> TimedSample
Sample.markForReset
)
(\e :: SomeException
e -> String -> String -> IO ()
warningM String
instanaLogger (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ SomeException -> String
forall a. Show a => a -> String
show (SomeException
e :: SomeException))
Int -> IO ()
Concurrent.threadDelay Int
delayMicroSeconds
collectAndSendMetricsSafe :: Int -> InternalContext -> IO()
collectAndSendMetricsSafe :: Int -> InternalContext -> IO ()
collectAndSendMetricsSafe delayMicroSeconds :: Int
delayMicroSeconds context :: InternalContext
context = do
IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch
( do
InternalContext -> IO ()
collectAndSendMetricsWhenConnected InternalContext
context
)
(\e :: SomeException
e -> String -> String -> IO ()
warningM String
instanaLogger (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ SomeException -> String
forall a. Show a => a -> String
show (SomeException
e :: SomeException))
Int -> IO ()
Concurrent.threadDelay Int
delayMicroSeconds
collectAndSendMetricsWhenConnected :: InternalContext -> IO ()
collectAndSendMetricsWhenConnected :: InternalContext -> IO ()
collectAndSendMetricsWhenConnected context :: InternalContext
context =
InternalContext -> (AgentConnection -> Store -> IO ()) -> IO ()
InternalContext.whenConnected InternalContext
context ((AgentConnection -> Store -> IO ()) -> IO ())
-> (AgentConnection -> Store -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$
InternalContext -> AgentConnection -> Store -> IO ()
collectAndSendMetrics InternalContext
context
collectAndSendMetrics ::
InternalContext
-> AgentConnection
-> Metrics.Store
-> IO ()
collectAndSendMetrics :: InternalContext -> AgentConnection -> Store -> IO ()
collectAndSendMetrics context :: InternalContext
context agentConnection :: AgentConnection
agentConnection metricsStore :: Store
metricsStore = do
TimedSample
previousSample <-
STM TimedSample -> IO TimedSample
forall a. STM a -> IO a
STM.atomically (STM TimedSample -> IO TimedSample)
-> STM TimedSample -> IO TimedSample
forall a b. (a -> b) -> a -> b
$ TVar TimedSample -> STM TimedSample
forall a. TVar a -> STM a
STM.readTVar (TVar TimedSample -> STM TimedSample)
-> TVar TimedSample -> STM TimedSample
forall a b. (a -> b) -> a -> b
$ InternalContext -> TVar TimedSample
InternalContext.previousMetricsSample InternalContext
context
Int
now <- POSIXTime -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (POSIXTime -> Int) -> (POSIXTime -> POSIXTime) -> POSIXTime -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (POSIXTime -> POSIXTime -> POSIXTime
forall a. Num a => a -> a -> a
* 1000) (POSIXTime -> Int) -> IO POSIXTime -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO POSIXTime
getPOSIXTime
Sample
sampledMetrics <- Store -> IO Sample
MetricsCollector.sampleAll Store
metricsStore
let
currentSample :: TimedSample
currentSample = Sample -> Int -> TimedSample
Sample.timedSampleFromEkgSample Sample
sampledMetrics Int
now
enrichedSample :: TimedSample
enrichedSample = TimedSample -> TimedSample -> TimedSample
Deltas.enrichWithDeltas TimedSample
previousSample TimedSample
currentSample
compressedMetrics :: InstanaSample
compressedMetrics =
InstanaSample -> InstanaSample -> InstanaSample
MetricsCompression.compressSample
(TimedSample -> InstanaSample
Sample.sample TimedSample
previousSample)
(TimedSample -> InstanaSample
Sample.sample TimedSample
enrichedSample)
agentHost :: String
agentHost = AgentConnection -> String
InternalContext.agentHost AgentConnection
agentConnection
agentPort :: Int
agentPort = AgentConnection -> Int
InternalContext.agentPort AgentConnection
agentConnection
translatedPidStr :: String
translatedPidStr = AgentConnection -> String
InternalContext.pid AgentConnection
agentConnection
metricsEndpointUrl :: URL
metricsEndpointUrl =
String -> Int -> String -> URL
URL.mkHttp
String
agentHost
Int
agentPort
(String
haskellEntityDataPathPrefix String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
translatedPidStr)
Request
defaultRequestSettings <- String -> IO Request
forall (m :: * -> *). MonadThrow m => String -> m Request
HTTP.parseUrlThrow (String -> IO Request) -> String -> IO Request
forall a b. (a -> b) -> a -> b
$ URL -> String
forall a. Show a => a -> String
show URL
metricsEndpointUrl
let
request :: Request
request =
Request
defaultRequestSettings
{ method :: Method
HTTP.method = "POST"
, requestBody :: RequestBody
HTTP.requestBody =
ByteString -> RequestBody
HTTP.RequestBodyLBS (ByteString -> RequestBody) -> ByteString -> RequestBody
forall a b. (a -> b) -> a -> b
$
SampleJson -> ByteString
forall a. ToJSON a => a -> ByteString
Aeson.encode (SampleJson -> ByteString) -> SampleJson -> ByteString
forall a b. (a -> b) -> a -> b
$ InstanaSample -> SampleJson
Sample.SampleJson InstanaSample
compressedMetrics
, requestHeaders :: RequestHeaders
HTTP.requestHeaders =
[ ("Accept", "application/json")
, ("Content-Type", "application/json; charset=UTF-8'")
]
}
IO () -> (HttpException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch
(do
Response ByteString
_ <- Request -> Manager -> IO (Response ByteString)
HTTP.httpLbs Request
request (Manager -> IO (Response ByteString))
-> Manager -> IO (Response ByteString)
forall a b. (a -> b) -> a -> b
$ InternalContext -> Manager
InternalContext.httpManager InternalContext
context
()
_ <- STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
TVar TimedSample -> TimedSample -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar
(InternalContext -> TVar TimedSample
InternalContext.previousMetricsSample InternalContext
context)
TimedSample
enrichedSample
() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
)
(\(HttpException
e :: HTTP.HttpException) -> do
let
statusCode :: Int
statusCode =
case HttpException
e of
HTTP.HttpExceptionRequest _ (HTTP.StatusCodeException response :: Response ()
response _) ->
Status -> Int
HttpTypes.statusCode (Response () -> Status
forall body. Response body -> Status
HTTP.responseStatus Response ()
response)
_ ->
0
if Int
statusCode Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== 404
then do
String -> String -> IO ()
warningM String
instanaLogger (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$
"Received HTTP 404 when sending metrics to " String -> String -> String
forall a. [a] -> [a] -> [a]
++
URL -> String
forall a. Show a => a -> String
show URL
metricsEndpointUrl String -> String -> String
forall a. [a] -> [a] -> [a]
++
", resetting connection state to unconnected."
InternalContext -> IO ()
resetToUnconnected InternalContext
context
else do
String -> String -> IO ()
debugM String
instanaLogger (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ HttpException -> String
forall a. Show a => a -> String
show HttpException
e
() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
)
resetToUnconnected :: InternalContext -> IO ()
resetToUnconnected :: InternalContext -> IO ()
resetToUnconnected context :: InternalContext
context = do
STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
TVar ConnectionState
-> (ConnectionState -> ConnectionState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
STM.modifyTVar'
(InternalContext -> TVar ConnectionState
InternalContext.connectionState InternalContext
context)
(\state :: ConnectionState
state ->
case ConnectionState
state of
AgentReady _ ->
ConnectionState
Unconnected
_ ->
ConnectionState
state
)