{-# LANGUAGE RecursiveDo #-}
{-# LANGUAGE StrictData #-}
{-# OPTIONS_GHC -funbox-strict-fields #-}

-- | A simple, hashed timer wheel.
module Data.TimerWheel
  ( -- * Timer wheel
    TimerWheel,
    with,
    Config (..),
    register,
    register_,
    recurring,
    recurring_,
  )
where

import Control.Concurrent
import Control.Exception
import Control.Monad (join, void)
import Data.Fixed (E6, Fixed (MkFixed))
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
import GHC.Generics (Generic)
import Micros (Micros (Micros))
import qualified Micros
import Supply (Supply)
import qualified Supply
import Wheel (Wheel)
import qualified Wheel

-- | A timer wheel is a vector-of-collections-of timers to fire. It is configured with a /spoke count/ and /resolution/.
-- Timers may be scheduled arbitrarily far in the future. A timeout thread is spawned to step through the timer wheel
-- and fire expired timers at regular intervals.
--
-- * The /spoke count/ determines the size of the timer vector.
--
--     * A __larger spoke count__ will result in __less insert contention__ at each spoke and will require
--       __more memory__ to store the timer wheel.
--
--     * A __smaller spoke count__ will result in __more insert contention__ at each spoke and will require
--       __less memory__ to store the timer wheel.
--
-- * The /resolution/ determines both the duration of time that each spoke corresponds to, and how often the timeout
--   thread wakes. For example, with a resolution of __@1s@__, a timer that expires at __@2.5s@__ will not fire until
--   the timeout thread wakes at __@3s@__.
--
--     * A __larger resolution__ will result in __more insert contention__ at each spoke, __less accurate__ timers, and
--       will require __fewer wakeups__ by the timeout thread.
--
--     * A __smaller resolution__ will result in __less insert contention__ at each spoke, __more accurate__ timers, and
--       will require __more wakeups__ by the timeout thread.
--
-- * The timeout thread has some important properties:
--
--     * There is only one, and it fires expired timers synchronously. If your timer actions execute quicky, 'register'
--       them directly. Otherwise, consider registering an action that enqueues the /real/ action to be performed on a
--       job queue.
--
--     * Synchronous exceptions thrown by enqueued @IO@ actions will bring the thread down, and the exception will be
--       propagated to the thread that created the timer wheel. If you want to catch exceptions and log them, for
--       example, you will have to bake this into the registered actions yourself.
--
-- As an example, below is a depiction of a timer wheel with @6@ timers inserted across @8@ spokes, and a resolution of
-- @.1s@. It depicts a cursor at @.3s@, which indicates where the timeout thread currently is.
--
-- @
--  0       .1      .2      .3      .4      .5      .6      .7
-- ┌───────┬───────┬───────┬───────┬───────┬───────┬───────┬───────┐
-- │       │ A⁰    │       │ B¹ C⁰ │ D⁰    │       │       │ E² F⁰ │
-- └───────┴───────┴───────┴───────┴───────┴───────┴───────┴───────┘
--                           ↑
-- @
--
-- After @.1s@, the timeout thread will advance to the next spoke and process all of the timers it passed over. In
-- this case, __C__ will fire, and __B__ will be put back with its count decremented to @0@. This is how the timer wheel
-- can schedule a timer to fire arbitrarily far in the future: its count is simply the number of times its delay wraps
-- the entire duration of the timer wheel.
--
-- @
--  0       .1      .2      .3      .4      .5      .6      .7
-- ┌───────┬───────┬───────┬───────┬───────┬───────┬───────┬───────┐
-- │       │ A⁰    │       │ B⁰    │ D⁰    │       │       │ E² F⁰ │
-- └───────┴───────┴───────┴───────┴───────┴───────┴───────┴───────┘
--                                   ↑
-- @
data TimerWheel = TimerWheel
  { -- | A supply of unique ints.
    TimerWheel -> Supply
wheelSupply :: Supply,
    -- | The array of collections of timers.
    TimerWheel -> Wheel
wheelWheel :: Wheel,
    TimerWheel -> ThreadId
wheelThread :: ThreadId
  }

-- | Timer wheel config.
--
-- * @spokes@ must be ∈ @(0, maxBound]@
-- * @resolution@ must ∈ @(0, ∞]@
data Config = Config
  { -- | Spoke count.
    Config -> Int
spokes :: Int,
    -- | Resolution, in seconds.
    Config -> Fixed E6
resolution :: Fixed E6
  }
  deriving stock ((forall x. Config -> Rep Config x)
-> (forall x. Rep Config x -> Config) -> Generic Config
forall x. Rep Config x -> Config
forall x. Config -> Rep Config x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep Config x -> Config
$cfrom :: forall x. Config -> Rep Config x
Generic, Int -> Config -> ShowS
[Config] -> ShowS
Config -> String
(Int -> Config -> ShowS)
-> (Config -> String) -> ([Config] -> ShowS) -> Show Config
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Config] -> ShowS
$cshowList :: [Config] -> ShowS
show :: Config -> String
$cshow :: Config -> String
showsPrec :: Int -> Config -> ShowS
$cshowsPrec :: Int -> Config -> ShowS
Show)

-- | The timeout thread died.
newtype TimerWheelDied
  = TimerWheelDied SomeException
  deriving stock (Int -> TimerWheelDied -> ShowS
[TimerWheelDied] -> ShowS
TimerWheelDied -> String
(Int -> TimerWheelDied -> ShowS)
-> (TimerWheelDied -> String)
-> ([TimerWheelDied] -> ShowS)
-> Show TimerWheelDied
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [TimerWheelDied] -> ShowS
$cshowList :: [TimerWheelDied] -> ShowS
show :: TimerWheelDied -> String
$cshow :: TimerWheelDied -> String
showsPrec :: Int -> TimerWheelDied -> ShowS
$cshowsPrec :: Int -> TimerWheelDied -> ShowS
Show)

instance Exception TimerWheelDied where
  toException :: TimerWheelDied -> SomeException
toException = TimerWheelDied -> SomeException
forall e. Exception e => e -> SomeException
asyncExceptionToException
  fromException :: SomeException -> Maybe TimerWheelDied
fromException = SomeException -> Maybe TimerWheelDied
forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException

-- | Perform an action with a timer wheel.
--
-- /Throws./
--
--   * Calls 'error' if the config is invalid
--   * Throws the exception the given action throws, if any
--   * Throws the exception the timer wheel thread throws, if any
with :: Config -> (TimerWheel -> IO a) -> IO a
with :: Config -> (TimerWheel -> IO a) -> IO a
with Config
config TimerWheel -> IO a
action =
  case Config -> ()
validateConfig Config
config of
    () -> Config -> (TimerWheel -> IO a) -> IO a
forall a. Config -> (TimerWheel -> IO a) -> IO a
_with Config
config TimerWheel -> IO a
action

_with :: Config -> (TimerWheel -> IO a) -> IO a
_with :: Config -> (TimerWheel -> IO a) -> IO a
_with Config {Int
spokes :: Int
spokes :: Config -> Int
spokes, Fixed E6
resolution :: Fixed E6
resolution :: Config -> Fixed E6
resolution} TimerWheel -> IO a
action = do
  Wheel
wheelWheel <- Int -> Micros -> IO Wheel
Wheel.create Int
spokes (Fixed E6 -> Micros
Micros.fromFixed Fixed E6
resolution)
  Supply
wheelSupply <- IO Supply
Supply.new
  ThreadId
thread <- IO ThreadId
myThreadId
  ((forall a. IO a -> IO a) -> IO a) -> IO a
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
uninterruptibleMask \forall a. IO a -> IO a
restore -> do
    ThreadId
wheelThread <-
      ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOWithUnmask (((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask ->
        IO () -> IO ()
forall a. IO a -> IO a
unmask (Wheel -> IO ()
Wheel.reap Wheel
wheelWheel)
          IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` \SomeException
e ->
            case SomeException -> Maybe AsyncException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e of
              Just AsyncException
ThreadKilled -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
              Maybe AsyncException
_ -> ThreadId -> TimerWheelDied -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
thread (SomeException -> TimerWheelDied
TimerWheelDied SomeException
e)
    let cleanup :: IO ()
cleanup = ThreadId -> IO ()
killThread ThreadId
wheelThread
    let handler :: SomeException -> IO void
        handler :: SomeException -> IO void
handler SomeException
ex = do
          IO ()
cleanup
          case SomeException -> Maybe TimerWheelDied
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
ex of
            Just (TimerWheelDied SomeException
ex') -> SomeException -> IO void
forall e a. Exception e => e -> IO a
throwIO SomeException
ex'
            Maybe TimerWheelDied
_ -> SomeException -> IO void
forall e a. Exception e => e -> IO a
throwIO SomeException
ex
    a
result <- IO a -> IO a
forall a. IO a -> IO a
restore (TimerWheel -> IO a
action TimerWheel :: Supply -> Wheel -> ThreadId -> TimerWheel
TimerWheel {Supply
wheelSupply :: Supply
wheelSupply :: Supply
wheelSupply, Wheel
wheelWheel :: Wheel
wheelWheel :: Wheel
wheelWheel, ThreadId
wheelThread :: ThreadId
wheelThread :: ThreadId
wheelThread}) IO a -> (SomeException -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` SomeException -> IO a
forall void. SomeException -> IO void
handler
    IO ()
cleanup
    a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
result

validateConfig :: Config -> ()
validateConfig :: Config -> ()
validateConfig config :: Config
config@Config {Int
spokes :: Int
spokes :: Config -> Int
spokes, Fixed E6
resolution :: Fixed E6
resolution :: Config -> Fixed E6
resolution}
  | Int
spokes Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 Bool -> Bool -> Bool
|| Fixed E6
resolution Fixed E6 -> Fixed E6 -> Bool
forall a. Ord a => a -> a -> Bool
<= Fixed E6
0 = String -> ()
forall a. HasCallStack => String -> a
error (String
"[timer-wheel] invalid config: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Config -> String
forall a. Show a => a -> String
show Config
config)
  | Bool
otherwise = ()

-- | @register wheel delay action@ registers an action __@action@__ in timer wheel __@wheel@__ to fire after __@delay@__
-- seconds.
--
-- Returns an action that, when called, attempts to cancel the timer, and returns whether or not it was successful
-- (@False@ means the timer has already fired).
--
-- Subsequent calls to the cancel action have no effect, and continue to return whatever the first result was.
register ::
  -- |
  TimerWheel ->
  -- | Delay, in seconds
  Fixed E6 ->
  -- | Action
  IO () ->
  IO (IO Bool)
register :: TimerWheel -> Fixed E6 -> IO () -> IO (IO Bool)
register TimerWheel
wheel (Fixed E6 -> Micros
secondsToMicros -> Micros
delay) =
  TimerWheel -> Micros -> IO () -> IO (IO Bool)
_register TimerWheel
wheel Micros
delay

-- | Like 'register', but for when you don't intend to cancel the timer.
register_ ::
  -- |
  TimerWheel ->
  -- | Delay, in seconds
  Fixed E6 ->
  -- | Action
  IO () ->
  IO ()
register_ :: TimerWheel -> Fixed E6 -> IO () -> IO ()
register_ TimerWheel
wheel Fixed E6
delay IO ()
action =
  IO (IO Bool) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (TimerWheel -> Fixed E6 -> IO () -> IO (IO Bool)
register TimerWheel
wheel Fixed E6
delay IO ()
action)

_register :: TimerWheel -> Micros -> IO () -> IO (IO Bool)
_register :: TimerWheel -> Micros -> IO () -> IO (IO Bool)
_register TimerWheel
wheel Micros
delay IO ()
action = do
  Int
key <- Supply -> IO Int
Supply.next (TimerWheel -> Supply
wheelSupply TimerWheel
wheel)
  Wheel -> Int -> Micros -> IO () -> IO (IO Bool)
Wheel.insert (TimerWheel -> Wheel
wheelWheel TimerWheel
wheel) Int
key Micros
delay IO ()
action

_reregister :: TimerWheel -> Micros -> IO () -> IO (IO Bool)
_reregister :: TimerWheel -> Micros -> IO () -> IO (IO Bool)
_reregister TimerWheel
wheel Micros
delay =
  TimerWheel -> Micros -> IO () -> IO (IO Bool)
_register TimerWheel
wheel (if Micros
reso Micros -> Micros -> Bool
forall a. Ord a => a -> a -> Bool
> Micros
delay then Word64 -> Micros
Micros Word64
0 else Micros
delay Micros -> Micros -> Micros
`Micros.minus` Micros
reso)
  where
    reso :: Micros
    reso :: Micros
reso = Wheel -> Micros
Wheel.resolution (TimerWheel -> Wheel
wheelWheel TimerWheel
wheel)

-- | @recurring wheel action delay@ registers an action __@action@__ in timer wheel __@wheel@__ to fire every
-- __@delay@__ seconds (or every /resolution/ seconds, whichever is smaller).
--
-- Returns an action that, when called, cancels the recurring timer.
recurring ::
  TimerWheel ->
  -- | Delay, in seconds
  Fixed E6 ->
  -- | Action
  IO () ->
  IO (IO ())
recurring :: TimerWheel -> Fixed E6 -> IO () -> IO (IO ())
recurring TimerWheel
wheel (Fixed E6 -> Micros
secondsToMicros -> Micros
delay) IO ()
action = mdo
  let doAction :: IO ()
      doAction :: IO ()
doAction = do
        -- Re-register one bucket early, to account for the fact that timers are
        -- expired at the *end* of a bucket.
        --
        -- +---+---+---+---+
        -- { A |   |   |   }
        -- +---+---+---+---+
        --      |
        --      The reaper thread fires 'A' approximately here, so if it's meant
        --      to be repeated every two buckets, and we just re-register it at
        --      this time, three buckets will pass before it's run again. So, we
        --      act as if it's still "one bucket ago" at the moment we re-register
        --      it.
        IORef (IO Bool) -> IO Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (IO Bool)
cancelRef (IO Bool -> IO ()) -> IO (IO Bool) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TimerWheel -> Micros -> IO () -> IO (IO Bool)
_reregister TimerWheel
wheel Micros
delay IO ()
doAction
        IO ()
action

  IO Bool
cancel :: IO Bool <-
    TimerWheel -> Micros -> IO () -> IO (IO Bool)
_register TimerWheel
wheel Micros
delay IO ()
doAction

  IORef (IO Bool)
cancelRef :: IORef (IO Bool) <-
    IO Bool -> IO (IORef (IO Bool))
forall a. a -> IO (IORef a)
newIORef IO Bool
cancel

  IO () -> IO (IO ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (IO Bool -> IO ()
untilTrue (IO (IO Bool) -> IO Bool
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IORef (IO Bool) -> IO (IO Bool)
forall a. IORef a -> IO a
readIORef IORef (IO Bool)
cancelRef)))

-- | Like 'recurring', but for when you don't intend to cancel the timer.
recurring_ ::
  TimerWheel ->
  -- | Delay, in seconds
  Fixed E6 ->
  -- | Action
  IO () ->
  IO ()
recurring_ :: TimerWheel -> Fixed E6 -> IO () -> IO ()
recurring_ TimerWheel
wheel (Fixed E6 -> Micros
secondsToMicros -> Micros
delay) IO ()
action =
  IO (IO Bool) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (TimerWheel -> Micros -> IO () -> IO (IO Bool)
_register TimerWheel
wheel Micros
delay IO ()
doAction)
  where
    doAction :: IO ()
    doAction :: IO ()
doAction = do
      IO Bool
_ <- TimerWheel -> Micros -> IO () -> IO (IO Bool)
_reregister TimerWheel
wheel Micros
delay IO ()
doAction
      IO ()
action

secondsToMicros :: Fixed E6 -> Micros
secondsToMicros :: Fixed E6 -> Micros
secondsToMicros (MkFixed Integer
micros) =
  Word64 -> Micros
Micros (Integer -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Integer -> Integer -> Integer
forall a. Ord a => a -> a -> a
max Integer
0 Integer
micros))

-- Repeat an IO action until it returns 'True'.
untilTrue :: IO Bool -> IO ()
untilTrue :: IO Bool -> IO ()
untilTrue IO Bool
action =
  IO Bool
action IO Bool -> (Bool -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Bool
True -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Bool
False -> IO Bool -> IO ()
untilTrue IO Bool
action