{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Instrument.Worker
  ( initWorkerCSV,
    initWorkerCSV',
    initWorkerGraphite,
    initWorkerGraphite',
    work,
    initWorker,
    AggProcess (..),

    -- * Configuring agg processes
    AggProcessConfig (..),
    standardQuantiles,
    noQuantiles,
    quantileMap,
    defAggProcessConfig,

    -- * Exported for testing
    expandDims,
  )
where

-------------------------------------------------------------------------------
import Control.Error
import Control.Monad
import Control.Monad.IO.Class
import qualified Data.ByteString.Char8 as B
import Data.CSV.Conduit
import Data.Conduit (runConduit, (.|))
import qualified Data.Conduit.List as CL
import Data.Default
import qualified Data.Map as M
import qualified Data.SafeCopy as SC
import Data.Semigroup as Semigroup
import Data.Serialize
import qualified Data.Set as Set
import qualified Data.Text as T
import qualified Data.Text.IO as T
import qualified Data.Vector.Unboxed as V
import Database.Redis as R hiding (decode)
-------------------------------------------------------------------------------
import Instrument.Client
  ( packetsKey,
    stripTimerPrefix,
    timerMetricName,
  )
import qualified Instrument.Measurement as TM
import Instrument.Types
import Instrument.Utils
import Network.Socket as N
import qualified Statistics.Quantile as Q
import Statistics.Sample
import System.IO
import System.Posix

-------------------------------------------------------------------------------

-------------------------------------------------------------------------------

-- | A CSV backend to store aggregation results in a CSV
initWorkerCSV ::
  ConnectInfo ->
  -- | Target file name
  FilePath ->
  -- | Aggregation period / flush interval in seconds
  Int ->
  AggProcessConfig ->
  IO ()
initWorkerCSV :: ConnectInfo -> FilePath -> Int -> AggProcessConfig -> IO ()
initWorkerCSV ConnectInfo
conn FilePath
fp Int
n AggProcessConfig
cfg =
  FilePath -> ConnectInfo -> Int -> AggProcess -> IO ()
initWorker FilePath
"CSV Worker" ConnectInfo
conn Int
n forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< FilePath -> AggProcessConfig -> IO AggProcess
initWorkerCSV' FilePath
fp AggProcessConfig
cfg

-------------------------------------------------------------------------------

-- | Create an AggProcess that dumps to CSV. Use this to compose with
-- other AggProcesses
initWorkerCSV' ::
  -- | Target file name
  FilePath ->
  AggProcessConfig ->
  IO AggProcess
initWorkerCSV' :: FilePath -> AggProcessConfig -> IO AggProcess
initWorkerCSV' FilePath
fp AggProcessConfig
cfg = do
  !Bool
res <- FilePath -> IO Bool
fileExist FilePath
fp
  !Handle
h <- FilePath -> IOMode -> IO Handle
openFile FilePath
fp IOMode
AppendMode
  Handle -> BufferMode -> IO ()
hSetBuffering Handle
h BufferMode
LineBuffering
  forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
res forall a b. (a -> b) -> a -> b
$
    Handle -> Text -> IO ()
T.hPutStrLn Handle
h forall a b. (a -> b) -> a -> b
$ forall s r. CSV s r => CSVSettings -> r -> s
rowToStr CSVSettings
defCSVSettings forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall k a. Map k a -> [k]
M.keys forall a b. (a -> b) -> a -> b
$ Aggregated -> Map Text Text
aggToCSV forall a. Default a => a
def
  forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Handle -> AggProcessConfig -> AggProcess
putAggregateCSV Handle
h AggProcessConfig
cfg

-------------------------------------------------------------------------------

-- | Initialize a Graphite backend
initWorkerGraphite ::
  -- | Redis connection
  ConnectInfo ->
  -- | Aggregation period / flush interval in seconds
  Int ->
  -- | Graphite host
  HostName ->
  -- | Graphite port
  Int ->
  AggProcessConfig ->
  IO ()
initWorkerGraphite :: ConnectInfo -> Int -> FilePath -> Int -> AggProcessConfig -> IO ()
initWorkerGraphite ConnectInfo
conn Int
n FilePath
server Int
port AggProcessConfig
cfg =
  FilePath -> ConnectInfo -> Int -> AggProcess -> IO ()
initWorker FilePath
"Graphite Worker" ConnectInfo
conn Int
n forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< FilePath -> Int -> AggProcessConfig -> IO AggProcess
initWorkerGraphite' FilePath
server Int
port AggProcessConfig
cfg

-------------------------------------------------------------------------------

-- | Crete an AggProcess that dumps to graphite. Use this to compose
-- with other AggProcesses
initWorkerGraphite' ::
  -- | Graphite host
  HostName ->
  -- | Graphite port
  Int ->
  AggProcessConfig ->
  IO AggProcess
initWorkerGraphite' :: FilePath -> Int -> AggProcessConfig -> IO AggProcess
initWorkerGraphite' FilePath
server Int
port AggProcessConfig
cfg = do
  AddrInfo
addr <- FilePath -> PortNumber -> IO AddrInfo
resolve FilePath
server (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
port)
  Socket
sock <- AddrInfo -> IO Socket
open AddrInfo
addr
  Handle
h <- Socket -> IOMode -> IO Handle
N.socketToHandle Socket
sock IOMode
ReadWriteMode
  Handle -> BufferMode -> IO ()
hSetBuffering Handle
h BufferMode
LineBuffering
  forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Handle -> AggProcessConfig -> AggProcess
putAggregateGraphite Handle
h AggProcessConfig
cfg
  where
    portNumberToServiceName :: N.PortNumber -> N.ServiceName
    portNumberToServiceName :: PortNumber -> FilePath
portNumberToServiceName = forall a. Show a => a -> FilePath
show
    resolve :: FilePath -> PortNumber -> IO AddrInfo
resolve FilePath
host PortNumber
portNumber = do
      let hints :: AddrInfo
hints = AddrInfo
N.defaultHints {addrSocketType :: SocketType
N.addrSocketType = SocketType
N.Stream}
      AddrInfo
addr : [AddrInfo]
_ <-
        Maybe AddrInfo -> Maybe FilePath -> Maybe FilePath -> IO [AddrInfo]
N.getAddrInfo
          (forall a. a -> Maybe a
Just AddrInfo
hints)
          (forall a. a -> Maybe a
Just FilePath
host)
          (forall a. a -> Maybe a
Just (PortNumber -> FilePath
portNumberToServiceName PortNumber
portNumber))
      forall (m :: * -> *) a. Monad m => a -> m a
return AddrInfo
addr
    open :: AddrInfo -> IO Socket
open AddrInfo
addr = do
      Socket
sock <-
        Family -> SocketType -> ProtocolNumber -> IO Socket
N.socket
          (AddrInfo -> Family
N.addrFamily AddrInfo
addr)
          (AddrInfo -> SocketType
N.addrSocketType AddrInfo
addr)
          (AddrInfo -> ProtocolNumber
N.addrProtocol AddrInfo
addr)
      Socket -> SockAddr -> IO ()
N.connect Socket
sock (AddrInfo -> SockAddr
N.addrAddress AddrInfo
addr)
      forall (m :: * -> *) a. Monad m => a -> m a
return Socket
sock

-------------------------------------------------------------------------------

-- | Generic utility for making worker backends. Will retry
-- indefinitely with exponential backoff.
initWorker :: String -> ConnectInfo -> Int -> AggProcess -> IO ()
initWorker :: FilePath -> ConnectInfo -> Int -> AggProcess -> IO ()
initWorker FilePath
wname ConnectInfo
conn Int
n AggProcess
f = do
  Connection
p <- ConnectInfo -> IO Connection
createInstrumentPool ConnectInfo
conn
  IO () -> IO ()
indefinitely' forall a b. (a -> b) -> a -> b
$ Connection -> Int -> AggProcess -> IO ()
work Connection
p Int
n AggProcess
f
  where
    indefinitely' :: IO () -> IO ()
indefinitely' = FilePath -> Int -> IO () -> IO ()
indefinitely FilePath
wname (Int -> Int
seconds Int
n)

-------------------------------------------------------------------------------

-- | Extract statistics out of the given sample for this flush period
mkStats :: Set.Set Quantile -> Sample -> Stats
mkStats :: Set Quantile -> Sample -> Stats
mkStats Set Quantile
qs Sample
s =
  Stats
    { smean :: Double
smean = forall (v :: * -> *). Vector v Double => v Double -> Double
mean Sample
s,
      ssum :: Double
ssum = forall a. (Unbox a, Num a) => Vector a -> a
V.sum Sample
s,
      scount :: Int
scount = forall a. Unbox a => Vector a -> Int
V.length Sample
s,
      smax :: Double
smax = forall a. (Unbox a, Ord a) => Vector a -> a
V.maximum Sample
s,
      smin :: Double
smin = forall a. (Unbox a, Ord a) => Vector a -> a
V.minimum Sample
s,
      srange :: Double
srange = forall (v :: * -> *). Vector v Double => v Double -> Double
range Sample
s,
      sstdev :: Double
sstdev = forall (v :: * -> *). Vector v Double => v Double -> Double
stdDev Sample
s,
      sskewness :: Double
sskewness = forall (v :: * -> *). Vector v Double => v Double -> Double
skewness Sample
s,
      skurtosis :: Double
skurtosis = forall (v :: * -> *). Vector v Double => v Double -> Double
kurtosis Sample
s,
      squantiles :: Map Int Double
squantiles = Map Int Double
quantiles
    }
  where
    quantiles :: Map Int Double
quantiles = forall k a. Ord k => [(k, a)] -> Map k a
M.fromList (Int -> Int -> (Int, Double)
mkQ Int
100 forall b c a. (b -> c) -> (a -> b) -> a -> c
. Quantile -> Int
quantile forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Set a -> [a]
Set.toList Set Quantile
qs)
    mkQ :: Int -> Int -> (Int, Double)
mkQ Int
mx Int
i = (Int
i, forall (v :: * -> *).
Vector v Double =>
Int -> Int -> v Double -> Double
Q.weightedAvg Int
i Int
mx Sample
s)

-------------------------------------------------------------------------------

-- | Go over all pending stats buffers in redis.
work :: R.Connection -> Int -> AggProcess -> IO ()
work :: Connection -> Int -> AggProcess -> IO ()
work Connection
r Int
n AggProcess
f = forall a. Connection -> Redis a -> IO a
runRedis Connection
r forall a b. (a -> b) -> a -> b
$ do
  forall (m :: * -> *). Monad m => FilePath -> m ()
dbg FilePath
"entered work block"
  Integer
estimate <- forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (forall a b. a -> b -> a
const Integer
0) forall a. a -> a
id forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> m (f Integer)
scard ByteString
packetsKey
  forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit forall a b. (a -> b) -> a -> b
$
    forall (m :: * -> *) b a i.
Monad m =>
(b -> m (Maybe (a, b))) -> b -> ConduitT i a m ()
CL.unfoldM forall {b} {m :: * -> *} {a}.
(Ord b, Num b, RedisCtx m (Either a)) =>
b -> m (Maybe (ByteString, b))
nextKey Integer
estimate
      forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| forall (m :: * -> *) a o.
Monad m =>
(a -> m ()) -> ConduitT a o m ()
CL.mapM_ (Int -> AggProcess -> ByteString -> Redis ()
processSampler Int
n AggProcess
f)
  where
    nextKey :: b -> m (Maybe (ByteString, b))
nextKey b
estRemaining
      | b
estRemaining forall a. Ord a => a -> a -> Bool
> b
0 = do
        Either a (Maybe ByteString)
mk <- forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> m (f (Maybe ByteString))
spop ByteString
packetsKey
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Either a (Maybe ByteString)
mk of
          Right (Just ByteString
k) -> forall a. a -> Maybe a
Just (ByteString
k, b
estRemaining forall a. Num a => a -> a -> a
- b
1)
          Either a (Maybe ByteString)
_ -> forall a. Maybe a
Nothing
      | Bool
otherwise = forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing

-------------------------------------------------------------------------------
processSampler ::
  -- | Flush interval - determines resolution
  Int ->
  -- | What to do with aggregation results
  AggProcess ->
  -- | Redis buffer for this metric
  B.ByteString ->
  Redis ()
processSampler :: Int -> AggProcess -> ByteString -> Redis ()
processSampler Int
n (AggProcess AggProcessConfig
cfg Aggregated -> Redis ()
f) ByteString
k = do
  [SubmissionPacket]
packets <- forall a. (Serialize a, SafeCopy a) => ByteString -> Redis [a]
popLAll ByteString
k
  case [SubmissionPacket]
packets of
    [] -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
    [SubmissionPacket]
_ -> do
      let nm :: MetricName
nm = SubmissionPacket -> MetricName
spName forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. [a] -> a
head forall a b. (a -> b) -> a -> b
$ [SubmissionPacket]
packets
          -- with and without timer prefix
          qs :: Set Quantile
qs = MetricName -> Set Quantile
quantilesFn (MetricName -> MetricName
stripTimerPrefix MetricName
nm) forall a. Semigroup a => a -> a -> a
<> MetricName -> Set Quantile
quantilesFn (MetricName -> MetricName
timerMetricName MetricName
nm)
          byDims :: M.Map Dimensions [SubmissionPacket]
          byDims :: Map Dimensions [SubmissionPacket]
byDims = forall b a c. Ord b => [a] -> (a -> b) -> (a -> c) -> Map b [c]
collect [SubmissionPacket]
packets SubmissionPacket -> Dimensions
spDimensions forall a. a -> a
id
          mkAgg :: [SubmissionPacket] -> AggPayload
mkAgg [SubmissionPacket]
xs =
            case SubmissionPacket -> Payload
spPayload forall a b. (a -> b) -> a -> b
$ forall a. [a] -> a
head [SubmissionPacket]
xs of
              Samples [Double]
_ ->
                Stats -> AggPayload
AggStats forall b c a. (b -> c) -> (a -> b) -> a -> c
. Set Quantile -> Sample -> Stats
mkStats Set Quantile
qs forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Unbox a => [a] -> Vector a
V.fromList
                  forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (Payload -> [Double]
unSamples forall b c a. (b -> c) -> (a -> b) -> a -> c
. SubmissionPacket -> Payload
spPayload)
                  forall a b. (a -> b) -> a -> b
$ [SubmissionPacket]
xs
              Counter Integer
_ ->
                Integer -> AggPayload
AggCount forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum
                  forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a -> b) -> [a] -> [b]
map (Payload -> Integer
unCounter forall b c a. (b -> c) -> (a -> b) -> a -> c
. SubmissionPacket -> Payload
spPayload)
                  forall a b. (a -> b) -> a -> b
$ [SubmissionPacket]
xs
      Double
t <- (forall a b. (Integral a, Num b) => a -> b
fromIntegral forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall a. Num a => a -> a -> a
* Int
n) forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall a. Integral a => a -> a -> a
`div` Int
n) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (RealFrac a, Integral b) => a -> b
round) forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
`liftM` forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO Double
TM.getTime
      let aggs :: [Aggregated]
aggs = forall a b. (a -> b) -> [a] -> [b]
map (Dimensions, [SubmissionPacket]) -> Aggregated
mkDimsAgg forall a b. (a -> b) -> a -> b
$ forall k a. Map k a -> [(k, a)]
M.toList forall a b. (a -> b) -> a -> b
$ forall packets.
(Monoid packets, Eq packets) =>
Map Dimensions packets -> Map Dimensions packets
expandDims forall a b. (a -> b) -> a -> b
$ Map Dimensions [SubmissionPacket]
byDims
          mkDimsAgg :: (Dimensions, [SubmissionPacket]) -> Aggregated
mkDimsAgg (Dimensions
dims, [SubmissionPacket]
ps) = Double -> MetricName -> AggPayload -> Dimensions -> Aggregated
Aggregated Double
t MetricName
nm ([SubmissionPacket] -> AggPayload
mkAgg [SubmissionPacket]
ps) Dimensions
dims
      forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Aggregated -> Redis ()
f [Aggregated]
aggs
      forall (m :: * -> *) a. Monad m => a -> m a
return ()
  where
    quantilesFn :: MetricName -> Set Quantile
quantilesFn = AggProcessConfig -> MetricName -> Set Quantile
metricQuantiles AggProcessConfig
cfg

-------------------------------------------------------------------------------

-- | Take a map of packets by dimensions and *add* aggregations of the
-- existing dims that isolate each distinct dimension/dimensionvalue
-- pair + one more entry with an empty dimension set that aggregates
-- the whole thing.
-- worked example:
--
-- Given:
-- { {d1=>d1v1,d2=>d2v1} => p1
-- , {d1=>d1v1,d2=>d2v2} => p2
-- }
-- Produces:
-- { {d1=>d1v1,d2=>d2v1} => p1
-- , {d1=>d1v1,d2=>d2v2} => p2
-- , {d1=>d1v1} => p1 + p2
-- , {d2=>d2v1} => p1
-- , {d2=>d2v2} => p2
-- , {} => p1 + p2
-- }
expandDims ::
  forall packets.
  (Monoid packets, Eq packets) =>
  M.Map Dimensions packets ->
  M.Map Dimensions packets
expandDims :: forall packets.
(Monoid packets, Eq packets) =>
Map Dimensions packets -> Map Dimensions packets
expandDims Map Dimensions packets
m =
  -- left-biased so technically if we have anything occupying the aggregated spots, leave them be
  Map Dimensions packets
m forall a. Semigroup a => a -> a -> a
<> Map Dimensions packets
additions forall a. Semigroup a => a -> a -> a
<> Map Dimensions packets
fullAggregation
  where
    distinctPairs :: Set.Set (DimensionName, DimensionValue)
    distinctPairs :: Set (DimensionName, DimensionValue)
distinctPairs = forall a. Ord a => [a] -> Set a
Set.fromList (forall a. Monoid a => [a] -> a
mconcat (forall k a. Map k a -> [(k, a)]
M.toList forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall k a. Map k a -> [k]
M.keys Map Dimensions packets
m))
    additions :: Map Dimensions packets
additions = forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap (DimensionName, DimensionValue) -> Map Dimensions packets
mkIsolatedMap Set (DimensionName, DimensionValue)
distinctPairs
    mkIsolatedMap :: (DimensionName, DimensionValue) -> M.Map Dimensions packets
    mkIsolatedMap :: (DimensionName, DimensionValue) -> Map Dimensions packets
mkIsolatedMap (DimensionName, DimensionValue)
dPair =
      let matches :: [packets]
matches = forall a b. (a, b) -> b
snd forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. (a -> Bool) -> [a] -> [a]
filter ((forall a. Eq a => a -> a -> Bool
== (DimensionName, DimensionValue)
dPair) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a, b) -> a
fst) [((DimensionName, DimensionValue), packets)]
mFlat
       in if [packets]
matches forall a. Eq a => a -> a -> Bool
== forall a. Monoid a => a
mempty
            then forall a. Monoid a => a
mempty
            else forall k a. k -> a -> Map k a
M.singleton (forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry forall k a. k -> a -> Map k a
M.singleton (DimensionName, DimensionValue)
dPair) (forall a. Monoid a => [a] -> a
mconcat [packets]
matches)
    mFlat :: [((DimensionName, DimensionValue), packets)]
    mFlat :: [((DimensionName, DimensionValue), packets)]
mFlat =
      [ ((DimensionName
dn, DimensionValue
dv), packets
packets)
        | (Dimensions
dimensionsMap, packets
packets) <- forall k a. Map k a -> [(k, a)]
M.toList Map Dimensions packets
m,
          (DimensionName
dn, DimensionValue
dv) <- forall k a. Map k a -> [(k, a)]
M.toList Dimensions
dimensionsMap
      ]
    -- All packets across any combination of dimensions
    fullAggregation :: Map Dimensions packets
fullAggregation = forall k a. k -> a -> Map k a
M.singleton forall a. Monoid a => a
mempty (forall a. Monoid a => [a] -> a
mconcat (forall k a. Map k a -> [a]
M.elems Map Dimensions packets
m))

-- | A function that does something with the aggregation results. Can
-- implement multiple backends simply using this. Note that Semigroup and Monoid instances are provided for defaulting and combining agg processes.
data AggProcess = AggProcess
  { AggProcess -> AggProcessConfig
apConfig :: AggProcessConfig,
    AggProcess -> Aggregated -> Redis ()
apProc :: Aggregated -> Redis ()
  }

instance Semigroup.Semigroup AggProcess where
  (AggProcess AggProcessConfig
cfg1 Aggregated -> Redis ()
prc1) <> :: AggProcess -> AggProcess -> AggProcess
<> (AggProcess AggProcessConfig
cfg2 Aggregated -> Redis ()
prc2) =
    AggProcessConfig -> (Aggregated -> Redis ()) -> AggProcess
AggProcess (AggProcessConfig
cfg1 forall a. Semigroup a => a -> a -> a
<> AggProcessConfig
cfg2) (\Aggregated
agg -> Aggregated -> Redis ()
prc1 Aggregated
agg forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Aggregated -> Redis ()
prc2 Aggregated
agg)

instance Monoid AggProcess where
  mempty :: AggProcess
mempty = AggProcessConfig -> (Aggregated -> Redis ()) -> AggProcess
AggProcess forall a. Monoid a => a
mempty (forall a b. a -> b -> a
const (forall (f :: * -> *) a. Applicative f => a -> f a
pure ()))
  mappend :: AggProcess -> AggProcess -> AggProcess
mappend = forall a. Semigroup a => a -> a -> a
(<>)

-------------------------------------------------------------------------------

-- | General configuration for agg processes. Defaulted with 'def',
-- 'defAggProcessConfig', and 'mempty'. Configurations can be combined
-- with (<>) from Monoid or Semigroup.
data AggProcessConfig = AggProcessConfig
  { -- | What quantiles should we calculate for any given metric, if
    -- any? We offer some common patterns for this in 'quantileMap',
    -- 'standardQuantiles', and 'noQuantiles'.
    AggProcessConfig -> MetricName -> Set Quantile
metricQuantiles :: MetricName -> Set.Set Quantile
  }

instance Semigroup AggProcessConfig where
  AggProcessConfig MetricName -> Set Quantile
f1 <> :: AggProcessConfig -> AggProcessConfig -> AggProcessConfig
<> AggProcessConfig MetricName -> Set Quantile
f2 =
    let f3 :: MetricName -> Set Quantile
f3 = MetricName -> Set Quantile
f1 forall a. Semigroup a => a -> a -> a
<> MetricName -> Set Quantile
f2
     in (MetricName -> Set Quantile) -> AggProcessConfig
AggProcessConfig MetricName -> Set Quantile
f3

instance Monoid AggProcessConfig where
  mempty :: AggProcessConfig
mempty = (MetricName -> Set Quantile) -> AggProcessConfig
AggProcessConfig forall a. Monoid a => a
mempty
  mappend :: AggProcessConfig -> AggProcessConfig -> AggProcessConfig
mappend = forall a. Semigroup a => a -> a -> a
(<>)

-- | Uses 'standardQuantiles'.
defAggProcessConfig :: AggProcessConfig
defAggProcessConfig :: AggProcessConfig
defAggProcessConfig = (MetricName -> Set Quantile) -> AggProcessConfig
AggProcessConfig MetricName -> Set Quantile
standardQuantiles

instance Default AggProcessConfig where
  def :: AggProcessConfig
def = AggProcessConfig
defAggProcessConfig

-- | Regardless of metric, produce no quantiles.
noQuantiles :: MetricName -> Set.Set Quantile
noQuantiles :: MetricName -> Set Quantile
noQuantiles = forall a b. a -> b -> a
const forall a. Monoid a => a
mempty

-- | This is usually a good, comprehensive default. Produces quantiles
-- 10,20,30,40,50,60,70,80,90,99. *Note:* for some backends like
-- cloudwatch, each quantile produces an additional metric, so you
-- should probably consider using something more limited than this.
standardQuantiles :: MetricName -> Set.Set Quantile
standardQuantiles :: MetricName -> Set Quantile
standardQuantiles MetricName
_ =
  forall a. Ord a => [a] -> Set a
Set.fromList [Int -> Quantile
Q Int
10, Int -> Quantile
Q Int
20, Int -> Quantile
Q Int
30, Int -> Quantile
Q Int
40, Int -> Quantile
Q Int
50, Int -> Quantile
Q Int
60, Int -> Quantile
Q Int
70, Int -> Quantile
Q Int
80, Int -> Quantile
Q Int
90, Int -> Quantile
Q Int
99]

-- | If you have a fixed set of metric names, this is often a
-- convenient way to express quantiles-per-metric.
quantileMap ::
  M.Map MetricName (Set.Set Quantile) ->
  -- | What to return on miss
  Set.Set Quantile ->
  (MetricName -> Set.Set Quantile)
quantileMap :: Map MetricName (Set Quantile)
-> Set Quantile -> MetricName -> Set Quantile
quantileMap Map MetricName (Set Quantile)
m Set Quantile
qdef MetricName
mn = forall a. a -> Maybe a -> a
fromMaybe Set Quantile
qdef (forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup MetricName
mn Map MetricName (Set Quantile)
m)

-------------------------------------------------------------------------------

-- | Store aggregation results in a CSV file
putAggregateCSV :: Handle -> AggProcessConfig -> AggProcess
putAggregateCSV :: Handle -> AggProcessConfig -> AggProcess
putAggregateCSV Handle
h AggProcessConfig
cfg = AggProcessConfig -> (Aggregated -> Redis ()) -> AggProcess
AggProcess AggProcessConfig
cfg forall a b. (a -> b) -> a -> b
$ \Aggregated
agg ->
  let d :: Text
d = forall s r. CSV s r => CSVSettings -> r -> s
rowToStr CSVSettings
defCSVSettings forall a b. (a -> b) -> a -> b
$ Aggregated -> Map Text Text
aggToCSV Aggregated
agg
   in forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Handle -> Text -> IO ()
T.hPutStrLn Handle
h Text
d

typePrefix :: AggPayload -> T.Text
typePrefix :: AggPayload -> Text
typePrefix AggStats {} = Text
"samples"
typePrefix AggCount {} = Text
"counts"

-------------------------------------------------------------------------------

-- | Push data into a Graphite database using the plaintext protocol
putAggregateGraphite :: Handle -> AggProcessConfig -> AggProcess
putAggregateGraphite :: Handle -> AggProcessConfig -> AggProcess
putAggregateGraphite Handle
h AggProcessConfig
cfg = AggProcessConfig -> (Aggregated -> Redis ()) -> AggProcess
AggProcess AggProcessConfig
cfg forall a b. (a -> b) -> a -> b
$ \Aggregated
agg ->
  let ([(Text, Text)]
ss, Text
ts) = Aggregated -> ([(Text, Text)], Text)
mkStatsFields Aggregated
agg
      -- Expand dimensions into one datum per dimension pair as the group
      mkLines :: (Text, Text) -> [Text]
mkLines (Text
m, Text
val) = forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
for (forall k a. Map k a -> [(k, a)]
M.toList (Aggregated -> Dimensions
aggDimensions Aggregated
agg)) forall a b. (a -> b) -> a -> b
$ \(DimensionName Text
dimName, DimensionValue Text
dimVal) ->
        [Text] -> Text
T.concat
          [ Text
"inst.",
            AggPayload -> Text
typePrefix (Aggregated -> AggPayload
aggPayload Aggregated
agg),
            Text
".",
            FilePath -> Text
T.pack (MetricName -> FilePath
metricName (Aggregated -> MetricName
aggName Aggregated
agg)),
            Text
".",
            Text
m,
            Text
".",
            Text
dimName,
            Text
".",
            Text
dimVal,
            Text
" ",
            Text
val,
            Text
" ",
            Text
ts
          ]
   in forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Handle -> Text -> IO ()
T.hPutStrLn Handle
h) forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Text, Text) -> [Text]
mkLines) [(Text, Text)]
ss

-------------------------------------------------------------------------------

-- | Pop all keys in a redis List
popLAll :: (Serialize a, SC.SafeCopy a) => B.ByteString -> Redis [a]
popLAll :: forall a. (Serialize a, SafeCopy a) => ByteString -> Redis [a]
popLAll ByteString
k = do
  [a]
res <- forall a.
(Serialize a, SafeCopy a) =>
ByteString -> Int -> Redis [a]
popLMany ByteString
k Int
100
  case [a]
res of
    [] -> forall (m :: * -> *) a. Monad m => a -> m a
return [a]
res
    [a]
_ -> ([a]
res forall a. [a] -> [a] -> [a]
++) forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
`liftM` forall a. (Serialize a, SafeCopy a) => ByteString -> Redis [a]
popLAll ByteString
k

-------------------------------------------------------------------------------

-- | Pop up to N items from a queue. It will pop from left and preserve order.
popLMany :: (Serialize a, SC.SafeCopy a) => B.ByteString -> Int -> Redis [a]
popLMany :: forall a.
(Serialize a, SafeCopy a) =>
ByteString -> Int -> Redis [a]
popLMany ByteString
k Int
n = do
  [Either Reply (Maybe ByteString)]
res <- forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
n Redis (Either Reply (Maybe ByteString))
pop
  case forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
sequence [Either Reply (Maybe ByteString)]
res of
    Left Reply
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return []
    Right [Maybe ByteString]
xs -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe forall {b}. (SafeCopy b, Serialize b) => ByteString -> Maybe b
conv forall a b. (a -> b) -> a -> b
$ forall a. [Maybe a] -> [a]
catMaybes [Maybe ByteString]
xs
  where
    pop :: Redis (Either Reply (Maybe ByteString))
pop = forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> m (f (Maybe ByteString))
R.lpop ByteString
k
    conv :: ByteString -> Maybe b
conv ByteString
x = forall a b. Either a b -> Maybe b
hush forall a b. (a -> b) -> a -> b
$ forall a.
(SafeCopy a, Serialize a) =>
ByteString -> Either FilePath a
decodeCompress ByteString
x

-------------------------------------------------------------------------------

-- | Need to pull in a debugging library here.
dbg :: (Monad m) => String -> m ()
dbg :: forall (m :: * -> *). Monad m => FilePath -> m ()
dbg FilePath
_ = forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- ------------------------------------------------------------------------------
-- dbg :: (MonadIO m) => String -> m ()
-- dbg s = debug $ "Instrument.Worker: " ++ s

-------------------------------------------------------------------------------

-- | Expand count aggregation to have the full columns
aggToCSV :: Aggregated -> M.Map T.Text T.Text
aggToCSV :: Aggregated -> Map Text Text
aggToCSV agg :: Aggregated
agg@Aggregated {Double
Dimensions
MetricName
AggPayload
aggTS :: Aggregated -> Double
aggDimensions :: Dimensions
aggPayload :: AggPayload
aggName :: MetricName
aggTS :: Double
aggName :: Aggregated -> MetricName
aggPayload :: Aggregated -> AggPayload
aggDimensions :: Aggregated -> Dimensions
..} = Map Text Text
els forall a. Semigroup a => a -> a -> a
<> Map Text Text
defFields forall a. Semigroup a => a -> a -> a
<> Map Text Text
dimFields
  where
    els :: MapRow T.Text
    els :: Map Text Text
els =
      forall k a. Ord k => [(k, a)] -> Map k a
M.fromList forall a b. (a -> b) -> a -> b
$
        (Text
"metric", FilePath -> Text
T.pack (MetricName -> FilePath
metricName MetricName
aggName)) forall a. a -> [a] -> [a]
:
        (Text
"timestamp", Text
ts) forall a. a -> [a] -> [a]
:
        [(Text, Text)]
fields
    ([(Text, Text)]
fields, Text
ts) = Aggregated -> ([(Text, Text)], Text)
mkStatsFields Aggregated
agg
    defFields :: Map Text Text
defFields = forall k a. Ord k => [(k, a)] -> Map k a
M.fromList forall a b. (a -> b) -> a -> b
$ forall a b. (a, b) -> a
fst forall a b. (a -> b) -> a -> b
$ Aggregated -> ([(Text, Text)], Text)
mkStatsFields forall a b. (a -> b) -> a -> b
$ Aggregated
agg {aggPayload :: AggPayload
aggPayload = (Stats -> AggPayload
AggStats forall a. Default a => a
def)}
    dimFields :: Map Text Text
dimFields = forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [(Text
k, Text
v) | (DimensionName Text
k, DimensionValue Text
v) <- forall k a. Map k a -> [(k, a)]
M.toList Dimensions
aggDimensions]

-------------------------------------------------------------------------------

-- | Get agg results into a form ready to be output
mkStatsFields :: Aggregated -> ([(T.Text, T.Text)], T.Text)
mkStatsFields :: Aggregated -> ([(Text, Text)], Text)
mkStatsFields Aggregated {Double
Dimensions
MetricName
AggPayload
aggDimensions :: Dimensions
aggPayload :: AggPayload
aggName :: MetricName
aggTS :: Double
aggTS :: Aggregated -> Double
aggName :: Aggregated -> MetricName
aggPayload :: Aggregated -> AggPayload
aggDimensions :: Aggregated -> Dimensions
..} = ([(Text, Text)]
els, Text
ts)
  where
    els :: [(Text, Text)]
els =
      case AggPayload
aggPayload of
        AggStats Stats {Double
Int
Map Int Double
squantiles :: Map Int Double
skurtosis :: Double
sskewness :: Double
sstdev :: Double
srange :: Double
smin :: Double
smax :: Double
scount :: Int
ssum :: Double
smean :: Double
squantiles :: Stats -> Map Int Double
skurtosis :: Stats -> Double
sskewness :: Stats -> Double
sstdev :: Stats -> Double
srange :: Stats -> Double
smin :: Stats -> Double
smax :: Stats -> Double
scount :: Stats -> Int
ssum :: Stats -> Double
smean :: Stats -> Double
..} ->
          [ (Text
"mean", forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
smean),
            (Text
"count", forall a. Show a => a -> Text
showT Int
scount),
            (Text
"max", forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
smax),
            (Text
"min", forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
smin),
            (Text
"srange", forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
srange),
            (Text
"stdDev", forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
sstdev),
            (Text
"sum", forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
ssum),
            (Text
"skewness", forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
sskewness),
            (Text
"kurtosis", forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
skurtosis)
          ]
            forall a. [a] -> [a] -> [a]
++ (forall a b. (a -> b) -> [a] -> [b]
map forall {a} {a}. (Show a, RealFloat a) => (a, a) -> (Text, Text)
mkQ forall a b. (a -> b) -> a -> b
$ forall k a. Map k a -> [(k, a)]
M.toList Map Int Double
squantiles)
        AggCount Integer
i ->
          [(Text
"count", forall a. Show a => a -> Text
showT Integer
i)]

    mkQ :: (a, a) -> (Text, Text)
mkQ (a
k, a
v) = ([Text] -> Text
T.concat [Text
"percentile_", forall a. Show a => a -> Text
showT a
k], forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False a
v)
    ts :: Text
ts = forall a. RealFrac a => a -> Text
formatInt Double
aggTS