{-# LANGUAGE BangPatterns #-} {-# LANGUAGE CPP #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} -- | This library lets you push metric samples to a broadcast channel. -- Consumers can then persist the metrics samples as they wish. -- ekg-push is based heavily off of the ekg-statsd package which -- can be found at: https://github.com/tibbe/ekg-statsd -- -- Example usage: -- -- > main = do -- > store <- newStore -- > push <- forkPush defaultPushOptions store -- > ch <- subscribe push -- > sample <- consume ch -- > putStrLn $ show sample -- -- You probably want to include some of the predefined metrics defined -- in the ekg-core package, by calling e.g. the 'registerGcStats' -- function defined in that package. module System.Remote.Monitoring.Push ( Push , PushChan , PushOptions(..) , pushThreadId , forkPush , defaultPushOptions , subscribe , consume ) where import Control.Concurrent (ThreadId, myThreadId, threadDelay, throwTo) import Control.Concurrent.Chan (Chan, newChan, writeChan, readChan, dupChan) import qualified Data.HashMap.Strict as M import Data.Int (Int64) import qualified Data.Text as T import Data.Time.Clock.POSIX (getPOSIXTime) import qualified System.Metrics as Metrics #if __GLASGOW_HASKELL__ >= 706 import Control.Concurrent (forkFinally) #else import Control.Concurrent (forkIO) import Control.Exception (SomeException, mask, try) import Prelude hiding (catch) #endif -- | A handle that can be used to control the push sync thread. -- Created by 'forkPush'. data Push = Push { threadId :: {-# UNPACK #-} !ThreadId , mainCh :: Chan (Metrics.Sample) } -- | A new PushChan is created on every call to subscribe. -- This is essentially a dupChan of our main channel (mainCh). data PushChan = PushChan { ch :: PushChanType } type PushChanType = Chan Metrics.Sample -- | The thread ID of the push sync thread. You can stop the sync by -- killing this thread (i.e. by throwing it an asynchronous -- exception.) pushThreadId :: Push -> ThreadId pushThreadId = threadId -- | Options to control how to connect to the push server and how -- often to flush metrics. The flush interval should be shorter than -- the flush interval push itself uses to flush data to its -- backends. data PushOptions = PushOptions { -- | Data push interval, in ms. flushInterval :: !Int -- | Print debug output to stderr. , debug :: !Bool -- | Prefix to add to all metric names. , prefix :: !T.Text -- | Suffix to add to all metric names. This is particularly -- useful for sending per host stats by settings this value to: -- @takeWhile (/= \'.\') \<$\> getHostName@, using @getHostName@ -- from the @Network.BSD@ module in the network package. , suffix :: !T.Text } -- | Defaults: -- -- * @flushInterval@ = @1000@ -- -- * @debug@ = @False@ defaultPushOptions :: PushOptions defaultPushOptions = PushOptions { flushInterval = 1000 , debug = False , prefix = "" , suffix = "" } -- | Create a thread that periodically flushes the metrics in the -- store to push. forkPush :: PushOptions -- ^ Options -> Metrics.Store -- ^ Metric store -> IO Push -- ^ Push sync handle forkPush opts store = do me <- myThreadId ch <- newChan tid <- forkFinally (loop ch store emptySample opts) $ \ r -> do case r of Left e -> throwTo me e Right _ -> return () return $ Push tid ch where emptySample = M.empty loop :: PushChanType -- ^ ekg-push clients subscribe to this channel -> Metrics.Store -- ^ Metric Store -> Metrics.Sample -- ^ Last sampled metrics -> PushOptions -- ^ Options -> IO () loop ch store lastSample opts = do start <- time sample <- Metrics.sampleAll store let !diff = diffSamples opts lastSample sample writeChan ch diff -- Write the Metrics.Sample to our broadcast channel. end <- time threadDelay (flushInterval opts * 1000 - fromIntegral (end - start)) loop ch store sample opts -- | Subscribe to the push broadcast channel. subscribe :: Push -> IO PushChan subscribe Push{..} = do ch' <- dupChan mainCh return $ PushChan { ch = ch' } -- | Consume a Metrics.Sample message from a subscribed channel. consume :: PushChan -> IO Metrics.Sample consume PushChan{..} = readChan ch -- | Microseconds since epoch. time :: IO Int64 time = (round . (* 1000000.0) . toDouble) `fmap` getPOSIXTime where toDouble = realToFrac :: Real a => a -> Double diffSamples :: PushOptions -> Metrics.Sample -> Metrics.Sample -> Metrics.Sample diffSamples opts prev curr = M.foldlWithKey' combine M.empty curr where combine m name new = case M.lookup name prev of Just old -> case diffMetric old new of Just val -> M.insert name' val m Nothing -> m _ -> M.insert name' new m where name' = T.append (prefix opts) (T.append name (suffix opts)) diffMetric :: Metrics.Value -> Metrics.Value -> Maybe Metrics.Value diffMetric (Metrics.Counter n1) (Metrics.Counter n2) | n1 == n2 = Nothing | otherwise = Just $! Metrics.Counter $ n2 - n1 diffMetric (Metrics.Gauge n1) (Metrics.Gauge n2) | n1 == n2 = Nothing | otherwise = Just $ Metrics.Gauge n2 diffMetric (Metrics.Label n1) (Metrics.Label n2) | n1 == n2 = Nothing | otherwise = Just $ Metrics.Label n2 -- Distributions are assumed to be non-equal. diffMetric _ _ = Nothing ------------------------------------------------------------------------ -- Backwards compatibility shims #if __GLASGOW_HASKELL__ < 706 forkFinally :: IO a -> (Either SomeException a -> IO ()) -> IO ThreadId forkFinally action and_then = mask $ \restore -> forkIO $ try (restore action) >>= and_then #endif