{-# LANGUAGE StrictData #-}
{-# LANGUAGE NoFieldSelectors #-}

module Job.Memory (queue) where

import Control.Concurrent
import Control.Concurrent.Async qualified as As
import Control.Concurrent.STM
import Control.Exception.Safe qualified as Ex
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Maybe (MaybeT (..), runMaybeT)
import Control.Monad.Trans.Resource.Extra qualified as R
import Control.Monad.Trans.State.Strict (put, runStateT)
import Data.Acquire qualified as A
import Data.Fixed
import Data.Function
import Data.List qualified as List
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Set (Set)
import Data.Set qualified as Set
import Data.Time qualified as Time
import Data.UUID.V7 (UUID)
import Data.UUID.V7 qualified as UUID7
import GHC.IO.Exception
import GHC.Stack

import Job

--------------------------------------------------------------------------------

keepAliveBeat :: Micro
keepAliveBeat :: Micro
keepAliveBeat = Micro
4

retryDelay :: Time.NominalDiffTime
retryDelay :: NominalDiffTime
retryDelay = NominalDiffTime
2

data Env job = Env
   { forall job. Env job -> TVar (Map Id (Meta, job))
jobs :: TVar (Map Id (Meta, job))
   , forall job. Env job -> TVar (Set (Meta, Id))
queued :: TVar (Set (Meta, Id))
   , forall job. Env job -> TMVar (UUID, (Id, Meta, job) -> STM ())
worker :: TMVar (UUID, (Id, Meta, job) -> STM ())
   , forall job. Env job -> TVar Bool
active :: TVar Bool
   }

acqEnv :: A.Acquire (Env job)
acqEnv :: forall job. Acquire (Env job)
acqEnv = do
   TVar (Map Id (Meta, job))
jobs <- IO (TVar (Map Id (Meta, job)))
-> (TVar (Map Id (Meta, job)) -> IO ())
-> Acquire (TVar (Map Id (Meta, job)))
forall a. IO a -> (a -> IO ()) -> Acquire a
R.mkAcquire1 (Map Id (Meta, job) -> IO (TVar (Map Id (Meta, job)))
forall a. a -> IO (TVar a)
newTVarIO Map Id (Meta, job)
forall a. Monoid a => a
mempty) \TVar (Map Id (Meta, job))
t -> do
      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Map Id (Meta, job)) -> Map Id (Meta, job) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Map Id (Meta, job))
t Map Id (Meta, job)
forall a. Monoid a => a
mempty
   TVar (Set (Meta, Id))
queued <- IO (TVar (Set (Meta, Id)))
-> (TVar (Set (Meta, Id)) -> IO ())
-> Acquire (TVar (Set (Meta, Id)))
forall a. IO a -> (a -> IO ()) -> Acquire a
R.mkAcquire1 (Set (Meta, Id) -> IO (TVar (Set (Meta, Id)))
forall a. a -> IO (TVar a)
newTVarIO Set (Meta, Id)
forall a. Monoid a => a
mempty) \TVar (Set (Meta, Id))
t -> do
      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Set (Meta, Id)) -> Set (Meta, Id) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Set (Meta, Id))
t Set (Meta, Id)
forall a. Monoid a => a
mempty
   TMVar (UUID, (Id, Meta, job) -> STM ())
worker <- IO (TMVar (UUID, (Id, Meta, job) -> STM ()))
-> (TMVar (UUID, (Id, Meta, job) -> STM ()) -> IO ())
-> Acquire (TMVar (UUID, (Id, Meta, job) -> STM ()))
forall a. IO a -> (a -> IO ()) -> Acquire a
R.mkAcquire1 IO (TMVar (UUID, (Id, Meta, job) -> STM ()))
forall a. IO (TMVar a)
newEmptyTMVarIO \TMVar (UUID, (Id, Meta, job) -> STM ())
t ->
      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM (Maybe (UUID, (Id, Meta, job) -> STM ())) -> STM ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM (Maybe (UUID, (Id, Meta, job) -> STM ())) -> STM ())
-> STM (Maybe (UUID, (Id, Meta, job) -> STM ())) -> STM ()
forall a b. (a -> b) -> a -> b
$ TMVar (UUID, (Id, Meta, job) -> STM ())
-> STM (Maybe (UUID, (Id, Meta, job) -> STM ()))
forall a. TMVar a -> STM (Maybe a)
tryTakeTMVar TMVar (UUID, (Id, Meta, job) -> STM ())
t
   TVar Bool
active <- IO (TVar Bool) -> (TVar Bool -> IO ()) -> Acquire (TVar Bool)
forall a. IO a -> (a -> IO ()) -> Acquire a
R.mkAcquire1 (Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
True) \TVar Bool
tv ->
      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
tv Bool
False -- TODO wait until no workers running?
   Env job -> Acquire (Env job)
forall a. a -> Acquire a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Env{TVar Bool
TVar (Map Id (Meta, job))
TVar (Set (Meta, Id))
TMVar (UUID, (Id, Meta, job) -> STM ())
$sel:jobs:Env :: TVar (Map Id (Meta, job))
$sel:queued:Env :: TVar (Set (Meta, Id))
$sel:worker:Env :: TMVar (UUID, (Id, Meta, job) -> STM ())
$sel:active:Env :: TVar Bool
jobs :: TVar (Map Id (Meta, job))
queued :: TVar (Set (Meta, Id))
worker :: TMVar (UUID, (Id, Meta, job) -> STM ())
active :: TVar Bool
..}

ensureActive :: Env job -> STM ()
ensureActive :: forall job. Env job -> STM ()
ensureActive Env job
env =
   TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar Env job
env.active STM Bool -> (Bool -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Bool
True -> () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      Bool
False -> IOError -> STM ()
forall e a. Exception e => e -> STM a
throwSTM (IOError -> STM ()) -> IOError -> STM ()
forall a b. (a -> b) -> a -> b
$ HasCallStack => String -> IOError
String -> IOError
resourceVanished String
"Job.Memory.queue"

connect1 :: forall job. Env job -> Time.UTCTime -> STM (Maybe Id)
connect1 :: forall job. Env job -> UTCTime -> STM (Maybe Id)
connect1 Env job
env UTCTime
now =
   TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar Env job
env.active STM Bool -> (Bool -> STM (Maybe Id)) -> STM (Maybe Id)
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Bool
False -> Maybe Id -> STM (Maybe Id)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe Id
forall a. Maybe a
Nothing
      Bool
True ->
         STM (Maybe (Id, Meta, job))
nextJob STM (Maybe (Id, Meta, job))
-> (Maybe (Id, Meta, job) -> STM (Maybe Id)) -> STM (Maybe Id)
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ((Id, Meta, job) -> STM Id)
-> Maybe (Id, Meta, job) -> STM (Maybe Id)
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Maybe a -> m (Maybe b)
mapM \x :: (Id, Meta, job)
x@(Id
i, Meta
_, job
_) -> do
            (UUID
_, (Id, Meta, job) -> STM ()
f) <- TMVar (UUID, (Id, Meta, job) -> STM ())
-> STM (UUID, (Id, Meta, job) -> STM ())
forall a. TMVar a -> STM a
takeTMVar Env job
env.worker
            (Id, Meta, job) -> STM ()
f (Id, Meta, job)
x STM () -> STM Id -> STM Id
forall a b. STM a -> STM b -> STM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Id -> STM Id
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Id
i
  where
   nextJob :: STM (Maybe (Id, Meta, job))
   nextJob :: STM (Maybe (Id, Meta, job))
nextJob = do
      Set (Meta, Id)
q0 <- TVar (Set (Meta, Id)) -> STM (Set (Meta, Id))
forall a. TVar a -> STM a
readTVar Env job
env.queued
      Maybe ((Meta, Id), Set (Meta, Id))
-> (((Meta, Id), Set (Meta, Id)) -> STM (Id, Meta, job))
-> STM (Maybe (Id, Meta, job))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM (Set (Meta, Id) -> Maybe ((Meta, Id), Set (Meta, Id))
forall a. Set a -> Maybe (a, Set a)
Set.minView Set (Meta, Id)
q0) \((m0 :: Meta
m0@Meta{Maybe UTCTime
Word32
UTCTime
Nice
alive :: Maybe UTCTime
nice :: Nice
wait :: UTCTime
try :: Word32
$sel:alive:Meta :: Meta -> Maybe UTCTime
$sel:nice:Meta :: Meta -> Nice
$sel:wait:Meta :: Meta -> UTCTime
$sel:try:Meta :: Meta -> Word32
..}, Id
i), Set (Meta, Id)
q1) -> do
         TVar (Set (Meta, Id)) -> Set (Meta, Id) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar Env job
env.queued Set (Meta, Id)
q1
         let m :: Meta
m = Meta{$sel:alive:Meta :: Maybe UTCTime
alive = UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just UTCTime
now, Word32
UTCTime
Nice
nice :: Nice
wait :: UTCTime
try :: Word32
$sel:nice:Meta :: Nice
$sel:wait:Meta :: UTCTime
$sel:try:Meta :: Word32
..}
         Map Id (Meta, job)
jobs0 <- TVar (Map Id (Meta, job)) -> STM (Map Id (Meta, job))
forall a. TVar a -> STM a
readTVar Env job
env.jobs
         (!Map Id (Meta, job)
jobs1, Maybe job
yj) <- (StateT (Maybe job) STM (Map Id (Meta, job))
 -> Maybe job -> STM (Map Id (Meta, job), Maybe job))
-> Maybe job
-> StateT (Maybe job) STM (Map Id (Meta, job))
-> STM (Map Id (Meta, job), Maybe job)
forall a b c. (a -> b -> c) -> b -> a -> c
flip StateT (Maybe job) STM (Map Id (Meta, job))
-> Maybe job -> STM (Map Id (Meta, job), Maybe job)
forall s (m :: * -> *) a. StateT s m a -> s -> m (a, s)
runStateT Maybe job
forall a. Maybe a
Nothing do
            (Maybe (Meta, job) -> StateT (Maybe job) STM (Maybe (Meta, job)))
-> Id
-> Map Id (Meta, job)
-> StateT (Maybe job) STM (Map Id (Meta, job))
forall (f :: * -> *) k a.
(Functor f, Ord k) =>
(Maybe a -> f (Maybe a)) -> k -> Map k a -> f (Map k a)
Map.alterF
               ( ((Meta, job) -> StateT (Maybe job) STM (Meta, job))
-> Maybe (Meta, job) -> StateT (Maybe job) STM (Maybe (Meta, job))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Maybe a -> m (Maybe b)
mapM \(Meta
m1, job
j) -> do
                  Bool -> StateT (Maybe job) STM () -> StateT (Maybe job) STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Meta
m0 Meta -> Meta -> Bool
forall a. Eq a => a -> a -> Bool
/= Meta
m1) (StateT (Maybe job) STM () -> StateT (Maybe job) STM ())
-> StateT (Maybe job) STM () -> StateT (Maybe job) STM ()
forall a b. (a -> b) -> a -> b
$ String -> StateT (Maybe job) STM ()
forall (m :: * -> *) a.
(MonadThrow m, HasCallStack) =>
String -> m a
Ex.throwString String
"m0 /= m1"
                  Maybe job -> StateT (Maybe job) STM ()
forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put (job -> Maybe job
forall a. a -> Maybe a
Just job
j)
                  (Meta, job) -> StateT (Maybe job) STM (Meta, job)
forall a. a -> StateT (Maybe job) STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Meta
m, job
j)
               )
               Id
i
               Map Id (Meta, job)
jobs0
         case Maybe job
yj of
            Just job
j -> TVar (Map Id (Meta, job)) -> Map Id (Meta, job) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar Env job
env.jobs Map Id (Meta, job)
jobs1 STM () -> STM (Id, Meta, job) -> STM (Id, Meta, job)
forall a b. STM a -> STM b -> STM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (Id, Meta, job) -> STM (Id, Meta, job)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Id
i, Meta
m, job
j)
            Maybe job
Nothing -> String -> STM (Id, Meta, job)
forall (m :: * -> *) a.
(MonadThrow m, HasCallStack) =>
String -> m a
Ex.throwString String
"job in queue but not in map"

connectMany :: forall job. Env job -> IO [Id]
connectMany :: forall job. Env job -> IO [Id]
connectMany Env job
env = (([Id] -> [Id]) -> [Id]) -> IO ([Id] -> [Id]) -> IO [Id]
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (([Id] -> [Id]) -> [Id] -> [Id]
forall a b. (a -> b) -> a -> b
$ []) (([Id] -> [Id]) -> IO ([Id] -> [Id])
go [Id] -> [Id]
forall a. a -> a
id)
  where
   go :: ([Id] -> [Id]) -> IO ([Id] -> [Id])
   go :: ([Id] -> [Id]) -> IO ([Id] -> [Id])
go [Id] -> [Id]
f = do
      -- We perform each connect1 in a separate transaction
      -- so that we don't hog concurrent access to the Env.
      UTCTime
now <- IO UTCTime
Time.getCurrentTime
      STM (Maybe Id) -> IO (Maybe Id)
forall a. STM a -> IO a
atomically (Env job -> UTCTime -> STM (Maybe Id)
forall job. Env job -> UTCTime -> STM (Maybe Id)
connect1 Env job
env UTCTime
now) IO (Maybe Id)
-> (Maybe Id -> IO ([Id] -> [Id])) -> IO ([Id] -> [Id])
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
         Just Id
i -> ([Id] -> [Id]) -> IO ([Id] -> [Id])
go ((Id
i :) ([Id] -> [Id]) -> ([Id] -> [Id]) -> [Id] -> [Id]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Id] -> [Id]
f)
         Maybe Id
Nothing -> ([Id] -> [Id]) -> IO ([Id] -> [Id])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Id] -> [Id]
f

-- | An in-memory 'Queue'.
queue :: forall job. A.Acquire (Queue job)
queue :: forall job. Acquire (Queue job)
queue = do
   Env job
env <- Acquire (Env job)
forall job. Acquire (Env job)
acqEnv
   Acquire (Async Any) -> Acquire ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void do
      -- This background thread perform 'connectMany' every 60 seconds.
      -- Not really necessary, just in case a timer died for some reason.
      IO (Async Any) -> (Async Any -> IO ()) -> Acquire (Async Any)
forall a. IO a -> (a -> IO ()) -> Acquire a
R.mkAcquire1
         ( IO Any -> IO (Async Any)
forall a. IO a -> IO (Async a)
As.async (IO Any -> IO (Async Any)) -> IO Any -> IO (Async Any)
forall a b. (a -> b) -> a -> b
$ IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
            Int -> IO ()
threadDelay Int
60_000_000
            IO [Id] -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO [Id] -> IO ()) -> IO [Id] -> IO ()
forall a b. (a -> b) -> a -> b
$ Env job -> IO [Id]
forall job. Env job -> IO [Id]
connectMany Env job
env
         )
         Async Any -> IO ()
forall a. Async a -> IO ()
As.uninterruptibleCancel
   Queue job -> Acquire (Queue job)
forall a. a -> Acquire a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
      Queue
         { $sel:push:Queue :: Nice -> UTCTime -> job -> IO Id
push = \Nice
nice UTCTime
wait job
job -> do
            let m :: Meta
m = Meta{Nice
$sel:nice:Meta :: Nice
nice :: Nice
nice, UTCTime
$sel:wait:Meta :: UTCTime
wait :: UTCTime
wait, $sel:alive:Meta :: Maybe UTCTime
alive = Maybe UTCTime
forall a. Maybe a
Nothing, $sel:try:Meta :: Word32
try = Word32
0}
            Id
i <- IO Id
forall (m :: * -> *). MonadIO m => m Id
newId
            STM () -> IO ()
forall a. STM a -> IO a
atomically do
               Env job -> STM ()
forall job. Env job -> STM ()
ensureActive Env job
env
               TVar (Map Id (Meta, job))
-> (Map Id (Meta, job) -> Map Id (Meta, job)) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' Env job
env.jobs ((Map Id (Meta, job) -> Map Id (Meta, job)) -> STM ())
-> (Map Id (Meta, job) -> Map Id (Meta, job)) -> STM ()
forall a b. (a -> b) -> a -> b
$ Id -> (Meta, job) -> Map Id (Meta, job) -> Map Id (Meta, job)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Id
i (Meta
m, job
job)
               TVar (Set (Meta, Id))
-> (Set (Meta, Id) -> Set (Meta, Id)) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' Env job
env.queued ((Set (Meta, Id) -> Set (Meta, Id)) -> STM ())
-> (Set (Meta, Id) -> Set (Meta, Id)) -> STM ()
forall a b. (a -> b) -> a -> b
$ (Meta, Id) -> Set (Meta, Id) -> Set (Meta, Id)
forall a. Ord a => a -> Set a -> Set a
Set.insert (Meta
m, Id
i)
            IO (Async ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Async ()) -> IO ()) -> IO (Async ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
As.async do
               UTCTime -> IO ()
threadDelayUTCTime UTCTime
wait
               IO [Id] -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO [Id] -> IO ()) -> IO [Id] -> IO ()
forall a b. (a -> b) -> a -> b
$ Env job -> IO [Id]
forall job. Env job -> IO [Id]
connectMany Env job
env
            Id -> IO Id
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Id
i
         , ---------
           $sel:prune:Queue :: forall a. Monoid a => (Id -> Meta -> job -> (Bool, a)) -> IO a
prune = \Id -> Meta -> job -> (Bool, a)
f ->
            STM a -> IO a
forall a. STM a -> IO a
atomically do
               Env job -> STM ()
forall job. Env job -> STM ()
ensureActive Env job
env
               Map Id (Meta, job)
jobs0 <- TVar (Map Id (Meta, job)) -> STM (Map Id (Meta, job))
forall a. TVar a -> STM a
readTVar Env job
env.jobs
               let ([(Id, (Meta, job))]
js, [(Meta, Id)]
qs, a
a) =
                     (([(Id, (Meta, job))], [(Meta, Id)], a)
 -> (Id, (Meta, job)) -> ([(Id, (Meta, job))], [(Meta, Id)], a))
-> ([(Id, (Meta, job))], [(Meta, Id)], a)
-> [(Id, (Meta, job))]
-> ([(Id, (Meta, job))], [(Meta, Id)], a)
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
List.foldl'
                        ( \(![(Id, (Meta, job))]
js0, ![(Meta, Id)]
qs0, !a
al) (Id
i, (Meta
m, job
j)) ->
                           case Id -> Meta -> job -> (Bool, a)
f Id
i Meta
m job
j of
                              (Bool
False, a
ar) -> ([(Id, (Meta, job))]
js0, [(Meta, Id)]
qs0, a
al a -> a -> a
forall a. Semigroup a => a -> a -> a
<> a
ar)
                              (Bool
True, a
ar) ->
                                 ( (Id
i, (Meta
m, job
j)) (Id, (Meta, job)) -> [(Id, (Meta, job))] -> [(Id, (Meta, job))]
forall a. a -> [a] -> [a]
: [(Id, (Meta, job))]
js0
                                 , [(Meta, Id)]
-> (UTCTime -> [(Meta, Id)]) -> Maybe UTCTime -> [(Meta, Id)]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ((Meta
m, Id
i) (Meta, Id) -> [(Meta, Id)] -> [(Meta, Id)]
forall a. a -> [a] -> [a]
: [(Meta, Id)]
qs0) ([(Meta, Id)] -> UTCTime -> [(Meta, Id)]
forall a b. a -> b -> a
const [(Meta, Id)]
qs0) Meta
m.alive
                                 , a
al a -> a -> a
forall a. Semigroup a => a -> a -> a
<> a
ar
                                 )
                        )
                        ([(Id, (Meta, job))], [(Meta, Id)], a)
forall a. Monoid a => a
mempty
                        ([(Id, (Meta, job))] -> ([(Id, (Meta, job))], [(Meta, Id)], a))
-> [(Id, (Meta, job))] -> ([(Id, (Meta, job))], [(Meta, Id)], a)
forall a b. (a -> b) -> a -> b
$ ((Id, (Meta, job)) -> (Meta, Id))
-> [(Id, (Meta, job))] -> [(Id, (Meta, job))]
forall b a. Ord b => (a -> b) -> [a] -> [a]
List.sortOn (\(Id
i, (Meta
m, job
_)) -> (Meta
m, Id
i))
                        ([(Id, (Meta, job))] -> [(Id, (Meta, job))])
-> [(Id, (Meta, job))] -> [(Id, (Meta, job))]
forall a b. (a -> b) -> a -> b
$ Map Id (Meta, job) -> [(Id, (Meta, job))]
forall k a. Map k a -> [(k, a)]
Map.toList Map Id (Meta, job)
jobs0
               TVar (Map Id (Meta, job)) -> Map Id (Meta, job) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar Env job
env.jobs (Map Id (Meta, job) -> STM ()) -> Map Id (Meta, job) -> STM ()
forall a b. (a -> b) -> a -> b
$! [(Id, (Meta, job))] -> Map Id (Meta, job)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [(Id, (Meta, job))]
js
               TVar (Set (Meta, Id)) -> Set (Meta, Id) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar Env job
env.queued (Set (Meta, Id) -> STM ()) -> Set (Meta, Id) -> STM ()
forall a b. (a -> b) -> a -> b
$! [(Meta, Id)] -> Set (Meta, Id)
forall a. Ord a => [a] -> Set a
Set.fromList [(Meta, Id)]
qs
               a -> STM a
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
a
         , ---------
           $sel:pull:Queue :: Acquire (Maybe (Work job))
pull = MaybeT Acquire (Work job) -> Acquire (Maybe (Work job))
forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT do
            IO Bool -> MaybeT Acquire Bool
forall a. IO a -> MaybeT Acquire a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (TVar Bool -> IO Bool
forall a. TVar a -> IO a
readTVarIO Env job
env.active) MaybeT Acquire Bool
-> (Bool -> MaybeT Acquire ()) -> MaybeT Acquire ()
forall a b.
MaybeT Acquire a -> (a -> MaybeT Acquire b) -> MaybeT Acquire b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
               Bool
False -> Acquire (Maybe ()) -> MaybeT Acquire ()
forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT (Acquire (Maybe ()) -> MaybeT Acquire ())
-> Acquire (Maybe ()) -> MaybeT Acquire ()
forall a b. (a -> b) -> a -> b
$ Maybe () -> Acquire (Maybe ())
forall a. a -> Acquire a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ()
forall a. Maybe a
Nothing
               Bool
True -> () -> MaybeT Acquire ()
forall a. a -> MaybeT Acquire a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
            TVar (Maybe (Maybe (Nice, UTCTime)))
tout :: TVar (Maybe (Maybe (Nice, Time.UTCTime))) <-
               IO (TVar (Maybe (Maybe (Nice, UTCTime))))
-> MaybeT Acquire (TVar (Maybe (Maybe (Nice, UTCTime))))
forall a. IO a -> MaybeT Acquire a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TVar (Maybe (Maybe (Nice, UTCTime))))
 -> MaybeT Acquire (TVar (Maybe (Maybe (Nice, UTCTime)))))
-> IO (TVar (Maybe (Maybe (Nice, UTCTime))))
-> MaybeT Acquire (TVar (Maybe (Maybe (Nice, UTCTime))))
forall a b. (a -> b) -> a -> b
$ Maybe (Maybe (Nice, UTCTime))
-> IO (TVar (Maybe (Maybe (Nice, UTCTime))))
forall a. a -> IO (TVar a)
newTVarIO Maybe (Maybe (Nice, UTCTime))
forall a. Maybe a
Nothing
            (Id
i :: Id, Meta
meta :: Meta, job
job :: job) <- Acquire (Maybe (Id, Meta, job)) -> MaybeT Acquire (Id, Meta, job)
forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT do
               IO (Maybe (Id, Meta, job))
-> (Maybe (Id, Meta, job) -> ReleaseType -> IO ())
-> Acquire (Maybe (Id, Meta, job))
forall a. IO a -> (a -> ReleaseType -> IO ()) -> Acquire a
R.mkAcquireType1
                  ( do
                     UUID
k0 <- IO UUID
forall (m :: * -> *). MonadIO m => m UUID
UUID7.genUUID
                     IO (Maybe (TMVar (Id, Meta, job)))
-> (Maybe (TMVar (Id, Meta, job)) -> IO ())
-> (Maybe (TMVar (Id, Meta, job)) -> IO (Maybe (Id, Meta, job)))
-> IO (Maybe (Id, Meta, job))
forall (m :: * -> *) a b c.
(HasCallStack, MonadMask m) =>
m a -> (a -> m b) -> (a -> m c) -> m c
Ex.bracketOnError
                        ( STM (Maybe (TMVar (Id, Meta, job)))
-> IO (Maybe (TMVar (Id, Meta, job)))
forall a. STM a -> IO a
atomically do
                           TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar Env job
env.active STM Bool
-> (Bool -> STM (Maybe (TMVar (Id, Meta, job))))
-> STM (Maybe (TMVar (Id, Meta, job)))
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                              Bool
False -> Maybe (TMVar (Id, Meta, job))
-> STM (Maybe (TMVar (Id, Meta, job)))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (TMVar (Id, Meta, job))
forall a. Maybe a
Nothing
                              Bool
True -> do
                                 TMVar (Id, Meta, job)
tj <- STM (TMVar (Id, Meta, job))
forall a. STM (TMVar a)
newEmptyTMVar
                                 -- Blocks until we become next worker
                                 TMVar (UUID, (Id, Meta, job) -> STM ())
-> (UUID, (Id, Meta, job) -> STM ()) -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar Env job
env.worker (UUID
k0, TMVar (Id, Meta, job) -> (Id, Meta, job) -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Id, Meta, job)
tj)
                                 Maybe (TMVar (Id, Meta, job))
-> STM (Maybe (TMVar (Id, Meta, job)))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TMVar (Id, Meta, job) -> Maybe (TMVar (Id, Meta, job))
forall a. a -> Maybe a
Just TMVar (Id, Meta, job)
tj)
                        )
                        ( (TMVar (Id, Meta, job) -> IO ())
-> Maybe (TMVar (Id, Meta, job)) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ \TMVar (Id, Meta, job)
_ -> STM () -> IO ()
forall a. STM a -> IO a
atomically do
                           TMVar (UUID, (Id, Meta, job) -> STM ())
-> STM (Maybe (UUID, (Id, Meta, job) -> STM ()))
forall a. TMVar a -> STM (Maybe a)
tryTakeTMVar Env job
env.worker STM (Maybe (UUID, (Id, Meta, job) -> STM ()))
-> (Maybe (UUID, (Id, Meta, job) -> STM ()) -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                              Just (UUID
k1, (Id, Meta, job) -> STM ()
f)
                                 | UUID
k0 UUID -> UUID -> Bool
forall a. Eq a => a -> a -> Bool
/= UUID
k1 ->
                                    TMVar (UUID, (Id, Meta, job) -> STM ())
-> (UUID, (Id, Meta, job) -> STM ()) -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar Env job
env.worker (UUID
k1, (Id, Meta, job) -> STM ()
f)
                              Maybe (UUID, (Id, Meta, job) -> STM ())
_ -> () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                        )
                        ( \case
                           Maybe (TMVar (Id, Meta, job))
Nothing -> Maybe (Id, Meta, job) -> IO (Maybe (Id, Meta, job))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Id, Meta, job)
forall a. Maybe a
Nothing
                           Just TMVar (Id, Meta, job)
tj -> STM (Maybe (Id, Meta, job)) -> IO (Maybe (Id, Meta, job))
forall a. STM a -> IO a
atomically do
                              TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar Env job
env.active STM Bool
-> (Bool -> STM (Maybe (Id, Meta, job)))
-> STM (Maybe (Id, Meta, job))
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                                 Bool
False -> Maybe (Id, Meta, job) -> STM (Maybe (Id, Meta, job))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Id, Meta, job)
forall a. Maybe a
Nothing
                                 Bool
True ->
                                    -- Blocks until we get job input
                                    (Id, Meta, job) -> Maybe (Id, Meta, job)
forall a. a -> Maybe a
Just ((Id, Meta, job) -> Maybe (Id, Meta, job))
-> STM (Id, Meta, job) -> STM (Maybe (Id, Meta, job))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMVar (Id, Meta, job) -> STM (Id, Meta, job)
forall a. TMVar a -> STM a
takeTMVar TMVar (Id, Meta, job)
tj
                        )
                  )
                  ( \Maybe (Id, Meta, job)
yx ReleaseType
rt -> Maybe (Id, Meta, job) -> ((Id, Meta, job) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe (Id, Meta, job)
yx \(Id
i, Meta
m0, job
job) -> do
                     Maybe Meta
ym1 <-
                        TVar (Maybe (Maybe (Nice, UTCTime)))
-> IO (Maybe (Maybe (Nice, UTCTime)))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Maybe (Nice, UTCTime)))
tout IO (Maybe (Maybe (Nice, UTCTime)))
-> (Maybe (Maybe (Nice, UTCTime)) -> IO (Maybe Meta))
-> IO (Maybe Meta)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                           -- no 'retry', and no 'finish',
                           -- and release with exception.
                           Maybe (Maybe (Nice, UTCTime))
Nothing
                              | A.ReleaseExceptionWith SomeException
_ <- ReleaseType
rt -> do
                                 UTCTime
now <- IO UTCTime
Time.getCurrentTime
                                 let alive :: Maybe a
alive = Maybe a
forall a. Maybe a
Nothing
                                     try :: Word32
try = Word32 -> Word32
forall a. Enum a => a -> a
succ Meta
m0.try
                                     nice :: Nice
nice = Meta
m0.nice
                                     wait :: UTCTime
wait = NominalDiffTime -> UTCTime -> UTCTime
Time.addUTCTime NominalDiffTime
retryDelay UTCTime
now
                                 Maybe Meta -> IO (Maybe Meta)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Meta -> IO (Maybe Meta)) -> Maybe Meta -> IO (Maybe Meta)
forall a b. (a -> b) -> a -> b
$ Meta -> Maybe Meta
forall a. a -> Maybe a
Just Meta{Maybe UTCTime
Word32
UTCTime
Nice
forall a. Maybe a
$sel:alive:Meta :: Maybe UTCTime
$sel:nice:Meta :: Nice
$sel:wait:Meta :: UTCTime
$sel:try:Meta :: Word32
alive :: forall a. Maybe a
try :: Word32
nice :: Nice
wait :: UTCTime
..}
                           -- explicit 'retry'.
                           Just (Just (Nice
nice, UTCTime
wait)) -> do
                              let alive :: Maybe a
alive = Maybe a
forall a. Maybe a
Nothing
                                  try :: Word32
try = Word32 -> Word32
forall a. Enum a => a -> a
succ Meta
m0.try
                              Maybe Meta -> IO (Maybe Meta)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Meta -> IO (Maybe Meta)) -> Maybe Meta -> IO (Maybe Meta)
forall a b. (a -> b) -> a -> b
$ Meta -> Maybe Meta
forall a. a -> Maybe a
Just Meta{Maybe UTCTime
Word32
UTCTime
Nice
forall a. Maybe a
$sel:alive:Meta :: Maybe UTCTime
$sel:nice:Meta :: Nice
$sel:wait:Meta :: UTCTime
$sel:try:Meta :: Word32
nice :: Nice
wait :: UTCTime
alive :: forall a. Maybe a
try :: Word32
..}
                           -- explicit 'finish',
                           -- or release without exception.
                           Maybe (Maybe (Nice, UTCTime))
_ -> Maybe Meta -> IO (Maybe Meta)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe Meta
forall a. Maybe a
Nothing
                     case Maybe Meta
ym1 of
                        Maybe Meta
Nothing ->
                           STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Map Id (Meta, job))
-> (Map Id (Meta, job) -> Map Id (Meta, job)) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' Env job
env.jobs ((Map Id (Meta, job) -> Map Id (Meta, job)) -> STM ())
-> (Map Id (Meta, job) -> Map Id (Meta, job)) -> STM ()
forall a b. (a -> b) -> a -> b
$ Id -> Map Id (Meta, job) -> Map Id (Meta, job)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete Id
i
                        Just !Meta
m1 -> do
                           STM () -> IO ()
forall a. STM a -> IO a
atomically do
                              Env job -> STM ()
forall job. Env job -> STM ()
ensureActive Env job
env
                              TVar (Set (Meta, Id))
-> (Set (Meta, Id) -> Set (Meta, Id)) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' Env job
env.queued ((Set (Meta, Id) -> Set (Meta, Id)) -> STM ())
-> (Set (Meta, Id) -> Set (Meta, Id)) -> STM ()
forall a b. (a -> b) -> a -> b
$ (Meta, Id) -> Set (Meta, Id) -> Set (Meta, Id)
forall a. Ord a => a -> Set a -> Set a
Set.insert (Meta
m1, Id
i)
                              TVar (Map Id (Meta, job))
-> (Map Id (Meta, job) -> Map Id (Meta, job)) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' Env job
env.jobs ((Map Id (Meta, job) -> Map Id (Meta, job)) -> STM ())
-> (Map Id (Meta, job) -> Map Id (Meta, job)) -> STM ()
forall a b. (a -> b) -> a -> b
$
                                 Id -> (Meta, job) -> Map Id (Meta, job) -> Map Id (Meta, job)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Id
i (Meta
m1, job
job)
                           IO (Async ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Async ()) -> IO ()) -> IO (Async ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
As.async do
                              UTCTime -> IO ()
threadDelayUTCTime Meta
m1.wait
                              IO [Id] -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO [Id] -> IO ()) -> IO [Id] -> IO ()
forall a b. (a -> b) -> a -> b
$ Env job -> IO [Id]
forall job. Env job -> IO [Id]
connectMany Env job
env
                  )
            -- While working on this job, send heartbeats
            MaybeT Acquire (Async ()) -> MaybeT Acquire ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (MaybeT Acquire (Async ()) -> MaybeT Acquire ())
-> MaybeT Acquire (Async ()) -> MaybeT Acquire ()
forall a b. (a -> b) -> a -> b
$ Acquire (Async ()) -> MaybeT Acquire (Async ())
forall (m :: * -> *) a. Monad m => m a -> MaybeT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift do
               IO (Async ()) -> (Async () -> IO ()) -> Acquire (Async ())
forall a. IO a -> (a -> IO ()) -> Acquire a
R.mkAcquire1
                  ( IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
As.async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ (IO () -> IO ()) -> IO ()
forall a. (a -> a) -> a
fix \IO ()
again -> do
                     Either SomeException Bool
eactive <- IO Bool -> IO (Either SomeException Bool)
forall (m :: * -> *) a.
(HasCallStack, MonadCatch m) =>
m a -> m (Either SomeException a)
Ex.tryAny do
                        Micro -> IO ()
threadDelayMicro Micro
keepAliveBeat
                        UTCTime
now <- IO UTCTime
Time.getCurrentTime
                        let m1 :: Meta
m1
                              | Meta{Maybe UTCTime
Word32
UTCTime
Nice
$sel:alive:Meta :: Meta -> Maybe UTCTime
$sel:nice:Meta :: Meta -> Nice
$sel:wait:Meta :: Meta -> UTCTime
$sel:try:Meta :: Meta -> Word32
alive :: Maybe UTCTime
nice :: Nice
wait :: UTCTime
try :: Word32
..} <- Meta
meta =
                                 Meta{$sel:alive:Meta :: Maybe UTCTime
alive = UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just UTCTime
now, Word32
UTCTime
Nice
$sel:nice:Meta :: Nice
$sel:wait:Meta :: UTCTime
$sel:try:Meta :: Word32
nice :: Nice
wait :: UTCTime
try :: Word32
..}
                        STM Bool -> IO Bool
forall a. STM a -> IO a
atomically do
                           Bool
a <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar Env job
env.active
                           Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
a do
                              TVar (Map Id (Meta, job))
-> (Map Id (Meta, job) -> Map Id (Meta, job)) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' Env job
env.jobs ((Map Id (Meta, job) -> Map Id (Meta, job)) -> STM ())
-> (Map Id (Meta, job) -> Map Id (Meta, job)) -> STM ()
forall a b. (a -> b) -> a -> b
$
                                 Id -> (Meta, job) -> Map Id (Meta, job) -> Map Id (Meta, job)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Id
i (Meta
m1, job
job)
                           Bool -> STM Bool
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
a
                     case Either SomeException Bool
eactive of
                        Right Bool
False -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                        Either SomeException Bool
_ -> IO ()
again
                  )
                  Async () -> IO ()
forall a. Async a -> IO ()
As.uninterruptibleCancel
            Work job -> MaybeT Acquire (Work job)
forall a. a -> MaybeT Acquire a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
               Work
                  { $sel:id:Work :: Id
id = Id
i
                  , job
job :: job
$sel:job:Work :: job
job
                  , Meta
meta :: Meta
$sel:meta:Work :: Meta
meta
                  , $sel:retry:Work :: Nice -> UTCTime -> IO ()
retry = \Nice
n UTCTime
w ->
                     STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe (Maybe (Nice, UTCTime)))
-> Maybe (Maybe (Nice, UTCTime)) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe (Maybe (Nice, UTCTime)))
tout (Maybe (Maybe (Nice, UTCTime)) -> STM ())
-> Maybe (Maybe (Nice, UTCTime)) -> STM ()
forall a b. (a -> b) -> a -> b
$ Maybe (Nice, UTCTime) -> Maybe (Maybe (Nice, UTCTime))
forall a. a -> Maybe a
Just ((Nice, UTCTime) -> Maybe (Nice, UTCTime)
forall a. a -> Maybe a
Just (Nice
n, UTCTime
w))
                  , $sel:finish:Work :: IO ()
finish = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe (Maybe (Nice, UTCTime)))
-> Maybe (Maybe (Nice, UTCTime)) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe (Maybe (Nice, UTCTime)))
tout (Maybe (Maybe (Nice, UTCTime)) -> STM ())
-> Maybe (Maybe (Nice, UTCTime)) -> STM ()
forall a b. (a -> b) -> a -> b
$ Maybe (Nice, UTCTime) -> Maybe (Maybe (Nice, UTCTime))
forall a. a -> Maybe a
Just Maybe (Nice, UTCTime)
forall a. Maybe a
Nothing
                  }
         }

--------------------------------------------------------------------------------

-- | Like 'threadDelay', but waits until a specified 'Time.UTCTime'.
threadDelayUTCTime :: Time.UTCTime -> IO ()
threadDelayUTCTime :: UTCTime -> IO ()
threadDelayUTCTime UTCTime
wait = do
   TVar Bool
t <- UTCTime -> IO (TVar Bool)
registerDelayUTCTime UTCTime
wait
   STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
t STM Bool -> (Bool -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> STM ()
check

-- | Like 'registerDelay', but waits until a specified 'Time.UTCTime'.
registerDelayUTCTime :: Time.UTCTime -> IO (TVar Bool)
registerDelayUTCTime :: UTCTime -> IO (TVar Bool)
registerDelayUTCTime UTCTime
wait = do
   UTCTime
start <- IO UTCTime
Time.getCurrentTime
   if UTCTime
wait UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
<= UTCTime
start
      then Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
True
      else do
         TVar Bool
tvtmp <-
            Micro -> IO (TVar Bool)
registerDelayMicro (Micro -> IO (TVar Bool)) -> Micro -> IO (TVar Bool)
forall a b. (a -> b) -> a -> b
$
               Pico -> Micro
ceilingPicoToMicro (Pico -> Micro) -> Pico -> Micro
forall a b. (a -> b) -> a -> b
$
                  NominalDiffTime -> Pico
Time.nominalDiffTimeToSeconds (NominalDiffTime -> Pico) -> NominalDiffTime -> Pico
forall a b. (a -> b) -> a -> b
$
                     UTCTime -> UTCTime -> NominalDiffTime
Time.diffUTCTime UTCTime
wait UTCTime
start
         TVar Bool
tvout <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
False
         IO (Async ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Async ()) -> IO ()) -> IO (Async ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO (Async ()) -> IO (Async ())
forall (m :: * -> *) a. (HasCallStack, MonadMask m) => m a -> m a
Ex.mask_ (IO (Async ()) -> IO (Async ())) -> IO (Async ()) -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
As.async do
            STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
tvtmp STM Bool -> (Bool -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> STM ()
check
            (IO () -> IO ()) -> IO ()
forall a. (a -> a) -> a
fix \IO ()
again -> do
               UTCTime
now <- IO UTCTime
Time.getCurrentTime
               if UTCTime
wait UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
<= UTCTime
now
                  then STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
tvout Bool
True
                  else do
                     -- Only happens if there were leap seconds involved.
                     -- We just loop innefficiently for a bit.
                     Int -> IO ()
threadDelay Int
10_000 -- 10 milliseconds
                     IO ()
again
         TVar Bool -> IO (TVar Bool)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TVar Bool
tvout

ceilingPicoToMicro :: Pico -> Micro
ceilingPicoToMicro :: Pico -> Micro
ceilingPicoToMicro (MkFixed Integer
p) = Integer -> Micro
forall k (a :: k). Integer -> Fixed a
MkFixed (Rational -> Integer
forall b. Integral b => Rational -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling (Integer -> Rational
forall a. Real a => a -> Rational
toRational Integer
p Rational -> Rational -> Rational
forall a. Fractional a => a -> a -> a
/ Rational
1_000_000))

-- | Like 'registerDelay', but not limited to @'maxBound' :: 'Int'@.
registerDelayMicro :: Micro -> IO (TVar Bool)
registerDelayMicro :: Micro -> IO (TVar Bool)
registerDelayMicro Micro
us
   | Micro
us Micro -> Micro -> Bool
forall a. Ord a => a -> a -> Bool
<= Micro
0 = Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
True
   | Bool
otherwise = do
      TVar Bool
tv <- IO (TVar Bool) -> IO (TVar Bool)
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TVar Bool) -> IO (TVar Bool))
-> IO (TVar Bool) -> IO (TVar Bool)
forall a b. (a -> b) -> a -> b
$ Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
False
      IO (Async ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Async ()) -> IO ()) -> IO (Async ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO (Async ()) -> IO (Async ())
forall (m :: * -> *) a. (HasCallStack, MonadMask m) => m a -> m a
Ex.mask_ (IO (Async ()) -> IO (Async ())) -> IO (Async ()) -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
As.async do
         Micro -> IO ()
threadDelayMicro Micro
us
         STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
tv Bool
True
      TVar Bool -> IO (TVar Bool)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TVar Bool
tv

-- | Like 'threadDelay', but not limited to @'maxBound' :: 'Int'@.
threadDelayMicro :: Micro -> IO ()
threadDelayMicro :: Micro -> IO ()
threadDelayMicro = ((Micro -> IO ()) -> Micro -> IO ()) -> Micro -> IO ()
forall a. (a -> a) -> a
fix \Micro -> IO ()
again Micro
us -> Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Micro
us Micro -> Micro -> Bool
forall a. Ord a => a -> a -> Bool
> Micro
0) do
   let d :: Int
d = Micro -> Int
forall b. Integral b => Micro -> b
forall a b. (RealFrac a, Integral b) => a -> b
floor (Micro -> Micro -> Micro
forall a. Ord a => a -> a -> a
min Micro
us Micro
stepMax Micro -> Micro -> Micro
forall a. Num a => a -> a -> a
* Micro
1_000_000)
   Int -> IO ()
threadDelay Int
d
   Micro -> IO ()
again (Micro
us Micro -> Micro -> Micro
forall a. Num a => a -> a -> a
- Micro
stepMax)
  where
   stepMax :: Micro
   stepMax :: Micro
stepMax = Integer -> Micro
forall k (a :: k). Integer -> Fixed a
MkFixed (Int -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int
forall a. Bounded a => a
maxBound :: Int))

--------------------------------------------------------------------------------

resourceVanished :: (HasCallStack) => String -> IOError
resourceVanished :: HasCallStack => String -> IOError
resourceVanished String
s =
   (String -> IOError
userError String
s)
      { ioe_location = prettyCallStack (popCallStack callStack)
      , ioe_type = ResourceVanished
      }