{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Instana.SDK.Internal.Worker
( spawnWorker
) where
import qualified Control.Concurrent as Concurrent
import qualified Control.Concurrent.STM as STM
import Control.Exception (SomeException,
catch)
import Control.Monad (forever,
when)
import qualified Data.Aeson as Aeson
import Data.Foldable (toList)
import Data.List (map)
import Data.Sequence ((|>))
import qualified Data.Sequence as Seq
import qualified Data.Text as T
import Data.Time.Clock.POSIX (getPOSIXTime)
import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Types.Status as HttpTypes
import System.Log.Logger (debugM,
warningM)
import qualified System.Metrics as Metrics
import qualified Instana.SDK.Internal.AgentConnection.ConnectLoop as ConnectLoop
import Instana.SDK.Internal.AgentConnection.Paths (haskellEntityDataPathPrefix,
haskellTracePluginPath)
import Instana.SDK.Internal.Command (Command (..))
import qualified Instana.SDK.Internal.Config as InternalConfig
import Instana.SDK.Internal.Context (AgentConnection (..),
ConnectionState (..),
InternalContext)
import qualified Instana.SDK.Internal.Context as InternalContext
import Instana.SDK.Internal.Logging (instanaLogger)
import qualified Instana.SDK.Internal.Metrics.Collector as MetricsCollector
import qualified Instana.SDK.Internal.Metrics.Compression as MetricsCompression
import qualified Instana.SDK.Internal.Metrics.Deltas as Deltas
import qualified Instana.SDK.Internal.Metrics.Sample as Sample
import qualified Instana.SDK.Internal.URL as URL
import Instana.SDK.Internal.WireSpan (QueuedSpan (QueuedSpan),
SpanKind (Entry, Exit),
WireSpan (WireSpan))
import qualified Instana.SDK.Internal.WireSpan as WireSpan
import Instana.SDK.Span.EntrySpan (EntrySpan (..))
import qualified Instana.SDK.Span.EntrySpan as EntrySpan
import Instana.SDK.Span.ExitSpan (ExitSpan (..))
import qualified Instana.SDK.Span.ExitSpan as ExitSpan
spawnWorker :: InternalContext -> IO()
spawnWorker context = do
debugM instanaLogger "Spawning the Instana Haskell SDK worker"
_ <- Concurrent.forkIO $ ConnectLoop.initConnectLoop context
_ <- Concurrent.forkIO $ initReadLoop context
_ <- Concurrent.forkIO $ initDrainSpanBufferAfterTimeoutLoop context
_ <- Concurrent.forkIO $ collectAndSendMetricsLoop context
_ <- Concurrent.forkIO $ resetPreviouslySendMetrics context
return ()
initReadLoop :: InternalContext -> IO()
initReadLoop context =
forever $ readFromQueue context
readFromQueue :: InternalContext -> IO ()
readFromQueue context =
catch
( do
let
commandQueue = InternalContext.commandQueue context
command <- STM.atomically $ STM.readTQueue commandQueue
execute command context
)
(\e -> warningM instanaLogger $ show (e :: SomeException))
execute :: Command -> InternalContext -> IO ()
execute (CompleteEntry entrySpan) =
queueEntrySpan entrySpan
execute (CompleteExit exitSpan) =
queueExitSpan exitSpan
queueEntrySpan :: EntrySpan -> InternalContext -> IO ()
queueEntrySpan entrySpan context = do
now <- round . (* 1000) <$> getPOSIXTime
let
timestamp = EntrySpan.timestamp entrySpan
queueSpan
context
QueuedSpan
{ WireSpan.traceId = EntrySpan.traceId entrySpan
, WireSpan.spanId = EntrySpan.spanId entrySpan
, WireSpan.parentId = EntrySpan.parentId entrySpan
, WireSpan.spanName = EntrySpan.spanName entrySpan
, WireSpan.timestamp = timestamp
, WireSpan.duration = now - timestamp
, WireSpan.kind = Entry
, WireSpan.errorCount = EntrySpan.errorCount entrySpan
, WireSpan.serviceName = EntrySpan.serviceName entrySpan
, WireSpan.spanData = EntrySpan.spanData entrySpan
}
queueExitSpan :: ExitSpan -> InternalContext -> IO ()
queueExitSpan exitSpan context = do
let
parentSpan = ExitSpan.parentSpan exitSpan
now <- round . (* 1000) <$> getPOSIXTime
queueSpan
context
QueuedSpan
{ WireSpan.traceId = EntrySpan.traceId parentSpan
, WireSpan.spanId = ExitSpan.spanId exitSpan
, WireSpan.parentId = Just $ EntrySpan.spanId parentSpan
, WireSpan.spanName = ExitSpan.spanName exitSpan
, WireSpan.timestamp = ExitSpan.timestamp exitSpan
, WireSpan.duration = now - ExitSpan.timestamp exitSpan
, WireSpan.kind = Exit
, WireSpan.errorCount = ExitSpan.errorCount exitSpan
, WireSpan.serviceName = ExitSpan.serviceName exitSpan
, WireSpan.spanData = ExitSpan.spanData exitSpan
}
queueSpan :: InternalContext -> QueuedSpan -> IO ()
queueSpan context span_ = do
currentSpanQueue <-
STM.atomically $
STM.readTVar $
InternalContext.spanQueue context
let
bufferedSpans = Seq.length currentSpanQueue
maxBufferedSpans =
InternalConfig.maxBufferedSpans $ InternalContext.config context
forceTransmissionStartingAt =
InternalConfig.forceTransmissionStartingAt $
InternalContext.config context
if bufferedSpans >= maxBufferedSpans
then do
debugM instanaLogger "dropping span, buffer limit reached"
return ()
else do
STM.atomically $
STM.modifyTVar
(InternalContext.spanQueue context)
(\q -> q |> span_)
when
(bufferedSpans + 1 >= forceTransmissionStartingAt)
(drainSpanBuffer context)
initDrainSpanBufferAfterTimeoutLoop :: InternalContext -> IO()
initDrainSpanBufferAfterTimeoutLoop context = do
let
delayMilliSeconds =
InternalConfig.forceTransmissionAfter $ InternalContext.config context
delayMicroSeconds = delayMilliSeconds * 1000
forever $ drainSpanBufferAfterTimeoutLoop delayMicroSeconds context
drainSpanBufferAfterTimeoutLoop :: Int -> InternalContext -> IO()
drainSpanBufferAfterTimeoutLoop delayMicroSeconds context = do
catch
( do
drainSpanBuffer context
)
(\e -> warningM instanaLogger $ show (e :: SomeException))
Concurrent.threadDelay delayMicroSeconds
drainSpanBuffer :: InternalContext -> IO ()
drainSpanBuffer context = do
spansSeq <- STM.atomically $
STM.swapTVar (InternalContext.spanQueue context) Seq.empty
let
spans :: [QueuedSpan]
spans = toList spansSeq
when (not $ null spans) $ do
InternalContext.whenConnected context $
sendSpansToAgent context spans
sendSpansToAgent ::
InternalContext
-> [QueuedSpan]
-> AgentConnection
-> Metrics.Store
-> IO ()
sendSpansToAgent context spans agentConnection _ = do
let
agentHost = InternalContext.agentHost agentConnection
agentPort = InternalContext.agentPort agentConnection
translatedPidStr = InternalContext.pid agentConnection
agentUuid = InternalContext.agentUuid agentConnection
serviceNameConfig =
T.pack <$> (InternalConfig.serviceName . InternalContext.config $ context)
traceEndpointUrl =
(show $
URL.mkHttp agentHost agentPort haskellTracePluginPath
) ++ "." ++ translatedPidStr
wireSpans = map
(\queuedSpan ->
WireSpan {
WireSpan.queuedSpan = queuedSpan
, WireSpan.pid = translatedPidStr
, WireSpan.agentUuid = agentUuid
, WireSpan.serviceNameConfig = serviceNameConfig
}
) spans
defaultRequestSettings <- HTTP.parseUrlThrow traceEndpointUrl
let
request =
defaultRequestSettings
{ HTTP.method = "POST"
, HTTP.requestBody = HTTP.RequestBodyLBS $ Aeson.encode wireSpans
, HTTP.requestHeaders =
[ ("Accept", "application/json")
, ("Content-Type", "application/json; charset=UTF-8'")
]
}
catch
(do
_ <- HTTP.httpLbs request $ InternalContext.httpManager context
return ()
)
(\(e :: HTTP.HttpException) -> do
let
statusCode =
case e of
HTTP.HttpExceptionRequest _ (HTTP.StatusCodeException response _) ->
HttpTypes.statusCode (HTTP.responseStatus response)
_ ->
0
if statusCode == 404
then
resetToUnconnected context
else do
debugM instanaLogger $ show e
return ()
)
collectAndSendMetricsLoop :: InternalContext -> IO()
collectAndSendMetricsLoop context = do
let
delayMicroSeconds = 1000 * 1000
Concurrent.threadDelay $ 500 * 1000
forever $ collectAndSendMetricsSafe delayMicroSeconds context
resetPreviouslySendMetrics :: InternalContext -> IO()
resetPreviouslySendMetrics context = do
let
delayMicroSeconds = 5 * 60 * 1000 * 1000
forever $ do
catch
(STM.atomically $ STM.modifyTVar
(InternalContext.previousMetricsSample context)
Sample.markForReset
)
(\e -> warningM instanaLogger $ show (e :: SomeException))
Concurrent.threadDelay delayMicroSeconds
collectAndSendMetricsSafe :: Int -> InternalContext -> IO()
collectAndSendMetricsSafe delayMicroSeconds context = do
catch
( do
collectAndSendMetricsWhenConnected context
)
(\e -> warningM instanaLogger $ show (e :: SomeException))
Concurrent.threadDelay delayMicroSeconds
collectAndSendMetricsWhenConnected :: InternalContext -> IO ()
collectAndSendMetricsWhenConnected context =
InternalContext.whenConnected context $
collectAndSendMetrics context
collectAndSendMetrics ::
InternalContext
-> AgentConnection
-> Metrics.Store
-> IO ()
collectAndSendMetrics context agentConnection metricsStore = do
previousSample <-
STM.atomically $ STM.readTVar $ InternalContext.previousMetricsSample context
now <- round . (* 1000) <$> getPOSIXTime
sampledMetrics <- MetricsCollector.sampleAll metricsStore
let
currentSample = Sample.timedSampleFromEkgSample sampledMetrics now
enrichedSample = Deltas.enrichWithDeltas previousSample currentSample
compressedMetrics =
MetricsCompression.compressSample
(Sample.sample previousSample)
(Sample.sample enrichedSample)
agentHost = InternalContext.agentHost agentConnection
agentPort = InternalContext.agentPort agentConnection
translatedPidStr = InternalContext.pid agentConnection
metricsEndpointUrl =
URL.mkHttp
agentHost
agentPort
(haskellEntityDataPathPrefix ++ translatedPidStr)
defaultRequestSettings <- HTTP.parseUrlThrow $ show metricsEndpointUrl
let
request =
defaultRequestSettings
{ HTTP.method = "POST"
, HTTP.requestBody =
HTTP.RequestBodyLBS $
Aeson.encode $ Sample.SampleJson compressedMetrics
, HTTP.requestHeaders =
[ ("Accept", "application/json")
, ("Content-Type", "application/json; charset=UTF-8'")
]
}
catch
(do
_ <- HTTP.httpLbs request $ InternalContext.httpManager context
_ <- STM.atomically $
STM.writeTVar
(InternalContext.previousMetricsSample context)
enrichedSample
return ()
)
(\(e :: HTTP.HttpException) -> do
let
statusCode =
case e of
HTTP.HttpExceptionRequest _ (HTTP.StatusCodeException response _) ->
HttpTypes.statusCode (HTTP.responseStatus response)
_ ->
0
if statusCode == 404
then
resetToUnconnected context
else do
debugM instanaLogger $ show e
return ()
)
resetToUnconnected :: InternalContext -> IO ()
resetToUnconnected context = do
STM.atomically $
STM.modifyTVar'
(InternalContext.connectionState context)
(\state ->
case state of
AgentReady _ ->
Unconnected
_ ->
state
)