{-# LANGUAGE CPP #-}
module Wheel
( Wheel(resolution)
, create
, lenMicros
, insert
, reap
) where
import Entries (Entries)
import qualified Entries as Entries
import Control.Concurrent (threadDelay)
import Control.Concurrent.MVar
import Control.Monad (join, when)
import Data.Foldable (for_)
import Data.Primitive.MutVar (MutVar, atomicModifyMutVar', newMutVar)
import Data.Primitive.UnliftedArray (MutableUnliftedArray, UnliftedArray,
freezeUnliftedArray, indexUnliftedArray,
sizeofUnliftedArray,
unsafeNewUnliftedArray, writeUnliftedArray)
import Data.Word (Word64)
import GHC.Prim (RealWorld)
import System.IO.Unsafe (unsafeInterleaveIO)
#if MIN_VERSION_base(4,11,0)
import GHC.Clock (getMonotonicTimeNSec)
#else
import System.Clock (Clock(Monotonic), getTime, toNanoSecs)
#endif
type IORef
= MutVar RealWorld
data Wheel
= Wheel
{ buckets :: !(UnliftedArray (IORef Entries))
, resolution :: !Word64
}
create ::
Int
-> Word64
-> IO Wheel
create spokes resolution = do
mbuckets :: MutableUnliftedArray RealWorld (IORef Entries) <-
unsafeNewUnliftedArray spokes
for_ [0 .. spokes-1] $ \i ->
writeUnliftedArray mbuckets i =<< newMutVar Entries.empty
buckets :: UnliftedArray (IORef Entries) <-
freezeUnliftedArray mbuckets 0 spokes
pure Wheel
{ buckets = buckets
, resolution = resolution
}
numSpokes :: Wheel -> Int
numSpokes wheel =
sizeofUnliftedArray (buckets wheel)
lenMicros :: Wheel -> Word64
lenMicros wheel =
fromIntegral (numSpokes wheel) * resolution wheel
bucket :: Wheel -> Word64 -> IORef Entries
bucket wheel time =
indexUnliftedArray
(buckets wheel)
(index wheel time)
index :: Wheel -> Word64 -> Int
index wheel@Wheel{resolution} time =
fromIntegral (time `div` resolution) `rem` numSpokes wheel
insert ::
Wheel
-> Int
-> IO ()
-> Word64
-> IO (IO Bool)
insert wheel key action delay = do
now :: Word64 <-
getMonotonicMicros
let
time :: Word64
time =
now + delay
let
count :: Word64
count =
delay `div` lenMicros wheel
let
bucketVar :: MutVar RealWorld Entries
bucketVar =
bucket wheel time
atomicModifyMutVar' bucketVar
(\entries ->
(Entries.insert key count action entries, ()))
canceledVar :: MVar (Maybe Bool) <-
unsafeInterleaveIO (newMVar Nothing)
pure $ do
modifyMVar canceledVar $ \result -> do
canceled <-
maybe
(atomicModifyMutVar' bucketVar
(\entries ->
maybe
(entries, False)
(, True)
(Entries.delete key entries)))
pure
result
pure (Just canceled, canceled)
reap :: Wheel -> IO ()
reap wheel@Wheel{buckets, resolution} = do
now :: Word64 <-
getMonotonicMicros
let
elapsedBucketMicros :: Word64
elapsedBucketMicros =
now `rem` resolution
let
remainingBucketMicros :: Word64
remainingBucketMicros =
resolution - elapsedBucketMicros
threadDelay (fromIntegral remainingBucketMicros)
loop
(now + remainingBucketMicros + resolution)
(index wheel now)
where
loop :: Word64 -> Int -> IO ()
loop nextTime i = do
join
(atomicModifyMutVar' (indexUnliftedArray buckets i)
(\entries ->
if Entries.null entries
then
(entries, pure ())
else
case Entries.partition entries of
(expired, alive) ->
(alive, sequence_ expired)))
afterTime :: Word64 <-
getMonotonicMicros
when (afterTime < nextTime) $
(threadDelay (fromIntegral (nextTime - afterTime)))
loop (nextTime + resolution) ((i+1) `rem` numSpokes wheel)
getMonotonicMicros :: IO Word64
getMonotonicMicros =
#if MIN_VERSION_base(4,11,0)
(`div` 1000) <$> getMonotonicTimeNSec
#else
((`div` 1000) . fromIntegral . toNanoSecs <$> getTime Monotonic)
#endif