module Wheel ( Wheel (resolution), create, lenMicros, insert, reap, ) where import Control.Concurrent.MVar import Control.Monad (join, when) import Data.IORef import Data.Vector (Vector) import qualified Data.Vector as Vector import Entries (Entries) import qualified Entries as Entries import Micros (Micros (..)) import qualified Micros import System.IO.Unsafe (unsafeInterleaveIO) import Timestamp (Timestamp) import qualified Timestamp data Wheel = Wheel { buckets :: !(Vector (IORef Entries)), resolution :: !Micros } create :: Int -> Micros -> IO Wheel create spokes resolution = do buckets <- Vector.replicateM spokes (newIORef Entries.empty) pure Wheel {buckets, resolution} numSpokes :: Wheel -> Int numSpokes wheel = Vector.length (buckets wheel) lenMicros :: Wheel -> Micros lenMicros wheel = Micros.scale (numSpokes wheel) (resolution wheel) bucket :: Wheel -> Timestamp -> IORef Entries bucket wheel timestamp = Vector.unsafeIndex (buckets wheel) (index wheel timestamp) index :: Wheel -> Timestamp -> Int index wheel@Wheel {resolution} timestamp = fromIntegral (Timestamp.epoch resolution timestamp) `rem` numSpokes wheel insert :: Wheel -> Int -> Micros -> IO () -> IO (IO Bool) insert wheel key delay action = do now :: Timestamp <- Timestamp.now let bucketRef :: IORef Entries bucketRef = bucket wheel (now `Timestamp.plus` delay) let insertEntry :: Entries -> Entries insertEntry = Entries.insert key (unMicros (delay `Micros.div` lenMicros wheel)) action atomicModifyIORef' bucketRef (\entries -> (insertEntry entries, ())) canceledVar :: MVar (Maybe Bool) <- unsafeInterleaveIO (newMVar Nothing) pure do modifyMVar canceledVar \maybeCanceled -> do canceled <- case maybeCanceled of Nothing -> atomicModifyIORef' bucketRef \entries -> case Entries.delete key entries of Nothing -> (entries, False) Just entries' -> (entries', True) Just canceled -> pure canceled pure (Just canceled, canceled) reap :: Wheel -> IO () reap wheel@Wheel {buckets, resolution} = do now <- Timestamp.now let remainingBucketMicros = resolution `Micros.minus` (now `Timestamp.rem` resolution) Micros.sleep remainingBucketMicros loop (now `Timestamp.plus` remainingBucketMicros `Timestamp.plus` resolution) (index wheel now) where loop :: Timestamp -> Int -> IO () loop nextTime i = do join (atomicModifyIORef' (Vector.unsafeIndex buckets i) expire) afterTime <- Timestamp.now when (afterTime < nextTime) (Micros.sleep (nextTime `Timestamp.minus` afterTime)) loop (nextTime `Timestamp.plus` resolution) ((i + 1) `rem` numSpokes wheel) expire :: Entries -> (Entries, IO ()) expire entries | Entries.null entries = (entries, pure ()) | otherwise = (alive, sequence_ expired) where (expired, alive) = Entries.partition entries