module Data.Metrics.Reservoir.ExponentiallyDecaying (
ExponentiallyDecayingReservoir,
standardReservoir,
reservoir,
clear,
size,
snapshot,
rescale,
update
) where
import Control.Lens
import Control.Lens.TH
import Control.Monad.Primitive
import Control.Monad.ST
import Data.Time.Clock
import Data.Time.Clock.POSIX
import Data.Metrics.Internal
import qualified Data.Map as M
import qualified Data.Metrics.Reservoir as R
import Data.Metrics.Snapshot (Snapshot(..), takeSnapshot)
import Data.Primitive.MutVar
import qualified Data.Vector.Unboxed as V
import Data.Word
import System.Posix.Time
import System.Posix.Types
import System.Random.MWC
baseRescaleThreshold :: Word64
baseRescaleThreshold = 60 * 60
data ExponentiallyDecayingReservoir = ExponentiallyDecayingReservoir
{ exponentiallyDecayingReservoirInnerSize :: !Int
, exponentiallyDecayingReservoirAlpha :: !Double
, exponentiallyDecayingReservoirRescaleThreshold :: !Word64
, exponentiallyDecayingReservoirInnerReservoir :: !(M.Map Double Double)
, exponentiallyDecayingReservoirCount :: !Int
, exponentiallyDecayingReservoirStartTime :: !Word64
, exponentiallyDecayingReservoirNextScaleTime :: !Word64
, exponentiallyDecayingReservoirSeed :: !Seed
} deriving (Show)
makeFields ''ExponentiallyDecayingReservoir
standardReservoir :: NominalDiffTime -> Seed -> R.Reservoir
standardReservoir = reservoir 0.015 1028
reservoir :: Double
-> Int
-> NominalDiffTime
-> Seed -> R.Reservoir
reservoir a r t s = R.Reservoir
{ R.reservoirClear = clear
, R.reservoirSize = size
, R.reservoirSnapshot = snapshot
, R.reservoirUpdate = update
, R.reservoirState = ExponentiallyDecayingReservoir r a baseRescaleThreshold M.empty 0 c c' s
}
where
c = truncate t
c' = c + baseRescaleThreshold
clear :: NominalDiffTime -> ExponentiallyDecayingReservoir -> ExponentiallyDecayingReservoir
clear = go
where
go t c = c & startTime .~ t' & nextScaleTime .~ t'' & count .~ 0 & innerReservoir .~ M.empty
where
t' = truncate t
t'' = t' + c ^. rescaleThreshold
size :: ExponentiallyDecayingReservoir -> Int
size = go
where
go r = min c s
where
c = r ^. count
s = r ^. innerSize
snapshot :: ExponentiallyDecayingReservoir -> Snapshot
snapshot r = runST $ do
let svals = V.fromList $ M.elems $ r ^. innerReservoir
mvals <- V.unsafeThaw svals
takeSnapshot mvals
weight :: Double -> Word64 -> Double
weight alpha t = exp (alpha * fromIntegral t)
rescale :: Word64 -> ExponentiallyDecayingReservoir -> ExponentiallyDecayingReservoir
rescale now c = c & startTime .~ now & nextScaleTime .~ st & count .~ M.size adjustedReservoir & innerReservoir .~ adjustedReservoir
where
potentialScaleTime = now + baseRescaleThreshold
currentScaleTime = c ^. nextScaleTime
st = if potentialScaleTime > currentScaleTime then potentialScaleTime else currentScaleTime
diff = now c ^. startTime
adjustKey x = x * exp (_alpha * fromIntegral diff)
adjustedReservoir = M.mapKeys adjustKey $ c ^. innerReservoir
_alpha = c ^. alpha
update :: Double
-> NominalDiffTime
-> ExponentiallyDecayingReservoir
-> ExponentiallyDecayingReservoir
update v t c = rescaled & seed .~ s' & count .~ newCount & innerReservoir .~ addValue r
where
rescaled = if seconds >= c ^. nextScaleTime
then rescale seconds c
else c
seconds = truncate t
priority = weight (c ^. alpha) (seconds c ^. startTime) / priorityDenom
addValue r = if newCount <= (c ^. innerSize)
then M.insert priority v r
else if firstKey < priority
then M.delete firstKey $ M.insertWith const priority v r
else r
r = c ^. innerReservoir
firstKey = head $ M.keys r
newCount = 1 + c ^. count
(priorityDenom, s') = runST $ do
g <- restore $ c ^. seed
p <- uniform g
s' <- save g
return (p :: Double, s')