{-# LANGUAGE OverloadedStrings #-}
module System.Remote.Monitoring.Influxdb
( InfluxdbOptions(..)
, defaultInfluxdbOptions
, forkInfluxdb
, forkInfluxdbRestart
) where
import Data.Monoid ((<>))
import Data.Time.Clock as Time (getCurrentTime, diffUTCTime, UTCTime)
import qualified Data.Text as T
import qualified System.Metrics as EKG
import qualified System.Metrics.Distribution as Stats
import qualified Data.Map as M
import qualified Data.Vector as V
import qualified Database.InfluxDB as Influxdb
import Control.Exception (SomeException, try, bracket)
import Control.Concurrent (ThreadId, forkIO, myThreadId, threadDelay, throwTo)
import Control.Monad (forever)
import qualified Data.HashMap.Strict as HashMap
import Data.String (fromString)
data InfluxdbOptions = InfluxdbOptions
{
writeParams :: !Influxdb.WriteParams
, flushInterval :: !Int
, prefix :: !T.Text
, suffix :: !T.Text
}
defaultInfluxdbOptions :: Influxdb.WriteParams -> InfluxdbOptions
defaultInfluxdbOptions params = InfluxdbOptions
{ writeParams = params
, flushInterval = 1000
, prefix = ""
, suffix = ""
}
forkInfluxdb :: InfluxdbOptions -> EKG.Store -> IO ThreadId
forkInfluxdb opts store =
do parent <- myThreadId
forkInfluxdbRestart opts
store
(\e _ -> throwTo parent e)
forkInfluxdbRestart :: InfluxdbOptions
-> EKG.Store
-> (SomeException -> IO () -> IO ())
-> IO ThreadId
forkInfluxdbRestart opts store exceptionHandler = forkIO go
where
go = do
terminated <-
try $ bracket
(return (writeParams opts))
(\_ -> return ())
(loop store opts)
case terminated of
Left exception -> exceptionHandler exception go
Right _ -> go
loop :: EKG.Store -> InfluxdbOptions -> Influxdb.WriteParams -> IO ()
loop store opts params = forever $ do
start <- getCurrentTime
sample <- EKG.sampleAll store
flushSample sample params opts
end <- getCurrentTime
let diff :: Int
diff = truncate (diffUTCTime end start * 1000)
threadDelay (flushInterval opts * 1000 - diff)
flushSample :: EKG.Sample -> Influxdb.WriteParams -> InfluxdbOptions -> IO ()
flushSample sample params opts = do
t <- getCurrentTime
sendMetrics params
(V.map renamed
(HashMap.foldlWithKey' (\ms k v -> metrics k v t <> ms)
V.empty
sample))
where
renamed (Metric n v t) =
let p = if T.null (prefix opts) then "" else prefix opts <> "."
s = if T.null (suffix opts) then "" else "." <> suffix opts
in Metric (p <> n <> s) v t
metrics n (EKG.Counter i) t = V.singleton (Metric ("counter." <> n) (fromIntegral i) t)
metrics n (EKG.Gauge i) t = V.singleton (Metric ("gauge." <> n) (fromIntegral i) t)
metrics _ (EKG.Label {}) _ = V.empty
metrics n (EKG.Distribution stats) t =
let f n' v = Metric ("dist." <> n <> "." <> n') v t
in V.fromList [ f "variance" (Stats.variance stats)
, f "count" (fromIntegral $ Stats.count stats)
, f "sum" (Stats.sum stats)
, f "min" (Stats.min stats)
, f "max" (Stats.max stats)
]
data Metric = Metric
{ path :: !T.Text
, value :: !Double
, timestamp :: !UTCTime
} deriving (Show)
sendMetrics :: Influxdb.WriteParams -> V.Vector Metric -> IO ()
sendMetrics params metrics = V.forM_ metrics $ \metric -> do
let tags = M.empty
values = M.singleton "value" (Influxdb.FieldFloat (value metric))
in Influxdb.write params $ Influxdb.Line (fromString . T.unpack $ path metric) tags values (Just (timestamp metric))