{-# language CPP #-}
{-# language LambdaCase #-}
{-# language MagicHash #-}
{-# language NamedFieldPuns #-}
{-# language RecursiveDo #-}
{-# language ScopedTypeVariables #-}
{-# language UnboxedTuples #-}
{-# language ViewPatterns #-}
{-# options_ghc -funbox-strict-fields #-}
module Data.TimerWheel
(
TimerWheel
, new
, register
, register_
, recurring
) where
import Entries (Entries)
import Supply (Supply)
import qualified Entries as Entries
import qualified Supply
import Control.Concurrent
import Control.Exception
import Control.Monad
import Data.Fixed (E6, Fixed(MkFixed))
import Data.Foldable
import Data.IORef
import Data.Primitive.MutVar
import Data.Primitive.UnliftedArray
import Data.Word (Word64)
import GHC.Base (IO(IO), mkWeak#)
#if MIN_VERSION_base(4,11,0)
import GHC.Clock (getMonotonicTimeNSec)
#else
import System.Clock (Clock(Monotonic), getTime, toNanoSecs)
#endif
import GHC.Prim (RealWorld)
import GHC.Weak (Weak(Weak), deRefWeak)
import qualified GHC.Event as GHC
data TimerWheel = TimerWheel
{ wheelResolution :: !Word64
, wheelSupply :: !Supply
, wheelEntries :: !(UnliftedArray (MutVar RealWorld Entries))
}
new :: Int -> Fixed E6 -> IO TimerWheel
new slots (MkFixed (fromInteger -> resolution)) = do
wheel :: UnliftedArray (MutVar RealWorld Entries) <- do
wheel :: MutableUnliftedArray RealWorld (MutVar RealWorld Entries) <-
unsafeNewUnliftedArray slots
for_ [0..slots-1] $ \i ->
writeUnliftedArray wheel i =<< newMutVar Entries.empty
freezeUnliftedArray wheel 0 slots
supply :: Supply <-
Supply.new
weakWheel :: Weak (UnliftedArray (MutVar RealWorld Entries)) <-
case wheel of
UnliftedArray wheel# ->
IO $ \s ->
case mkWeak# wheel# wheel (\t -> (# t, () #)) s of
(# s', w #) ->
(# s', Weak w #)
(void . forkIO)
(reaper resolution (sizeofUnliftedArray wheel) weakWheel)
pure TimerWheel
{ wheelResolution = fromIntegral resolution * 1000
, wheelSupply = supply
, wheelEntries = wheel
}
reaper
:: Int
-> Int
-> Weak (UnliftedArray (MutVar RealWorld Entries))
-> IO ()
reaper resolution len weakWheel = do
manager :: GHC.TimerManager <-
GHC.getSystemTimerManager
loop manager 0
where
loop :: GHC.TimerManager -> Int -> IO ()
loop manager i = do
waitVar :: MVar () <-
newEmptyMVar
mask_ $ do
key :: GHC.TimeoutKey <-
GHC.registerTimeout manager resolution (putMVar waitVar ())
takeMVar waitVar `onException` GHC.unregisterTimeout manager key
now :: Word64 <-
getMonotonicTime
deRefWeak weakWheel >>= \case
Nothing ->
pure ()
Just wheel -> do
let
j :: Int
j =
fromIntegral (now `div` (fromIntegral resolution * 1000))
`mod` len
let
is :: [Int]
is =
if j >= i
then
[i .. j-1]
else
[i .. len - 1] ++ [0 .. j-1]
for_ is $ \k -> do
let
entriesRef :: MutVar RealWorld Entries
entriesRef =
indexUnliftedArray wheel k
join
(atomicModifyMutVar' entriesRef
(\entries ->
if Entries.null entries
then
(entries, pure ())
else
case Entries.squam entries of
(expired, alive) ->
(alive, sequence_ expired)))
loop manager j
register :: Fixed E6 -> IO () -> TimerWheel -> IO (IO Bool)
register (MkFixed ((*1000) . fromIntegral -> delay)) action wheel = do
newEntryId :: Int <-
Supply.next (wheelSupply wheel)
entriesVar :: MutVar RealWorld Entries <-
entriesIn delay wheel
atomicModifyMutVar' entriesVar
(\entries ->
(Entries.insert newEntryId entryCount action entries, ()))
pure $ do
atomicModifyMutVar' entriesVar
(\entries ->
case Entries.delete newEntryId entries of
Nothing ->
(entries, False)
Just entries' ->
(entries', True))
where
entryCount :: Word64
entryCount =
delay `div`
(fromIntegral (sizeofUnliftedArray (wheelEntries wheel))
* wheelResolution wheel)
register_ :: Fixed E6 -> IO () -> TimerWheel -> IO ()
register_ delay action wheel =
void (register delay action wheel)
recurring :: Fixed E6 -> IO () -> TimerWheel -> IO (IO ())
recurring delay action wheel = mdo
cancel :: IO Bool <-
register delay (action' cancelRef) wheel
cancelRef :: IORef (IO Bool) <-
newIORef cancel
pure (untilTrue (join (readIORef cancelRef)))
where
action' :: IORef (IO Bool) -> IO ()
action' cancelRef = do
action
cancel :: IO Bool <-
register delay (action' cancelRef) wheel
writeIORef cancelRef cancel
entriesIn :: Word64 -> TimerWheel -> IO (MutVar RealWorld Entries)
entriesIn delay TimerWheel{wheelResolution, wheelEntries} = do
now :: Word64 <-
getMonotonicTime
pure (index ((now+delay) `div` wheelResolution))
where
index :: Word64 -> MutVar RealWorld Entries
index i =
indexUnliftedArray wheelEntries
(fromIntegral i `rem` sizeofUnliftedArray wheelEntries)
untilTrue :: IO Bool -> IO ()
untilTrue action =
action >>= \case
True ->
pure ()
False ->
untilTrue action
getMonotonicTime :: IO Word64
getMonotonicTime =
#if MIN_VERSION_base(4,11,0)
getMonotonicTimeNSec
#else
fromIntegral . toNanoSecs <$> getTime Monotonic
#endif