{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeApplications #-}
module OpenTelemetry.ZipkinExporter where
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad.IO.Class
import Data.Coerce
import qualified Data.HashMap.Strict as HM
import Data.Scientific
import qualified Data.Text as T
import qualified Jsonifier as J
import Network.HTTP.Client
import Network.HTTP.Client.TLS
import Network.HTTP.Types
import OpenTelemetry.Common
import OpenTelemetry.Debug
import OpenTelemetry.SpanContext
import System.IO.Unsafe
import Text.Printf
data ZipkinSpan = ZipkinSpan
{ ZipkinSpan -> ZipkinConfig
zsConfig :: ZipkinConfig,
ZipkinSpan -> Span
zsSpan :: Span
}
tagValue2text :: TagValue -> J.Json
tagValue2text :: TagValue -> Json
tagValue2text TagValue
tv = Text -> Json
J.textString (Text -> Json) -> Text -> Json
forall a b. (a -> b) -> a -> b
$ case TagValue
tv of
(StringTagValue (TagVal Text
s)) -> Text
s
(BoolTagValue Bool
b) -> if Bool
b then Text
"true" else Text
"false"
(IntTagValue Int
i) -> String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Int -> String
forall a. Show a => a -> String
show Int
i
(DoubleTagValue Double
d) -> String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Scientific -> String
forall a. Show a => a -> String
show (Double -> Scientific
forall a. RealFloat a => a -> Scientific
fromFloatDigits Double
d)
jSpan :: ZipkinConfig -> Span -> J.Json
jSpan :: ZipkinConfig -> Span -> Json
jSpan ZipkinConfig {String
[(Text, Text)]
Word
Text
zSpanQueueSize :: ZipkinConfig -> Word
zGracefulShutdownTimeoutSeconds :: ZipkinConfig -> Word
zGlobalTags :: ZipkinConfig -> [(Text, Text)]
zServiceName :: ZipkinConfig -> Text
zEndpoint :: ZipkinConfig -> String
zSpanQueueSize :: Word
zGracefulShutdownTimeoutSeconds :: Word
zGlobalTags :: [(Text, Text)]
zServiceName :: Text
zEndpoint :: String
..} s :: Span
s@(Span {[SpanEvent]
Maybe SpanId
Word32
Word64
Text
SpanContext
HashMap TagName TagValue
SpanStatus
$sel:spanNanosecondsSpentInGC:Span :: Span -> Word64
$sel:spanParentId:Span :: Span -> Maybe SpanId
$sel:spanStatus:Span :: Span -> SpanStatus
$sel:spanEvents:Span :: Span -> [SpanEvent]
$sel:spanTags:Span :: Span -> HashMap TagName TagValue
$sel:spanFinishedAt:Span :: Span -> Word64
$sel:spanStartedAt:Span :: Span -> Word64
$sel:spanDisplayThreadId:Span :: Span -> Word32
$sel:spanThreadId:Span :: Span -> Word32
$sel:spanOperation:Span :: Span -> Text
$sel:spanContext:Span :: Span -> SpanContext
spanNanosecondsSpentInGC :: Word64
spanParentId :: Maybe SpanId
spanStatus :: SpanStatus
spanEvents :: [SpanEvent]
spanTags :: HashMap TagName TagValue
spanFinishedAt :: Word64
spanStartedAt :: Word64
spanDisplayThreadId :: Word32
spanThreadId :: Word32
spanOperation :: Text
spanContext :: SpanContext
..}) =
let TId Word64
tid = Span -> TraceId
spanTraceId Span
s
SId Word64
sid = Span -> SpanId
spanId Span
s
ts :: Word64
ts = Word64
spanStartedAt Word64 -> Word64 -> Word64
forall a. Integral a => a -> a -> a
`div` Word64
1000
duration :: Word64
duration = (Word64
spanFinishedAt Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
- Word64
spanStartedAt) Word64 -> Word64 -> Word64
forall a. Integral a => a -> a -> a
`div` Word64
1000
in [(Text, Json)] -> Json
forall (f :: * -> *). Foldable f => f (Text, Json) -> Json
J.object ([(Text, Json)] -> Json) -> [(Text, Json)] -> Json
forall a b. (a -> b) -> a -> b
$
[ (Text
"name", Text -> Json
J.textString Text
spanOperation),
(Text
"traceId", Text -> Json
J.textString (Text -> Json) -> Text -> Json
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack (String -> Word64 -> String
forall r. PrintfType r => String -> r
printf String
"%016x" Word64
tid)),
(Text
"id", Text -> Json
J.textString (Text -> Json) -> Text -> Json
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack (String -> Word64 -> String
forall r. PrintfType r => String -> r
printf String
"%016x" Word64
sid)),
(Text
"timestamp", Word -> Json
J.wordNumber (Word -> Json) -> Word -> Json
forall a b. (a -> b) -> a -> b
$ Word64 -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
ts),
(Text
"duration", Word -> Json
J.wordNumber (Word -> Json) -> Word -> Json
forall a b. (a -> b) -> a -> b
$ Word64 -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
duration),
(Text
"localEndpoint", [(Text, Json)] -> Json
forall (f :: * -> *). Foldable f => f (Text, Json) -> Json
J.object [(Text
"serviceName", Text -> Json
J.textString Text
zServiceName)]),
( Text
"tags",
[(Text, Json)] -> Json
forall (f :: * -> *). Foldable f => f (Text, Json) -> Json
J.object
( ((Text -> Json) -> (Text, Text) -> (Text, Json)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Text -> Json
J.textString ((Text, Text) -> (Text, Json)) -> [(Text, Text)] -> [(Text, Json)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(Text, Text)]
zGlobalTags)
[(Text, Json)] -> [(Text, Json)] -> [(Text, Json)]
forall a. Semigroup a => a -> a -> a
<> [(Text
k, TagValue -> Json
tagValue2text TagValue
v) | ((TagName Text
k), TagValue
v) <- HashMap TagName TagValue -> [(TagName, TagValue)]
forall k v. HashMap k v -> [(k, v)]
HM.toList HashMap TagName TagValue
spanTags]
)
),
( Text
"annotations",
[Json] -> Json
forall (f :: * -> *). Foldable f => f Json -> Json
J.array
[ [(Text, Json)] -> Json
forall (f :: * -> *). Foldable f => f (Text, Json) -> Json
J.object
[ (Text
"timestamp", Word -> Json
J.wordNumber (Word -> Json) -> Word -> Json
forall a b. (a -> b) -> a -> b
$ Word64 -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word64
t Word64 -> Word64 -> Word64
forall a. Integral a => a -> a -> a
`div` Word64
1000)),
(Text
"value", Text -> Json
J.textString (Text -> Json) -> Text -> Json
forall a b. (a -> b) -> a -> b
$ EventVal -> Text
coerce EventVal
v)
]
| SpanEvent Word64
t EventName
_ EventVal
v <- [SpanEvent]
spanEvents
]
)
]
[(Text, Json)] -> [(Text, Json)] -> [(Text, Json)]
forall a. Semigroup a => a -> a -> a
<> ([(Text, Json)]
-> (SpanId -> [(Text, Json)]) -> Maybe SpanId -> [(Text, Json)]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [] (\(SId Word64
psid) -> [(Text
"parentId", Word -> Json
J.wordNumber (Word -> Json) -> Word -> Json
forall a b. (a -> b) -> a -> b
$ Word64 -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
psid)]) Maybe SpanId
spanParentId)
data ZipkinConfig = ZipkinConfig
{ ZipkinConfig -> String
zEndpoint :: String,
ZipkinConfig -> Text
zServiceName :: T.Text,
ZipkinConfig -> [(Text, Text)]
zGlobalTags :: [(T.Text, T.Text)],
ZipkinConfig -> Word
zGracefulShutdownTimeoutSeconds :: Word,
ZipkinConfig -> Word
zSpanQueueSize :: Word
}
localhostZipkinConfig :: T.Text -> ZipkinConfig
localhostZipkinConfig :: Text -> ZipkinConfig
localhostZipkinConfig Text
service =
ZipkinConfig :: String -> Text -> [(Text, Text)] -> Word -> Word -> ZipkinConfig
ZipkinConfig
{ zEndpoint :: String
zEndpoint = String
"http://localhost:9411/api/v2/spans",
zServiceName :: Text
zServiceName = Text
service,
zGlobalTags :: [(Text, Text)]
zGlobalTags = [(Text, Text)]
forall a. Monoid a => a
mempty,
zGracefulShutdownTimeoutSeconds :: Word
zGracefulShutdownTimeoutSeconds = Word
5,
zSpanQueueSize :: Word
zSpanQueueSize = Word
2048
}
data ZipkinClient = ZipkinClient
{ ZipkinClient -> ZipkinConfig
zcConfig :: ZipkinConfig,
ZipkinClient -> Async ()
zcSenderThread :: Async (),
ZipkinClient -> TBQueue Span
zcSenderQueue :: TBQueue Span,
ZipkinClient -> TVar Bool
zcShutdownVar :: TVar Bool
}
createZipkinSpanExporter :: MonadIO m => ZipkinConfig -> m (Exporter Span)
createZipkinSpanExporter :: ZipkinConfig -> m (Exporter Span)
createZipkinSpanExporter ZipkinConfig
cfg = IO (Exporter Span) -> m (Exporter Span)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO do
ZipkinClient
client <- ZipkinConfig -> IO ZipkinClient
mkClient ZipkinConfig
cfg
Exporter Span -> IO (Exporter Span)
forall (f :: * -> *) a. Applicative f => a -> f a
pure
(Exporter Span -> IO (Exporter Span))
-> Exporter Span -> IO (Exporter Span)
forall a b. (a -> b) -> a -> b
$! ([Span] -> IO ExportResult) -> IO () -> Exporter Span
forall thing.
([thing] -> IO ExportResult) -> IO () -> Exporter thing
Exporter
( \[Span]
sps -> do
let q :: TBQueue Span
q = ZipkinClient -> TBQueue Span
zcSenderQueue ZipkinClient
client
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Word
q_population <- Natural -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Natural -> Word) -> STM Natural -> STM Word
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue Span -> STM Natural
forall a. TBQueue a -> STM Natural
lengthTBQueue TBQueue Span
q
let q_vacancy :: Int
q_vacancy = Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ZipkinConfig -> Word
zSpanQueueSize (ZipkinClient -> ZipkinConfig
zcConfig ZipkinClient
client) Word -> Word -> Word
forall a. Num a => a -> a -> a
- Word
q_population)
TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar Int
droppedSpanCountVar (\Int
x -> Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
+ [Span] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Span]
sps Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
q_vacancy)
(Span -> STM ()) -> [Span] -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_
(TBQueue Span -> Span -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue Span
q)
(Int -> [Span] -> [Span]
forall a. Int -> [a] -> [a]
take Int
q_vacancy [Span]
sps)
ExportResult -> IO ExportResult
forall (f :: * -> *) a. Applicative f => a -> f a
pure ExportResult
ExportSuccess
)
( do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (ZipkinClient -> TVar Bool
zcShutdownVar ZipkinClient
client) Bool
True
Async () -> IO ()
forall a. Async a -> IO a
wait (ZipkinClient -> Async ()
zcSenderThread ZipkinClient
client)
)
mkClient :: ZipkinConfig -> IO ZipkinClient
mkClient :: ZipkinConfig -> IO ZipkinClient
mkClient cfg :: ZipkinConfig
cfg@(ZipkinConfig {String
[(Text, Text)]
Word
Text
zSpanQueueSize :: Word
zGracefulShutdownTimeoutSeconds :: Word
zGlobalTags :: [(Text, Text)]
zServiceName :: Text
zEndpoint :: String
zSpanQueueSize :: ZipkinConfig -> Word
zGracefulShutdownTimeoutSeconds :: ZipkinConfig -> Word
zGlobalTags :: ZipkinConfig -> [(Text, Text)]
zServiceName :: ZipkinConfig -> Text
zEndpoint :: ZipkinConfig -> String
..}) = do
Manager
manager <- ManagerSettings -> IO Manager
newManager ManagerSettings
tlsManagerSettings
TBQueue Span
q <- Natural -> IO (TBQueue Span)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO (Word -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
zSpanQueueSize)
TVar Bool
shutdown_var <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
False
Async ()
sender <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ do
let loop :: IO ()
loop = do
(Bool
must_shutdown, [Span]
sps) <- STM (Bool, [Span]) -> IO (Bool, [Span])
forall a. STM a -> IO a
atomically (STM (Bool, [Span]) -> IO (Bool, [Span]))
-> STM (Bool, [Span]) -> IO (Bool, [Span])
forall a b. (a -> b) -> a -> b
$ do
Bool
must_shutdown <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
shutdown_var
[Span]
sps <- TBQueue Span -> STM [Span]
forall a. TBQueue a -> STM [a]
flushTBQueue TBQueue Span
q
case (Bool
must_shutdown, [Span]
sps) of
(Bool
False, []) -> STM (Bool, [Span])
forall a. STM a
retry
(Bool, [Span])
_ -> (Bool, [Span]) -> STM (Bool, [Span])
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
must_shutdown, [Span]
sps)
case [Span]
sps of
[] -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
[Span]
_ -> String -> Manager -> ZipkinConfig -> [Span] -> IO ()
reportSpans String
zEndpoint Manager
manager ZipkinConfig
cfg [Span]
sps
String -> Bool -> IO ()
forall a. Show a => String -> a -> IO ()
dd_ String
"must_shutdown" Bool
must_shutdown
case Bool
must_shutdown of
Bool
True -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Bool
False -> IO ()
loop
IO ()
loop
ZipkinClient -> IO ZipkinClient
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ZipkinClient -> IO ZipkinClient)
-> ZipkinClient -> IO ZipkinClient
forall a b. (a -> b) -> a -> b
$! ZipkinConfig
-> Async () -> TBQueue Span -> TVar Bool -> ZipkinClient
ZipkinClient ZipkinConfig
cfg Async ()
sender TBQueue Span
q TVar Bool
shutdown_var
reportSpans :: String -> Manager -> ZipkinConfig -> [Span] -> IO ()
reportSpans :: String -> Manager -> ZipkinConfig -> [Span] -> IO ()
reportSpans String
endpoint Manager
httpManager ZipkinConfig
cfg [Span]
sps = do
String -> [Span] -> IO ()
forall a. Show a => String -> a -> IO ()
dd_ String
"reportSpans" [Span]
sps
let body :: ByteString
body = Json -> ByteString
J.toByteString (Json -> ByteString) -> Json -> ByteString
forall a b. (a -> b) -> a -> b
$ [Json] -> Json
forall (f :: * -> *). Foldable f => f Json -> Json
J.array ((Span -> Json) -> [Span] -> [Json]
forall a b. (a -> b) -> [a] -> [b]
map (ZipkinConfig -> Span -> Json
jSpan ZipkinConfig
cfg) [Span]
sps)
request :: Request
request =
(String -> Request
parseRequest_ String
endpoint)
{ method :: ByteString
method = ByteString
"POST",
requestBody :: RequestBody
requestBody = ByteString -> RequestBody
RequestBodyBS ByteString
body,
requestHeaders :: RequestHeaders
requestHeaders = [(HeaderName
"Content-Type", ByteString
"application/json")]
}
Response ByteString
resp <- Request -> Manager -> IO (Response ByteString)
httpLbs Request
request Manager
httpManager
case Status -> Int
statusCode (Response ByteString -> Status
forall body. Response body -> Status
responseStatus Response ByteString
resp) of
Int
s | Int
s Int -> [Int] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [Int
200, Int
202] -> do
Int -> TVar Int -> IO ()
inc Int
1 TVar Int
reportedSpanCountVar
() -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Int
_ -> do
Int -> TVar Int -> IO ()
inc Int
1 TVar Int
rejectedSpanCountVar
String -> ByteString -> IO ()
forall a. Show a => String -> a -> IO ()
dd_ String
"body" ByteString
body
String -> Status -> IO ()
forall a. Show a => String -> a -> IO ()
dd_ String
"resp status" (Status -> IO ()) -> Status -> IO ()
forall a b. (a -> b) -> a -> b
$ Response ByteString -> Status
forall body. Response body -> Status
responseStatus Response ByteString
resp
String -> ByteString -> IO ()
forall a. Show a => String -> a -> IO ()
dd_ String
"resp" (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ Response ByteString -> ByteString
forall body. Response body -> body
responseBody Response ByteString
resp
droppedSpanCountVar :: TVar Int
droppedSpanCountVar :: TVar Int
droppedSpanCountVar = IO (TVar Int) -> TVar Int
forall a. IO a -> a
unsafePerformIO (IO (TVar Int) -> TVar Int) -> IO (TVar Int) -> TVar Int
forall a b. (a -> b) -> a -> b
$ Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0
{-# NOINLINE droppedSpanCountVar #-}
reportedSpanCountVar :: TVar Int
reportedSpanCountVar :: TVar Int
reportedSpanCountVar = IO (TVar Int) -> TVar Int
forall a. IO a -> a
unsafePerformIO (IO (TVar Int) -> TVar Int) -> IO (TVar Int) -> TVar Int
forall a b. (a -> b) -> a -> b
$ Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0
{-# NOINLINE reportedSpanCountVar #-}
rejectedSpanCountVar :: TVar Int
rejectedSpanCountVar :: TVar Int
rejectedSpanCountVar = IO (TVar Int) -> TVar Int
forall a. IO a -> a
unsafePerformIO (IO (TVar Int) -> TVar Int) -> IO (TVar Int) -> TVar Int
forall a b. (a -> b) -> a -> b
$ Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0
{-# NOINLINE rejectedSpanCountVar #-}