{-# LANGUAGE StrictData #-}
{-# LANGUAGE NoFieldSelectors #-}
module Job.Memory (queue) where
import Control.Concurrent
import Control.Concurrent.Async qualified as As
import Control.Concurrent.STM
import Control.Exception.Safe qualified as Ex
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Maybe (MaybeT (..), runMaybeT)
import Control.Monad.Trans.Resource.Extra qualified as R
import Control.Monad.Trans.State.Strict (put, runStateT)
import Data.Acquire qualified as A
import Data.Fixed
import Data.Function
import Data.List qualified as List
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Set (Set)
import Data.Set qualified as Set
import Data.Time qualified as Time
import Data.UUID.V7 (UUID)
import Data.UUID.V7 qualified as UUID7
import GHC.IO.Exception
import GHC.Stack
import Job
keepAliveBeat :: Micro
keepAliveBeat :: Micro
keepAliveBeat = Micro
4
retryDelay :: Time.NominalDiffTime
retryDelay :: NominalDiffTime
retryDelay = NominalDiffTime
2
data Env job = Env
{ forall job. Env job -> TVar (Map Id (Meta, job))
jobs :: TVar (Map Id (Meta, job))
, forall job. Env job -> TVar (Set (Meta, Id))
queued :: TVar (Set (Meta, Id))
, forall job. Env job -> TMVar (UUID, (Id, Meta, job) -> STM ())
worker :: TMVar (UUID, (Id, Meta, job) -> STM ())
, forall job. Env job -> TVar Bool
active :: TVar Bool
}
acqEnv :: A.Acquire (Env job)
acqEnv :: forall job. Acquire (Env job)
acqEnv = do
TVar (Map Id (Meta, job))
jobs <- IO (TVar (Map Id (Meta, job)))
-> (TVar (Map Id (Meta, job)) -> IO ())
-> Acquire (TVar (Map Id (Meta, job)))
forall a. IO a -> (a -> IO ()) -> Acquire a
R.mkAcquire1 (Map Id (Meta, job) -> IO (TVar (Map Id (Meta, job)))
forall a. a -> IO (TVar a)
newTVarIO Map Id (Meta, job)
forall a. Monoid a => a
mempty) \TVar (Map Id (Meta, job))
t -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Map Id (Meta, job)) -> Map Id (Meta, job) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Map Id (Meta, job))
t Map Id (Meta, job)
forall a. Monoid a => a
mempty
TVar (Set (Meta, Id))
queued <- IO (TVar (Set (Meta, Id)))
-> (TVar (Set (Meta, Id)) -> IO ())
-> Acquire (TVar (Set (Meta, Id)))
forall a. IO a -> (a -> IO ()) -> Acquire a
R.mkAcquire1 (Set (Meta, Id) -> IO (TVar (Set (Meta, Id)))
forall a. a -> IO (TVar a)
newTVarIO Set (Meta, Id)
forall a. Monoid a => a
mempty) \TVar (Set (Meta, Id))
t -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Set (Meta, Id)) -> Set (Meta, Id) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Set (Meta, Id))
t Set (Meta, Id)
forall a. Monoid a => a
mempty
TMVar (UUID, (Id, Meta, job) -> STM ())
worker <- IO (TMVar (UUID, (Id, Meta, job) -> STM ()))
-> (TMVar (UUID, (Id, Meta, job) -> STM ()) -> IO ())
-> Acquire (TMVar (UUID, (Id, Meta, job) -> STM ()))
forall a. IO a -> (a -> IO ()) -> Acquire a
R.mkAcquire1 IO (TMVar (UUID, (Id, Meta, job) -> STM ()))
forall a. IO (TMVar a)
newEmptyTMVarIO \TMVar (UUID, (Id, Meta, job) -> STM ())
t ->
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM (Maybe (UUID, (Id, Meta, job) -> STM ())) -> STM ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM (Maybe (UUID, (Id, Meta, job) -> STM ())) -> STM ())
-> STM (Maybe (UUID, (Id, Meta, job) -> STM ())) -> STM ()
forall a b. (a -> b) -> a -> b
$ TMVar (UUID, (Id, Meta, job) -> STM ())
-> STM (Maybe (UUID, (Id, Meta, job) -> STM ()))
forall a. TMVar a -> STM (Maybe a)
tryTakeTMVar TMVar (UUID, (Id, Meta, job) -> STM ())
t
TVar Bool
active <- IO (TVar Bool) -> (TVar Bool -> IO ()) -> Acquire (TVar Bool)
forall a. IO a -> (a -> IO ()) -> Acquire a
R.mkAcquire1 (Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
True) \TVar Bool
tv ->
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
tv Bool
False
Env job -> Acquire (Env job)
forall a. a -> Acquire a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Env{TVar Bool
TVar (Map Id (Meta, job))
TVar (Set (Meta, Id))
TMVar (UUID, (Id, Meta, job) -> STM ())
$sel:jobs:Env :: TVar (Map Id (Meta, job))
$sel:queued:Env :: TVar (Set (Meta, Id))
$sel:worker:Env :: TMVar (UUID, (Id, Meta, job) -> STM ())
$sel:active:Env :: TVar Bool
jobs :: TVar (Map Id (Meta, job))
queued :: TVar (Set (Meta, Id))
worker :: TMVar (UUID, (Id, Meta, job) -> STM ())
active :: TVar Bool
..}
ensureActive :: Env job -> STM ()
ensureActive :: forall job. Env job -> STM ()
ensureActive Env job
env =
TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar Env job
env.active STM Bool -> (Bool -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Bool
True -> () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Bool
False -> IOError -> STM ()
forall e a. Exception e => e -> STM a
throwSTM (IOError -> STM ()) -> IOError -> STM ()
forall a b. (a -> b) -> a -> b
$ HasCallStack => String -> IOError
String -> IOError
resourceVanished String
"Job.Memory.queue"
connect1 :: forall job. Env job -> Time.UTCTime -> STM (Maybe Id)
connect1 :: forall job. Env job -> UTCTime -> STM (Maybe Id)
connect1 Env job
env UTCTime
now =
TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar Env job
env.active STM Bool -> (Bool -> STM (Maybe Id)) -> STM (Maybe Id)
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Bool
False -> Maybe Id -> STM (Maybe Id)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe Id
forall a. Maybe a
Nothing
Bool
True ->
STM (Maybe (Id, Meta, job))
nextJob STM (Maybe (Id, Meta, job))
-> (Maybe (Id, Meta, job) -> STM (Maybe Id)) -> STM (Maybe Id)
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ((Id, Meta, job) -> STM Id)
-> Maybe (Id, Meta, job) -> STM (Maybe Id)
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Maybe a -> m (Maybe b)
mapM \x :: (Id, Meta, job)
x@(Id
i, Meta
_, job
_) -> do
(UUID
_, (Id, Meta, job) -> STM ()
f) <- TMVar (UUID, (Id, Meta, job) -> STM ())
-> STM (UUID, (Id, Meta, job) -> STM ())
forall a. TMVar a -> STM a
takeTMVar Env job
env.worker
(Id, Meta, job) -> STM ()
f (Id, Meta, job)
x STM () -> STM Id -> STM Id
forall a b. STM a -> STM b -> STM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Id -> STM Id
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Id
i
where
nextJob :: STM (Maybe (Id, Meta, job))
nextJob :: STM (Maybe (Id, Meta, job))
nextJob = do
Set (Meta, Id)
q0 <- TVar (Set (Meta, Id)) -> STM (Set (Meta, Id))
forall a. TVar a -> STM a
readTVar Env job
env.queued
Maybe ((Meta, Id), Set (Meta, Id))
-> (((Meta, Id), Set (Meta, Id)) -> STM (Id, Meta, job))
-> STM (Maybe (Id, Meta, job))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM (Set (Meta, Id) -> Maybe ((Meta, Id), Set (Meta, Id))
forall a. Set a -> Maybe (a, Set a)
Set.minView Set (Meta, Id)
q0) \((m0 :: Meta
m0@Meta{Maybe UTCTime
Word32
UTCTime
Nice
alive :: Maybe UTCTime
nice :: Nice
wait :: UTCTime
try :: Word32
$sel:alive:Meta :: Meta -> Maybe UTCTime
$sel:nice:Meta :: Meta -> Nice
$sel:wait:Meta :: Meta -> UTCTime
$sel:try:Meta :: Meta -> Word32
..}, Id
i), Set (Meta, Id)
q1) -> do
TVar (Set (Meta, Id)) -> Set (Meta, Id) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar Env job
env.queued Set (Meta, Id)
q1
let m :: Meta
m = Meta{$sel:alive:Meta :: Maybe UTCTime
alive = UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just UTCTime
now, Word32
UTCTime
Nice
nice :: Nice
wait :: UTCTime
try :: Word32
$sel:nice:Meta :: Nice
$sel:wait:Meta :: UTCTime
$sel:try:Meta :: Word32
..}
Map Id (Meta, job)
jobs0 <- TVar (Map Id (Meta, job)) -> STM (Map Id (Meta, job))
forall a. TVar a -> STM a
readTVar Env job
env.jobs
(!Map Id (Meta, job)
jobs1, Maybe job
yj) <- (StateT (Maybe job) STM (Map Id (Meta, job))
-> Maybe job -> STM (Map Id (Meta, job), Maybe job))
-> Maybe job
-> StateT (Maybe job) STM (Map Id (Meta, job))
-> STM (Map Id (Meta, job), Maybe job)
forall a b c. (a -> b -> c) -> b -> a -> c
flip StateT (Maybe job) STM (Map Id (Meta, job))
-> Maybe job -> STM (Map Id (Meta, job), Maybe job)
forall s (m :: * -> *) a. StateT s m a -> s -> m (a, s)
runStateT Maybe job
forall a. Maybe a
Nothing do
(Maybe (Meta, job) -> StateT (Maybe job) STM (Maybe (Meta, job)))
-> Id
-> Map Id (Meta, job)
-> StateT (Maybe job) STM (Map Id (Meta, job))
forall (f :: * -> *) k a.
(Functor f, Ord k) =>
(Maybe a -> f (Maybe a)) -> k -> Map k a -> f (Map k a)
Map.alterF
( ((Meta, job) -> StateT (Maybe job) STM (Meta, job))
-> Maybe (Meta, job) -> StateT (Maybe job) STM (Maybe (Meta, job))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Maybe a -> m (Maybe b)
mapM \(Meta
m1, job
j) -> do
Bool -> StateT (Maybe job) STM () -> StateT (Maybe job) STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Meta
m0 Meta -> Meta -> Bool
forall a. Eq a => a -> a -> Bool
/= Meta
m1) (StateT (Maybe job) STM () -> StateT (Maybe job) STM ())
-> StateT (Maybe job) STM () -> StateT (Maybe job) STM ()
forall a b. (a -> b) -> a -> b
$ String -> StateT (Maybe job) STM ()
forall (m :: * -> *) a.
(MonadThrow m, HasCallStack) =>
String -> m a
Ex.throwString String
"m0 /= m1"
Maybe job -> StateT (Maybe job) STM ()
forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put (job -> Maybe job
forall a. a -> Maybe a
Just job
j)
(Meta, job) -> StateT (Maybe job) STM (Meta, job)
forall a. a -> StateT (Maybe job) STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Meta
m, job
j)
)
Id
i
Map Id (Meta, job)
jobs0
case Maybe job
yj of
Just job
j -> TVar (Map Id (Meta, job)) -> Map Id (Meta, job) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar Env job
env.jobs Map Id (Meta, job)
jobs1 STM () -> STM (Id, Meta, job) -> STM (Id, Meta, job)
forall a b. STM a -> STM b -> STM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (Id, Meta, job) -> STM (Id, Meta, job)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Id
i, Meta
m, job
j)
Maybe job
Nothing -> String -> STM (Id, Meta, job)
forall (m :: * -> *) a.
(MonadThrow m, HasCallStack) =>
String -> m a
Ex.throwString String
"job in queue but not in map"
connectMany :: forall job. Env job -> IO [Id]
connectMany :: forall job. Env job -> IO [Id]
connectMany Env job
env = (([Id] -> [Id]) -> [Id]) -> IO ([Id] -> [Id]) -> IO [Id]
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (([Id] -> [Id]) -> [Id] -> [Id]
forall a b. (a -> b) -> a -> b
$ []) (([Id] -> [Id]) -> IO ([Id] -> [Id])
go [Id] -> [Id]
forall a. a -> a
id)
where
go :: ([Id] -> [Id]) -> IO ([Id] -> [Id])
go :: ([Id] -> [Id]) -> IO ([Id] -> [Id])
go [Id] -> [Id]
f = do
UTCTime
now <- IO UTCTime
Time.getCurrentTime
STM (Maybe Id) -> IO (Maybe Id)
forall a. STM a -> IO a
atomically (Env job -> UTCTime -> STM (Maybe Id)
forall job. Env job -> UTCTime -> STM (Maybe Id)
connect1 Env job
env UTCTime
now) IO (Maybe Id)
-> (Maybe Id -> IO ([Id] -> [Id])) -> IO ([Id] -> [Id])
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just Id
i -> ([Id] -> [Id]) -> IO ([Id] -> [Id])
go ((Id
i :) ([Id] -> [Id]) -> ([Id] -> [Id]) -> [Id] -> [Id]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Id] -> [Id]
f)
Maybe Id
Nothing -> ([Id] -> [Id]) -> IO ([Id] -> [Id])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Id] -> [Id]
f
queue :: forall job. A.Acquire (Queue job)
queue :: forall job. Acquire (Queue job)
queue = do
Env job
env <- Acquire (Env job)
forall job. Acquire (Env job)
acqEnv
Acquire (Async Any) -> Acquire ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void do
IO (Async Any) -> (Async Any -> IO ()) -> Acquire (Async Any)
forall a. IO a -> (a -> IO ()) -> Acquire a
R.mkAcquire1
( IO Any -> IO (Async Any)
forall a. IO a -> IO (Async a)
As.async (IO Any -> IO (Async Any)) -> IO Any -> IO (Async Any)
forall a b. (a -> b) -> a -> b
$ IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
Int -> IO ()
threadDelay Int
60_000_000
IO [Id] -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO [Id] -> IO ()) -> IO [Id] -> IO ()
forall a b. (a -> b) -> a -> b
$ Env job -> IO [Id]
forall job. Env job -> IO [Id]
connectMany Env job
env
)
Async Any -> IO ()
forall a. Async a -> IO ()
As.uninterruptibleCancel
Queue job -> Acquire (Queue job)
forall a. a -> Acquire a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
Queue
{ $sel:push:Queue :: Nice -> UTCTime -> job -> IO Id
push = \Nice
nice UTCTime
wait job
job -> do
let m :: Meta
m = Meta{Nice
$sel:nice:Meta :: Nice
nice :: Nice
nice, UTCTime
$sel:wait:Meta :: UTCTime
wait :: UTCTime
wait, $sel:alive:Meta :: Maybe UTCTime
alive = Maybe UTCTime
forall a. Maybe a
Nothing, $sel:try:Meta :: Word32
try = Word32
0}
Id
i <- IO Id
forall (m :: * -> *). MonadIO m => m Id
newId
STM () -> IO ()
forall a. STM a -> IO a
atomically do
Env job -> STM ()
forall job. Env job -> STM ()
ensureActive Env job
env
TVar (Map Id (Meta, job))
-> (Map Id (Meta, job) -> Map Id (Meta, job)) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' Env job
env.jobs ((Map Id (Meta, job) -> Map Id (Meta, job)) -> STM ())
-> (Map Id (Meta, job) -> Map Id (Meta, job)) -> STM ()
forall a b. (a -> b) -> a -> b
$ Id -> (Meta, job) -> Map Id (Meta, job) -> Map Id (Meta, job)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Id
i (Meta
m, job
job)
TVar (Set (Meta, Id))
-> (Set (Meta, Id) -> Set (Meta, Id)) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' Env job
env.queued ((Set (Meta, Id) -> Set (Meta, Id)) -> STM ())
-> (Set (Meta, Id) -> Set (Meta, Id)) -> STM ()
forall a b. (a -> b) -> a -> b
$ (Meta, Id) -> Set (Meta, Id) -> Set (Meta, Id)
forall a. Ord a => a -> Set a -> Set a
Set.insert (Meta
m, Id
i)
IO (Async ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Async ()) -> IO ()) -> IO (Async ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
As.async do
UTCTime -> IO ()
threadDelayUTCTime UTCTime
wait
IO [Id] -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO [Id] -> IO ()) -> IO [Id] -> IO ()
forall a b. (a -> b) -> a -> b
$ Env job -> IO [Id]
forall job. Env job -> IO [Id]
connectMany Env job
env
Id -> IO Id
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Id
i
,
$sel:prune:Queue :: forall a. Monoid a => (Id -> Meta -> job -> (Bool, a)) -> IO a
prune = \Id -> Meta -> job -> (Bool, a)
f ->
STM a -> IO a
forall a. STM a -> IO a
atomically do
Env job -> STM ()
forall job. Env job -> STM ()
ensureActive Env job
env
Map Id (Meta, job)
jobs0 <- TVar (Map Id (Meta, job)) -> STM (Map Id (Meta, job))
forall a. TVar a -> STM a
readTVar Env job
env.jobs
let ([(Id, (Meta, job))]
js, [(Meta, Id)]
qs, a
a) =
(([(Id, (Meta, job))], [(Meta, Id)], a)
-> (Id, (Meta, job)) -> ([(Id, (Meta, job))], [(Meta, Id)], a))
-> ([(Id, (Meta, job))], [(Meta, Id)], a)
-> [(Id, (Meta, job))]
-> ([(Id, (Meta, job))], [(Meta, Id)], a)
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
List.foldl'
( \(![(Id, (Meta, job))]
js0, ![(Meta, Id)]
qs0, !a
al) (Id
i, (Meta
m, job
j)) ->
case Id -> Meta -> job -> (Bool, a)
f Id
i Meta
m job
j of
(Bool
False, a
ar) -> ([(Id, (Meta, job))]
js0, [(Meta, Id)]
qs0, a
al a -> a -> a
forall a. Semigroup a => a -> a -> a
<> a
ar)
(Bool
True, a
ar) ->
( (Id
i, (Meta
m, job
j)) (Id, (Meta, job)) -> [(Id, (Meta, job))] -> [(Id, (Meta, job))]
forall a. a -> [a] -> [a]
: [(Id, (Meta, job))]
js0
, [(Meta, Id)]
-> (UTCTime -> [(Meta, Id)]) -> Maybe UTCTime -> [(Meta, Id)]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ((Meta
m, Id
i) (Meta, Id) -> [(Meta, Id)] -> [(Meta, Id)]
forall a. a -> [a] -> [a]
: [(Meta, Id)]
qs0) ([(Meta, Id)] -> UTCTime -> [(Meta, Id)]
forall a b. a -> b -> a
const [(Meta, Id)]
qs0) Meta
m.alive
, a
al a -> a -> a
forall a. Semigroup a => a -> a -> a
<> a
ar
)
)
([(Id, (Meta, job))], [(Meta, Id)], a)
forall a. Monoid a => a
mempty
([(Id, (Meta, job))] -> ([(Id, (Meta, job))], [(Meta, Id)], a))
-> [(Id, (Meta, job))] -> ([(Id, (Meta, job))], [(Meta, Id)], a)
forall a b. (a -> b) -> a -> b
$ ((Id, (Meta, job)) -> (Meta, Id))
-> [(Id, (Meta, job))] -> [(Id, (Meta, job))]
forall b a. Ord b => (a -> b) -> [a] -> [a]
List.sortOn (\(Id
i, (Meta
m, job
_)) -> (Meta
m, Id
i))
([(Id, (Meta, job))] -> [(Id, (Meta, job))])
-> [(Id, (Meta, job))] -> [(Id, (Meta, job))]
forall a b. (a -> b) -> a -> b
$ Map Id (Meta, job) -> [(Id, (Meta, job))]
forall k a. Map k a -> [(k, a)]
Map.toList Map Id (Meta, job)
jobs0
TVar (Map Id (Meta, job)) -> Map Id (Meta, job) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar Env job
env.jobs (Map Id (Meta, job) -> STM ()) -> Map Id (Meta, job) -> STM ()
forall a b. (a -> b) -> a -> b
$! [(Id, (Meta, job))] -> Map Id (Meta, job)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [(Id, (Meta, job))]
js
TVar (Set (Meta, Id)) -> Set (Meta, Id) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar Env job
env.queued (Set (Meta, Id) -> STM ()) -> Set (Meta, Id) -> STM ()
forall a b. (a -> b) -> a -> b
$! [(Meta, Id)] -> Set (Meta, Id)
forall a. Ord a => [a] -> Set a
Set.fromList [(Meta, Id)]
qs
a -> STM a
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
a
,
$sel:pull:Queue :: Acquire (Maybe (Work job))
pull = MaybeT Acquire (Work job) -> Acquire (Maybe (Work job))
forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT do
IO Bool -> MaybeT Acquire Bool
forall a. IO a -> MaybeT Acquire a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (TVar Bool -> IO Bool
forall a. TVar a -> IO a
readTVarIO Env job
env.active) MaybeT Acquire Bool
-> (Bool -> MaybeT Acquire ()) -> MaybeT Acquire ()
forall a b.
MaybeT Acquire a -> (a -> MaybeT Acquire b) -> MaybeT Acquire b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Bool
False -> Acquire (Maybe ()) -> MaybeT Acquire ()
forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT (Acquire (Maybe ()) -> MaybeT Acquire ())
-> Acquire (Maybe ()) -> MaybeT Acquire ()
forall a b. (a -> b) -> a -> b
$ Maybe () -> Acquire (Maybe ())
forall a. a -> Acquire a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ()
forall a. Maybe a
Nothing
Bool
True -> () -> MaybeT Acquire ()
forall a. a -> MaybeT Acquire a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
TVar (Maybe (Maybe (Nice, UTCTime)))
tout :: TVar (Maybe (Maybe (Nice, Time.UTCTime))) <-
IO (TVar (Maybe (Maybe (Nice, UTCTime))))
-> MaybeT Acquire (TVar (Maybe (Maybe (Nice, UTCTime))))
forall a. IO a -> MaybeT Acquire a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TVar (Maybe (Maybe (Nice, UTCTime))))
-> MaybeT Acquire (TVar (Maybe (Maybe (Nice, UTCTime)))))
-> IO (TVar (Maybe (Maybe (Nice, UTCTime))))
-> MaybeT Acquire (TVar (Maybe (Maybe (Nice, UTCTime))))
forall a b. (a -> b) -> a -> b
$ Maybe (Maybe (Nice, UTCTime))
-> IO (TVar (Maybe (Maybe (Nice, UTCTime))))
forall a. a -> IO (TVar a)
newTVarIO Maybe (Maybe (Nice, UTCTime))
forall a. Maybe a
Nothing
(Id
i :: Id, Meta
meta :: Meta, job
job :: job) <- Acquire (Maybe (Id, Meta, job)) -> MaybeT Acquire (Id, Meta, job)
forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT do
IO (Maybe (Id, Meta, job))
-> (Maybe (Id, Meta, job) -> ReleaseType -> IO ())
-> Acquire (Maybe (Id, Meta, job))
forall a. IO a -> (a -> ReleaseType -> IO ()) -> Acquire a
R.mkAcquireType1
( do
UUID
k0 <- IO UUID
forall (m :: * -> *). MonadIO m => m UUID
UUID7.genUUID
IO (Maybe (TMVar (Id, Meta, job)))
-> (Maybe (TMVar (Id, Meta, job)) -> IO ())
-> (Maybe (TMVar (Id, Meta, job)) -> IO (Maybe (Id, Meta, job)))
-> IO (Maybe (Id, Meta, job))
forall (m :: * -> *) a b c.
(HasCallStack, MonadMask m) =>
m a -> (a -> m b) -> (a -> m c) -> m c
Ex.bracketOnError
( STM (Maybe (TMVar (Id, Meta, job)))
-> IO (Maybe (TMVar (Id, Meta, job)))
forall a. STM a -> IO a
atomically do
TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar Env job
env.active STM Bool
-> (Bool -> STM (Maybe (TMVar (Id, Meta, job))))
-> STM (Maybe (TMVar (Id, Meta, job)))
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Bool
False -> Maybe (TMVar (Id, Meta, job))
-> STM (Maybe (TMVar (Id, Meta, job)))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (TMVar (Id, Meta, job))
forall a. Maybe a
Nothing
Bool
True -> do
TMVar (Id, Meta, job)
tj <- STM (TMVar (Id, Meta, job))
forall a. STM (TMVar a)
newEmptyTMVar
TMVar (UUID, (Id, Meta, job) -> STM ())
-> (UUID, (Id, Meta, job) -> STM ()) -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar Env job
env.worker (UUID
k0, TMVar (Id, Meta, job) -> (Id, Meta, job) -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Id, Meta, job)
tj)
Maybe (TMVar (Id, Meta, job))
-> STM (Maybe (TMVar (Id, Meta, job)))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TMVar (Id, Meta, job) -> Maybe (TMVar (Id, Meta, job))
forall a. a -> Maybe a
Just TMVar (Id, Meta, job)
tj)
)
( (TMVar (Id, Meta, job) -> IO ())
-> Maybe (TMVar (Id, Meta, job)) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ \TMVar (Id, Meta, job)
_ -> STM () -> IO ()
forall a. STM a -> IO a
atomically do
TMVar (UUID, (Id, Meta, job) -> STM ())
-> STM (Maybe (UUID, (Id, Meta, job) -> STM ()))
forall a. TMVar a -> STM (Maybe a)
tryTakeTMVar Env job
env.worker STM (Maybe (UUID, (Id, Meta, job) -> STM ()))
-> (Maybe (UUID, (Id, Meta, job) -> STM ()) -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just (UUID
k1, (Id, Meta, job) -> STM ()
f)
| UUID
k0 UUID -> UUID -> Bool
forall a. Eq a => a -> a -> Bool
/= UUID
k1 ->
TMVar (UUID, (Id, Meta, job) -> STM ())
-> (UUID, (Id, Meta, job) -> STM ()) -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar Env job
env.worker (UUID
k1, (Id, Meta, job) -> STM ()
f)
Maybe (UUID, (Id, Meta, job) -> STM ())
_ -> () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
)
( \case
Maybe (TMVar (Id, Meta, job))
Nothing -> Maybe (Id, Meta, job) -> IO (Maybe (Id, Meta, job))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Id, Meta, job)
forall a. Maybe a
Nothing
Just TMVar (Id, Meta, job)
tj -> STM (Maybe (Id, Meta, job)) -> IO (Maybe (Id, Meta, job))
forall a. STM a -> IO a
atomically do
TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar Env job
env.active STM Bool
-> (Bool -> STM (Maybe (Id, Meta, job)))
-> STM (Maybe (Id, Meta, job))
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Bool
False -> Maybe (Id, Meta, job) -> STM (Maybe (Id, Meta, job))
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Id, Meta, job)
forall a. Maybe a
Nothing
Bool
True ->
(Id, Meta, job) -> Maybe (Id, Meta, job)
forall a. a -> Maybe a
Just ((Id, Meta, job) -> Maybe (Id, Meta, job))
-> STM (Id, Meta, job) -> STM (Maybe (Id, Meta, job))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMVar (Id, Meta, job) -> STM (Id, Meta, job)
forall a. TMVar a -> STM a
takeTMVar TMVar (Id, Meta, job)
tj
)
)
( \Maybe (Id, Meta, job)
yx ReleaseType
rt -> Maybe (Id, Meta, job) -> ((Id, Meta, job) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe (Id, Meta, job)
yx \(Id
i, Meta
m0, job
job) -> do
Maybe Meta
ym1 <-
TVar (Maybe (Maybe (Nice, UTCTime)))
-> IO (Maybe (Maybe (Nice, UTCTime)))
forall a. TVar a -> IO a
readTVarIO TVar (Maybe (Maybe (Nice, UTCTime)))
tout IO (Maybe (Maybe (Nice, UTCTime)))
-> (Maybe (Maybe (Nice, UTCTime)) -> IO (Maybe Meta))
-> IO (Maybe Meta)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe (Maybe (Nice, UTCTime))
Nothing
| A.ReleaseExceptionWith SomeException
_ <- ReleaseType
rt -> do
UTCTime
now <- IO UTCTime
Time.getCurrentTime
let alive :: Maybe a
alive = Maybe a
forall a. Maybe a
Nothing
try :: Word32
try = Word32 -> Word32
forall a. Enum a => a -> a
succ Meta
m0.try
nice :: Nice
nice = Meta
m0.nice
wait :: UTCTime
wait = NominalDiffTime -> UTCTime -> UTCTime
Time.addUTCTime NominalDiffTime
retryDelay UTCTime
now
Maybe Meta -> IO (Maybe Meta)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Meta -> IO (Maybe Meta)) -> Maybe Meta -> IO (Maybe Meta)
forall a b. (a -> b) -> a -> b
$ Meta -> Maybe Meta
forall a. a -> Maybe a
Just Meta{Maybe UTCTime
Word32
UTCTime
Nice
forall a. Maybe a
$sel:alive:Meta :: Maybe UTCTime
$sel:nice:Meta :: Nice
$sel:wait:Meta :: UTCTime
$sel:try:Meta :: Word32
alive :: forall a. Maybe a
try :: Word32
nice :: Nice
wait :: UTCTime
..}
Just (Just (Nice
nice, UTCTime
wait)) -> do
let alive :: Maybe a
alive = Maybe a
forall a. Maybe a
Nothing
try :: Word32
try = Word32 -> Word32
forall a. Enum a => a -> a
succ Meta
m0.try
Maybe Meta -> IO (Maybe Meta)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Meta -> IO (Maybe Meta)) -> Maybe Meta -> IO (Maybe Meta)
forall a b. (a -> b) -> a -> b
$ Meta -> Maybe Meta
forall a. a -> Maybe a
Just Meta{Maybe UTCTime
Word32
UTCTime
Nice
forall a. Maybe a
$sel:alive:Meta :: Maybe UTCTime
$sel:nice:Meta :: Nice
$sel:wait:Meta :: UTCTime
$sel:try:Meta :: Word32
nice :: Nice
wait :: UTCTime
alive :: forall a. Maybe a
try :: Word32
..}
Maybe (Maybe (Nice, UTCTime))
_ -> Maybe Meta -> IO (Maybe Meta)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe Meta
forall a. Maybe a
Nothing
case Maybe Meta
ym1 of
Maybe Meta
Nothing ->
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Map Id (Meta, job))
-> (Map Id (Meta, job) -> Map Id (Meta, job)) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' Env job
env.jobs ((Map Id (Meta, job) -> Map Id (Meta, job)) -> STM ())
-> (Map Id (Meta, job) -> Map Id (Meta, job)) -> STM ()
forall a b. (a -> b) -> a -> b
$ Id -> Map Id (Meta, job) -> Map Id (Meta, job)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete Id
i
Just !Meta
m1 -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically do
Env job -> STM ()
forall job. Env job -> STM ()
ensureActive Env job
env
TVar (Set (Meta, Id))
-> (Set (Meta, Id) -> Set (Meta, Id)) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' Env job
env.queued ((Set (Meta, Id) -> Set (Meta, Id)) -> STM ())
-> (Set (Meta, Id) -> Set (Meta, Id)) -> STM ()
forall a b. (a -> b) -> a -> b
$ (Meta, Id) -> Set (Meta, Id) -> Set (Meta, Id)
forall a. Ord a => a -> Set a -> Set a
Set.insert (Meta
m1, Id
i)
TVar (Map Id (Meta, job))
-> (Map Id (Meta, job) -> Map Id (Meta, job)) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' Env job
env.jobs ((Map Id (Meta, job) -> Map Id (Meta, job)) -> STM ())
-> (Map Id (Meta, job) -> Map Id (Meta, job)) -> STM ()
forall a b. (a -> b) -> a -> b
$
Id -> (Meta, job) -> Map Id (Meta, job) -> Map Id (Meta, job)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Id
i (Meta
m1, job
job)
IO (Async ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Async ()) -> IO ()) -> IO (Async ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
As.async do
UTCTime -> IO ()
threadDelayUTCTime Meta
m1.wait
IO [Id] -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO [Id] -> IO ()) -> IO [Id] -> IO ()
forall a b. (a -> b) -> a -> b
$ Env job -> IO [Id]
forall job. Env job -> IO [Id]
connectMany Env job
env
)
MaybeT Acquire (Async ()) -> MaybeT Acquire ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (MaybeT Acquire (Async ()) -> MaybeT Acquire ())
-> MaybeT Acquire (Async ()) -> MaybeT Acquire ()
forall a b. (a -> b) -> a -> b
$ Acquire (Async ()) -> MaybeT Acquire (Async ())
forall (m :: * -> *) a. Monad m => m a -> MaybeT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift do
IO (Async ()) -> (Async () -> IO ()) -> Acquire (Async ())
forall a. IO a -> (a -> IO ()) -> Acquire a
R.mkAcquire1
( IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
As.async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ (IO () -> IO ()) -> IO ()
forall a. (a -> a) -> a
fix \IO ()
again -> do
Either SomeException Bool
eactive <- IO Bool -> IO (Either SomeException Bool)
forall (m :: * -> *) a.
(HasCallStack, MonadCatch m) =>
m a -> m (Either SomeException a)
Ex.tryAny do
Micro -> IO ()
threadDelayMicro Micro
keepAliveBeat
UTCTime
now <- IO UTCTime
Time.getCurrentTime
let m1 :: Meta
m1
| Meta{Maybe UTCTime
Word32
UTCTime
Nice
$sel:alive:Meta :: Meta -> Maybe UTCTime
$sel:nice:Meta :: Meta -> Nice
$sel:wait:Meta :: Meta -> UTCTime
$sel:try:Meta :: Meta -> Word32
alive :: Maybe UTCTime
nice :: Nice
wait :: UTCTime
try :: Word32
..} <- Meta
meta =
Meta{$sel:alive:Meta :: Maybe UTCTime
alive = UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just UTCTime
now, Word32
UTCTime
Nice
$sel:nice:Meta :: Nice
$sel:wait:Meta :: UTCTime
$sel:try:Meta :: Word32
nice :: Nice
wait :: UTCTime
try :: Word32
..}
STM Bool -> IO Bool
forall a. STM a -> IO a
atomically do
Bool
a <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar Env job
env.active
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
a do
TVar (Map Id (Meta, job))
-> (Map Id (Meta, job) -> Map Id (Meta, job)) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' Env job
env.jobs ((Map Id (Meta, job) -> Map Id (Meta, job)) -> STM ())
-> (Map Id (Meta, job) -> Map Id (Meta, job)) -> STM ()
forall a b. (a -> b) -> a -> b
$
Id -> (Meta, job) -> Map Id (Meta, job) -> Map Id (Meta, job)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Id
i (Meta
m1, job
job)
Bool -> STM Bool
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
a
case Either SomeException Bool
eactive of
Right Bool
False -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Either SomeException Bool
_ -> IO ()
again
)
Async () -> IO ()
forall a. Async a -> IO ()
As.uninterruptibleCancel
Work job -> MaybeT Acquire (Work job)
forall a. a -> MaybeT Acquire a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
Work
{ $sel:id:Work :: Id
id = Id
i
, job
job :: job
$sel:job:Work :: job
job
, Meta
meta :: Meta
$sel:meta:Work :: Meta
meta
, $sel:retry:Work :: Nice -> UTCTime -> IO ()
retry = \Nice
n UTCTime
w ->
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe (Maybe (Nice, UTCTime)))
-> Maybe (Maybe (Nice, UTCTime)) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe (Maybe (Nice, UTCTime)))
tout (Maybe (Maybe (Nice, UTCTime)) -> STM ())
-> Maybe (Maybe (Nice, UTCTime)) -> STM ()
forall a b. (a -> b) -> a -> b
$ Maybe (Nice, UTCTime) -> Maybe (Maybe (Nice, UTCTime))
forall a. a -> Maybe a
Just ((Nice, UTCTime) -> Maybe (Nice, UTCTime)
forall a. a -> Maybe a
Just (Nice
n, UTCTime
w))
, $sel:finish:Work :: IO ()
finish = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe (Maybe (Nice, UTCTime)))
-> Maybe (Maybe (Nice, UTCTime)) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe (Maybe (Nice, UTCTime)))
tout (Maybe (Maybe (Nice, UTCTime)) -> STM ())
-> Maybe (Maybe (Nice, UTCTime)) -> STM ()
forall a b. (a -> b) -> a -> b
$ Maybe (Nice, UTCTime) -> Maybe (Maybe (Nice, UTCTime))
forall a. a -> Maybe a
Just Maybe (Nice, UTCTime)
forall a. Maybe a
Nothing
}
}
threadDelayUTCTime :: Time.UTCTime -> IO ()
threadDelayUTCTime :: UTCTime -> IO ()
threadDelayUTCTime UTCTime
wait = do
TVar Bool
t <- UTCTime -> IO (TVar Bool)
registerDelayUTCTime UTCTime
wait
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
t STM Bool -> (Bool -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> STM ()
check
registerDelayUTCTime :: Time.UTCTime -> IO (TVar Bool)
registerDelayUTCTime :: UTCTime -> IO (TVar Bool)
registerDelayUTCTime UTCTime
wait = do
UTCTime
start <- IO UTCTime
Time.getCurrentTime
if UTCTime
wait UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
<= UTCTime
start
then Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
True
else do
TVar Bool
tvtmp <-
Micro -> IO (TVar Bool)
registerDelayMicro (Micro -> IO (TVar Bool)) -> Micro -> IO (TVar Bool)
forall a b. (a -> b) -> a -> b
$
Pico -> Micro
ceilingPicoToMicro (Pico -> Micro) -> Pico -> Micro
forall a b. (a -> b) -> a -> b
$
NominalDiffTime -> Pico
Time.nominalDiffTimeToSeconds (NominalDiffTime -> Pico) -> NominalDiffTime -> Pico
forall a b. (a -> b) -> a -> b
$
UTCTime -> UTCTime -> NominalDiffTime
Time.diffUTCTime UTCTime
wait UTCTime
start
TVar Bool
tvout <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
False
IO (Async ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Async ()) -> IO ()) -> IO (Async ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO (Async ()) -> IO (Async ())
forall (m :: * -> *) a. (HasCallStack, MonadMask m) => m a -> m a
Ex.mask_ (IO (Async ()) -> IO (Async ())) -> IO (Async ()) -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
As.async do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
tvtmp STM Bool -> (Bool -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> STM ()
check
(IO () -> IO ()) -> IO ()
forall a. (a -> a) -> a
fix \IO ()
again -> do
UTCTime
now <- IO UTCTime
Time.getCurrentTime
if UTCTime
wait UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
<= UTCTime
now
then STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
tvout Bool
True
else do
Int -> IO ()
threadDelay Int
10_000
IO ()
again
TVar Bool -> IO (TVar Bool)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TVar Bool
tvout
ceilingPicoToMicro :: Pico -> Micro
ceilingPicoToMicro :: Pico -> Micro
ceilingPicoToMicro (MkFixed Integer
p) = Integer -> Micro
forall k (a :: k). Integer -> Fixed a
MkFixed (Rational -> Integer
forall b. Integral b => Rational -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling (Integer -> Rational
forall a. Real a => a -> Rational
toRational Integer
p Rational -> Rational -> Rational
forall a. Fractional a => a -> a -> a
/ Rational
1_000_000))
registerDelayMicro :: Micro -> IO (TVar Bool)
registerDelayMicro :: Micro -> IO (TVar Bool)
registerDelayMicro Micro
us
| Micro
us Micro -> Micro -> Bool
forall a. Ord a => a -> a -> Bool
<= Micro
0 = Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
True
| Bool
otherwise = do
TVar Bool
tv <- IO (TVar Bool) -> IO (TVar Bool)
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TVar Bool) -> IO (TVar Bool))
-> IO (TVar Bool) -> IO (TVar Bool)
forall a b. (a -> b) -> a -> b
$ Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
False
IO (Async ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Async ()) -> IO ()) -> IO (Async ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO (Async ()) -> IO (Async ())
forall (m :: * -> *) a. (HasCallStack, MonadMask m) => m a -> m a
Ex.mask_ (IO (Async ()) -> IO (Async ())) -> IO (Async ()) -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
As.async do
Micro -> IO ()
threadDelayMicro Micro
us
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
tv Bool
True
TVar Bool -> IO (TVar Bool)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TVar Bool
tv
threadDelayMicro :: Micro -> IO ()
threadDelayMicro :: Micro -> IO ()
threadDelayMicro = ((Micro -> IO ()) -> Micro -> IO ()) -> Micro -> IO ()
forall a. (a -> a) -> a
fix \Micro -> IO ()
again Micro
us -> Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Micro
us Micro -> Micro -> Bool
forall a. Ord a => a -> a -> Bool
> Micro
0) do
let d :: Int
d = Micro -> Int
forall b. Integral b => Micro -> b
forall a b. (RealFrac a, Integral b) => a -> b
floor (Micro -> Micro -> Micro
forall a. Ord a => a -> a -> a
min Micro
us Micro
stepMax Micro -> Micro -> Micro
forall a. Num a => a -> a -> a
* Micro
1_000_000)
Int -> IO ()
threadDelay Int
d
Micro -> IO ()
again (Micro
us Micro -> Micro -> Micro
forall a. Num a => a -> a -> a
- Micro
stepMax)
where
stepMax :: Micro
stepMax :: Micro
stepMax = Integer -> Micro
forall k (a :: k). Integer -> Fixed a
MkFixed (Int -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int
forall a. Bounded a => a
maxBound :: Int))
resourceVanished :: (HasCallStack) => String -> IOError
resourceVanished :: HasCallStack => String -> IOError
resourceVanished String
s =
(String -> IOError
userError String
s)
{ ioe_location = prettyCallStack (popCallStack callStack)
, ioe_type = ResourceVanished
}