{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Instrument.Worker
( initWorkerCSV,
initWorkerCSV',
initWorkerGraphite,
initWorkerGraphite',
work,
initWorker,
AggProcess (..),
AggProcessConfig (..),
standardQuantiles,
noQuantiles,
quantileMap,
defAggProcessConfig,
expandDims,
)
where
import Control.Error
import Control.Monad
import Control.Monad.IO.Class
import qualified Data.ByteString.Char8 as B
import Data.CSV.Conduit
import Data.Conduit (runConduit, (.|))
import qualified Data.Conduit.List as CL
import Data.Default
import qualified Data.Map as M
import qualified Data.SafeCopy as SC
import Data.Semigroup as Semigroup
import Data.Serialize
import qualified Data.Set as Set
import qualified Data.Text as T
import qualified Data.Text.IO as T
import qualified Data.Vector.Unboxed as V
import Database.Redis as R hiding (decode)
import Instrument.Client
( packetsKey,
stripTimerPrefix,
timerMetricName,
)
import qualified Instrument.Measurement as TM
import Instrument.Types
import Instrument.Utils
import Network.Socket as N
import qualified Statistics.Quantile as Q
import Statistics.Sample
import System.IO
import System.Posix
initWorkerCSV ::
ConnectInfo ->
FilePath ->
Int ->
AggProcessConfig ->
IO ()
initWorkerCSV :: ConnectInfo -> FilePath -> Int -> AggProcessConfig -> IO ()
initWorkerCSV ConnectInfo
conn FilePath
fp Int
n AggProcessConfig
cfg =
FilePath -> ConnectInfo -> Int -> AggProcess -> IO ()
initWorker FilePath
"CSV Worker" ConnectInfo
conn Int
n forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< FilePath -> AggProcessConfig -> IO AggProcess
initWorkerCSV' FilePath
fp AggProcessConfig
cfg
initWorkerCSV' ::
FilePath ->
AggProcessConfig ->
IO AggProcess
initWorkerCSV' :: FilePath -> AggProcessConfig -> IO AggProcess
initWorkerCSV' FilePath
fp AggProcessConfig
cfg = do
!Bool
res <- FilePath -> IO Bool
fileExist FilePath
fp
!Handle
h <- FilePath -> IOMode -> IO Handle
openFile FilePath
fp IOMode
AppendMode
Handle -> BufferMode -> IO ()
hSetBuffering Handle
h BufferMode
LineBuffering
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
res forall a b. (a -> b) -> a -> b
$
Handle -> Text -> IO ()
T.hPutStrLn Handle
h forall a b. (a -> b) -> a -> b
$ forall s r. CSV s r => CSVSettings -> r -> s
rowToStr CSVSettings
defCSVSettings forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall k a. Map k a -> [k]
M.keys forall a b. (a -> b) -> a -> b
$ Aggregated -> Map Text Text
aggToCSV forall a. Default a => a
def
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Handle -> AggProcessConfig -> AggProcess
putAggregateCSV Handle
h AggProcessConfig
cfg
initWorkerGraphite ::
ConnectInfo ->
Int ->
HostName ->
Int ->
AggProcessConfig ->
IO ()
initWorkerGraphite :: ConnectInfo -> Int -> FilePath -> Int -> AggProcessConfig -> IO ()
initWorkerGraphite ConnectInfo
conn Int
n FilePath
server Int
port AggProcessConfig
cfg =
FilePath -> ConnectInfo -> Int -> AggProcess -> IO ()
initWorker FilePath
"Graphite Worker" ConnectInfo
conn Int
n forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< FilePath -> Int -> AggProcessConfig -> IO AggProcess
initWorkerGraphite' FilePath
server Int
port AggProcessConfig
cfg
initWorkerGraphite' ::
HostName ->
Int ->
AggProcessConfig ->
IO AggProcess
initWorkerGraphite' :: FilePath -> Int -> AggProcessConfig -> IO AggProcess
initWorkerGraphite' FilePath
server Int
port AggProcessConfig
cfg = do
AddrInfo
addr <- FilePath -> PortNumber -> IO AddrInfo
resolve FilePath
server (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
port)
Socket
sock <- AddrInfo -> IO Socket
open AddrInfo
addr
Handle
h <- Socket -> IOMode -> IO Handle
N.socketToHandle Socket
sock IOMode
ReadWriteMode
Handle -> BufferMode -> IO ()
hSetBuffering Handle
h BufferMode
LineBuffering
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Handle -> AggProcessConfig -> AggProcess
putAggregateGraphite Handle
h AggProcessConfig
cfg
where
portNumberToServiceName :: N.PortNumber -> N.ServiceName
portNumberToServiceName :: PortNumber -> FilePath
portNumberToServiceName = forall a. Show a => a -> FilePath
show
resolve :: FilePath -> PortNumber -> IO AddrInfo
resolve FilePath
host PortNumber
portNumber = do
let hints :: AddrInfo
hints = AddrInfo
N.defaultHints {addrSocketType :: SocketType
N.addrSocketType = SocketType
N.Stream}
AddrInfo
addr : [AddrInfo]
_ <-
Maybe AddrInfo -> Maybe FilePath -> Maybe FilePath -> IO [AddrInfo]
N.getAddrInfo
(forall a. a -> Maybe a
Just AddrInfo
hints)
(forall a. a -> Maybe a
Just FilePath
host)
(forall a. a -> Maybe a
Just (PortNumber -> FilePath
portNumberToServiceName PortNumber
portNumber))
forall (m :: * -> *) a. Monad m => a -> m a
return AddrInfo
addr
open :: AddrInfo -> IO Socket
open AddrInfo
addr = do
Socket
sock <-
Family -> SocketType -> ProtocolNumber -> IO Socket
N.socket
(AddrInfo -> Family
N.addrFamily AddrInfo
addr)
(AddrInfo -> SocketType
N.addrSocketType AddrInfo
addr)
(AddrInfo -> ProtocolNumber
N.addrProtocol AddrInfo
addr)
Socket -> SockAddr -> IO ()
N.connect Socket
sock (AddrInfo -> SockAddr
N.addrAddress AddrInfo
addr)
forall (m :: * -> *) a. Monad m => a -> m a
return Socket
sock
initWorker :: String -> ConnectInfo -> Int -> AggProcess -> IO ()
initWorker :: FilePath -> ConnectInfo -> Int -> AggProcess -> IO ()
initWorker FilePath
wname ConnectInfo
conn Int
n AggProcess
f = do
Connection
p <- ConnectInfo -> IO Connection
createInstrumentPool ConnectInfo
conn
IO () -> IO ()
indefinitely' forall a b. (a -> b) -> a -> b
$ Connection -> Int -> AggProcess -> IO ()
work Connection
p Int
n AggProcess
f
where
indefinitely' :: IO () -> IO ()
indefinitely' = FilePath -> Int -> IO () -> IO ()
indefinitely FilePath
wname (Int -> Int
seconds Int
n)
mkStats :: Set.Set Quantile -> Sample -> Stats
mkStats :: Set Quantile -> Sample -> Stats
mkStats Set Quantile
qs Sample
s =
Stats
{ smean :: Double
smean = forall (v :: * -> *). Vector v Double => v Double -> Double
mean Sample
s,
ssum :: Double
ssum = forall a. (Unbox a, Num a) => Vector a -> a
V.sum Sample
s,
scount :: Int
scount = forall a. Unbox a => Vector a -> Int
V.length Sample
s,
smax :: Double
smax = forall a. (Unbox a, Ord a) => Vector a -> a
V.maximum Sample
s,
smin :: Double
smin = forall a. (Unbox a, Ord a) => Vector a -> a
V.minimum Sample
s,
srange :: Double
srange = forall (v :: * -> *). Vector v Double => v Double -> Double
range Sample
s,
sstdev :: Double
sstdev = forall (v :: * -> *). Vector v Double => v Double -> Double
stdDev Sample
s,
sskewness :: Double
sskewness = forall (v :: * -> *). Vector v Double => v Double -> Double
skewness Sample
s,
skurtosis :: Double
skurtosis = forall (v :: * -> *). Vector v Double => v Double -> Double
kurtosis Sample
s,
squantiles :: Map Int Double
squantiles = Map Int Double
quantiles
}
where
quantiles :: Map Int Double
quantiles = forall k a. Ord k => [(k, a)] -> Map k a
M.fromList (Int -> Int -> (Int, Double)
mkQ Int
100 forall b c a. (b -> c) -> (a -> b) -> a -> c
. Quantile -> Int
quantile forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Set a -> [a]
Set.toList Set Quantile
qs)
mkQ :: Int -> Int -> (Int, Double)
mkQ Int
mx Int
i = (Int
i, forall (v :: * -> *).
Vector v Double =>
Int -> Int -> v Double -> Double
Q.weightedAvg Int
i Int
mx Sample
s)
work :: R.Connection -> Int -> AggProcess -> IO ()
work :: Connection -> Int -> AggProcess -> IO ()
work Connection
r Int
n AggProcess
f = forall a. Connection -> Redis a -> IO a
runRedis Connection
r forall a b. (a -> b) -> a -> b
$ do
forall (m :: * -> *). Monad m => FilePath -> m ()
dbg FilePath
"entered work block"
Integer
estimate <- forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (forall a b. a -> b -> a
const Integer
0) forall a. a -> a
id forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> m (f Integer)
scard ByteString
packetsKey
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) b a i.
Monad m =>
(b -> m (Maybe (a, b))) -> b -> ConduitT i a m ()
CL.unfoldM forall {b} {m :: * -> *} {a}.
(Ord b, Num b, RedisCtx m (Either a)) =>
b -> m (Maybe (ByteString, b))
nextKey Integer
estimate
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| forall (m :: * -> *) a o.
Monad m =>
(a -> m ()) -> ConduitT a o m ()
CL.mapM_ (Int -> AggProcess -> ByteString -> Redis ()
processSampler Int
n AggProcess
f)
where
nextKey :: b -> m (Maybe (ByteString, b))
nextKey b
estRemaining
| b
estRemaining forall a. Ord a => a -> a -> Bool
> b
0 = do
Either a (Maybe ByteString)
mk <- forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> m (f (Maybe ByteString))
spop ByteString
packetsKey
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Either a (Maybe ByteString)
mk of
Right (Just ByteString
k) -> forall a. a -> Maybe a
Just (ByteString
k, b
estRemaining forall a. Num a => a -> a -> a
- b
1)
Either a (Maybe ByteString)
_ -> forall a. Maybe a
Nothing
| Bool
otherwise = forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
processSampler ::
Int ->
AggProcess ->
B.ByteString ->
Redis ()
processSampler :: Int -> AggProcess -> ByteString -> Redis ()
processSampler Int
n (AggProcess AggProcessConfig
cfg Aggregated -> Redis ()
f) ByteString
k = do
[SubmissionPacket]
packets <- forall a. (Serialize a, SafeCopy a) => ByteString -> Redis [a]
popLAll ByteString
k
case [SubmissionPacket]
packets of
[] -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
[SubmissionPacket]
_ -> do
let nm :: MetricName
nm = SubmissionPacket -> MetricName
spName forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. [a] -> a
head forall a b. (a -> b) -> a -> b
$ [SubmissionPacket]
packets
qs :: Set Quantile
qs = MetricName -> Set Quantile
quantilesFn (MetricName -> MetricName
stripTimerPrefix MetricName
nm) forall a. Semigroup a => a -> a -> a
<> MetricName -> Set Quantile
quantilesFn (MetricName -> MetricName
timerMetricName MetricName
nm)
byDims :: M.Map Dimensions [SubmissionPacket]
byDims :: Map Dimensions [SubmissionPacket]
byDims = forall b a c. Ord b => [a] -> (a -> b) -> (a -> c) -> Map b [c]
collect [SubmissionPacket]
packets SubmissionPacket -> Dimensions
spDimensions forall a. a -> a
id
mkAgg :: [SubmissionPacket] -> AggPayload
mkAgg [SubmissionPacket]
xs =
case SubmissionPacket -> Payload
spPayload forall a b. (a -> b) -> a -> b
$ forall a. [a] -> a
head [SubmissionPacket]
xs of
Samples [Double]
_ ->
Stats -> AggPayload
AggStats forall b c a. (b -> c) -> (a -> b) -> a -> c
. Set Quantile -> Sample -> Stats
mkStats Set Quantile
qs forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Unbox a => [a] -> Vector a
V.fromList
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (Payload -> [Double]
unSamples forall b c a. (b -> c) -> (a -> b) -> a -> c
. SubmissionPacket -> Payload
spPayload)
forall a b. (a -> b) -> a -> b
$ [SubmissionPacket]
xs
Counter Integer
_ ->
Integer -> AggPayload
AggCount forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a -> b) -> [a] -> [b]
map (Payload -> Integer
unCounter forall b c a. (b -> c) -> (a -> b) -> a -> c
. SubmissionPacket -> Payload
spPayload)
forall a b. (a -> b) -> a -> b
$ [SubmissionPacket]
xs
Double
t <- (forall a b. (Integral a, Num b) => a -> b
fromIntegral forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall a. Num a => a -> a -> a
* Int
n) forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall a. Integral a => a -> a -> a
`div` Int
n) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (RealFrac a, Integral b) => a -> b
round) forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
`liftM` forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO Double
TM.getTime
let aggs :: [Aggregated]
aggs = forall a b. (a -> b) -> [a] -> [b]
map (Dimensions, [SubmissionPacket]) -> Aggregated
mkDimsAgg forall a b. (a -> b) -> a -> b
$ forall k a. Map k a -> [(k, a)]
M.toList forall a b. (a -> b) -> a -> b
$ forall packets.
(Monoid packets, Eq packets) =>
Map Dimensions packets -> Map Dimensions packets
expandDims forall a b. (a -> b) -> a -> b
$ Map Dimensions [SubmissionPacket]
byDims
mkDimsAgg :: (Dimensions, [SubmissionPacket]) -> Aggregated
mkDimsAgg (Dimensions
dims, [SubmissionPacket]
ps) = Double -> MetricName -> AggPayload -> Dimensions -> Aggregated
Aggregated Double
t MetricName
nm ([SubmissionPacket] -> AggPayload
mkAgg [SubmissionPacket]
ps) Dimensions
dims
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Aggregated -> Redis ()
f [Aggregated]
aggs
forall (m :: * -> *) a. Monad m => a -> m a
return ()
where
quantilesFn :: MetricName -> Set Quantile
quantilesFn = AggProcessConfig -> MetricName -> Set Quantile
metricQuantiles AggProcessConfig
cfg
expandDims ::
forall packets.
(Monoid packets, Eq packets) =>
M.Map Dimensions packets ->
M.Map Dimensions packets
expandDims :: forall packets.
(Monoid packets, Eq packets) =>
Map Dimensions packets -> Map Dimensions packets
expandDims Map Dimensions packets
m =
Map Dimensions packets
m forall a. Semigroup a => a -> a -> a
<> Map Dimensions packets
additions forall a. Semigroup a => a -> a -> a
<> Map Dimensions packets
fullAggregation
where
distinctPairs :: Set.Set (DimensionName, DimensionValue)
distinctPairs :: Set (DimensionName, DimensionValue)
distinctPairs = forall a. Ord a => [a] -> Set a
Set.fromList (forall a. Monoid a => [a] -> a
mconcat (forall k a. Map k a -> [(k, a)]
M.toList forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall k a. Map k a -> [k]
M.keys Map Dimensions packets
m))
additions :: Map Dimensions packets
additions = forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap (DimensionName, DimensionValue) -> Map Dimensions packets
mkIsolatedMap Set (DimensionName, DimensionValue)
distinctPairs
mkIsolatedMap :: (DimensionName, DimensionValue) -> M.Map Dimensions packets
mkIsolatedMap :: (DimensionName, DimensionValue) -> Map Dimensions packets
mkIsolatedMap (DimensionName, DimensionValue)
dPair =
let matches :: [packets]
matches = forall a b. (a, b) -> b
snd forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. (a -> Bool) -> [a] -> [a]
filter ((forall a. Eq a => a -> a -> Bool
== (DimensionName, DimensionValue)
dPair) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a, b) -> a
fst) [((DimensionName, DimensionValue), packets)]
mFlat
in if [packets]
matches forall a. Eq a => a -> a -> Bool
== forall a. Monoid a => a
mempty
then forall a. Monoid a => a
mempty
else forall k a. k -> a -> Map k a
M.singleton (forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry forall k a. k -> a -> Map k a
M.singleton (DimensionName, DimensionValue)
dPair) (forall a. Monoid a => [a] -> a
mconcat [packets]
matches)
mFlat :: [((DimensionName, DimensionValue), packets)]
mFlat :: [((DimensionName, DimensionValue), packets)]
mFlat =
[ ((DimensionName
dn, DimensionValue
dv), packets
packets)
| (Dimensions
dimensionsMap, packets
packets) <- forall k a. Map k a -> [(k, a)]
M.toList Map Dimensions packets
m,
(DimensionName
dn, DimensionValue
dv) <- forall k a. Map k a -> [(k, a)]
M.toList Dimensions
dimensionsMap
]
fullAggregation :: Map Dimensions packets
fullAggregation = forall k a. k -> a -> Map k a
M.singleton forall a. Monoid a => a
mempty (forall a. Monoid a => [a] -> a
mconcat (forall k a. Map k a -> [a]
M.elems Map Dimensions packets
m))
data AggProcess = AggProcess
{ AggProcess -> AggProcessConfig
apConfig :: AggProcessConfig,
AggProcess -> Aggregated -> Redis ()
apProc :: Aggregated -> Redis ()
}
instance Semigroup.Semigroup AggProcess where
(AggProcess AggProcessConfig
cfg1 Aggregated -> Redis ()
prc1) <> :: AggProcess -> AggProcess -> AggProcess
<> (AggProcess AggProcessConfig
cfg2 Aggregated -> Redis ()
prc2) =
AggProcessConfig -> (Aggregated -> Redis ()) -> AggProcess
AggProcess (AggProcessConfig
cfg1 forall a. Semigroup a => a -> a -> a
<> AggProcessConfig
cfg2) (\Aggregated
agg -> Aggregated -> Redis ()
prc1 Aggregated
agg forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Aggregated -> Redis ()
prc2 Aggregated
agg)
instance Monoid AggProcess where
mempty :: AggProcess
mempty = AggProcessConfig -> (Aggregated -> Redis ()) -> AggProcess
AggProcess forall a. Monoid a => a
mempty (forall a b. a -> b -> a
const (forall (f :: * -> *) a. Applicative f => a -> f a
pure ()))
mappend :: AggProcess -> AggProcess -> AggProcess
mappend = forall a. Semigroup a => a -> a -> a
(<>)
data AggProcessConfig = AggProcessConfig
{
AggProcessConfig -> MetricName -> Set Quantile
metricQuantiles :: MetricName -> Set.Set Quantile
}
instance Semigroup AggProcessConfig where
AggProcessConfig MetricName -> Set Quantile
f1 <> :: AggProcessConfig -> AggProcessConfig -> AggProcessConfig
<> AggProcessConfig MetricName -> Set Quantile
f2 =
let f3 :: MetricName -> Set Quantile
f3 = MetricName -> Set Quantile
f1 forall a. Semigroup a => a -> a -> a
<> MetricName -> Set Quantile
f2
in (MetricName -> Set Quantile) -> AggProcessConfig
AggProcessConfig MetricName -> Set Quantile
f3
instance Monoid AggProcessConfig where
mempty :: AggProcessConfig
mempty = (MetricName -> Set Quantile) -> AggProcessConfig
AggProcessConfig forall a. Monoid a => a
mempty
mappend :: AggProcessConfig -> AggProcessConfig -> AggProcessConfig
mappend = forall a. Semigroup a => a -> a -> a
(<>)
defAggProcessConfig :: AggProcessConfig
defAggProcessConfig :: AggProcessConfig
defAggProcessConfig = (MetricName -> Set Quantile) -> AggProcessConfig
AggProcessConfig MetricName -> Set Quantile
standardQuantiles
instance Default AggProcessConfig where
def :: AggProcessConfig
def = AggProcessConfig
defAggProcessConfig
noQuantiles :: MetricName -> Set.Set Quantile
noQuantiles :: MetricName -> Set Quantile
noQuantiles = forall a b. a -> b -> a
const forall a. Monoid a => a
mempty
standardQuantiles :: MetricName -> Set.Set Quantile
standardQuantiles :: MetricName -> Set Quantile
standardQuantiles MetricName
_ =
forall a. Ord a => [a] -> Set a
Set.fromList [Int -> Quantile
Q Int
10, Int -> Quantile
Q Int
20, Int -> Quantile
Q Int
30, Int -> Quantile
Q Int
40, Int -> Quantile
Q Int
50, Int -> Quantile
Q Int
60, Int -> Quantile
Q Int
70, Int -> Quantile
Q Int
80, Int -> Quantile
Q Int
90, Int -> Quantile
Q Int
99]
quantileMap ::
M.Map MetricName (Set.Set Quantile) ->
Set.Set Quantile ->
(MetricName -> Set.Set Quantile)
quantileMap :: Map MetricName (Set Quantile)
-> Set Quantile -> MetricName -> Set Quantile
quantileMap Map MetricName (Set Quantile)
m Set Quantile
qdef MetricName
mn = forall a. a -> Maybe a -> a
fromMaybe Set Quantile
qdef (forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup MetricName
mn Map MetricName (Set Quantile)
m)
putAggregateCSV :: Handle -> AggProcessConfig -> AggProcess
putAggregateCSV :: Handle -> AggProcessConfig -> AggProcess
putAggregateCSV Handle
h AggProcessConfig
cfg = AggProcessConfig -> (Aggregated -> Redis ()) -> AggProcess
AggProcess AggProcessConfig
cfg forall a b. (a -> b) -> a -> b
$ \Aggregated
agg ->
let d :: Text
d = forall s r. CSV s r => CSVSettings -> r -> s
rowToStr CSVSettings
defCSVSettings forall a b. (a -> b) -> a -> b
$ Aggregated -> Map Text Text
aggToCSV Aggregated
agg
in forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Handle -> Text -> IO ()
T.hPutStrLn Handle
h Text
d
typePrefix :: AggPayload -> T.Text
typePrefix :: AggPayload -> Text
typePrefix AggStats {} = Text
"samples"
typePrefix AggCount {} = Text
"counts"
putAggregateGraphite :: Handle -> AggProcessConfig -> AggProcess
putAggregateGraphite :: Handle -> AggProcessConfig -> AggProcess
putAggregateGraphite Handle
h AggProcessConfig
cfg = AggProcessConfig -> (Aggregated -> Redis ()) -> AggProcess
AggProcess AggProcessConfig
cfg forall a b. (a -> b) -> a -> b
$ \Aggregated
agg ->
let ([(Text, Text)]
ss, Text
ts) = Aggregated -> ([(Text, Text)], Text)
mkStatsFields Aggregated
agg
mkLines :: (Text, Text) -> [Text]
mkLines (Text
m, Text
val) = forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
for (forall k a. Map k a -> [(k, a)]
M.toList (Aggregated -> Dimensions
aggDimensions Aggregated
agg)) forall a b. (a -> b) -> a -> b
$ \(DimensionName Text
dimName, DimensionValue Text
dimVal) ->
[Text] -> Text
T.concat
[ Text
"inst.",
AggPayload -> Text
typePrefix (Aggregated -> AggPayload
aggPayload Aggregated
agg),
Text
".",
FilePath -> Text
T.pack (MetricName -> FilePath
metricName (Aggregated -> MetricName
aggName Aggregated
agg)),
Text
".",
Text
m,
Text
".",
Text
dimName,
Text
".",
Text
dimVal,
Text
" ",
Text
val,
Text
" ",
Text
ts
]
in forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Handle -> Text -> IO ()
T.hPutStrLn Handle
h) forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Text, Text) -> [Text]
mkLines) [(Text, Text)]
ss
popLAll :: (Serialize a, SC.SafeCopy a) => B.ByteString -> Redis [a]
popLAll :: forall a. (Serialize a, SafeCopy a) => ByteString -> Redis [a]
popLAll ByteString
k = do
[a]
res <- forall a.
(Serialize a, SafeCopy a) =>
ByteString -> Int -> Redis [a]
popLMany ByteString
k Int
100
case [a]
res of
[] -> forall (m :: * -> *) a. Monad m => a -> m a
return [a]
res
[a]
_ -> ([a]
res forall a. [a] -> [a] -> [a]
++) forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
`liftM` forall a. (Serialize a, SafeCopy a) => ByteString -> Redis [a]
popLAll ByteString
k
popLMany :: (Serialize a, SC.SafeCopy a) => B.ByteString -> Int -> Redis [a]
popLMany :: forall a.
(Serialize a, SafeCopy a) =>
ByteString -> Int -> Redis [a]
popLMany ByteString
k Int
n = do
[Either Reply (Maybe ByteString)]
res <- forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
n Redis (Either Reply (Maybe ByteString))
pop
case forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
sequence [Either Reply (Maybe ByteString)]
res of
Left Reply
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return []
Right [Maybe ByteString]
xs -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe forall {b}. (SafeCopy b, Serialize b) => ByteString -> Maybe b
conv forall a b. (a -> b) -> a -> b
$ forall a. [Maybe a] -> [a]
catMaybes [Maybe ByteString]
xs
where
pop :: Redis (Either Reply (Maybe ByteString))
pop = forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> m (f (Maybe ByteString))
R.lpop ByteString
k
conv :: ByteString -> Maybe b
conv ByteString
x = forall a b. Either a b -> Maybe b
hush forall a b. (a -> b) -> a -> b
$ forall a.
(SafeCopy a, Serialize a) =>
ByteString -> Either FilePath a
decodeCompress ByteString
x
dbg :: (Monad m) => String -> m ()
dbg :: forall (m :: * -> *). Monad m => FilePath -> m ()
dbg FilePath
_ = forall (m :: * -> *) a. Monad m => a -> m a
return ()
aggToCSV :: Aggregated -> M.Map T.Text T.Text
aggToCSV :: Aggregated -> Map Text Text
aggToCSV agg :: Aggregated
agg@Aggregated {Double
Dimensions
MetricName
AggPayload
aggTS :: Aggregated -> Double
aggDimensions :: Dimensions
aggPayload :: AggPayload
aggName :: MetricName
aggTS :: Double
aggName :: Aggregated -> MetricName
aggPayload :: Aggregated -> AggPayload
aggDimensions :: Aggregated -> Dimensions
..} = Map Text Text
els forall a. Semigroup a => a -> a -> a
<> Map Text Text
defFields forall a. Semigroup a => a -> a -> a
<> Map Text Text
dimFields
where
els :: MapRow T.Text
els :: Map Text Text
els =
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList forall a b. (a -> b) -> a -> b
$
(Text
"metric", FilePath -> Text
T.pack (MetricName -> FilePath
metricName MetricName
aggName)) forall a. a -> [a] -> [a]
:
(Text
"timestamp", Text
ts) forall a. a -> [a] -> [a]
:
[(Text, Text)]
fields
([(Text, Text)]
fields, Text
ts) = Aggregated -> ([(Text, Text)], Text)
mkStatsFields Aggregated
agg
defFields :: Map Text Text
defFields = forall k a. Ord k => [(k, a)] -> Map k a
M.fromList forall a b. (a -> b) -> a -> b
$ forall a b. (a, b) -> a
fst forall a b. (a -> b) -> a -> b
$ Aggregated -> ([(Text, Text)], Text)
mkStatsFields forall a b. (a -> b) -> a -> b
$ Aggregated
agg {aggPayload :: AggPayload
aggPayload = (Stats -> AggPayload
AggStats forall a. Default a => a
def)}
dimFields :: Map Text Text
dimFields = forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [(Text
k, Text
v) | (DimensionName Text
k, DimensionValue Text
v) <- forall k a. Map k a -> [(k, a)]
M.toList Dimensions
aggDimensions]
mkStatsFields :: Aggregated -> ([(T.Text, T.Text)], T.Text)
mkStatsFields :: Aggregated -> ([(Text, Text)], Text)
mkStatsFields Aggregated {Double
Dimensions
MetricName
AggPayload
aggDimensions :: Dimensions
aggPayload :: AggPayload
aggName :: MetricName
aggTS :: Double
aggTS :: Aggregated -> Double
aggName :: Aggregated -> MetricName
aggPayload :: Aggregated -> AggPayload
aggDimensions :: Aggregated -> Dimensions
..} = ([(Text, Text)]
els, Text
ts)
where
els :: [(Text, Text)]
els =
case AggPayload
aggPayload of
AggStats Stats {Double
Int
Map Int Double
squantiles :: Map Int Double
skurtosis :: Double
sskewness :: Double
sstdev :: Double
srange :: Double
smin :: Double
smax :: Double
scount :: Int
ssum :: Double
smean :: Double
squantiles :: Stats -> Map Int Double
skurtosis :: Stats -> Double
sskewness :: Stats -> Double
sstdev :: Stats -> Double
srange :: Stats -> Double
smin :: Stats -> Double
smax :: Stats -> Double
scount :: Stats -> Int
ssum :: Stats -> Double
smean :: Stats -> Double
..} ->
[ (Text
"mean", forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
smean),
(Text
"count", forall a. Show a => a -> Text
showT Int
scount),
(Text
"max", forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
smax),
(Text
"min", forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
smin),
(Text
"srange", forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
srange),
(Text
"stdDev", forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
sstdev),
(Text
"sum", forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
ssum),
(Text
"skewness", forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
sskewness),
(Text
"kurtosis", forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False Double
skurtosis)
]
forall a. [a] -> [a] -> [a]
++ (forall a b. (a -> b) -> [a] -> [b]
map forall {a} {a}. (Show a, RealFloat a) => (a, a) -> (Text, Text)
mkQ forall a b. (a -> b) -> a -> b
$ forall k a. Map k a -> [(k, a)]
M.toList Map Int Double
squantiles)
AggCount Integer
i ->
[(Text
"count", forall a. Show a => a -> Text
showT Integer
i)]
mkQ :: (a, a) -> (Text, Text)
mkQ (a
k, a
v) = ([Text] -> Text
T.concat [Text
"percentile_", forall a. Show a => a -> Text
showT a
k], forall a. RealFloat a => Int -> Bool -> a -> Text
formatDecimal Int
6 Bool
False a
v)
ts :: Text
ts = forall a. RealFrac a => a -> Text
formatInt Double
aggTS