{-# LANGUAGE LambdaCase        #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards   #-}

-- | Throttle

module Control.Concurrent.Throttle
 ( Measure
 , ThrottleConf
 , newThrottleConf
 , throttleConfSetMeasure
 , throttleConfThrottleConsumer
 , throttleConfThrottleProducer
 , throttleConfSetInterval
 , throttleConfSetMaxThroughput
 , throttleConfSetBufferSize
 , throttleConfSetEmaAlpha
 , throttle
 ) where

import           Control.Concurrent
import           Control.Concurrent.Async
import           Control.Concurrent.STM
import           Control.Concurrent.STM.TBMQueue
import           Control.Concurrent.Throttle.Ema
import           Control.Monad
import           Control.Monad.IO.Class
import           System.Clock

-- | Type of a measure function for items of the specified type. The
-- measure function is used by the user to specify a notion of size
-- used for throughput computations.
type Measure a = a -> Double

-- | Defines the throttling mode. Consuming and producing can be
-- throttled independently.
data ThrottleMode = ThrottleMode
  { throttleConsumer :: Bool
  , throttleProducer :: Bool
  }

-- | Configuration for throttling.
data ThrottleConf a = ThrottleConf
  { throttleConfMeasure       :: Measure a    -- ^ Measure function for values of type a
  , throttleConfMode          :: ThrottleMode -- ^ Throtting mode
  , throttleConfInterval      :: Double       -- ^ Interval in milliseconds
  , throttleConfMaxThroughput :: Double       -- ^ Maximum throughput allowed
  , throttleConfBufferSize    :: Int          -- ^ Size for buffer queue
  , throttleConfEmaAlpha      :: Double       -- ^ Exponential weight for computing current item size
  }

-- | Type used for internal statistics collection. It is used for
-- approximating the "current" size of items processed.
data Stats = Stats
  { statsEmaItemSizeIn  :: Ema    -- ^ Exponentially weighted moving
                                  -- average for item size of read
                                  -- items
  , statsEmaItemSizeOut :: Ema    -- ^ Exponentially weighted moving
                                  -- average for item size of written
                                  -- items
  } deriving (Show)

-- | Produce a new 'ThrottleConf'.
newThrottleConf :: ThrottleConf a
newThrottleConf = defaultThrottleConf

-- | Default 'ThrottleConf'.
defaultThrottleConf :: ThrottleConf a
defaultThrottleConf = ThrottleConf
  { throttleConfMeasure       = const 1
  , throttleConfMode          = ThrottleMode { throttleConsumer = False
                                             , throttleProducer = False }
  , throttleConfInterval      = 1000
  , throttleConfMaxThroughput = 100
  , throttleConfBufferSize    = 1024
  , throttleConfEmaAlpha      = defaultEmaAlpha }

-- | Set measure function in configuration.
throttleConfSetMeasure :: Measure a -> ThrottleConf a -> ThrottleConf a
throttleConfSetMeasure measure conf = conf { throttleConfMeasure = measure }

throttleConfThrottleProducer :: ThrottleConf a -> ThrottleConf a
throttleConfThrottleProducer conf @ ThrottleConf { .. } =
  conf { throttleConfMode = throttleConfMode { throttleProducer = True } }

throttleConfThrottleConsumer :: ThrottleConf a -> ThrottleConf a
throttleConfThrottleConsumer conf @ ThrottleConf { .. } =
  conf { throttleConfMode = throttleConfMode { throttleConsumer = True } }

-- | Set interval in configuration.
throttleConfSetInterval :: Double -> ThrottleConf a -> ThrottleConf a
throttleConfSetInterval interval conf = conf { throttleConfInterval = interval }

-- | Set max throughput in configuration.
throttleConfSetMaxThroughput :: Double -> ThrottleConf a -> ThrottleConf a
throttleConfSetMaxThroughput throughput conf =
  conf { throttleConfMaxThroughput = throughput }

-- | Set buffer size in configuration.
throttleConfSetBufferSize :: Int -> ThrottleConf a -> ThrottleConf a
throttleConfSetBufferSize n conf = conf { throttleConfBufferSize = n }

-- | Set exponential weight factor used for computing current item
-- size.
throttleConfSetEmaAlpha :: Double -> ThrottleConf a -> ThrottleConf a
throttleConfSetEmaAlpha alpha conf = conf { throttleConfEmaAlpha = alpha }

-- | Default exponential weight factor for computing current item
-- size.
defaultEmaAlpha :: Double
defaultEmaAlpha = 0.5

-- | Asynchonously read items with the given input callback and write
-- them throttled with the given output callback.
throttle :: ThrottleConf a     -- ^ Throttling configuration
         -> IO (Maybe a)       -- ^ Input callback
         -> (Maybe a -> IO ()) -- ^ Output callback
         -> IO (Async ())      -- ^ Returns an async handler for this
                               -- throttling process
throttle (conf @ ThrottleConf { .. }) readItem writeItem = do
  queueBuffer <- atomically $ newTBMQueue throttleConfBufferSize
  statsTVar   <- atomically $ newTVar Stats { statsEmaItemSizeIn  = newEma throttleConfEmaAlpha 0
                                            , statsEmaItemSizeOut = newEma throttleConfEmaAlpha 0 }
  async $
    withAsync (consumer conf statsTVar queueBuffer readItem ) $ \ consumerThread ->
    withAsync (producer conf statsTVar queueBuffer writeItem) $ \ producerThread -> do
    link consumerThread
    link producerThread
    void $ waitBoth consumerThread producerThread

-- | Unthrottled consumer. Fills the provided buffer with new items as
-- fast as possible.
consumerUnthrottled :: TBMQueue a -> IO (Maybe a) -> IO ()
consumerUnthrottled buffer readItem = go
  where go =
          readItem >>= \ case
            Just a  -> atomically (writeTBMQueue buffer a) >> go
            Nothing -> atomically (closeTBMQueue buffer)

-- | Throttled Consumer.
consumerThrottled :: ThrottleConf a
                  -> TVar Stats
                  -> TBMQueue a
                  -> IO (Maybe a)
                  -> IO ()
consumerThrottled (conf @ ThrottleConf { .. }) statsTVar buffer readItem = go
  where go = do
          (maybeStats, consumeDuration) <- timeAction $
            readItem >>= \ case
              Just a -> atomically $ do
                writeTBMQueue buffer a
                modifyTVar statsTVar (updateStatsIn conf a)
                Just <$> readTVar statsTVar
              Nothing -> atomically $ do
                closeTBMQueue buffer
                return Nothing
          case maybeStats of
            Just stats -> do
              liftIO . threadDelay $ throttleDelayIn stats conf consumeDuration
              go
            Nothing    -> return ()

-- | Producer thread, dispatches to throttled or non-throttled case.
producer :: ThrottleConf a
         -> TVar Stats
         -> TBMQueue a
         -> (Maybe a -> IO ())
         -> IO ()
producer (conf @ ThrottleConf { .. }) stats =
  if throttleProducer throttleConfMode
  then producerThrottled conf stats
  else producerUnthrottled

-- | Consumer, dispatches to throttled or non-throttled case.
consumer :: ThrottleConf a
         -> TVar Stats
         -> TBMQueue a
         -> IO (Maybe a)
         -> IO ()
consumer (conf @ ThrottleConf { .. }) stats =
  if throttleConsumer throttleConfMode
  then consumerThrottled conf stats
  else consumerUnthrottled

-- | Throttled Producer. Reads items from the provided buffer and
-- writes them throttled. When the queue is empty, this function
-- returns.
producerThrottled :: ThrottleConf a
                  -> TVar Stats
                  -> TBMQueue a
                  -> (Maybe a -> IO ())
                  -> IO ()
producerThrottled (conf @ ThrottleConf { .. }) statsTVar buffer writeItem = go
  where go = do
          (maybeStats, produceDuration) <- timeAction $
            atomically (readTBMQueue buffer) >>= \ case
              Just a -> do
                writeItem (Just a)
                atomically $ do
                  modifyTVar statsTVar (updateStatsOut conf a)
                  Just <$> readTVar statsTVar
              Nothing -> do
                writeItem Nothing
                return Nothing
          case maybeStats of
            Just stats -> do
              liftIO . threadDelay $ throttleDelayOut stats conf produceDuration
              go
            Nothing    -> return ()

-- | Unthrottled producer.
producerUnthrottled :: TBMQueue a
                    -> (Maybe a -> IO ())
                    -> IO ()
producerUnthrottled buffer writeItem = go
  where go =
          atomically (readTBMQueue buffer) >>= \ case
            Just a  -> writeItem (Just a) >> go
            Nothing -> writeItem Nothing

-- | Update provided statistics.
updateStatsOut :: ThrottleConf a -> a -> Stats -> Stats
updateStatsOut ThrottleConf { .. } a (stats @ Stats { .. }) =
  stats { statsEmaItemSizeOut = emaUpdate aSize statsEmaItemSizeOut }
  where aSize = throttleConfMeasure a

-- | Update provided statistics.
updateStatsIn :: ThrottleConf a -> a -> Stats -> Stats
updateStatsIn ThrottleConf { .. } a (stats @ Stats { .. }) =
  stats { statsEmaItemSizeIn = emaUpdate aSize statsEmaItemSizeIn }
  where aSize = throttleConfMeasure a

-- | Measure execution of an IO action. Returns a pair consisting of
-- the result of the IO action and the duration in milliseconds.
timeAction :: IO a -> IO (a, Double)
timeAction io = do
  t0 <- getTime Monotonic
  a <- io
  t1 <- getTime Monotonic
  return (a, t1 `timeSpecDiff` t0)

  where timeSpecDiff :: TimeSpec -> TimeSpec -> Double
        timeSpecDiff ts1 ts0 =
          fromIntegral (sec ts1 - sec ts0) * 10^3 + fromIntegral (nsec ts1 - nsec ts0) / 10^6

-- | Given a throttle configuration and the current averaged item
-- size, compute the desired delay between two writes.
computeDelay :: ThrottleConf a -> Double -> Double
computeDelay _ 0 = 0
computeDelay ThrottleConf { .. } itemSize =
  throttleConfInterval / (throttleConfMaxThroughput / itemSize)

-- | Delay execution.
throttleDelayIn :: Stats          -- ^ Current throughput statistics
                -> ThrottleConf a -- ^ Throttle configuration
                -> Double         -- ^ Duration of last write
                -> Int            -- ^ Resulting delay in microseconds
throttleDelayIn Stats { .. } = throttleDelay (emaCurrent statsEmaItemSizeIn)

-- | Delay execution.
throttleDelayOut :: Stats          -- ^ Current throughput statistics
                 -> ThrottleConf a -- ^ Throttle configuration
                 -> Double         -- ^ Duration of last write
                 -> Int            -- ^ Resulting delay in microseconds
throttleDelayOut Stats { .. } = throttleDelay (emaCurrent statsEmaItemSizeOut)

throttleDelay :: Double         -- ^ Current item size
              -> ThrottleConf a -- ^ Throttle configuration
              -> Double         -- ^ Duration of last write
              -> Int            -- ^ Resulting delay in microseconds
throttleDelay itemSize (conf @ ThrottleConf { .. }) duration =
  round . subtract duration . (10^3 *) $ computeDelay conf itemSize