{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE OverloadedStrings   #-}
{-# LANGUAGE RecordWildCards     #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-|
Module      : Instana.SDK.Internal.Worker
Description : Manages the SDKs background worker threads
-}
module Instana.SDK.Internal.Worker
    ( spawnWorker
    ) where


import qualified Control.Concurrent                               as Concurrent
import qualified Control.Concurrent.STM                           as STM
import           Control.Exception                                (SomeException,
                                                                   catch)
import           Control.Monad                                    (forever,
                                                                   when)
import qualified Data.Aeson                                       as Aeson
import           Data.Foldable                                    (toList)
import           Data.Sequence                                    ((|>))
import qualified Data.Sequence                                    as Seq
import qualified Data.Text                                        as T
import           Data.Time.Clock.POSIX                            (getPOSIXTime)
import qualified Network.HTTP.Client                              as HTTP
import qualified Network.HTTP.Types.Status                        as HttpTypes
import           System.Log.Logger                                (debugM,
                                                                   warningM)
import qualified System.Metrics                                   as Metrics

import qualified Instana.SDK.Internal.AgentConnection.ConnectLoop as ConnectLoop
import           Instana.SDK.Internal.AgentConnection.Paths       (haskellEntityDataPathPrefix,
                                                                   haskellTracePluginPath)
import           Instana.SDK.Internal.Command                     (Command (..))
import qualified Instana.SDK.Internal.Config                      as InternalConfig
import           Instana.SDK.Internal.Context                     (AgentConnection (..),
                                                                   ConnectionState (..),
                                                                   InternalContext)
import qualified Instana.SDK.Internal.Context                     as InternalContext
import qualified Instana.SDK.Internal.Id                          as Id
import           Instana.SDK.Internal.Logging                     (instanaLogger)
import qualified Instana.SDK.Internal.Metrics.Collector           as MetricsCollector
import qualified Instana.SDK.Internal.Metrics.Compression         as MetricsCompression
import qualified Instana.SDK.Internal.Metrics.Deltas              as Deltas
import qualified Instana.SDK.Internal.Metrics.Sample              as Sample
import qualified Instana.SDK.Internal.URL                         as URL
import           Instana.SDK.Internal.W3CTraceContext             (InstanaKeyValuePair (..))
import qualified Instana.SDK.Internal.W3CTraceContext             as W3CTraceContext
import           Instana.SDK.Internal.WireSpan                    (QueuedSpan (QueuedSpan),
                                                                   SpanKind (Entry, Exit),
                                                                   WireSpan (WireSpan))
import qualified Instana.SDK.Internal.WireSpan                    as WireSpan
import           Instana.SDK.Span.EntrySpan                       (EntrySpan (..))
import qualified Instana.SDK.Span.EntrySpan                       as EntrySpan
import           Instana.SDK.Span.ExitSpan                        (ExitSpan (..))
import qualified Instana.SDK.Span.ExitSpan                        as ExitSpan


-- |Spawns the SDK's worker. There should only be one worker at any time.
spawnWorker :: InternalContext -> IO()
spawnWorker :: InternalContext -> IO ()
spawnWorker context :: InternalContext
context = do
  String -> String -> IO ()
debugM String
instanaLogger "Spawning the Instana Haskell SDK worker"

  -- The worker starts five threads, which continuously:
  --
  -- 1) Check if the connection to the agent is already/still up. If not, this
  --    thread will start to establish a connection to the agent.
  ThreadId
_ <- IO () -> IO ThreadId
Concurrent.forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ InternalContext -> IO ()
ConnectLoop.initConnectLoop InternalContext
context

  -- 2) Read commands (incoming spans) from the command queue and put arriving
  --    spans into the worker's local span buffer. This will happen regardless
  --    of the agent connection state. If a certain amount of spans are in the
  --    local buffer, we'll drain the buffer and, if connected, try to send the
  --    spans to the agent. If not connected, these spans will be dropped. This
  --    avoids excessive memory consumption at the expense of losing spans.
  ThreadId
_ <- IO () -> IO ThreadId
Concurrent.forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ InternalContext -> IO ()
initReadLoop InternalContext
context

  -- 3) Drain the local span buffer once every second and try to send the
  --    buffered spans, if any. Again, sending the spans will only be attempted
  --    when connected, see above.
  ThreadId
_ <- IO () -> IO ThreadId
Concurrent.forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ InternalContext -> IO ()
initDrainSpanBufferAfterTimeoutLoop InternalContext
context

  -- 4) Collect and send metrics, if connected.
  ThreadId
_ <- IO () -> IO ThreadId
Concurrent.forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ InternalContext -> IO ()
collectAndSendMetricsLoop InternalContext
context

  -- 5) Make sure full metrics instad of diffs get send every five minutes
  ThreadId
_ <- IO () -> IO ThreadId
Concurrent.forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ InternalContext -> IO ()
resetPreviouslySendMetrics InternalContext
context

  () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()


{-| Read commands (incoming spans) from the command queue and put arriving spans
into the worker's local span buffer. This will happen regardless of the agent
connection state. If a certain amount of spans are in the local buffer, we'll
drain the buffer and, if connected, try to send the spans to the agent. If not
connected, these spans will be dropped. This avoids excessive memory consumption
at the expense of losing spans.
-}
initReadLoop :: InternalContext -> IO()
initReadLoop :: InternalContext -> IO ()
initReadLoop context :: InternalContext
context =
  IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ InternalContext -> IO ()
readFromQueue InternalContext
context


readFromQueue :: InternalContext -> IO ()
readFromQueue :: InternalContext -> IO ()
readFromQueue context :: InternalContext
context =
  IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch
    ( do
        let
          commandQueue :: TQueue Command
commandQueue = InternalContext -> TQueue Command
InternalContext.commandQueue InternalContext
context
        Command
command <- STM Command -> IO Command
forall a. STM a -> IO a
STM.atomically (STM Command -> IO Command) -> STM Command -> IO Command
forall a b. (a -> b) -> a -> b
$ TQueue Command -> STM Command
forall a. TQueue a -> STM a
STM.readTQueue TQueue Command
commandQueue
        Command -> InternalContext -> IO ()
execute Command
command InternalContext
context
    )
    -- exceptions in execute (or reading from queue) must not kill the loop, so
    -- we just catch everything
    (\e :: SomeException
e -> String -> String -> IO ()
warningM String
instanaLogger (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ SomeException -> String
forall a. Show a => a -> String
show (SomeException
e :: SomeException))


execute :: Command -> InternalContext -> IO ()
execute :: Command -> InternalContext -> IO ()
execute (CompleteEntry entrySpan :: EntrySpan
entrySpan) =
  EntrySpan -> InternalContext -> IO ()
queueEntrySpan EntrySpan
entrySpan
execute (CompleteExit exitSpan :: ExitSpan
exitSpan) =
  ExitSpan -> InternalContext -> IO ()
queueExitSpan ExitSpan
exitSpan


queueEntrySpan :: EntrySpan -> InternalContext -> IO ()
queueEntrySpan :: EntrySpan -> InternalContext -> IO ()
queueEntrySpan entrySpan :: EntrySpan
entrySpan context :: InternalContext
context = do
  Int
now <- POSIXTime -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (POSIXTime -> Int) -> (POSIXTime -> POSIXTime) -> POSIXTime -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (POSIXTime -> POSIXTime -> POSIXTime
forall a. Num a => a -> a -> a
* 1000) (POSIXTime -> Int) -> IO POSIXTime -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO POSIXTime
getPOSIXTime
  let
    timestamp :: Int
timestamp = EntrySpan -> Int
EntrySpan.timestamp EntrySpan
entrySpan
    instanaAncestor :: Maybe (String, String)
instanaAncestor =
      case ( EntrySpan -> Bool
EntrySpan.tpFlag EntrySpan
entrySpan
           , TraceState -> Maybe InstanaKeyValuePair
W3CTraceContext.instanaKeyValuePair (TraceState -> Maybe InstanaKeyValuePair)
-> Maybe TraceState -> Maybe InstanaKeyValuePair
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<
             W3CTraceContext -> TraceState
W3CTraceContext.traceState (W3CTraceContext -> TraceState)
-> Maybe W3CTraceContext -> Maybe TraceState
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
             EntrySpan -> Maybe W3CTraceContext
EntrySpan.w3cTraceContext EntrySpan
entrySpan
           ) of
        ( True
          , Just (InstanaKeyValuePair { Id
instanaTraceId :: InstanaKeyValuePair -> Id
instanaTraceId :: Id
instanaTraceId, Id
instanaParentId :: InstanaKeyValuePair -> Id
instanaParentId :: Id
instanaParentId })) ->
          (String, String) -> Maybe (String, String)
forall a. a -> Maybe a
Just ( Id -> String
Id.longOrShortTraceId Id
instanaTraceId
               , Id -> String
Id.toString Id
instanaParentId
               )

        _ ->
          Maybe (String, String)
forall a. Maybe a
Nothing

  InternalContext -> QueuedSpan -> IO ()
queueSpan
    InternalContext
context
    QueuedSpan :: Id
-> Id
-> Maybe Id
-> Text
-> Int
-> Int
-> SpanKind
-> Int
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Bool
-> Maybe (String, String)
-> Maybe Bool
-> SpanData
-> QueuedSpan
QueuedSpan
      { traceId :: Id
WireSpan.traceId         = EntrySpan -> Id
EntrySpan.traceId EntrySpan
entrySpan
      , spanId :: Id
WireSpan.spanId          = EntrySpan -> Id
EntrySpan.spanId EntrySpan
entrySpan
      , parentId :: Maybe Id
WireSpan.parentId        = EntrySpan -> Maybe Id
EntrySpan.parentId EntrySpan
entrySpan
      , spanName :: Text
WireSpan.spanName        = EntrySpan -> Text
EntrySpan.spanName EntrySpan
entrySpan
      , timestamp :: Int
WireSpan.timestamp       = Int
timestamp
      , duration :: Int
WireSpan.duration        = Int
now Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
timestamp
      , kind :: SpanKind
WireSpan.kind            = SpanKind
Entry
      , errorCount :: Int
WireSpan.errorCount      = EntrySpan -> Int
EntrySpan.errorCount EntrySpan
entrySpan
      , serviceName :: Maybe Text
WireSpan.serviceName     = EntrySpan -> Maybe Text
EntrySpan.serviceName EntrySpan
entrySpan
      , correlationType :: Maybe Text
WireSpan.correlationType = EntrySpan -> Maybe Text
EntrySpan.correlationType EntrySpan
entrySpan
      , correlationId :: Maybe Text
WireSpan.correlationId   = EntrySpan -> Maybe Text
EntrySpan.correlationId EntrySpan
entrySpan
      , tpFlag :: Maybe Bool
WireSpan.tpFlag          =
          if EntrySpan -> Bool
EntrySpan.tpFlag EntrySpan
entrySpan then Bool -> Maybe Bool
forall a. a -> Maybe a
Just Bool
True else Maybe Bool
forall a. Maybe a
Nothing
      , instanaAncestor :: Maybe (String, String)
WireSpan.instanaAncestor = Maybe (String, String)
instanaAncestor
      , synthetic :: Maybe Bool
WireSpan.synthetic       =
          if EntrySpan -> Bool
EntrySpan.synthetic EntrySpan
entrySpan then Bool -> Maybe Bool
forall a. a -> Maybe a
Just Bool
True else Maybe Bool
forall a. Maybe a
Nothing
      , spanData :: SpanData
WireSpan.spanData        = EntrySpan -> SpanData
EntrySpan.spanData EntrySpan
entrySpan
      }


queueExitSpan :: ExitSpan -> InternalContext -> IO ()
queueExitSpan :: ExitSpan -> InternalContext -> IO ()
queueExitSpan exitSpan :: ExitSpan
exitSpan context :: InternalContext
context = do
  let
    parentSpan :: EntrySpan
parentSpan = ExitSpan -> EntrySpan
ExitSpan.parentSpan ExitSpan
exitSpan
  Int
now <- POSIXTime -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (POSIXTime -> Int) -> (POSIXTime -> POSIXTime) -> POSIXTime -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (POSIXTime -> POSIXTime -> POSIXTime
forall a. Num a => a -> a -> a
* 1000) (POSIXTime -> Int) -> IO POSIXTime -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO POSIXTime
getPOSIXTime
  InternalContext -> QueuedSpan -> IO ()
queueSpan
    InternalContext
context
    QueuedSpan :: Id
-> Id
-> Maybe Id
-> Text
-> Int
-> Int
-> SpanKind
-> Int
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Bool
-> Maybe (String, String)
-> Maybe Bool
-> SpanData
-> QueuedSpan
QueuedSpan
      { traceId :: Id
WireSpan.traceId         = EntrySpan -> Id
EntrySpan.traceId EntrySpan
parentSpan
      , spanId :: Id
WireSpan.spanId          = ExitSpan -> Id
ExitSpan.spanId ExitSpan
exitSpan
      , parentId :: Maybe Id
WireSpan.parentId        = Id -> Maybe Id
forall a. a -> Maybe a
Just (Id -> Maybe Id) -> Id -> Maybe Id
forall a b. (a -> b) -> a -> b
$ EntrySpan -> Id
EntrySpan.spanId EntrySpan
parentSpan
      , spanName :: Text
WireSpan.spanName        = ExitSpan -> Text
ExitSpan.spanName ExitSpan
exitSpan
      , timestamp :: Int
WireSpan.timestamp       = ExitSpan -> Int
ExitSpan.timestamp ExitSpan
exitSpan
      , duration :: Int
WireSpan.duration        = Int
now Int -> Int -> Int
forall a. Num a => a -> a -> a
- ExitSpan -> Int
ExitSpan.timestamp ExitSpan
exitSpan
      , kind :: SpanKind
WireSpan.kind            = SpanKind
Exit
      , errorCount :: Int
WireSpan.errorCount      = ExitSpan -> Int
ExitSpan.errorCount ExitSpan
exitSpan
      , serviceName :: Maybe Text
WireSpan.serviceName     = ExitSpan -> Maybe Text
ExitSpan.serviceName ExitSpan
exitSpan
      , correlationType :: Maybe Text
WireSpan.correlationType = Maybe Text
forall a. Maybe a
Nothing
      , correlationId :: Maybe Text
WireSpan.correlationId   = Maybe Text
forall a. Maybe a
Nothing
      , tpFlag :: Maybe Bool
WireSpan.tpFlag          = Maybe Bool
forall a. Maybe a
Nothing
      , instanaAncestor :: Maybe (String, String)
WireSpan.instanaAncestor = Maybe (String, String)
forall a. Maybe a
Nothing
      , synthetic :: Maybe Bool
WireSpan.synthetic       = Maybe Bool
forall a. Maybe a
Nothing
      , spanData :: SpanData
WireSpan.spanData        = ExitSpan -> SpanData
ExitSpan.spanData ExitSpan
exitSpan
      }


queueSpan :: InternalContext -> QueuedSpan -> IO ()
queueSpan :: InternalContext -> QueuedSpan -> IO ()
queueSpan context :: InternalContext
context span_ :: QueuedSpan
span_ = do
  Seq QueuedSpan
currentSpanQueue <-
    STM (Seq QueuedSpan) -> IO (Seq QueuedSpan)
forall a. STM a -> IO a
STM.atomically (STM (Seq QueuedSpan) -> IO (Seq QueuedSpan))
-> STM (Seq QueuedSpan) -> IO (Seq QueuedSpan)
forall a b. (a -> b) -> a -> b
$
      TVar (Seq QueuedSpan) -> STM (Seq QueuedSpan)
forall a. TVar a -> STM a
STM.readTVar (TVar (Seq QueuedSpan) -> STM (Seq QueuedSpan))
-> TVar (Seq QueuedSpan) -> STM (Seq QueuedSpan)
forall a b. (a -> b) -> a -> b
$
        InternalContext -> TVar (Seq QueuedSpan)
InternalContext.spanQueue InternalContext
context
  let
    bufferedSpans :: Int
bufferedSpans = Seq QueuedSpan -> Int
forall a. Seq a -> Int
Seq.length Seq QueuedSpan
currentSpanQueue
    maxBufferedSpans :: Int
maxBufferedSpans =
      FinalConfig -> Int
InternalConfig.maxBufferedSpans (FinalConfig -> Int) -> FinalConfig -> Int
forall a b. (a -> b) -> a -> b
$ InternalContext -> FinalConfig
InternalContext.config InternalContext
context
    forceTransmissionStartingAt :: Int
forceTransmissionStartingAt =
      FinalConfig -> Int
InternalConfig.forceTransmissionStartingAt (FinalConfig -> Int) -> FinalConfig -> Int
forall a b. (a -> b) -> a -> b
$
        InternalContext -> FinalConfig
InternalContext.config InternalContext
context
  if Int
bufferedSpans Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
maxBufferedSpans
  then do
    -- TODO remove debug log?
    String -> String -> IO ()
debugM String
instanaLogger "dropping span, buffer limit reached"
    () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
  else do
    STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
      TVar (Seq QueuedSpan)
-> (Seq QueuedSpan -> Seq QueuedSpan) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
STM.modifyTVar
        (InternalContext -> TVar (Seq QueuedSpan)
InternalContext.spanQueue InternalContext
context)
        (\q :: Seq QueuedSpan
q -> Seq QueuedSpan
q Seq QueuedSpan -> QueuedSpan -> Seq QueuedSpan
forall a. Seq a -> a -> Seq a
|> QueuedSpan
span_)
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when
      (Int
bufferedSpans Int -> Int -> Int
forall a. Num a => a -> a -> a
+ 1 Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
forceTransmissionStartingAt)
      (InternalContext -> IO ()
drainSpanBuffer InternalContext
context)


{-| Drain the local span buffer once every second and try to send the buffered
spans to the agent, if any. Sending the spans will only be attempted when the
connection to the agent is up, otherwise, the locally buffered spans will be
dropped. This avoids excessive memory consumption at the expense of losing
spans.
-}
initDrainSpanBufferAfterTimeoutLoop :: InternalContext -> IO()
initDrainSpanBufferAfterTimeoutLoop :: InternalContext -> IO ()
initDrainSpanBufferAfterTimeoutLoop context :: InternalContext
context = do
  let
    delayMilliSeconds :: Int
delayMilliSeconds =
      FinalConfig -> Int
InternalConfig.forceTransmissionAfter (FinalConfig -> Int) -> FinalConfig -> Int
forall a b. (a -> b) -> a -> b
$ InternalContext -> FinalConfig
InternalContext.config InternalContext
context
    delayMicroSeconds :: Int
delayMicroSeconds = Int
delayMilliSeconds Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000
  IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> InternalContext -> IO ()
drainSpanBufferAfterTimeoutLoop Int
delayMicroSeconds InternalContext
context


drainSpanBufferAfterTimeoutLoop :: Int -> InternalContext -> IO()
drainSpanBufferAfterTimeoutLoop :: Int -> InternalContext -> IO ()
drainSpanBufferAfterTimeoutLoop delayMicroSeconds :: Int
delayMicroSeconds context :: InternalContext
context = do
  IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch
    ( do
        InternalContext -> IO ()
drainSpanBuffer InternalContext
context
    )
    -- exceptions in drainSpanBuffer must not kill the loop, so we just catch
    -- everything
    (\e :: SomeException
e -> String -> String -> IO ()
warningM String
instanaLogger (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ SomeException -> String
forall a. Show a => a -> String
show (SomeException
e :: SomeException))
  Int -> IO ()
Concurrent.threadDelay Int
delayMicroSeconds


drainSpanBuffer :: InternalContext ->  IO ()
drainSpanBuffer :: InternalContext -> IO ()
drainSpanBuffer context :: InternalContext
context = do
  Seq QueuedSpan
spansSeq <- STM (Seq QueuedSpan) -> IO (Seq QueuedSpan)
forall a. STM a -> IO a
STM.atomically (STM (Seq QueuedSpan) -> IO (Seq QueuedSpan))
-> STM (Seq QueuedSpan) -> IO (Seq QueuedSpan)
forall a b. (a -> b) -> a -> b
$
    TVar (Seq QueuedSpan) -> Seq QueuedSpan -> STM (Seq QueuedSpan)
forall a. TVar a -> a -> STM a
STM.swapTVar (InternalContext -> TVar (Seq QueuedSpan)
InternalContext.spanQueue InternalContext
context) Seq QueuedSpan
forall a. Seq a
Seq.empty
  let
    spans :: [QueuedSpan]
    spans :: [QueuedSpan]
spans = Seq QueuedSpan -> [QueuedSpan]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList Seq QueuedSpan
spansSeq
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ [QueuedSpan] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [QueuedSpan]
spans) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    InternalContext -> (AgentConnection -> Store -> IO ()) -> IO ()
InternalContext.whenConnected InternalContext
context ((AgentConnection -> Store -> IO ()) -> IO ())
-> (AgentConnection -> Store -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$
      InternalContext
-> [QueuedSpan] -> AgentConnection -> Store -> IO ()
sendSpansToAgent InternalContext
context [QueuedSpan]
spans


sendSpansToAgent ::
  InternalContext
  -> [QueuedSpan]
  -> AgentConnection
  -> Metrics.Store
  -> IO ()
sendSpansToAgent :: InternalContext
-> [QueuedSpan] -> AgentConnection -> Store -> IO ()
sendSpansToAgent context :: InternalContext
context spans :: [QueuedSpan]
spans agentConnection :: AgentConnection
agentConnection _ = do
  let
    agentHost :: String
agentHost = AgentConnection -> String
InternalContext.agentHost AgentConnection
agentConnection
    agentPort :: Int
agentPort = AgentConnection -> Int
InternalContext.agentPort AgentConnection
agentConnection
    translatedPidStr :: String
translatedPidStr = AgentConnection -> String
InternalContext.pid AgentConnection
agentConnection
    agentUuid :: Text
agentUuid = AgentConnection -> Text
InternalContext.agentUuid AgentConnection
agentConnection
    serviceNameConfig :: Maybe Text
serviceNameConfig =
      String -> Text
T.pack (String -> Text) -> Maybe String -> Maybe Text
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (FinalConfig -> Maybe String
InternalConfig.serviceName (FinalConfig -> Maybe String)
-> (InternalContext -> FinalConfig)
-> InternalContext
-> Maybe String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. InternalContext -> FinalConfig
InternalContext.config (InternalContext -> Maybe String)
-> InternalContext -> Maybe String
forall a b. (a -> b) -> a -> b
$ InternalContext
context)
    traceEndpointUrl :: String
traceEndpointUrl =
      (URL -> String
forall a. Show a => a -> String
show (URL -> String) -> URL -> String
forall a b. (a -> b) -> a -> b
$
        String -> Int -> String -> URL
URL.mkHttp String
agentHost Int
agentPort String
haskellTracePluginPath
      ) String -> String -> String
forall a. [a] -> [a] -> [a]
++ "." String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
translatedPidStr
    -- combine actual span data with static per-process data
    wireSpans :: [WireSpan]
wireSpans = (QueuedSpan -> WireSpan) -> [QueuedSpan] -> [WireSpan]
forall a b. (a -> b) -> [a] -> [b]
map
      (\queuedSpan :: QueuedSpan
queuedSpan ->
        WireSpan :: QueuedSpan -> String -> Text -> Maybe Text -> WireSpan
WireSpan {
          queuedSpan :: QueuedSpan
WireSpan.queuedSpan        = QueuedSpan
queuedSpan
        , pid :: String
WireSpan.pid               = String
translatedPidStr
        , agentUuid :: Text
WireSpan.agentUuid         = Text
agentUuid
        , serviceNameConfig :: Maybe Text
WireSpan.serviceNameConfig = Maybe Text
serviceNameConfig
        }
      ) [QueuedSpan]
spans
  Request
defaultRequestSettings <- String -> IO Request
forall (m :: * -> *). MonadThrow m => String -> m Request
HTTP.parseUrlThrow String
traceEndpointUrl
  let
    request :: Request
request =
      Request
defaultRequestSettings
        { method :: Method
HTTP.method = "POST"
        , requestBody :: RequestBody
HTTP.requestBody = ByteString -> RequestBody
HTTP.RequestBodyLBS (ByteString -> RequestBody) -> ByteString -> RequestBody
forall a b. (a -> b) -> a -> b
$ [WireSpan] -> ByteString
forall a. ToJSON a => a -> ByteString
Aeson.encode [WireSpan]
wireSpans
        , requestHeaders :: RequestHeaders
HTTP.requestHeaders =
          [ ("Accept", "application/json")
          , ("Content-Type", "application/json; charset=UTF-8'")
          ]
        }

  -- TODO Would we want to retry the request? Or do we just lose this batch
  -- of spans? Perhaps do 3 quick retries before giving up, maybe excluding a
  -- 404 response status (because that clearly indicates that a new connection
  -- establishment process has to be initiated.

  -- Right now, if sending the spans fails (either in the context of
  -- drainSpanBufferAfterTimeoutLoop or queueSpan), the exception will be
  -- logged and we move on.
  -- That means that if the agent is unavailable for some time, we will try to
  -- send spans at least every second regardless (or even more frequent, if the
  -- monitored application creates more than 1000 spans per second).
  -- We could do something more sophisticated here like trying to send spans
  -- less frequently after a number of failed attempts. Maybe we could use
  -- http://hackage.haskell.org/package/glue-0.2.0/docs/Glue-CircuitBreaker.html
  --
  -- Also, as a small performance improvement, we might want to stop _accepting_
  -- spans until the connection has been reestablished. (To avoid serializing
  -- a lot of spans to JSON if they can not be send due to agent
  -- unavailability.)
  IO () -> (HttpException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch
    (do
      Response ByteString
_ <- Request -> Manager -> IO (Response ByteString)
HTTP.httpLbs Request
request (Manager -> IO (Response ByteString))
-> Manager -> IO (Response ByteString)
forall a b. (a -> b) -> a -> b
$ InternalContext -> Manager
InternalContext.httpManager InternalContext
context
      () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    )
    (\(HttpException
e :: HTTP.HttpException) -> do
      let
        statusCode :: Int
statusCode =
          case HttpException
e of
            HTTP.HttpExceptionRequest _ (HTTP.StatusCodeException response :: Response ()
response _) ->
              Status -> Int
HttpTypes.statusCode (Response () -> Status
forall body. Response body -> Status
HTTP.responseStatus Response ()
response)
            _ ->
              0
      if Int
statusCode Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== 404
        then do
          String -> String -> IO ()
warningM String
instanaLogger (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$
            "Received HTTP 404 when sending spans to " String -> String -> String
forall a. [a] -> [a] -> [a]
++
            String -> String
forall a. Show a => a -> String
show String
traceEndpointUrl String -> String -> String
forall a. [a] -> [a] -> [a]
++
            ", resetting connection state to unconnected."
          InternalContext -> IO ()
resetToUnconnected InternalContext
context
        else do
          String -> String -> IO ()
debugM String
instanaLogger (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ HttpException -> String
forall a. Show a => a -> String
show HttpException
e
          () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    )


collectAndSendMetricsLoop :: InternalContext -> IO()
collectAndSendMetricsLoop :: InternalContext -> IO ()
collectAndSendMetricsLoop context :: InternalContext
context = do
  let
    delayMicroSeconds :: Int
delayMicroSeconds = 1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000 -- once each second
  -- offset metrics collection from span buffer draining
  Int -> IO ()
Concurrent.threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ 500 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000
  IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> InternalContext -> IO ()
collectAndSendMetricsSafe Int
delayMicroSeconds InternalContext
context


-- |Resets the previously send metrics to an empty map so we send the full set
-- of metrics the next time (instead of just the diff).
resetPreviouslySendMetrics :: InternalContext -> IO()
resetPreviouslySendMetrics :: InternalContext -> IO ()
resetPreviouslySendMetrics context :: InternalContext
context = do
  let
    delayMicroSeconds :: Int
delayMicroSeconds = 5 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 60 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000 -- once every five minutes
  IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch
      (STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar TimedSample -> (TimedSample -> TimedSample) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
STM.modifyTVar
         (InternalContext -> TVar TimedSample
InternalContext.previousMetricsSample InternalContext
context)
         TimedSample -> TimedSample
Sample.markForReset
      )
      (\e :: SomeException
e -> String -> String -> IO ()
warningM String
instanaLogger (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ SomeException -> String
forall a. Show a => a -> String
show (SomeException
e :: SomeException))
    Int -> IO ()
Concurrent.threadDelay Int
delayMicroSeconds


collectAndSendMetricsSafe :: Int -> InternalContext -> IO()
collectAndSendMetricsSafe :: Int -> InternalContext -> IO ()
collectAndSendMetricsSafe delayMicroSeconds :: Int
delayMicroSeconds context :: InternalContext
context = do
  IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch
    ( do
        InternalContext -> IO ()
collectAndSendMetricsWhenConnected InternalContext
context
    )
    -- exceptions in collectAndSendMetrics must not kill the loop, so we just catch
    -- everything
    (\e :: SomeException
e -> String -> String -> IO ()
warningM String
instanaLogger (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ SomeException -> String
forall a. Show a => a -> String
show (SomeException
e :: SomeException))
  Int -> IO ()
Concurrent.threadDelay Int
delayMicroSeconds


collectAndSendMetricsWhenConnected :: InternalContext -> IO ()
collectAndSendMetricsWhenConnected :: InternalContext -> IO ()
collectAndSendMetricsWhenConnected context :: InternalContext
context =
  InternalContext -> (AgentConnection -> Store -> IO ()) -> IO ()
InternalContext.whenConnected InternalContext
context ((AgentConnection -> Store -> IO ()) -> IO ())
-> (AgentConnection -> Store -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$
    InternalContext -> AgentConnection -> Store -> IO ()
collectAndSendMetrics InternalContext
context


collectAndSendMetrics ::
  InternalContext
  -> AgentConnection
  -> Metrics.Store
  -> IO ()
collectAndSendMetrics :: InternalContext -> AgentConnection -> Store -> IO ()
collectAndSendMetrics context :: InternalContext
context agentConnection :: AgentConnection
agentConnection metricsStore :: Store
metricsStore = do
  TimedSample
previousSample <-
    STM TimedSample -> IO TimedSample
forall a. STM a -> IO a
STM.atomically (STM TimedSample -> IO TimedSample)
-> STM TimedSample -> IO TimedSample
forall a b. (a -> b) -> a -> b
$ TVar TimedSample -> STM TimedSample
forall a. TVar a -> STM a
STM.readTVar (TVar TimedSample -> STM TimedSample)
-> TVar TimedSample -> STM TimedSample
forall a b. (a -> b) -> a -> b
$ InternalContext -> TVar TimedSample
InternalContext.previousMetricsSample InternalContext
context
  Int
now <- POSIXTime -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (POSIXTime -> Int) -> (POSIXTime -> POSIXTime) -> POSIXTime -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (POSIXTime -> POSIXTime -> POSIXTime
forall a. Num a => a -> a -> a
* 1000) (POSIXTime -> Int) -> IO POSIXTime -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO POSIXTime
getPOSIXTime
  Sample
sampledMetrics <- Store -> IO Sample
MetricsCollector.sampleAll Store
metricsStore
  let
    currentSample :: TimedSample
currentSample = Sample -> Int -> TimedSample
Sample.timedSampleFromEkgSample Sample
sampledMetrics Int
now
    enrichedSample :: TimedSample
enrichedSample = TimedSample -> TimedSample -> TimedSample
Deltas.enrichWithDeltas TimedSample
previousSample TimedSample
currentSample
    compressedMetrics :: InstanaSample
compressedMetrics =
      InstanaSample -> InstanaSample -> InstanaSample
MetricsCompression.compressSample
        (TimedSample -> InstanaSample
Sample.sample TimedSample
previousSample)
        (TimedSample -> InstanaSample
Sample.sample TimedSample
enrichedSample)

    agentHost :: String
agentHost = AgentConnection -> String
InternalContext.agentHost AgentConnection
agentConnection
    agentPort :: Int
agentPort = AgentConnection -> Int
InternalContext.agentPort AgentConnection
agentConnection
    translatedPidStr :: String
translatedPidStr = AgentConnection -> String
InternalContext.pid AgentConnection
agentConnection
    metricsEndpointUrl :: URL
metricsEndpointUrl =
      String -> Int -> String -> URL
URL.mkHttp
        String
agentHost
        Int
agentPort
        (String
haskellEntityDataPathPrefix String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
translatedPidStr)
  Request
defaultRequestSettings <- String -> IO Request
forall (m :: * -> *). MonadThrow m => String -> m Request
HTTP.parseUrlThrow (String -> IO Request) -> String -> IO Request
forall a b. (a -> b) -> a -> b
$ URL -> String
forall a. Show a => a -> String
show URL
metricsEndpointUrl
  let
    request :: Request
request =
      Request
defaultRequestSettings
        { method :: Method
HTTP.method = "POST"
        , requestBody :: RequestBody
HTTP.requestBody =
            ByteString -> RequestBody
HTTP.RequestBodyLBS (ByteString -> RequestBody) -> ByteString -> RequestBody
forall a b. (a -> b) -> a -> b
$
              SampleJson -> ByteString
forall a. ToJSON a => a -> ByteString
Aeson.encode (SampleJson -> ByteString) -> SampleJson -> ByteString
forall a b. (a -> b) -> a -> b
$ InstanaSample -> SampleJson
Sample.SampleJson InstanaSample
compressedMetrics
        , requestHeaders :: RequestHeaders
HTTP.requestHeaders =
          [ ("Accept", "application/json")
          , ("Content-Type", "application/json; charset=UTF-8'")
          ]
        }

  -- TODO Would we want to retry the request? Or do we just lose this batch
  -- of data? Perhaps do 3 quick retries before giving up, maybe excluding a
  -- 404 response status (because that clearly indicates that a new connection
  -- establishment process has to be initiated.

  -- Right now, if sending the data fails, the exception will be logged and we
  -- move on.
  -- That means that if the agent is unavailable for some time, we will try to
  -- send data at least every second regardless.
  -- We could do something more sophisticated here like trying to send spans
  -- less frequently after a number of failed attempts. Maybe we could use
  -- http://hackage.haskell.org/package/glue-0.2.0/docs/Glue-CircuitBreaker.html
  --
  -- Also, as a small performance improvement, we might want to stop
  -- _collecting_ metrics until the connection has been reestablished.
  IO () -> (HttpException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch
    (do
      Response ByteString
_ <- Request -> Manager -> IO (Response ByteString)
HTTP.httpLbs Request
request (Manager -> IO (Response ByteString))
-> Manager -> IO (Response ByteString)
forall a b. (a -> b) -> a -> b
$ InternalContext -> Manager
InternalContext.httpManager InternalContext
context
      ()
_ <- STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
             TVar TimedSample -> TimedSample -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar
               (InternalContext -> TVar TimedSample
InternalContext.previousMetricsSample InternalContext
context)
               TimedSample
enrichedSample
      () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    )
    (\(HttpException
e :: HTTP.HttpException) -> do
      let
        statusCode :: Int
statusCode =
          case HttpException
e of
            HTTP.HttpExceptionRequest _ (HTTP.StatusCodeException response :: Response ()
response _) ->
              Status -> Int
HttpTypes.statusCode (Response () -> Status
forall body. Response body -> Status
HTTP.responseStatus Response ()
response)
            _ ->
              0
      if Int
statusCode Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== 404
        then do
          String -> String -> IO ()
warningM String
instanaLogger (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$
            "Received HTTP 404 when sending metrics to " String -> String -> String
forall a. [a] -> [a] -> [a]
++
            URL -> String
forall a. Show a => a -> String
show URL
metricsEndpointUrl String -> String -> String
forall a. [a] -> [a] -> [a]
++
            ", resetting connection state to unconnected."
          InternalContext -> IO ()
resetToUnconnected InternalContext
context
        else do
          String -> String -> IO ()
debugM String
instanaLogger (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ HttpException -> String
forall a. Show a => a -> String
show HttpException
e
          () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    )


-- |Resets the agent connection to unconnected but only if it is currently
-- in state AgentReady (that is, the connection is fully established). This
-- triggers a new sensor agent handshake. The reasoning behind resetting it only
-- when it is in state AgentReady is that we do not want to interfer with any
-- attempts to establish a connection that is currently in flight, we only want
-- to record the fact that we have lost the connection if we thought we were
-- connected but are not.
resetToUnconnected :: InternalContext -> IO ()
resetToUnconnected :: InternalContext -> IO ()
resetToUnconnected context :: InternalContext
context = do
  STM () -> IO ()
forall a. STM a -> IO a
STM.atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
    TVar ConnectionState
-> (ConnectionState -> ConnectionState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
STM.modifyTVar'
      (InternalContext -> TVar ConnectionState
InternalContext.connectionState InternalContext
context)
      (\state :: ConnectionState
state ->
        case ConnectionState
state of
          AgentReady _ ->
            ConnectionState
Unconnected
          _         ->
            ConnectionState
state
      )