{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
module System.Remote.Monitoring.Statsd
(
Statsd
, statsdFlush
, statsdThreadId
, forkStatsd
, StatsdOptions(..)
, defaultStatsdOptions
) where
import Control.Concurrent (ThreadId, myThreadId, threadDelay, throwTo)
import Control.Concurrent.MVar (modifyMVar_, newMVar)
import Control.Exception (IOException, AsyncException(ThreadKilled), catch, fromException)
import Control.Monad (foldM, when)
import qualified Data.ByteString.Char8 as B8
import qualified Data.HashMap.Strict as M
import Data.Int (Int64)
import Data.Maybe (fromMaybe)
import Data.Monoid ((<>))
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import qualified Data.Text.IO as T
import Data.Time.Clock.POSIX (getPOSIXTime)
import qualified Network.Socket as Socket
import qualified Network.Socket.ByteString as Socket
import qualified System.Metrics as Metrics
import qualified System.Metrics.Distribution.Internal as Distribution
import System.IO (stderr)
#if __GLASGOW_HASKELL__ >= 706
import Control.Concurrent (forkFinally)
#else
import Control.Concurrent (forkIO)
import Control.Exception (SomeException, mask, try)
import Prelude hiding (catch)
#endif
data Statsd = Statsd
{ threadId :: {-# UNPACK #-} !ThreadId
, flush :: IO ()
}
statsdThreadId :: Statsd -> ThreadId
statsdThreadId = threadId
statsdFlush :: Statsd -> IO ()
statsdFlush = flush
data StatsdOptions = StatsdOptions
{
host :: !T.Text
, port :: !Int
, flushInterval :: !Int
, debug :: !Bool
, prefix :: !T.Text
, suffix :: !T.Text
}
defaultStatsdOptions :: StatsdOptions
defaultStatsdOptions = StatsdOptions
{ host = "127.0.0.1"
, port = 8125
, flushInterval = 1000
, debug = False
, prefix = ""
, suffix = ""
}
forkStatsd :: StatsdOptions
-> Metrics.Store
-> IO Statsd
forkStatsd opts store = do
addrInfos <- Socket.getAddrInfo Nothing (Just $ T.unpack $ host opts)
(Just $ show $ port opts)
(sendSample, closeSocket) <- case addrInfos of
[] -> unsupportedAddressError
(addrInfo:_) -> do
socket <- Socket.socket (Socket.addrFamily addrInfo)
Socket.Datagram Socket.defaultProtocol
let socketAddress = Socket.addrAddress addrInfo
sendSample <- if debug opts
then do
Socket.connect socket socketAddress
return $ \msg -> Socket.sendAll socket msg
else return $ \msg -> Socket.sendAllTo socket msg socketAddress
return (sendSample, Socket.close socket)
priorCountsVar <- newMVar M.empty
let flush = do
sample <- Metrics.sampleAll store
modifyMVar_ priorCountsVar (flushSample sample sendSample opts)
me <- myThreadId
tid <- forkFinally (loop opts flush) $ \ r -> do
closeSocket
case r of
Left e -> case fromException e of
Just ThreadKilled -> return ()
_ -> throwTo me e
Right _ -> return ()
return $ Statsd tid flush
where
unsupportedAddressError = ioError $ userError $
"unsupported address: " ++ T.unpack (host opts)
loop :: StatsdOptions
-> IO ()
-> IO ()
loop opts flush = do
start <- time
flush
end <- time
threadDelay (flushInterval opts * 1000 - fromIntegral (end - start))
loop opts flush
time :: IO Int64
time = (round . (* 1000000.0) . toDouble) `fmap` getPOSIXTime
where toDouble = realToFrac :: Real a => a -> Double
flushSample :: Metrics.Sample -> (B8.ByteString -> IO ()) -> StatsdOptions -> M.HashMap T.Text Int64 -> IO (M.HashMap T.Text Int64)
flushSample sample sendSample opts priorCounts =
foldM flushOne priorCounts (M.toList sample)
where
flushOne pc (name, val) =
let fullName = dottedPrefix <> sanitizeName name <> dottedSuffix
in flushMetric fullName val pc
sanitizeName = T.map sanitizeChar
sanitizeChar ':' = '_'
sanitizeChar c = c
flushMetric name (Metrics.Counter n) pc = sendCounter name n pc
flushMetric name (Metrics.Gauge n) pc = sendGauge name n >> return pc
flushMetric name (Metrics.Distribution d) pc = sendDistribution name d pc
flushMetric _ (Metrics.Label _) pc = return pc
sendGauge name n = send "|g" name (show n)
sendCounter name n pc = do
let old = fromMaybe 0 (M.lookup name pc)
send "|c" name (show (n - old))
return (M.insert name n pc)
sendDistribution name d pc = do
sendGauge (name <> "." <> "mean" ) (Distribution.mean d)
sendGauge (name <> "." <> "variance") (Distribution.variance d)
uc <- sendCounter (name <> "." <> "count" ) (Distribution.count d) pc
sendGauge (name <> "." <> "sum" ) (Distribution.sum d)
sendGauge (name <> "." <> "min" ) (Distribution.min d)
sendGauge (name <> "." <> "max" ) (Distribution.max d)
return uc
isDebug = debug opts
dottedPrefix = if T.null (prefix opts) then "" else prefix opts <> "."
dottedSuffix = if T.null (suffix opts) then "" else "." <> suffix opts
send ty name val = do
let !msg = B8.concat [T.encodeUtf8 name, ":", B8.pack val, ty]
when isDebug $ B8.hPutStrLn stderr $ B8.concat [ "DEBUG: ", msg]
sendSample msg `catch` \ (e :: IOException) -> do
T.hPutStrLn stderr $ "ERROR: Couldn't send message: " <>
T.pack (show e)
return ()
#if __GLASGOW_HASKELL__ < 706
forkFinally :: IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally action and_then =
mask $ \restore ->
forkIO $ try (restore action) >>= and_then
#endif