module Control.Concurrent.Throttle
( Measure
, ThrottleConf
, newThrottleConf
, throttleConfSetMeasure
, throttleConfThrottleConsumer
, throttleConfThrottleProducer
, throttleConfSetInterval
, throttleConfSetMaxThroughput
, throttleConfSetBufferSize
, throttleConfSetEmaAlpha
, throttle
) where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Concurrent.STM.TBMQueue
import Control.Concurrent.Throttle.Ema
import Control.Monad
import Control.Monad.IO.Class
import System.Clock
type Measure a = a -> Double
data ThrottleMode = ThrottleMode
{ throttleConsumer :: Bool
, throttleProducer :: Bool
}
data ThrottleConf a = ThrottleConf
{ throttleConfMeasure :: Measure a
, throttleConfMode :: ThrottleMode
, throttleConfInterval :: Double
, throttleConfMaxThroughput :: Double
, throttleConfBufferSize :: Int
, throttleConfEmaAlpha :: Double
}
data Stats = Stats
{ statsEmaItemSizeIn :: Ema
, statsEmaItemSizeOut :: Ema
} deriving (Show)
newThrottleConf :: ThrottleConf a
newThrottleConf = defaultThrottleConf
defaultThrottleConf :: ThrottleConf a
defaultThrottleConf = ThrottleConf
{ throttleConfMeasure = const 1
, throttleConfMode = ThrottleMode { throttleConsumer = False
, throttleProducer = False }
, throttleConfInterval = 1000
, throttleConfMaxThroughput = 100
, throttleConfBufferSize = 1024
, throttleConfEmaAlpha = defaultEmaAlpha }
throttleConfSetMeasure :: Measure a -> ThrottleConf a -> ThrottleConf a
throttleConfSetMeasure measure conf = conf { throttleConfMeasure = measure }
throttleConfThrottleProducer :: ThrottleConf a -> ThrottleConf a
throttleConfThrottleProducer conf @ ThrottleConf { .. } =
conf { throttleConfMode = throttleConfMode { throttleProducer = True } }
throttleConfThrottleConsumer :: ThrottleConf a -> ThrottleConf a
throttleConfThrottleConsumer conf @ ThrottleConf { .. } =
conf { throttleConfMode = throttleConfMode { throttleConsumer = True } }
throttleConfSetInterval :: Double -> ThrottleConf a -> ThrottleConf a
throttleConfSetInterval interval conf = conf { throttleConfInterval = interval }
throttleConfSetMaxThroughput :: Double -> ThrottleConf a -> ThrottleConf a
throttleConfSetMaxThroughput throughput conf =
conf { throttleConfMaxThroughput = throughput }
throttleConfSetBufferSize :: Int -> ThrottleConf a -> ThrottleConf a
throttleConfSetBufferSize n conf = conf { throttleConfBufferSize = n }
throttleConfSetEmaAlpha :: Double -> ThrottleConf a -> ThrottleConf a
throttleConfSetEmaAlpha alpha conf = conf { throttleConfEmaAlpha = alpha }
defaultEmaAlpha :: Double
defaultEmaAlpha = 0.5
throttle :: ThrottleConf a
-> IO (Maybe a)
-> (Maybe a -> IO ())
-> IO (Async ())
throttle (conf @ ThrottleConf { .. }) readItem writeItem = do
queueBuffer <- atomically $ newTBMQueue throttleConfBufferSize
statsTVar <- atomically $ newTVar Stats { statsEmaItemSizeIn = newEma throttleConfEmaAlpha 0
, statsEmaItemSizeOut = newEma throttleConfEmaAlpha 0 }
async $
withAsync (consumer conf statsTVar queueBuffer readItem ) $ \ consumerThread ->
withAsync (producer conf statsTVar queueBuffer writeItem) $ \ producerThread -> do
link consumerThread
link producerThread
void $ waitBoth consumerThread producerThread
consumerUnthrottled :: TBMQueue a -> IO (Maybe a) -> IO ()
consumerUnthrottled buffer readItem = go
where go =
readItem >>= \ case
Just a -> atomically (writeTBMQueue buffer a) >> go
Nothing -> atomically (closeTBMQueue buffer)
consumerThrottled :: ThrottleConf a
-> TVar Stats
-> TBMQueue a
-> IO (Maybe a)
-> IO ()
consumerThrottled (conf @ ThrottleConf { .. }) statsTVar buffer readItem = go
where go = do
(maybeStats, consumeDuration) <- timeAction $
readItem >>= \ case
Just a -> atomically $ do
writeTBMQueue buffer a
modifyTVar statsTVar (updateStatsIn conf a)
Just <$> readTVar statsTVar
Nothing -> atomically $ do
closeTBMQueue buffer
return Nothing
case maybeStats of
Just stats -> do
liftIO . threadDelay $ throttleDelayIn stats conf consumeDuration
go
Nothing -> return ()
producer :: ThrottleConf a
-> TVar Stats
-> TBMQueue a
-> (Maybe a -> IO ())
-> IO ()
producer (conf @ ThrottleConf { .. }) stats =
if throttleProducer throttleConfMode
then producerThrottled conf stats
else producerUnthrottled
consumer :: ThrottleConf a
-> TVar Stats
-> TBMQueue a
-> IO (Maybe a)
-> IO ()
consumer (conf @ ThrottleConf { .. }) stats =
if throttleConsumer throttleConfMode
then consumerThrottled conf stats
else consumerUnthrottled
producerThrottled :: ThrottleConf a
-> TVar Stats
-> TBMQueue a
-> (Maybe a -> IO ())
-> IO ()
producerThrottled (conf @ ThrottleConf { .. }) statsTVar buffer writeItem = go
where go = do
(maybeStats, produceDuration) <- timeAction $
atomically (readTBMQueue buffer) >>= \ case
Just a -> do
writeItem (Just a)
atomically $ do
modifyTVar statsTVar (updateStatsOut conf a)
Just <$> readTVar statsTVar
Nothing -> do
writeItem Nothing
return Nothing
case maybeStats of
Just stats -> do
liftIO . threadDelay $ throttleDelayOut stats conf produceDuration
go
Nothing -> return ()
producerUnthrottled :: TBMQueue a
-> (Maybe a -> IO ())
-> IO ()
producerUnthrottled buffer writeItem = go
where go =
atomically (readTBMQueue buffer) >>= \ case
Just a -> writeItem (Just a) >> go
Nothing -> writeItem Nothing
updateStatsOut :: ThrottleConf a -> a -> Stats -> Stats
updateStatsOut ThrottleConf { .. } a (stats @ Stats { .. }) =
stats { statsEmaItemSizeOut = emaUpdate aSize statsEmaItemSizeOut }
where aSize = throttleConfMeasure a
updateStatsIn :: ThrottleConf a -> a -> Stats -> Stats
updateStatsIn ThrottleConf { .. } a (stats @ Stats { .. }) =
stats { statsEmaItemSizeIn = emaUpdate aSize statsEmaItemSizeIn }
where aSize = throttleConfMeasure a
timeAction :: IO a -> IO (a, Double)
timeAction io = do
t0 <- getTime Monotonic
a <- io
t1 <- getTime Monotonic
return (a, t1 `timeSpecDiff` t0)
where timeSpecDiff :: TimeSpec -> TimeSpec -> Double
timeSpecDiff ts1 ts0 =
fromIntegral (sec ts1 sec ts0) * 10^3 + fromIntegral (nsec ts1 nsec ts0) / 10^6
computeDelay :: ThrottleConf a -> Double -> Double
computeDelay _ 0 = 0
computeDelay ThrottleConf { .. } itemSize =
throttleConfInterval / (throttleConfMaxThroughput / itemSize)
throttleDelayIn :: Stats
-> ThrottleConf a
-> Double
-> Int
throttleDelayIn Stats { .. } = throttleDelay (emaCurrent statsEmaItemSizeIn)
throttleDelayOut :: Stats
-> ThrottleConf a
-> Double
-> Int
throttleDelayOut Stats { .. } = throttleDelay (emaCurrent statsEmaItemSizeOut)
throttleDelay :: Double
-> ThrottleConf a
-> Double
-> Int
throttleDelay itemSize (conf @ ThrottleConf { .. }) duration =
round . subtract duration . (10^3 *) $ computeDelay conf itemSize