{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE UndecidableInstances #-} module Periodic.Server.Scheduler ( SchedT , runSchedT , SchedEnv , initSchedEnv , startSchedT , pushJob , pushGrab , failJob , doneJob , schedLaterJob , acquireLock , releaseLock , addFunc , removeFunc , broadcastFunc , dropFunc , removeJob , dumpJob , status , shutdown , keepalive , setConfigInt , getConfigInt , prepareWait , lookupPrevResult , waitResult , canRun ) where import Control.Monad (forever, mzero, unless, void, when) import Control.Monad.Reader.Class (MonadReader (ask), asks) import Control.Monad.Trans.Class (MonadTrans, lift) import Control.Monad.Trans.Maybe (runMaybeT) import Control.Monad.Trans.Reader (ReaderT (..), runReaderT) import Data.ByteString (ByteString) import Data.Foldable (forM_) import Data.HashPSQ (HashPSQ) import qualified Data.HashPSQ as PSQ import Data.Int (Int64) import qualified Data.List as L (delete) import Data.Maybe (fromJust, fromMaybe, isJust) import Metro.Class (Transport) import Metro.IOHashMap (IOHashMap, newIOHashMap) import qualified Metro.IOHashMap as FL import qualified Metro.Lock as L (Lock, new, with) import Metro.Session (runSessionT1, send, sessionState) import Metro.Utils (getEpochTime) import Periodic.IOList (IOList) import qualified Periodic.IOList as IL import Periodic.Server.FuncStat import Periodic.Server.GrabQueue import Periodic.Server.Persist (Persist, State (..)) import qualified Periodic.Server.Persist as P import Periodic.Server.Types (CSEnv) import Periodic.Types (packetRES) import Periodic.Types.Internal (LockName) import Periodic.Types.Job import Periodic.Types.ServerCommand (ServerCommand (JobAssign)) import System.Log.Logger (errorM, infoM) import UnliftIO import UnliftIO.Concurrent (threadDelay) data Action = Add Job | Remove Job | Cancel | PollJob | TryPoll JobHandle data WaitItem = WaitItem { itemTs :: Int64 , itemValue :: Maybe ByteString , itemWait :: Int } -- Cache runJob result -- expiredAt, Nothing retrySTM -- expiredAt, Just bs return bs type WaitList = IOHashMap JobHandle WaitItem -- Distributed lock -- acquired locked data LockInfo = LockInfo { acquired :: [JobHandle] , locked :: [JobHandle] , maxCount :: Int } type LockList = IOHashMap LockName LockInfo data SchedEnv db tp = SchedEnv { sPollInterval :: TVar Int -- main poll loop every time interval -- revert process queue loop every time interval , sRevertInterval :: TVar Int -- revert process queue loop every time interval -- the task do timeout , sTaskTimeout :: TVar Int -- the task do timeout -- max poll batch size , sMaxBatchSize :: TVar Int -- max poll batch size -- client or worker keepalive , sKeepalive :: TVar Int -- client or worker keepalive -- run job cache expiration , sExpiration :: TVar Int -- run job cache expiration -- auto poll job when job done or failed , sAutoPoll :: TVar Bool -- auto poll job when job done or failed -- auto poll lock , sPolled :: TVar Bool -- auto poll lock , sCleanup :: IO () , sFuncStatList :: FuncStatList , sLocker :: L.Lock , sGrabQueue :: GrabQueue tp -- sched state, when false sched is exited. , sAlive :: TVar Bool -- sched state, when false sched is exited. , sChanList :: TVar [Action] , sWaitList :: WaitList , sLockList :: LockList , sPersist :: db } newtype SchedT db tp m a = SchedT {unSchedT :: ReaderT (SchedEnv db tp) m a} deriving ( Functor , Applicative , Monad , MonadTrans , MonadIO , MonadReader (SchedEnv db tp) ) instance MonadUnliftIO m => MonadUnliftIO (SchedT db tp m) where withRunInIO inner = SchedT $ ReaderT $ \r -> withRunInIO $ \run -> inner (run . runSchedT r) type TaskList = IOHashMap JobHandle (Int64, Async ()) runSchedT :: SchedEnv db tp -> SchedT db tp m a -> m a runSchedT schedEnv = flip runReaderT schedEnv . unSchedT initSchedEnv :: (MonadUnliftIO m, Persist db) => P.PersistConfig db -> m () -> m (SchedEnv db tp) initSchedEnv config sC = do sFuncStatList <- newIOHashMap sWaitList <- newIOHashMap sLockList <- newIOHashMap sLocker <- L.new sGrabQueue <- newGrabQueue sAlive <- newTVarIO True sChanList <- newTVarIO [] sPollInterval <- newTVarIO 300 sRevertInterval <- newTVarIO 300 sTaskTimeout <- newTVarIO 600 sMaxBatchSize <- newTVarIO 250 sKeepalive <- newTVarIO 300 sExpiration <- newTVarIO 300 sAutoPoll <- newTVarIO False sPolled <- newTVarIO False sCleanup <- toIO sC sPersist <- liftIO $ P.newPersist config pure SchedEnv{..} startSchedT :: (MonadUnliftIO m, Persist db, Transport tp) => SchedT db tp m () startSchedT = do liftIO $ infoM "Periodic.Server.Scheduler" "Scheduler started" SchedEnv{..} <- ask runTask_ sRevertInterval revertRunningQueue taskList <- newIOHashMap runTask_ sPollInterval $ pushChanList PollJob runTask 0 $ runChanJob taskList runTask 100 purgeExpired runTask 60 revertLockingQueue loadInt "poll-interval" sPollInterval loadInt "revert-interval" sRevertInterval loadInt "timeout" sTaskTimeout loadInt "keepalive" sKeepalive loadInt "max-batch-size" sMaxBatchSize loadInt "expiration" sExpiration loadInt :: (MonadIO m, Persist db) => String -> TVar Int -> SchedT db tp m () loadInt name ref = do v <- liftIO . flip P.configGet name =<< asks sPersist case v of Nothing -> pure () Just v' -> atomically $ writeTVar ref v' saveInt :: (MonadIO m, Persist db) => String -> Int -> TVar Int -> SchedT db tp m () saveInt name v ref = do p <- asks sPersist liftIO $ P.configSet p name v atomically $ writeTVar ref v setConfigInt :: (MonadIO m, Persist db) => String -> Int -> SchedT db tp m () setConfigInt key val = do SchedEnv {..} <- ask case key of "poll-interval" -> saveInt "poll-interval" val sPollInterval "revert-interval" -> saveInt "revert-interval" val sRevertInterval "timeout" -> saveInt "timeout" val sTaskTimeout "keepalive" -> saveInt "keepalive" val sKeepalive "max-batch-size" -> saveInt "max-batch-size" val sMaxBatchSize "expiration" -> saveInt "expiration" val sExpiration _ -> pure () getConfigInt :: (MonadIO m, Persist db) => String -> SchedT db tp m Int getConfigInt key = do SchedEnv {..} <- ask case key of "poll-interval" -> readTVarIO sPollInterval "revert-interval" -> readTVarIO sRevertInterval "timeout" -> readTVarIO sTaskTimeout "keepalive" -> readTVarIO sKeepalive "max-batch-size" -> readTVarIO sMaxBatchSize "expiration" -> readTVarIO sExpiration _ -> pure 0 keepalive :: Monad m => SchedT db tp m (TVar Int) keepalive = asks sKeepalive runTask :: (MonadUnliftIO m) => Int -> SchedT db tp m () -> SchedT db tp m () runTask d m = flip runTask_ m =<< newTVarIO d runTask_ :: (MonadUnliftIO m) => TVar Int -> SchedT db tp m () -> SchedT db tp m () runTask_ d m = void . async $ do SchedEnv{..} <- ask void . runMaybeT . forever $ do interval <- readTVarIO d when (interval > 0) $ threadDelay $ interval * 1000 * 1000 alive <- readTVarIO sAlive if alive then lift m else mzero runChanJob :: (MonadUnliftIO m, Persist db, Transport tp) => TaskList -> SchedT db tp m () runChanJob taskList = do cl <- asks sChanList al <- asks sAlive acts <- atomically $ do acts <- readTVar cl if null acts then do st <- readTVar al if st then retrySTM else pure [] else do writeTVar cl [] pure acts mapM_ (doChanJob taskList) acts where doChanJob :: (MonadUnliftIO m, Persist db, Transport tp) => TaskList -> Action -> SchedT db tp m () doChanJob tl (Add job) = reSchedJob tl job doChanJob tl (Remove job) = findTask tl job >>= mapM_ cancel doChanJob tl Cancel = mapM_ (cancel . snd) =<< FL.elems tl doChanJob tl PollJob = pollJob tl doChanJob tl (TryPoll jh) = removeTaskAndTryPoll tl jh pollInterval :: (MonadIO m, Num a) => SchedT db tp m a pollInterval = fmap fromIntegral . readTVarIO =<< asks sPollInterval removeTaskAndTryPoll :: MonadIO m => TaskList -> JobHandle -> SchedT db tp m () removeTaskAndTryPoll taskList jh = do FL.delete taskList jh polled <- asks sPolled isPolled <- readTVarIO polled autoPoll <- readTVarIO =<< asks sAutoPoll when (isPolled && autoPoll) $ do maxBatchSize <- readTVarIO =<< asks sMaxBatchSize size <- FL.size taskList when (size < maxBatchSize) $ do atomically $ writeTVar polled False pushChanList PollJob pollJob :: (MonadUnliftIO m, Persist db, Transport tp) => TaskList -> SchedT db tp m () pollJob taskList = do polled <- asks sPolled atomically $ writeTVar polled False mapM_ checkPoll =<< FL.toList taskList stList <- asks sFuncStatList funcList <- foldr foldFunc [] <$> FL.toList stList pollJob_ taskList funcList atomically $ writeTVar polled True where foldFunc :: (FuncName, FuncStat) -> [FuncName] -> [FuncName] foldFunc (_, FuncStat{sWorker=0}) acc = acc foldFunc (fn, _) acc = fn:acc checkPoll :: (MonadIO m) => (JobHandle, (Int64, Async ())) -> SchedT db tp m () checkPoll (jh, (_, w)) = do r <- poll w case r of Just (Right ()) -> FL.delete taskList jh Just (Left e) -> do FL.delete taskList jh liftIO $ errorM "Periodic.Server.Scheduler" ("Poll error: " ++ show e) Nothing -> do r0 <- canRun fn unless r0 $ cancel w where (fn, _) = unHandle jh pollJob_ :: (MonadUnliftIO m, Persist db, Transport tp) => TaskList -> [FuncName] -> SchedT db tp m () pollJob_ _ [] = pure () pollJob_ taskList funcList = do now <- getEpochTime next <- (+ (100 + now)) <$> pollInterval handles <- FL.keys taskList let check job = notElem (getHandle job) handles && (getSchedAt job < next) maxBatchSize <- readTVarIO =<< asks sMaxBatchSize p <- asks sPersist jobs <- liftIO $ P.foldrPending p next funcList (foldFunc (maxBatchSize * 2) check now) PSQ.empty mapM_ (checkJob taskList) jobs autoPoll <- asks sAutoPoll atomically $ writeTVar autoPoll (length jobs > maxBatchSize) where foldFunc :: Int -> (Job -> Bool) -> Int64 -> Job -> HashPSQ JobHandle Int64 Job -> HashPSQ JobHandle Int64 Job foldFunc s f now job acc | f job = trimPSQ $ PSQ.insert (getHandle job) (now - getSchedAt job) job acc | otherwise = acc where trimPSQ :: HashPSQ JobHandle Int64 Job -> HashPSQ JobHandle Int64 Job trimPSQ q | PSQ.size q > s = trimPSQ $ PSQ.deleteMin q | otherwise = q checkJob :: (MonadUnliftIO m, Persist db, Transport tp) => TaskList -> Job -> SchedT db tp m () checkJob tl job = do w <- findTask tl job case w of Nothing -> do p <- asks sPersist isProc <- liftIO $ P.member p Running fn jn unless isProc $ reSchedJob tl job Just w0 -> do r <- canRun fn unless r $ cancel w0 where fn = getFuncName job jn = getName job pushChanList :: MonadIO m => Action -> SchedT db tp m () pushChanList act = do cl <- asks sChanList atomically $ do l <- readTVar cl writeTVar cl (act:l) pushJob :: (MonadIO m, Persist db) => Job -> SchedT db tp m () pushJob job = do liftIO $ infoM "Periodic.Server.Scheduler" ("pushJob: " ++ show (getHandle job)) p <- asks sPersist isRunning <- liftIO $ P.member p Running fn jn unless isRunning $ do job' <- fixedSchedAt job liftIO $ P.insert p Pending fn jn job' pushChanList (Add job') where fn = getFuncName job jn = getName job fixedSchedAt :: MonadIO m => Job -> SchedT db tp m Job fixedSchedAt job = do now <- getEpochTime if getSchedAt job < now then return $ setSchedAt now job else return job reSchedJob :: (MonadUnliftIO m, Persist db, Transport tp) => TaskList -> Job -> SchedT db tp m () reSchedJob taskList job = do w <- findTask taskList job forM_ w cancel interval <- (+100) <$> pollInterval next <- (+ interval) <$> getEpochTime when (getSchedAt job < next) $ do r <- canRun $ getFuncName job c <- check taskList when (r && c) $ do w' <- schedJob taskList job FL.insert taskList (getHandle job) (getSchedAt job, w') where check :: (MonadIO m) => TaskList -> SchedT db tp m Bool check tl = do maxBatchSize <- readTVarIO =<< asks sMaxBatchSize size <- FL.size tl if size < maxBatchSize * 2 then return True else do lastTask <- findLastTask tl case lastTask of Nothing -> return True Just (sc, jh, w) -> if sc < getSchedAt job then return False else do cancel w FL.delete taskList jh return True findTask :: MonadIO m => TaskList -> Job -> SchedT db tp m (Maybe (Async ())) findTask taskList job = fmap snd <$> FL.lookup taskList (getHandle job) findLastTask :: MonadIO m => TaskList -> SchedT db tp m (Maybe (Int64, JobHandle, Async ())) findLastTask tl = atomically $ FL.foldrWithKeySTM tl f Nothing where f :: JobHandle -> (Int64, a) -> Maybe (Int64, JobHandle, a) -> Maybe (Int64, JobHandle, a) f jh (sc, t) Nothing = Just (sc, jh, t) f jh (sc, t) (Just (sc1, jh1, t1)) | sc > sc1 = Just (sc, jh, t) | otherwise = Just (sc1, jh1, t1) canRun :: MonadIO m => FuncName -> SchedT db tp m Bool canRun fn = asks sFuncStatList >>= flip canRun_ fn canRun_ :: MonadIO m => FuncStatList -> FuncName -> m Bool canRun_ stList fn = do st0 <- FL.lookup stList fn case st0 of Nothing -> pure False Just FuncStat{sWorker=0} -> pure False Just _ -> pure True schedJob :: (MonadUnliftIO m, Persist db, Transport tp) => TaskList -> Job -> SchedT db tp m (Async ()) schedJob taskList = async . schedJob_ taskList schedJob_ :: (MonadUnliftIO m, Persist db, Transport tp) => TaskList -> Job -> SchedT db tp m () schedJob_ taskList job = do SchedEnv{..} <- ask r <- canRun fn when r $ do now <- getEpochTime when (schedAt > now + 1) . threadDelay . fromIntegral $ (schedAt - now) * 1000000 FuncStat{..} <- atomically $ do st <- FL.lookupSTM sFuncStatList fn case st of Nothing -> retrySTM Just FuncStat{sWorker=0} -> retrySTM Just st' -> pure st' if sBroadcast then popAgentListThen else popAgentThen taskList where fn = getFuncName job jn = getName job schedAt = getSchedAt job jh = getHandle job popAgentThen :: (MonadUnliftIO m, Persist db, Transport tp) => TaskList -> SchedT db tp m () popAgentThen tl = do SchedEnv{..} <- ask (jq, env0) <- atomically $ popAgentSTM sGrabQueue fn alive <- runSessionT1 env0 sessionState if alive then do IL.insert jq (getHandle job) nextSchedAt <- getEpochTime liftIO $ P.insert sPersist Running fn jn $ setSchedAt nextSchedAt job r <- doSubmitJob env0 case r of Left _ -> do liftIO $ P.insert sPersist Pending fn jn $ setSchedAt nextSchedAt job IL.delete jq jh schedJob_ tl job Right _ -> endSchedJob else schedJob_ tl job popAgentListThen :: (MonadUnliftIO m, Transport tp) => SchedT db tp m () popAgentListThen = do SchedEnv{..} <- ask agents <- popAgentList sGrabQueue fn mapM_ (doSubmitJob . snd) agents unless (null agents) endSchedJob -- wait to resched the broadcast job doSubmitJob :: (MonadUnliftIO m, Transport tp) => CSEnv tp -> SchedT db tp m (Either SomeException ()) doSubmitJob agent = do SchedEnv{..} <- ask tryAny $ assignJob agent job endSchedJob :: MonadIO m => SchedT db tp m () endSchedJob = pushChanList (TryPoll jh) adjustFuncStat :: (MonadIO m, Persist db) => FuncName -> SchedT db tp m () adjustFuncStat fn = do SchedEnv{..} <- ask size <- liftIO $ P.size sPersist Pending fn sizePQ <- liftIO $ P.size sPersist Running fn sizeL <- liftIO $ P.size sPersist Locking fn sc <- liftIO $ P.minSchedAt sPersist fn schedAt <- if sc > 0 then pure sc else getEpochTime FL.alter sFuncStatList (update (size + sizePQ + sizeL) sizePQ sizeL schedAt) fn where update :: Int64 -> Int64 -> Int64 -> Int64 -> Maybe FuncStat -> Maybe FuncStat update size sizePQ sizeL schedAt st = Just ((fromMaybe (funcStat fn) st) { sJob = size , sRunning = sizePQ , sLocking = sizeL , sSchedAt = schedAt }) removeJob :: (MonadIO m, Persist db) => Job -> SchedT db tp m () removeJob job = do liftIO $ infoM "Periodic.Server.Scheduler" ("removeJob: " ++ show (getHandle job)) p <- asks sPersist liftIO $ P.delete p fn jn pushChanList (Remove job) pushResult jh "" where jn = getName job fn = getFuncName job jh = getHandle job dumpJob :: (MonadIO m, Persist db) => SchedT db tp m [Job] dumpJob = liftIO . P.dumpJob =<< asks sPersist alterFunc :: (MonadIO m, Persist db) => FuncName -> (Maybe FuncStat -> Maybe FuncStat) -> SchedT db tp m () alterFunc n f = do SchedEnv{..} <- ask FL.alter sFuncStatList f n liftIO $ P.insertFuncName sPersist n pushChanList PollJob addFunc :: (MonadIO m, Persist db) => FuncName -> SchedT db tp m () addFunc n = broadcastFunc n False broadcastFunc :: (MonadIO m, Persist db) => FuncName -> Bool -> SchedT db tp m () broadcastFunc n cast = do liftIO $ infoM "Periodic.Server.Scheduler" (h ++ ": " ++ show n) alterFunc n updateStat where updateStat :: Maybe FuncStat -> Maybe FuncStat updateStat Nothing = Just ((funcStat n) {sWorker = 1, sBroadcast = cast}) updateStat (Just fs) = Just (fs { sWorker = sWorker fs + 1, sBroadcast = cast }) h = if cast then "broadcastFunc" else "addFunc" removeFunc :: (MonadIO m, Persist db) => FuncName -> SchedT db tp m () removeFunc n = do liftIO $ infoM "Periodic.Server.Scheduler" ("removeFunc: " ++ show n) alterFunc n updateStat where updateStat :: Maybe FuncStat -> Maybe FuncStat updateStat Nothing = Just (funcStat n) updateStat (Just fs) = Just (fs { sWorker = max (sWorker fs - 1) 0 }) dropFunc :: (MonadUnliftIO m, Persist db) => FuncName -> SchedT db tp m () dropFunc n = do liftIO $ infoM "Periodic.Server.Scheduler" ("dropFunc: " ++ show n) SchedEnv{..} <- ask L.with sLocker $ do st <- FL.lookup sFuncStatList n case st of Just FuncStat{sWorker=0} -> do FL.delete sFuncStatList n liftIO $ P.removeFuncName sPersist n _ -> pure () pushChanList PollJob pushGrab :: MonadIO m => IOList FuncName -> IOList JobHandle -> CSEnv tp -> SchedT db tp m () pushGrab funcList handleList ag = do queue <- asks sGrabQueue pushAgent queue funcList handleList ag assignJob :: (MonadUnliftIO m, Transport tp) => CSEnv tp -> Job -> m () assignJob env0 job = do liftIO $ infoM "Periodic.Server.Scheduler" ("assignJob: " ++ show (getHandle job)) runSessionT1 env0 $ send $ packetRES (JobAssign job) failJob :: (MonadUnliftIO m, Persist db) => JobHandle -> SchedT db tp m () failJob jh = do liftIO $ infoM "Periodic.Server.Scheduler" ("failJob: " ++ show jh) releaseLock' jh isWaiting <- existsWaitList jh if isWaiting then do removeFromWaitList jh doneJob jh "" else do p <- asks sPersist job <- liftIO $ P.lookup p Running fn jn when (isJust job) $ do nextSchedAt <- getEpochTime retryJob $ setSchedAt nextSchedAt $ fromJust job where (fn, jn) = unHandle jh retryJob :: (MonadIO m, Persist db) => Job -> SchedT db tp m () retryJob job = do p <- asks sPersist liftIO $ P.insert p Pending fn jn job pushChanList (Add job) where fn = getFuncName job jn = getName job doneJob :: (MonadUnliftIO m, Persist db) => JobHandle -> ByteString -> SchedT db tp m () doneJob jh w = do liftIO $ infoM "Periodic.Server.Scheduler" ("doneJob: " ++ show jh) releaseLock' jh p <- asks sPersist liftIO $ P.delete p fn jn pushResult jh w where (fn, jn) = unHandle jh schedLaterJob :: (MonadUnliftIO m, Persist db) => JobHandle -> Int64 -> Int -> SchedT db tp m () schedLaterJob jh later step = do liftIO $ infoM "Periodic.Server.Scheduler" ("schedLaterJob: " ++ show jh) releaseLock' jh isWaiting <- existsWaitList jh if isWaiting then do removeFromWaitList jh doneJob jh "" else do p <- asks sPersist job <- liftIO $ P.lookup p Running fn jn when (isJust job) $ do let job' = fromJust job nextSchedAt <- (+) later <$> getEpochTime retryJob $ setCount (getCount job' + step) $ setSchedAt nextSchedAt job' where (fn, jn) = unHandle jh acquireLock :: (MonadUnliftIO m, Persist db) => LockName -> Int -> JobHandle -> SchedT db tp m Bool acquireLock name count jh = do liftIO $ infoM "Periodic.Server.Scheduler" ("acquireLock: " ++ show name ++ " " ++ show count ++ " " ++ show jh) locker <- asks sLocker L.with locker $ do lockList <- asks sLockList p <- asks sPersist j <- liftIO $ P.lookup p Running fn jn case j of Nothing -> pure True Just job -> do r <- atomically $ do l <- FL.lookupSTM lockList name case l of Nothing -> do FL.insertSTM lockList name LockInfo { acquired = [jh] , locked = [] , maxCount = count } pure True Just info@LockInfo {..} -> do let newCount = max maxCount count if jh `elem` acquired then pure True else if jh `elem` locked then pure False else if length acquired < maxCount then do FL.insertSTM lockList name info { acquired = acquired ++ [jh] , maxCount = newCount } pure True else do FL.insertSTM lockList name info { locked = locked ++ [jh] , maxCount = newCount } pure False unless r $ liftIO $ P.insert p Locking fn jn job return r where (fn, jn) = unHandle jh releaseLock :: (MonadUnliftIO m, Persist db) => LockName -> JobHandle -> SchedT db tp m () releaseLock name jh = do locker <- asks sLocker L.with locker $ releaseLock_ name jh releaseLock_ :: (MonadUnliftIO m, Persist db) => LockName -> JobHandle -> SchedT db tp m () releaseLock_ name jh = do liftIO $ infoM "Periodic.Server.Scheduler" ("releaseLock: " ++ show name ++ " " ++ show jh) p <- asks sPersist lockList <- asks sLockList h <- atomically $ do l <- FL.lookupSTM lockList name case l of Nothing -> pure Nothing Just info@LockInfo {..} -> if jh `elem` acquired then case locked of [] -> do FL.insertSTM lockList name info { acquired = L.delete jh acquired } pure Nothing x:xs -> do FL.insertSTM lockList name info { acquired = L.delete jh acquired , locked = xs } pure $ Just x else pure Nothing case h of Nothing -> pure () Just hh -> do let (fn, jn) = unHandle hh j <- liftIO $ P.lookup p Locking fn jn case j of Nothing -> releaseLock_ name hh Just job -> do liftIO $ P.insert p Pending fn jn job pushChanList (Add job) releaseLock' :: (MonadUnliftIO m, Persist db) => JobHandle -> SchedT db tp m () releaseLock' jh = do lockList <- asks sLockList names <- atomically $ FL.foldrWithKeySTM lockList foldFunc [] mapM_ (`releaseLock` jh) names where foldFunc :: LockName -> LockInfo -> [LockName] -> [LockName] foldFunc n LockInfo {..} acc | jh `elem` acquired = n : acc | jh `elem` locked = n : acc | otherwise = acc countLock :: MonadUnliftIO m => (LockInfo -> [JobHandle]) -> FuncName -> SchedT db tp m Int countLock f fn = do lockList <- asks sLockList sum . map mapFunc <$> FL.elems lockList where filterFunc :: JobHandle -> Bool filterFunc jh = fn0 == fn where (fn0, _) = unHandle jh mapFunc :: LockInfo -> Int mapFunc = length . filter filterFunc . f getMaxLockCount :: MonadUnliftIO m => SchedT db tp m Int getMaxLockCount = do lockList <- asks sLockList maximum . map maxCount <$> FL.elems lockList status :: (MonadIO m, Persist db) => SchedT db tp m [FuncStat] status = do mapM_ adjustFuncStat =<< liftIO . P.funcList =<< asks sPersist FL.elems =<< asks sFuncStatList revertRunningQueue :: (MonadUnliftIO m, Persist db) => SchedT db tp m () revertRunningQueue = do now <- getEpochTime tout <- fmap fromIntegral . readTVarIO =<< asks sTaskTimeout p <- asks sPersist handles <- liftIO $ P.foldr p Running (foldFunc (check now tout)) [] mapM_ (failJob . getHandle) handles where foldFunc :: (Job -> Bool) -> Job -> [Job] -> [Job] foldFunc f job acc | f job = job : acc | otherwise = acc check :: Int64 -> Int64 -> Job -> Bool check now t0 job | getTimeout job > 0 = getSchedAt job + fromIntegral (getTimeout job) < now | otherwise = getSchedAt job + fromIntegral t0 < now revertLockingQueue :: (MonadUnliftIO m, Persist db) => SchedT db tp m () revertLockingQueue = mapM_ checkAndReleaseLock =<< liftIO . P.funcList =<< asks sPersist where checkAndReleaseLock :: (MonadUnliftIO m, Persist db) => FuncName -> SchedT db tp m () checkAndReleaseLock fn = do p <- asks sPersist sizeLocked <- countLock locked fn sizeAcquired <- countLock acquired fn liftIO $ infoM "Peridic.Server.Scheduler" $ "LockInfo " ++ show fn ++ " Locked:" ++ show sizeLocked ++ " Acquired:" ++ show sizeAcquired when (sizeLocked > 0 && sizeAcquired == 0) $ do count <- getMaxLockCount handles <- liftIO $ P.foldrLocking p count fn (:) [] mapM_ pushJob handles purgeExpired :: MonadIO m => SchedT db tp m () purgeExpired = do now <- getEpochTime wl <- asks sWaitList ex <- fmap fromIntegral . readTVarIO =<< asks sExpiration atomically $ do ks <- FL.foldrWithKeySTM wl (foldFunc (check (now - ex))) [] mapM_ (FL.deleteSTM wl) ks where foldFunc :: (WaitItem -> Bool) -> JobHandle -> WaitItem -> [JobHandle] -> [JobHandle] foldFunc f jh v acc | f v = jh : acc | otherwise = acc check :: Int64 -> WaitItem -> Bool check t0 item = itemTs item < t0 shutdown :: (MonadUnliftIO m) => SchedT db tp m () shutdown = do liftIO $ infoM "Periodic.Server.Scheduler" "Scheduler shutdown" SchedEnv{..} <- ask pushChanList Cancel alive <- atomically $ do t <- readTVar sAlive writeTVar sAlive False return t when alive . void . async $ liftIO sCleanup prepareWait :: MonadIO m => Job -> SchedT db tp m () prepareWait job = pushResult_ updateWL jh where updateWL :: Int64 -> Maybe WaitItem -> Maybe WaitItem updateWL now Nothing = Just $ WaitItem {itemTs = now, itemValue = Nothing, itemWait = 1} updateWL now (Just item) = Just $ item {itemTs = now, itemWait = itemWait item + 1} jh = getHandle job waitResult :: MonadIO m => TVar Bool -> Job -> SchedT db tp m ByteString waitResult state job = do wl <- asks sWaitList atomically $ do st <- readTVar state if st then do w0 <- FL.lookupSTM wl jh case w0 of Nothing -> pure "" Just item -> case itemValue item of Nothing -> retrySTM Just w1 -> do if itemWait item > 1 then FL.insertSTM wl jh item { itemWait = itemWait item - 1 } else FL.deleteSTM wl jh pure w1 else pure "" where jh = getHandle job pushResult :: MonadIO m => JobHandle -> ByteString -> SchedT db tp m () pushResult jh w = pushResult_ updateWL jh where updateWL :: Int64 -> Maybe WaitItem -> Maybe WaitItem updateWL _ Nothing = Nothing updateWL now (Just item) = Just item {itemTs=now, itemValue = Just w} pushResult_ :: MonadIO m => (Int64 -> Maybe WaitItem -> Maybe WaitItem) -> JobHandle -> SchedT db tp m () pushResult_ f jh = do wl <- asks sWaitList now <- getEpochTime FL.alter wl (f now) jh existsWaitList :: MonadIO m => JobHandle -> SchedT db tp m Bool existsWaitList jh = do wl <- asks sWaitList isJust <$> FL.lookup wl jh lookupPrevResult :: MonadIO m => Job -> SchedT db tp m (Maybe ByteString) lookupPrevResult job = do wl <- asks sWaitList r <- FL.lookup wl jh case r of Nothing -> pure Nothing (Just WaitItem {itemValue = Nothing}) -> pure Nothing (Just WaitItem {itemValue = Just v}) -> pure (Just v) where jh = getHandle job removeFromWaitList :: MonadIO m => JobHandle -> SchedT db tp m () removeFromWaitList jh = do wl <- asks sWaitList FL.delete wl jh