module Wheel ( Wheel (resolution), create, lenMicros, insert, reap, ) where import Control.Concurrent.MVar import Control.Monad (join, when) import Data.IORef import Data.Vector (Vector) import qualified Data.Vector as Vector import Entries (Entries) import qualified Entries as Entries import Micros (Micros (..)) import qualified Micros import System.IO.Unsafe (unsafeInterleaveIO) import Timestamp (Timestamp) import qualified Timestamp data Wheel = Wheel { Wheel -> Vector (IORef Entries) buckets :: !(Vector (IORef Entries)), Wheel -> Micros resolution :: !Micros } create :: Int -> Micros -> IO Wheel create :: Int -> Micros -> IO Wheel create Int spokes Micros resolution = do Vector (IORef Entries) buckets <- Int -> IO (IORef Entries) -> IO (Vector (IORef Entries)) forall (m :: * -> *) a. Monad m => Int -> m a -> m (Vector a) Vector.replicateM Int spokes (Entries -> IO (IORef Entries) forall a. a -> IO (IORef a) newIORef Entries Entries.empty) Wheel -> IO Wheel forall (f :: * -> *) a. Applicative f => a -> f a pure Wheel :: Vector (IORef Entries) -> Micros -> Wheel Wheel {Vector (IORef Entries) buckets :: Vector (IORef Entries) buckets :: Vector (IORef Entries) buckets, Micros resolution :: Micros resolution :: Micros resolution} numSpokes :: Wheel -> Int numSpokes :: Wheel -> Int numSpokes Wheel wheel = Vector (IORef Entries) -> Int forall a. Vector a -> Int Vector.length (Wheel -> Vector (IORef Entries) buckets Wheel wheel) lenMicros :: Wheel -> Micros lenMicros :: Wheel -> Micros lenMicros Wheel wheel = Int -> Micros -> Micros Micros.scale (Wheel -> Int numSpokes Wheel wheel) (Wheel -> Micros resolution Wheel wheel) bucket :: Wheel -> Timestamp -> IORef Entries bucket :: Wheel -> Timestamp -> IORef Entries bucket Wheel wheel Timestamp timestamp = Vector (IORef Entries) -> Int -> IORef Entries forall a. Vector a -> Int -> a Vector.unsafeIndex (Wheel -> Vector (IORef Entries) buckets Wheel wheel) (Wheel -> Timestamp -> Int index Wheel wheel Timestamp timestamp) index :: Wheel -> Timestamp -> Int index :: Wheel -> Timestamp -> Int index wheel :: Wheel wheel@Wheel {Micros resolution :: Micros resolution :: Wheel -> Micros resolution} Timestamp timestamp = Word64 -> Int forall a b. (Integral a, Num b) => a -> b fromIntegral (Micros -> Timestamp -> Word64 Timestamp.epoch Micros resolution Timestamp timestamp) Int -> Int -> Int forall a. Integral a => a -> a -> a `rem` Wheel -> Int numSpokes Wheel wheel insert :: Wheel -> Int -> Micros -> IO () -> IO (IO Bool) insert :: Wheel -> Int -> Micros -> IO () -> IO (IO Bool) insert Wheel wheel Int key Micros delay IO () action = do Timestamp now :: Timestamp <- IO Timestamp Timestamp.now let bucketRef :: IORef Entries bucketRef :: IORef Entries bucketRef = Wheel -> Timestamp -> IORef Entries bucket Wheel wheel (Timestamp now Timestamp -> Micros -> Timestamp `Timestamp.plus` Micros delay) let insertEntry :: Entries -> Entries insertEntry :: Entries -> Entries insertEntry = Int -> Word64 -> IO () -> Entries -> Entries Entries.insert Int key (Micros -> Word64 unMicros (Micros delay Micros -> Micros -> Micros `Micros.div` Wheel -> Micros lenMicros Wheel wheel)) IO () action IORef Entries -> (Entries -> (Entries, ())) -> IO () forall a b. IORef a -> (a -> (a, b)) -> IO b atomicModifyIORef' IORef Entries bucketRef (\Entries entries -> (Entries -> Entries insertEntry Entries entries, ())) MVar (Maybe Bool) canceledVar :: MVar (Maybe Bool) <- IO (MVar (Maybe Bool)) -> IO (MVar (Maybe Bool)) forall a. IO a -> IO a unsafeInterleaveIO (Maybe Bool -> IO (MVar (Maybe Bool)) forall a. a -> IO (MVar a) newMVar Maybe Bool forall a. Maybe a Nothing) IO Bool -> IO (IO Bool) forall (f :: * -> *) a. Applicative f => a -> f a pure do MVar (Maybe Bool) -> (Maybe Bool -> IO (Maybe Bool, Bool)) -> IO Bool forall a b. MVar a -> (a -> IO (a, b)) -> IO b modifyMVar MVar (Maybe Bool) canceledVar \Maybe Bool maybeCanceled -> do Bool canceled <- case Maybe Bool maybeCanceled of Maybe Bool Nothing -> IORef Entries -> (Entries -> (Entries, Bool)) -> IO Bool forall a b. IORef a -> (a -> (a, b)) -> IO b atomicModifyIORef' IORef Entries bucketRef \Entries entries -> case Int -> Entries -> Maybe Entries Entries.delete Int key Entries entries of Maybe Entries Nothing -> (Entries entries, Bool False) Just Entries entries' -> (Entries entries', Bool True) Just Bool canceled -> Bool -> IO Bool forall (f :: * -> *) a. Applicative f => a -> f a pure Bool canceled (Maybe Bool, Bool) -> IO (Maybe Bool, Bool) forall (f :: * -> *) a. Applicative f => a -> f a pure (Bool -> Maybe Bool forall a. a -> Maybe a Just Bool canceled, Bool canceled) reap :: Wheel -> IO () reap :: Wheel -> IO () reap wheel :: Wheel wheel@Wheel {Vector (IORef Entries) buckets :: Vector (IORef Entries) buckets :: Wheel -> Vector (IORef Entries) buckets, Micros resolution :: Micros resolution :: Wheel -> Micros resolution} = do Timestamp now <- IO Timestamp Timestamp.now let remainingBucketMicros :: Micros remainingBucketMicros = Micros resolution Micros -> Micros -> Micros `Micros.minus` (Timestamp now Timestamp -> Micros -> Micros `Timestamp.rem` Micros resolution) Micros -> IO () Micros.sleep Micros remainingBucketMicros Timestamp -> Int -> IO () loop (Timestamp now Timestamp -> Micros -> Timestamp `Timestamp.plus` Micros remainingBucketMicros Timestamp -> Micros -> Timestamp `Timestamp.plus` Micros resolution) (Wheel -> Timestamp -> Int index Wheel wheel Timestamp now) where loop :: Timestamp -> Int -> IO () loop :: Timestamp -> Int -> IO () loop Timestamp nextTime Int i = do IO (IO ()) -> IO () forall (m :: * -> *) a. Monad m => m (m a) -> m a join (IORef Entries -> (Entries -> (Entries, IO ())) -> IO (IO ()) forall a b. IORef a -> (a -> (a, b)) -> IO b atomicModifyIORef' (Vector (IORef Entries) -> Int -> IORef Entries forall a. Vector a -> Int -> a Vector.unsafeIndex Vector (IORef Entries) buckets Int i) Entries -> (Entries, IO ()) expire) Timestamp afterTime <- IO Timestamp Timestamp.now Bool -> IO () -> IO () forall (f :: * -> *). Applicative f => Bool -> f () -> f () when (Timestamp afterTime Timestamp -> Timestamp -> Bool forall a. Ord a => a -> a -> Bool < Timestamp nextTime) (Micros -> IO () Micros.sleep (Timestamp nextTime Timestamp -> Timestamp -> Micros `Timestamp.minus` Timestamp afterTime)) Timestamp -> Int -> IO () loop (Timestamp nextTime Timestamp -> Micros -> Timestamp `Timestamp.plus` Micros resolution) ((Int i Int -> Int -> Int forall a. Num a => a -> a -> a + Int 1) Int -> Int -> Int forall a. Integral a => a -> a -> a `rem` Wheel -> Int numSpokes Wheel wheel) expire :: Entries -> (Entries, IO ()) expire :: Entries -> (Entries, IO ()) expire Entries entries | Entries -> Bool Entries.null Entries entries = (Entries entries, () -> IO () forall (f :: * -> *) a. Applicative f => a -> f a pure ()) | Bool otherwise = (Entries alive, [IO ()] -> IO () forall (t :: * -> *) (m :: * -> *) a. (Foldable t, Monad m) => t (m a) -> m () sequence_ [IO ()] expired) where ([IO ()] expired, Entries alive) = Entries -> ([IO ()], Entries) Entries.partition Entries entries