{-# LANGUAGE CPP                 #-}
{-# LANGUAGE OverloadedStrings   #-}
{-# LANGUAGE RecordWildCards     #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell     #-}
{-# LANGUAGE TupleSections       #-}
-- | This module lets you periodically flush metrics to elasticsearch. Example
-- usage:
--
-- > main = do
-- >     store <- newStore
-- >     forkElasticSearch defaultESOptions store
--
-- You probably want to include some of the predefined metrics defined
-- in the ekg-core package, by calling e.g. the 'registerGcStats'
-- function defined in that package.
module System.Remote.Monitoring.ElasticSearch
    (
      -- * The elasticsearch syncer
      ElasticSearch
    , elasticSearchThreadId
    , forkElasticSearch
      -- * ElasticSearch options
    , ESOptions(..)
    , defaultESOptions
    ) where

import           Control.Concurrent    (ThreadId, forkIO, threadDelay)
import           Control.Exception     (catch)
import           Control.Lens
import           Control.Monad         (forever, void)
import qualified Data.HashMap.Strict   as M
import           Data.Int              (Int64)
import           Data.Monoid           ((<>))
import           Data.Text             (Text)
import qualified Data.Text             as T
import           Data.Text.Lens
import           Data.Time.Clock       (getCurrentTime)
import           Data.Time.Clock.POSIX (getPOSIXTime)
import           Data.Time.Format      (defaultTimeLocale, formatTime)
import           Network.HostName      (getHostName)
import           Network.HTTP.Client   (HttpException)
import           Network.Wreq          as Wreq
import qualified System.Metrics        as Metrics

import           System.Metrics.Json

--------------------------------------------------------------------------------
-- | A handle that can be used to control the elasticsearch sync thread.
-- Created by 'forkElasticSearch'.
newtype ElasticSearch = ElasticSearch { threadId :: ThreadId }

-- | The thread ID of the elasticsearch sync thread. You can stop the sync by
-- killing this thread (i.e. by throwing it an asynchronous
-- exception.)
elasticSearchThreadId :: ElasticSearch -> ThreadId
elasticSearchThreadId = threadId

--------------------------------------------------------------------------------
-- | Options to control how to connect to the elasticsearch server and how
-- often to flush metrics. The flush interval should be shorter than
-- the flush interval elasticsearch itself uses to flush data to its
-- backends.
data ESOptions = ESOptions
    { -- | Server hostname or IP address
      _host          :: !Text

      -- | Server port
    , _port          :: !Int

      -- | Error handler
    , _onError       :: !(HttpException -> IO ())

      -- | The elasticsearch index to insert into
    , _indexBase     :: !Text

      -- | Append "-YYYY.MM.DD" onto index?
    , _indexByDate   :: !Bool

      -- | What to put in the @beat.name@ field
    , _beatName      :: !Text

      -- | Data push interval, in ms.
    , _flushInterval :: !Int

      -- | Print debug output to stderr.
    , _debug         :: !Bool

      -- | Prefix to add to all metric names.
    , _prefix        :: !Text

      -- | Suffix to add to all metric names. This is particularly
      -- useful for sending per host stats by settings this value to:
      -- @takeWhile (/= \'.\') \<$\> getHostName@, using @getHostName@
      -- from the @Network.BSD@ module in the network package.
    , _suffix        :: !Text
      -- | Extra tags to add to events
    , _tags          :: ![Text]
    }

makeClassy ''ESOptions

-- | Defaults:
--
-- * @host@ = @\"127.0.0.1\"@
--
-- * @port@ = @8125@
--
-- * @onException@ = @print@
--
-- * @indexBase@ = @metricbeats@
--
-- * @indexByDate@ = @True@
--
-- * @beatName@ = @\"ekg\"@
--
-- * @flushInterval@ = @1000@
--
-- * @debug@ = @False@
defaultESOptions :: ESOptions
defaultESOptions = ESOptions
    { _host          = "127.0.0.1"
    , _port          = 9200
    , _onError       = print
    , _indexBase     = "metricbeat"
    , _indexByDate   = True
    , _beatName      = "ekg"
    , _flushInterval = 1000
    , _debug         = False
    , _prefix        = ""
    , _suffix        = ""
    , _tags          = []
    }

--------------------------------------------------------------------------------
-- | Create a thread that flushes the metrics in the store to elasticsearch.
forkElasticSearch :: ESOptions -- ^ Options
           -> Metrics.Store    -- ^ Metric store
           -> IO ElasticSearch -- ^ ElasticSearch sync handle
forkElasticSearch opts store = ElasticSearch <$> forkIO (loop store opts)

loop :: Metrics.Store   -- ^ Metric store
     -> ESOptions   -- ^ Options
     -> IO ()
loop store opts = forever $ do
    start <- time
    flushSample store opts
    end <- time
    threadDelay ((opts ^. flushInterval) * 1000 - fromIntegral (end - start))
    loop store opts

-- | Microseconds since epoch.
time :: IO Int64
time = (round . (* 1000000.0) . toDouble) `fmap` getPOSIXTime
  where toDouble = realToFrac :: Real a => a -> Double


--------------------------------------------------------------------------------
-- | Construct the correct URL to send metrics too from '_host' and '_port'
elasticURL :: ESOptions -> String
elasticURL eo = "http://" ++ (eo ^. host.unpacked) ++ ":" ++ show (eo ^. port) ++ "/_bulk"

-- | Construct the index to send to
--
-- if '_indexByDate' is @True@ this will be '_indexBase'-YYYY.MM.DD otherwise it
-- will just be '_indexBase'
mkIndex :: ESOptions -> IO CreateBulk
mkIndex eo =
  CreateBulk <$> if eo ^. indexByDate
    then appendDate (eo ^. indexBase)
    else return (eo ^. indexBase)
  where
    appendDate base = do
      day <- formatTime defaultTimeLocale "%Y.%m.%d" <$> getCurrentTime
      return $ base <> "-" <> (day ^. packed)

--------------------------------------------------------------------------------
-- | Generate a 'BeatEvent' for each metric in the 'Metrics.Store'
sampleBeatEvents :: Metrics.Store -> ESOptions -> IO [BeatEvent]
sampleBeatEvents store eo = do
  now <- getPOSIXTime
  sample <- Metrics.sampleAll store
  hostName <- T.pack <$> getHostName
  finish <- getPOSIXTime
  let took = floor $ (finish - now) * 1000
      theBeat = Beat hostName (eo ^. beatName) "0.1"
      mkBeatEvt evts k v = BeatEvent theBeat now(eo ^. tags)  took (M.singleton k v) : evts
  return $ M.foldlWithKey' mkBeatEvt [] sample

--------------------------------------------------------------------------------
-- | Create a 'BulkRequest' and send it elasticsearch
flushSample :: Metrics.Store -> ESOptions -> IO ()
flushSample store eo = do
  createBulk <- mkIndex eo
  bulkEvts <- sampleBeatEvents store eo
  (void $ Wreq.post (elasticURL eo) $ BulkRequest $ (createBulk, ) <$> bulkEvts) `catch`
    (_onError eo)