{-# LANGUAGE OverloadedStrings #-}

-- | This module lets you periodically flush metrics to a influxdb
-- backend. Example usage:
--
-- > import qualified Database.Influxdb as Influxdb
-- > main = do
-- >   store <- newStore
-- >   forkInfluxdb (defaultInfluxdbOptions (Influxdb.writeParams "database")) store
--
-- You probably want to include some of the predefined metrics defined
-- in the @ekg-core@ package, by calling e.g. the 'EKG.registerGcMetrics'
-- function defined in that pacakge.
--
-- NOTE: This package has been modelled after the @ekg-carbon@ package, and
--       is almost identical. The author is indebted to those fine folks who
--       wrote the @ekg-carbon@ package.

module System.Remote.Monitoring.Influxdb
  ( InfluxdbOptions(..)
  , defaultInfluxdbOptions
  , forkInfluxdb
  , forkInfluxdbRestart
  ) where

import Data.Monoid ((<>))
import Data.Time.Clock as Time (getCurrentTime, diffUTCTime, UTCTime)
import qualified Data.Text as T
import qualified System.Metrics as EKG
import qualified System.Metrics.Distribution as Stats
import qualified Data.Map as M
import qualified Data.Vector as V
import qualified Database.InfluxDB as Influxdb
import Control.Exception (SomeException, try, bracket)
import Control.Concurrent (ThreadId, forkIO, myThreadId, threadDelay, throwTo)
import Control.Monad (forever)
import qualified Data.HashMap.Strict as HashMap
import Data.String (fromString)

--------------------------------------------------------------------------------
-- | Options to control how to connect to the Influxdb server and how often to
-- flush metrics. The flush interval should match the shortest retention rate of
-- the matching retention periods, or you risk over-riding previous samples.
data InfluxdbOptions = InfluxdbOptions
  { -- | The write parameters for Influxdb.
    writeParams :: !Influxdb.WriteParams
    -- | The amount of time between sampling EKG metrics and pushing to Influxdb.
  , flushInterval :: !Int
    -- | Prefix to add to all matric names.
  , prefix :: !T.Text
    -- | Suffix to add to all metric names. This is particularly
    -- useful for sending per host stats by settings this value to:
    -- @takeWhile (/= \'.\') \<$\> getHostName@, using @getHostName@
    -- from the @Network.BSD@ module in the network package.
  , suffix :: !T.Text
  }

--------------------------------------------------------------------------------
-- | Defaults
--
-- * @flushInterval@ = @1000@
--
-- * Empty 'prefix' and 'suffix'.
defaultInfluxdbOptions :: Influxdb.WriteParams -> InfluxdbOptions
defaultInfluxdbOptions params = InfluxdbOptions
    { writeParams   = params
    , flushInterval = 1000
    , prefix        = ""
    , suffix        = ""
    }

--------------------------------------------------------------------------------
-- | Create a thread that periodically flushes the metrics in 'EKG.Store' to
-- Influxdb. If the thread flushing statistics throws an exception (for example,
-- the network connection is lost), this exception will be thrown up to the thread
-- that called 'forkInfluxdb'. For more control, see 'forkInfluxdbRestart'.
forkInfluxdb :: InfluxdbOptions -> EKG.Store -> IO ThreadId
forkInfluxdb opts store =
  do parent <- myThreadId
     forkInfluxdbRestart opts
                         store
                         (\e _ -> throwTo parent e)

--------------------------------------------------------------------------------
-- | Create a thread that periodically flushes the metrics in 'EKG.Store' to
-- Influxdb. If the thread flushing statistics throws an exception (for example,
-- the network connection is lost), the callback function will be invoked with the
-- exception that was thrown, and an 'IO' computation to restart the handler.
--
-- For example, you can use 'forkInfluxdbRestart' to log failures and restart
-- logging:
--
-- > forkInfluxdbRestart defaultInfluxdbOptions
-- >                     store
-- >                     (\ex restart -> do hPutStrLn stderr ("ekg-influxdb: " ++ show ex)
-- >                                        restart)
forkInfluxdbRestart :: InfluxdbOptions
                    -> EKG.Store
                    -> (SomeException -> IO () -> IO ())
                    -> IO ThreadId
forkInfluxdbRestart opts store exceptionHandler = forkIO go
  where
    go = do
        terminated <-
          try $ bracket
          (return (writeParams opts))
          (\_ -> return ())
          (loop store opts)
        case terminated of
          Left exception -> exceptionHandler exception go
          Right _ -> go

--------------------------------------------------------------------------------
loop :: EKG.Store -> InfluxdbOptions -> Influxdb.WriteParams -> IO ()
loop store opts params = forever $ do
  start <- getCurrentTime
  sample <- EKG.sampleAll store
  flushSample sample params opts
  end <- getCurrentTime
  let diff :: Int
      diff = truncate (diffUTCTime end start * 1000)
  threadDelay (flushInterval opts * 1000 - diff)

flushSample :: EKG.Sample -> Influxdb.WriteParams -> InfluxdbOptions -> IO ()
flushSample sample params opts = do
  t <- getCurrentTime
  sendMetrics params
    (V.map renamed
      (HashMap.foldlWithKey' (\ms k v -> metrics k v t <> ms)
        V.empty
        sample))

  where
  renamed (Metric n v t) =
    let p = if T.null (prefix opts) then "" else prefix opts <> "."
        s = if T.null (suffix opts) then "" else "." <> suffix opts
    in Metric (p <> n <> s) v t

  metrics n (EKG.Counter i) t = V.singleton (Metric ("counter." <> n) (fromIntegral i) t)
  metrics n (EKG.Gauge i) t = V.singleton (Metric ("gauge." <> n) (fromIntegral i) t)
  metrics _ (EKG.Label {}) _ = V.empty
  metrics n (EKG.Distribution stats) t =
    let f n' v = Metric ("dist." <> n <> "." <> n') v t
    in V.fromList [ f "variance" (Stats.variance stats)
                  , f "count" (fromIntegral $ Stats.count stats)
                  , f "sum" (Stats.sum stats)
                  , f "min" (Stats.min stats)
                  , f "max" (Stats.max stats)
                  ]

data Metric = Metric
  { path  :: !T.Text
  , value :: !Double
  , timestamp :: !UTCTime
  } deriving (Show)

sendMetrics :: Influxdb.WriteParams -> V.Vector Metric -> IO ()
sendMetrics params metrics = V.forM_ metrics $ \metric -> do
  let tags   = M.empty
      values = M.singleton "value" (Influxdb.FieldFloat (value metric))
    in Influxdb.write params $ Influxdb.Line (fromString . T.unpack $ path metric) tags values (Just (timestamp metric))