module Wheel
  ( Wheel (resolution),
    create,
    lenMicros,
    insert,
    reap,
  )
where

import Control.Concurrent.MVar
import Control.Monad (join, when)
import Data.IORef
import Data.Vector (Vector)
import qualified Data.Vector as Vector
import Entries (Entries)
import qualified Entries as Entries
import Micros (Micros (..))
import qualified Micros
import System.IO.Unsafe (unsafeInterleaveIO)
import Timestamp (Timestamp)
import qualified Timestamp

data Wheel = Wheel
  { Wheel -> Vector (IORef Entries)
buckets :: !(Vector (IORef Entries)),
    Wheel -> Micros
resolution :: !Micros
  }

create :: Int -> Micros -> IO Wheel
create :: Int -> Micros -> IO Wheel
create Int
spokes Micros
resolution = do
  Vector (IORef Entries)
buckets <- Int -> IO (IORef Entries) -> IO (Vector (IORef Entries))
forall (m :: * -> *) a. Monad m => Int -> m a -> m (Vector a)
Vector.replicateM Int
spokes (Entries -> IO (IORef Entries)
forall a. a -> IO (IORef a)
newIORef Entries
Entries.empty)
  Wheel -> IO Wheel
forall (f :: * -> *) a. Applicative f => a -> f a
pure Wheel :: Vector (IORef Entries) -> Micros -> Wheel
Wheel {Vector (IORef Entries)
buckets :: Vector (IORef Entries)
buckets :: Vector (IORef Entries)
buckets, Micros
resolution :: Micros
resolution :: Micros
resolution}

numSpokes :: Wheel -> Int
numSpokes :: Wheel -> Int
numSpokes Wheel
wheel =
  Vector (IORef Entries) -> Int
forall a. Vector a -> Int
Vector.length (Wheel -> Vector (IORef Entries)
buckets Wheel
wheel)

lenMicros :: Wheel -> Micros
lenMicros :: Wheel -> Micros
lenMicros Wheel
wheel =
  Int -> Micros -> Micros
Micros.scale (Wheel -> Int
numSpokes Wheel
wheel) (Wheel -> Micros
resolution Wheel
wheel)

bucket :: Wheel -> Timestamp -> IORef Entries
bucket :: Wheel -> Timestamp -> IORef Entries
bucket Wheel
wheel Timestamp
timestamp =
  Vector (IORef Entries) -> Int -> IORef Entries
forall a. Vector a -> Int -> a
Vector.unsafeIndex (Wheel -> Vector (IORef Entries)
buckets Wheel
wheel) (Wheel -> Timestamp -> Int
index Wheel
wheel Timestamp
timestamp)

index :: Wheel -> Timestamp -> Int
index :: Wheel -> Timestamp -> Int
index wheel :: Wheel
wheel@Wheel {Micros
resolution :: Micros
resolution :: Wheel -> Micros
resolution} Timestamp
timestamp =
  Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Micros -> Timestamp -> Word64
Timestamp.epoch Micros
resolution Timestamp
timestamp) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`rem` Wheel -> Int
numSpokes Wheel
wheel

insert :: Wheel -> Int -> Micros -> IO () -> IO (IO Bool)
insert :: Wheel -> Int -> Micros -> IO () -> IO (IO Bool)
insert Wheel
wheel Int
key Micros
delay IO ()
action = do
  Timestamp
now :: Timestamp <-
    IO Timestamp
Timestamp.now

  let bucketRef :: IORef Entries
      bucketRef :: IORef Entries
bucketRef =
        Wheel -> Timestamp -> IORef Entries
bucket Wheel
wheel (Timestamp
now Timestamp -> Micros -> Timestamp
`Timestamp.plus` Micros
delay)

  let insertEntry :: Entries -> Entries
      insertEntry :: Entries -> Entries
insertEntry = Int -> Word64 -> IO () -> Entries -> Entries
Entries.insert Int
key (Micros -> Word64
unMicros (Micros
delay Micros -> Micros -> Micros
`Micros.div` Wheel -> Micros
lenMicros Wheel
wheel)) IO ()
action

  IORef Entries -> (Entries -> (Entries, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Entries
bucketRef (\Entries
entries -> (Entries -> Entries
insertEntry Entries
entries, ()))

  MVar (Maybe Bool)
canceledVar :: MVar (Maybe Bool) <-
    IO (MVar (Maybe Bool)) -> IO (MVar (Maybe Bool))
forall a. IO a -> IO a
unsafeInterleaveIO (Maybe Bool -> IO (MVar (Maybe Bool))
forall a. a -> IO (MVar a)
newMVar Maybe Bool
forall a. Maybe a
Nothing)

  IO Bool -> IO (IO Bool)
forall (f :: * -> *) a. Applicative f => a -> f a
pure do
    MVar (Maybe Bool)
-> (Maybe Bool -> IO (Maybe Bool, Bool)) -> IO Bool
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar (Maybe Bool)
canceledVar \Maybe Bool
maybeCanceled -> do
      Bool
canceled <-
        case Maybe Bool
maybeCanceled of
          Maybe Bool
Nothing ->
            IORef Entries -> (Entries -> (Entries, Bool)) -> IO Bool
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Entries
bucketRef \Entries
entries ->
              case Int -> Entries -> Maybe Entries
Entries.delete Int
key Entries
entries of
                Maybe Entries
Nothing -> (Entries
entries, Bool
False)
                Just Entries
entries' -> (Entries
entries', Bool
True)
          Just Bool
canceled -> Bool -> IO Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
canceled
      (Maybe Bool, Bool) -> IO (Maybe Bool, Bool)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool -> Maybe Bool
forall a. a -> Maybe a
Just Bool
canceled, Bool
canceled)

reap :: Wheel -> IO ()
reap :: Wheel -> IO ()
reap wheel :: Wheel
wheel@Wheel {Vector (IORef Entries)
buckets :: Vector (IORef Entries)
buckets :: Wheel -> Vector (IORef Entries)
buckets, Micros
resolution :: Micros
resolution :: Wheel -> Micros
resolution} = do
  Timestamp
now <- IO Timestamp
Timestamp.now
  let remainingBucketMicros :: Micros
remainingBucketMicros = Micros
resolution Micros -> Micros -> Micros
`Micros.minus` (Timestamp
now Timestamp -> Micros -> Micros
`Timestamp.rem` Micros
resolution)
  Micros -> IO ()
Micros.sleep Micros
remainingBucketMicros
  Timestamp -> Int -> IO ()
loop (Timestamp
now Timestamp -> Micros -> Timestamp
`Timestamp.plus` Micros
remainingBucketMicros Timestamp -> Micros -> Timestamp
`Timestamp.plus` Micros
resolution) (Wheel -> Timestamp -> Int
index Wheel
wheel Timestamp
now)
  where
    loop :: Timestamp -> Int -> IO ()
    loop :: Timestamp -> Int -> IO ()
loop Timestamp
nextTime Int
i = do
      IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IORef Entries -> (Entries -> (Entries, IO ())) -> IO (IO ())
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' (Vector (IORef Entries) -> Int -> IORef Entries
forall a. Vector a -> Int -> a
Vector.unsafeIndex Vector (IORef Entries)
buckets Int
i) Entries -> (Entries, IO ())
expire)
      Timestamp
afterTime <- IO Timestamp
Timestamp.now
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Timestamp
afterTime Timestamp -> Timestamp -> Bool
forall a. Ord a => a -> a -> Bool
< Timestamp
nextTime) (Micros -> IO ()
Micros.sleep (Timestamp
nextTime Timestamp -> Timestamp -> Micros
`Timestamp.minus` Timestamp
afterTime))
      Timestamp -> Int -> IO ()
loop (Timestamp
nextTime Timestamp -> Micros -> Timestamp
`Timestamp.plus` Micros
resolution) ((Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`rem` Wheel -> Int
numSpokes Wheel
wheel)
    expire :: Entries -> (Entries, IO ())
    expire :: Entries -> (Entries, IO ())
expire Entries
entries
      | Entries -> Bool
Entries.null Entries
entries = (Entries
entries, () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
      | Bool
otherwise = (Entries
alive, [IO ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ [IO ()]
expired)
      where
        ([IO ()]
expired, Entries
alive) = Entries -> ([IO ()], Entries)
Entries.partition Entries
entries