{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}
module Periodic.Server.Scheduler
( SchedT
, runSchedT
, SchedEnv
, initSchedEnv
, startSchedT
, pushJob
, pushGrab
, failJob
, doneJob
, schedLaterJob
, acquireLock
, releaseLock
, addFunc
, removeFunc
, broadcastFunc
, dropFunc
, removeJob
, dumpJob
, status
, shutdown
, keepalive
, setConfigInt
, getConfigInt
, prepareWait
, lookupPrevResult
, waitResult
, canRun
) where
import Control.Monad (forever, mzero, unless, void,
when)
import Control.Monad.Reader.Class (MonadReader (ask), asks)
import Control.Monad.Trans.Class (MonadTrans, lift)
import Control.Monad.Trans.Maybe (runMaybeT)
import Control.Monad.Trans.Reader (ReaderT (..), runReaderT)
import Data.ByteString (ByteString)
import Data.Foldable (forM_)
import Data.HashPSQ (HashPSQ)
import qualified Data.HashPSQ as PSQ
import Data.Int (Int64)
import qualified Data.List as L (delete)
import Data.Maybe (fromJust, fromMaybe, isJust)
import Metro.Class (Transport)
import Metro.IOHashMap (IOHashMap, newIOHashMap)
import qualified Metro.IOHashMap as FL
import qualified Metro.Lock as L (Lock, new, with)
import Metro.Session (runSessionT1, send, sessionState)
import Metro.Utils (getEpochTime)
import Periodic.IOList (IOList)
import qualified Periodic.IOList as IL
import Periodic.Server.FuncStat
import Periodic.Server.GrabQueue
import Periodic.Server.Persist (Persist, State (..))
import qualified Periodic.Server.Persist as P
import Periodic.Server.Types (CSEnv)
import Periodic.Types (packetRES)
import Periodic.Types.Internal (LockName)
import Periodic.Types.Job
import Periodic.Types.ServerCommand (ServerCommand (JobAssign))
import System.Log.Logger (errorM, infoM)
import UnliftIO
import UnliftIO.Concurrent (threadDelay)
data Action = Add Job
| Remove Job
| Cancel
| PollJob
| TryPoll JobHandle
data WaitItem = WaitItem
{ WaitItem -> Int64
itemTs :: Int64
, WaitItem -> Maybe ByteString
itemValue :: Maybe ByteString
, WaitItem -> Int
itemWait :: Int
}
type WaitList = IOHashMap JobHandle WaitItem
data LockInfo = LockInfo
{ LockInfo -> [JobHandle]
acquired :: [JobHandle]
, LockInfo -> [JobHandle]
locked :: [JobHandle]
, LockInfo -> Int
maxCount :: Int
}
type LockList = IOHashMap LockName LockInfo
data SchedEnv db tp = SchedEnv
{ SchedEnv db tp -> TVar Int
sPollInterval :: TVar Int
, SchedEnv db tp -> TVar Int
sRevertInterval :: TVar Int
, SchedEnv db tp -> TVar Int
sTaskTimeout :: TVar Int
, SchedEnv db tp -> TVar Int
sMaxBatchSize :: TVar Int
, SchedEnv db tp -> TVar Int
sKeepalive :: TVar Int
, SchedEnv db tp -> TVar Int
sExpiration :: TVar Int
, SchedEnv db tp -> TVar Bool
sAutoPoll :: TVar Bool
, SchedEnv db tp -> TVar Bool
sPolled :: TVar Bool
, SchedEnv db tp -> IO ()
sCleanup :: IO ()
, SchedEnv db tp -> FuncStatList
sFuncStatList :: FuncStatList
, SchedEnv db tp -> Lock
sLocker :: L.Lock
, SchedEnv db tp -> GrabQueue tp
sGrabQueue :: GrabQueue tp
, SchedEnv db tp -> TVar Bool
sAlive :: TVar Bool
, SchedEnv db tp -> TVar [Action]
sChanList :: TVar [Action]
, SchedEnv db tp -> WaitList
sWaitList :: WaitList
, SchedEnv db tp -> LockList
sLockList :: LockList
, SchedEnv db tp -> db
sPersist :: db
}
newtype SchedT db tp m a = SchedT {SchedT db tp m a -> ReaderT (SchedEnv db tp) m a
unSchedT :: ReaderT (SchedEnv db tp) m a}
deriving
( a -> SchedT db tp m b -> SchedT db tp m a
(a -> b) -> SchedT db tp m a -> SchedT db tp m b
(forall a b. (a -> b) -> SchedT db tp m a -> SchedT db tp m b)
-> (forall a b. a -> SchedT db tp m b -> SchedT db tp m a)
-> Functor (SchedT db tp m)
forall a b. a -> SchedT db tp m b -> SchedT db tp m a
forall a b. (a -> b) -> SchedT db tp m a -> SchedT db tp m b
forall db tp (m :: * -> *) a b.
Functor m =>
a -> SchedT db tp m b -> SchedT db tp m a
forall db tp (m :: * -> *) a b.
Functor m =>
(a -> b) -> SchedT db tp m a -> SchedT db tp m b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> SchedT db tp m b -> SchedT db tp m a
$c<$ :: forall db tp (m :: * -> *) a b.
Functor m =>
a -> SchedT db tp m b -> SchedT db tp m a
fmap :: (a -> b) -> SchedT db tp m a -> SchedT db tp m b
$cfmap :: forall db tp (m :: * -> *) a b.
Functor m =>
(a -> b) -> SchedT db tp m a -> SchedT db tp m b
Functor
, Functor (SchedT db tp m)
a -> SchedT db tp m a
Functor (SchedT db tp m) =>
(forall a. a -> SchedT db tp m a)
-> (forall a b.
SchedT db tp m (a -> b) -> SchedT db tp m a -> SchedT db tp m b)
-> (forall a b c.
(a -> b -> c)
-> SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m c)
-> (forall a b.
SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m b)
-> (forall a b.
SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m a)
-> Applicative (SchedT db tp m)
SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m b
SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m a
SchedT db tp m (a -> b) -> SchedT db tp m a -> SchedT db tp m b
(a -> b -> c)
-> SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m c
forall a. a -> SchedT db tp m a
forall a b.
SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m a
forall a b.
SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m b
forall a b.
SchedT db tp m (a -> b) -> SchedT db tp m a -> SchedT db tp m b
forall a b c.
(a -> b -> c)
-> SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m c
forall db tp (m :: * -> *).
Applicative m =>
Functor (SchedT db tp m)
forall db tp (m :: * -> *) a.
Applicative m =>
a -> SchedT db tp m a
forall db tp (m :: * -> *) a b.
Applicative m =>
SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m a
forall db tp (m :: * -> *) a b.
Applicative m =>
SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m b
forall db tp (m :: * -> *) a b.
Applicative m =>
SchedT db tp m (a -> b) -> SchedT db tp m a -> SchedT db tp m b
forall db tp (m :: * -> *) a b c.
Applicative m =>
(a -> b -> c)
-> SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m c
forall (f :: * -> *).
Functor f =>
(forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
<* :: SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m a
$c<* :: forall db tp (m :: * -> *) a b.
Applicative m =>
SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m a
*> :: SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m b
$c*> :: forall db tp (m :: * -> *) a b.
Applicative m =>
SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m b
liftA2 :: (a -> b -> c)
-> SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m c
$cliftA2 :: forall db tp (m :: * -> *) a b c.
Applicative m =>
(a -> b -> c)
-> SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m c
<*> :: SchedT db tp m (a -> b) -> SchedT db tp m a -> SchedT db tp m b
$c<*> :: forall db tp (m :: * -> *) a b.
Applicative m =>
SchedT db tp m (a -> b) -> SchedT db tp m a -> SchedT db tp m b
pure :: a -> SchedT db tp m a
$cpure :: forall db tp (m :: * -> *) a.
Applicative m =>
a -> SchedT db tp m a
$cp1Applicative :: forall db tp (m :: * -> *).
Applicative m =>
Functor (SchedT db tp m)
Applicative
, Applicative (SchedT db tp m)
a -> SchedT db tp m a
Applicative (SchedT db tp m) =>
(forall a b.
SchedT db tp m a -> (a -> SchedT db tp m b) -> SchedT db tp m b)
-> (forall a b.
SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m b)
-> (forall a. a -> SchedT db tp m a)
-> Monad (SchedT db tp m)
SchedT db tp m a -> (a -> SchedT db tp m b) -> SchedT db tp m b
SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m b
forall a. a -> SchedT db tp m a
forall a b.
SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m b
forall a b.
SchedT db tp m a -> (a -> SchedT db tp m b) -> SchedT db tp m b
forall db tp (m :: * -> *). Monad m => Applicative (SchedT db tp m)
forall db tp (m :: * -> *) a. Monad m => a -> SchedT db tp m a
forall db tp (m :: * -> *) a b.
Monad m =>
SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m b
forall db tp (m :: * -> *) a b.
Monad m =>
SchedT db tp m a -> (a -> SchedT db tp m b) -> SchedT db tp m b
forall (m :: * -> *).
Applicative m =>
(forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
return :: a -> SchedT db tp m a
$creturn :: forall db tp (m :: * -> *) a. Monad m => a -> SchedT db tp m a
>> :: SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m b
$c>> :: forall db tp (m :: * -> *) a b.
Monad m =>
SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m b
>>= :: SchedT db tp m a -> (a -> SchedT db tp m b) -> SchedT db tp m b
$c>>= :: forall db tp (m :: * -> *) a b.
Monad m =>
SchedT db tp m a -> (a -> SchedT db tp m b) -> SchedT db tp m b
$cp1Monad :: forall db tp (m :: * -> *). Monad m => Applicative (SchedT db tp m)
Monad
, m a -> SchedT db tp m a
(forall (m :: * -> *) a. Monad m => m a -> SchedT db tp m a)
-> MonadTrans (SchedT db tp)
forall db tp (m :: * -> *) a. Monad m => m a -> SchedT db tp m a
forall (m :: * -> *) a. Monad m => m a -> SchedT db tp m a
forall (t :: (* -> *) -> * -> *).
(forall (m :: * -> *) a. Monad m => m a -> t m a) -> MonadTrans t
lift :: m a -> SchedT db tp m a
$clift :: forall db tp (m :: * -> *) a. Monad m => m a -> SchedT db tp m a
MonadTrans
, Monad (SchedT db tp m)
Monad (SchedT db tp m) =>
(forall a. IO a -> SchedT db tp m a) -> MonadIO (SchedT db tp m)
IO a -> SchedT db tp m a
forall a. IO a -> SchedT db tp m a
forall db tp (m :: * -> *). MonadIO m => Monad (SchedT db tp m)
forall db tp (m :: * -> *) a. MonadIO m => IO a -> SchedT db tp m a
forall (m :: * -> *).
Monad m =>
(forall a. IO a -> m a) -> MonadIO m
liftIO :: IO a -> SchedT db tp m a
$cliftIO :: forall db tp (m :: * -> *) a. MonadIO m => IO a -> SchedT db tp m a
$cp1MonadIO :: forall db tp (m :: * -> *). MonadIO m => Monad (SchedT db tp m)
MonadIO
, MonadReader (SchedEnv db tp)
)
instance MonadUnliftIO m => MonadUnliftIO (SchedT db tp m) where
withRunInIO :: ((forall a. SchedT db tp m a -> IO a) -> IO b) -> SchedT db tp m b
withRunInIO inner :: (forall a. SchedT db tp m a -> IO a) -> IO b
inner = ReaderT (SchedEnv db tp) m b -> SchedT db tp m b
forall db tp (m :: * -> *) a.
ReaderT (SchedEnv db tp) m a -> SchedT db tp m a
SchedT (ReaderT (SchedEnv db tp) m b -> SchedT db tp m b)
-> ReaderT (SchedEnv db tp) m b -> SchedT db tp m b
forall a b. (a -> b) -> a -> b
$
(SchedEnv db tp -> m b) -> ReaderT (SchedEnv db tp) m b
forall r (m :: * -> *) a. (r -> m a) -> ReaderT r m a
ReaderT ((SchedEnv db tp -> m b) -> ReaderT (SchedEnv db tp) m b)
-> (SchedEnv db tp -> m b) -> ReaderT (SchedEnv db tp) m b
forall a b. (a -> b) -> a -> b
$ \r :: SchedEnv db tp
r ->
((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO b) -> m b)
-> ((forall a. m a -> IO a) -> IO b) -> m b
forall a b. (a -> b) -> a -> b
$ \run :: forall a. m a -> IO a
run ->
(forall a. SchedT db tp m a -> IO a) -> IO b
inner (m a -> IO a
forall a. m a -> IO a
run (m a -> IO a)
-> (SchedT db tp m a -> m a) -> SchedT db tp m a -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SchedEnv db tp -> SchedT db tp m a -> m a
forall db tp (m :: * -> *) a.
SchedEnv db tp -> SchedT db tp m a -> m a
runSchedT SchedEnv db tp
r)
type TaskList = IOHashMap JobHandle (Int64, Async ())
runSchedT :: SchedEnv db tp -> SchedT db tp m a -> m a
runSchedT :: SchedEnv db tp -> SchedT db tp m a -> m a
runSchedT schedEnv :: SchedEnv db tp
schedEnv = (ReaderT (SchedEnv db tp) m a -> SchedEnv db tp -> m a)
-> SchedEnv db tp -> ReaderT (SchedEnv db tp) m a -> m a
forall a b c. (a -> b -> c) -> b -> a -> c
flip ReaderT (SchedEnv db tp) m a -> SchedEnv db tp -> m a
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT SchedEnv db tp
schedEnv (ReaderT (SchedEnv db tp) m a -> m a)
-> (SchedT db tp m a -> ReaderT (SchedEnv db tp) m a)
-> SchedT db tp m a
-> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SchedT db tp m a -> ReaderT (SchedEnv db tp) m a
forall db tp (m :: * -> *) a.
SchedT db tp m a -> ReaderT (SchedEnv db tp) m a
unSchedT
initSchedEnv :: (MonadUnliftIO m, Persist db) => P.PersistConfig db -> m () -> m (SchedEnv db tp)
initSchedEnv :: PersistConfig db -> m () -> m (SchedEnv db tp)
initSchedEnv config :: PersistConfig db
config sC :: m ()
sC = do
FuncStatList
sFuncStatList <- m FuncStatList
forall (m :: * -> *) a b. MonadIO m => m (IOHashMap a b)
newIOHashMap
WaitList
sWaitList <- m WaitList
forall (m :: * -> *) a b. MonadIO m => m (IOHashMap a b)
newIOHashMap
LockList
sLockList <- m LockList
forall (m :: * -> *) a b. MonadIO m => m (IOHashMap a b)
newIOHashMap
Lock
sLocker <- m Lock
forall (m :: * -> *). MonadIO m => m Lock
L.new
GrabQueue tp
sGrabQueue <- m (GrabQueue tp)
forall (m :: * -> *) tp. MonadIO m => m (GrabQueue tp)
newGrabQueue
TVar Bool
sAlive <- Bool -> m (TVar Bool)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Bool
True
TVar [Action]
sChanList <- [Action] -> m (TVar [Action])
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO []
TVar Int
sPollInterval <- Int -> m (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO 300
TVar Int
sRevertInterval <- Int -> m (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO 300
TVar Int
sTaskTimeout <- Int -> m (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO 600
TVar Int
sMaxBatchSize <- Int -> m (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO 250
TVar Int
sKeepalive <- Int -> m (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO 300
TVar Int
sExpiration <- Int -> m (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO 300
TVar Bool
sAutoPoll <- Bool -> m (TVar Bool)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Bool
False
TVar Bool
sPolled <- Bool -> m (TVar Bool)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Bool
False
IO ()
sCleanup <- m () -> m (IO ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (IO a)
toIO m ()
sC
db
sPersist <- IO db -> m db
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO db -> m db) -> IO db -> m db
forall a b. (a -> b) -> a -> b
$ PersistConfig db -> IO db
forall db. Persist db => PersistConfig db -> IO db
P.newPersist PersistConfig db
config
SchedEnv db tp -> m (SchedEnv db tp)
forall (f :: * -> *) a. Applicative f => a -> f a
pure SchedEnv :: forall db tp.
TVar Int
-> TVar Int
-> TVar Int
-> TVar Int
-> TVar Int
-> TVar Int
-> TVar Bool
-> TVar Bool
-> IO ()
-> FuncStatList
-> Lock
-> GrabQueue tp
-> TVar Bool
-> TVar [Action]
-> WaitList
-> LockList
-> db
-> SchedEnv db tp
SchedEnv{..}
startSchedT :: (MonadUnliftIO m, Persist db, Transport tp) => SchedT db tp m ()
startSchedT :: SchedT db tp m ()
startSchedT = do
IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
infoM "Periodic.Server.Scheduler" "Scheduler started"
SchedEnv{..} <- SchedT db tp m (SchedEnv db tp)
forall r (m :: * -> *). MonadReader r m => m r
ask
TVar Int -> SchedT db tp m () -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadUnliftIO m =>
TVar Int -> SchedT db tp m () -> SchedT db tp m ()
runTask_ TVar Int
sRevertInterval SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db) =>
SchedT db tp m ()
revertRunningQueue
IOHashMap JobHandle (Int64, Async ())
taskList <- SchedT db tp m (IOHashMap JobHandle (Int64, Async ()))
forall (m :: * -> *) a b. MonadIO m => m (IOHashMap a b)
newIOHashMap
TVar Int -> SchedT db tp m () -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadUnliftIO m =>
TVar Int -> SchedT db tp m () -> SchedT db tp m ()
runTask_ TVar Int
sPollInterval (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ Action -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
Action -> SchedT db tp m ()
pushChanList Action
PollJob
Int -> SchedT db tp m () -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadUnliftIO m =>
Int -> SchedT db tp m () -> SchedT db tp m ()
runTask 0 (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ IOHashMap JobHandle (Int64, Async ()) -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db, Transport tp) =>
IOHashMap JobHandle (Int64, Async ()) -> SchedT db tp m ()
runChanJob IOHashMap JobHandle (Int64, Async ())
taskList
Int -> SchedT db tp m () -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadUnliftIO m =>
Int -> SchedT db tp m () -> SchedT db tp m ()
runTask 100 SchedT db tp m ()
forall (m :: * -> *) db tp. MonadIO m => SchedT db tp m ()
purgeExpired
Int -> SchedT db tp m () -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadUnliftIO m =>
Int -> SchedT db tp m () -> SchedT db tp m ()
runTask 60 SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db) =>
SchedT db tp m ()
revertLockingQueue
String -> TVar Int -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
String -> TVar Int -> SchedT db tp m ()
loadInt "poll-interval" TVar Int
sPollInterval
String -> TVar Int -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
String -> TVar Int -> SchedT db tp m ()
loadInt "revert-interval" TVar Int
sRevertInterval
String -> TVar Int -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
String -> TVar Int -> SchedT db tp m ()
loadInt "timeout" TVar Int
sTaskTimeout
String -> TVar Int -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
String -> TVar Int -> SchedT db tp m ()
loadInt "keepalive" TVar Int
sKeepalive
String -> TVar Int -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
String -> TVar Int -> SchedT db tp m ()
loadInt "max-batch-size" TVar Int
sMaxBatchSize
String -> TVar Int -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
String -> TVar Int -> SchedT db tp m ()
loadInt "expiration" TVar Int
sExpiration
loadInt :: (MonadIO m, Persist db) => String -> TVar Int -> SchedT db tp m ()
loadInt :: String -> TVar Int -> SchedT db tp m ()
loadInt name :: String
name ref :: TVar Int
ref = do
Maybe Int
v <- IO (Maybe Int) -> SchedT db tp m (Maybe Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe Int) -> SchedT db tp m (Maybe Int))
-> (db -> IO (Maybe Int)) -> db -> SchedT db tp m (Maybe Int)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (db -> String -> IO (Maybe Int)) -> String -> db -> IO (Maybe Int)
forall a b c. (a -> b -> c) -> b -> a -> c
flip db -> String -> IO (Maybe Int)
forall db. Persist db => db -> String -> IO (Maybe Int)
P.configGet String
name (db -> SchedT db tp m (Maybe Int))
-> SchedT db tp m db -> SchedT db tp m (Maybe Int)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
case Maybe Int
v of
Nothing -> () -> SchedT db tp m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just v' :: Int
v' -> STM () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> SchedT db tp m ()) -> STM () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
ref Int
v'
saveInt :: (MonadIO m, Persist db) => String -> Int -> TVar Int -> SchedT db tp m ()
saveInt :: String -> Int -> TVar Int -> SchedT db tp m ()
saveInt name :: String
name v :: Int
v ref :: TVar Int
ref = do
db
p <- (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ db -> String -> Int -> IO ()
forall db. Persist db => db -> String -> Int -> IO ()
P.configSet db
p String
name Int
v
STM () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> SchedT db tp m ()) -> STM () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
ref Int
v
setConfigInt :: (MonadIO m, Persist db) => String -> Int -> SchedT db tp m ()
setConfigInt :: String -> Int -> SchedT db tp m ()
setConfigInt key :: String
key val :: Int
val = do
SchedEnv {..} <- SchedT db tp m (SchedEnv db tp)
forall r (m :: * -> *). MonadReader r m => m r
ask
case String
key of
"poll-interval" -> String -> Int -> TVar Int -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
String -> Int -> TVar Int -> SchedT db tp m ()
saveInt "poll-interval" Int
val TVar Int
sPollInterval
"revert-interval" -> String -> Int -> TVar Int -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
String -> Int -> TVar Int -> SchedT db tp m ()
saveInt "revert-interval" Int
val TVar Int
sRevertInterval
"timeout" -> String -> Int -> TVar Int -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
String -> Int -> TVar Int -> SchedT db tp m ()
saveInt "timeout" Int
val TVar Int
sTaskTimeout
"keepalive" -> String -> Int -> TVar Int -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
String -> Int -> TVar Int -> SchedT db tp m ()
saveInt "keepalive" Int
val TVar Int
sKeepalive
"max-batch-size" -> String -> Int -> TVar Int -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
String -> Int -> TVar Int -> SchedT db tp m ()
saveInt "max-batch-size" Int
val TVar Int
sMaxBatchSize
"expiration" -> String -> Int -> TVar Int -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
String -> Int -> TVar Int -> SchedT db tp m ()
saveInt "expiration" Int
val TVar Int
sExpiration
_ -> () -> SchedT db tp m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
getConfigInt :: (MonadIO m, Persist db) => String -> SchedT db tp m Int
getConfigInt :: String -> SchedT db tp m Int
getConfigInt key :: String
key = do
SchedEnv {..} <- SchedT db tp m (SchedEnv db tp)
forall r (m :: * -> *). MonadReader r m => m r
ask
case String
key of
"poll-interval" -> TVar Int -> SchedT db tp m Int
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar Int
sPollInterval
"revert-interval" -> TVar Int -> SchedT db tp m Int
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar Int
sRevertInterval
"timeout" -> TVar Int -> SchedT db tp m Int
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar Int
sTaskTimeout
"keepalive" -> TVar Int -> SchedT db tp m Int
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar Int
sKeepalive
"max-batch-size" -> TVar Int -> SchedT db tp m Int
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar Int
sMaxBatchSize
"expiration" -> TVar Int -> SchedT db tp m Int
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar Int
sExpiration
_ -> Int -> SchedT db tp m Int
forall (f :: * -> *) a. Applicative f => a -> f a
pure 0
keepalive :: Monad m => SchedT db tp m (TVar Int)
keepalive :: SchedT db tp m (TVar Int)
keepalive = (SchedEnv db tp -> TVar Int) -> SchedT db tp m (TVar Int)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> TVar Int
forall db tp. SchedEnv db tp -> TVar Int
sKeepalive
runTask :: (MonadUnliftIO m) => Int -> SchedT db tp m () -> SchedT db tp m ()
runTask :: Int -> SchedT db tp m () -> SchedT db tp m ()
runTask d :: Int
d m :: SchedT db tp m ()
m = (TVar Int -> SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> TVar Int -> SchedT db tp m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip TVar Int -> SchedT db tp m () -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadUnliftIO m =>
TVar Int -> SchedT db tp m () -> SchedT db tp m ()
runTask_ SchedT db tp m ()
m (TVar Int -> SchedT db tp m ())
-> SchedT db tp m (TVar Int) -> SchedT db tp m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Int -> SchedT db tp m (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Int
d
runTask_ :: (MonadUnliftIO m) => TVar Int -> SchedT db tp m () -> SchedT db tp m ()
runTask_ :: TVar Int -> SchedT db tp m () -> SchedT db tp m ()
runTask_ d :: TVar Int
d m :: SchedT db tp m ()
m = SchedT db tp m (Async ()) -> SchedT db tp m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (SchedT db tp m (Async ()) -> SchedT db tp m ())
-> (SchedT db tp m () -> SchedT db tp m (Async ()))
-> SchedT db tp m ()
-> SchedT db tp m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SchedT db tp m () -> SchedT db tp m (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ do
SchedEnv{..} <- SchedT db tp m (SchedEnv db tp)
forall r (m :: * -> *). MonadReader r m => m r
ask
SchedT db tp m (Maybe Any) -> SchedT db tp m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (SchedT db tp m (Maybe Any) -> SchedT db tp m ())
-> (MaybeT (SchedT db tp m) () -> SchedT db tp m (Maybe Any))
-> MaybeT (SchedT db tp m) ()
-> SchedT db tp m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MaybeT (SchedT db tp m) Any -> SchedT db tp m (Maybe Any)
forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT (MaybeT (SchedT db tp m) Any -> SchedT db tp m (Maybe Any))
-> (MaybeT (SchedT db tp m) () -> MaybeT (SchedT db tp m) Any)
-> MaybeT (SchedT db tp m) ()
-> SchedT db tp m (Maybe Any)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MaybeT (SchedT db tp m) () -> MaybeT (SchedT db tp m) Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (MaybeT (SchedT db tp m) () -> SchedT db tp m ())
-> MaybeT (SchedT db tp m) () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ do
Int
interval <- TVar Int -> MaybeT (SchedT db tp m) Int
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar Int
d
Bool -> MaybeT (SchedT db tp m) () -> MaybeT (SchedT db tp m) ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
interval Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> 0) (MaybeT (SchedT db tp m) () -> MaybeT (SchedT db tp m) ())
-> MaybeT (SchedT db tp m) () -> MaybeT (SchedT db tp m) ()
forall a b. (a -> b) -> a -> b
$ Int -> MaybeT (SchedT db tp m) ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay (Int -> MaybeT (SchedT db tp m) ())
-> Int -> MaybeT (SchedT db tp m) ()
forall a b. (a -> b) -> a -> b
$ Int
interval Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000
Bool
alive <- TVar Bool -> MaybeT (SchedT db tp m) Bool
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar Bool
sAlive
if Bool
alive then SchedT db tp m () -> MaybeT (SchedT db tp m) ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift SchedT db tp m ()
m
else MaybeT (SchedT db tp m) ()
forall (m :: * -> *) a. MonadPlus m => m a
mzero
runChanJob
:: (MonadUnliftIO m, Persist db, Transport tp)
=> TaskList -> SchedT db tp m ()
runChanJob :: IOHashMap JobHandle (Int64, Async ()) -> SchedT db tp m ()
runChanJob taskList :: IOHashMap JobHandle (Int64, Async ())
taskList = do
TVar [Action]
cl <- (SchedEnv db tp -> TVar [Action]) -> SchedT db tp m (TVar [Action])
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> TVar [Action]
forall db tp. SchedEnv db tp -> TVar [Action]
sChanList
TVar Bool
al <- (SchedEnv db tp -> TVar Bool) -> SchedT db tp m (TVar Bool)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> TVar Bool
forall db tp. SchedEnv db tp -> TVar Bool
sAlive
[Action]
acts <- STM [Action] -> SchedT db tp m [Action]
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM [Action] -> SchedT db tp m [Action])
-> STM [Action] -> SchedT db tp m [Action]
forall a b. (a -> b) -> a -> b
$ do
[Action]
acts <- TVar [Action] -> STM [Action]
forall a. TVar a -> STM a
readTVar TVar [Action]
cl
if [Action] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Action]
acts then do
Bool
st <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
al
if Bool
st then STM [Action]
forall a. STM a
retrySTM
else [Action] -> STM [Action]
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
else do
TVar [Action] -> [Action] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [Action]
cl []
[Action] -> STM [Action]
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Action]
acts
(Action -> SchedT db tp m ()) -> [Action] -> SchedT db tp m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (IOHashMap JobHandle (Int64, Async ())
-> Action -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db, Transport tp) =>
IOHashMap JobHandle (Int64, Async ())
-> Action -> SchedT db tp m ()
doChanJob IOHashMap JobHandle (Int64, Async ())
taskList) [Action]
acts
where doChanJob
:: (MonadUnliftIO m, Persist db, Transport tp)
=> TaskList -> Action -> SchedT db tp m ()
doChanJob :: IOHashMap JobHandle (Int64, Async ())
-> Action -> SchedT db tp m ()
doChanJob tl :: IOHashMap JobHandle (Int64, Async ())
tl (Add job :: Job
job) = IOHashMap JobHandle (Int64, Async ()) -> Job -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db, Transport tp) =>
IOHashMap JobHandle (Int64, Async ()) -> Job -> SchedT db tp m ()
reSchedJob IOHashMap JobHandle (Int64, Async ())
tl Job
job
doChanJob tl :: IOHashMap JobHandle (Int64, Async ())
tl (Remove job :: Job
job) = IOHashMap JobHandle (Int64, Async ())
-> Job -> SchedT db tp m (Maybe (Async ()))
forall (m :: * -> *) db tp.
MonadIO m =>
IOHashMap JobHandle (Int64, Async ())
-> Job -> SchedT db tp m (Maybe (Async ()))
findTask IOHashMap JobHandle (Int64, Async ())
tl Job
job SchedT db tp m (Maybe (Async ()))
-> (Maybe (Async ()) -> SchedT db tp m ()) -> SchedT db tp m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Async () -> SchedT db tp m ())
-> Maybe (Async ()) -> SchedT db tp m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Async () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
cancel
doChanJob tl :: IOHashMap JobHandle (Int64, Async ())
tl Cancel = ((Int64, Async ()) -> SchedT db tp m ())
-> [(Int64, Async ())] -> SchedT db tp m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Async () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
cancel (Async () -> SchedT db tp m ())
-> ((Int64, Async ()) -> Async ())
-> (Int64, Async ())
-> SchedT db tp m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int64, Async ()) -> Async ()
forall a b. (a, b) -> b
snd) ([(Int64, Async ())] -> SchedT db tp m ())
-> SchedT db tp m [(Int64, Async ())] -> SchedT db tp m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IOHashMap JobHandle (Int64, Async ())
-> SchedT db tp m [(Int64, Async ())]
forall (m :: * -> *) a b. MonadIO m => IOHashMap a b -> m [b]
FL.elems IOHashMap JobHandle (Int64, Async ())
tl
doChanJob tl :: IOHashMap JobHandle (Int64, Async ())
tl PollJob = IOHashMap JobHandle (Int64, Async ()) -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db, Transport tp) =>
IOHashMap JobHandle (Int64, Async ()) -> SchedT db tp m ()
pollJob IOHashMap JobHandle (Int64, Async ())
tl
doChanJob tl :: IOHashMap JobHandle (Int64, Async ())
tl (TryPoll jh :: JobHandle
jh) = IOHashMap JobHandle (Int64, Async ())
-> JobHandle -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
IOHashMap JobHandle (Int64, Async ())
-> JobHandle -> SchedT db tp m ()
removeTaskAndTryPoll IOHashMap JobHandle (Int64, Async ())
tl JobHandle
jh
pollInterval :: (MonadIO m, Num a) => SchedT db tp m a
pollInterval :: SchedT db tp m a
pollInterval = (Int -> a) -> SchedT db tp m Int -> SchedT db tp m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Int -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral (SchedT db tp m Int -> SchedT db tp m a)
-> (TVar Int -> SchedT db tp m Int) -> TVar Int -> SchedT db tp m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar Int -> SchedT db tp m Int
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (TVar Int -> SchedT db tp m a)
-> SchedT db tp m (TVar Int) -> SchedT db tp m a
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (SchedEnv db tp -> TVar Int) -> SchedT db tp m (TVar Int)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> TVar Int
forall db tp. SchedEnv db tp -> TVar Int
sPollInterval
removeTaskAndTryPoll :: MonadIO m => TaskList -> JobHandle -> SchedT db tp m ()
removeTaskAndTryPoll :: IOHashMap JobHandle (Int64, Async ())
-> JobHandle -> SchedT db tp m ()
removeTaskAndTryPoll taskList :: IOHashMap JobHandle (Int64, Async ())
taskList jh :: JobHandle
jh = do
IOHashMap JobHandle (Int64, Async ())
-> JobHandle -> SchedT db tp m ()
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> m ()
FL.delete IOHashMap JobHandle (Int64, Async ())
taskList JobHandle
jh
TVar Bool
polled <- (SchedEnv db tp -> TVar Bool) -> SchedT db tp m (TVar Bool)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> TVar Bool
forall db tp. SchedEnv db tp -> TVar Bool
sPolled
Bool
isPolled <- TVar Bool -> SchedT db tp m Bool
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar Bool
polled
Bool
autoPoll <- TVar Bool -> SchedT db tp m Bool
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (TVar Bool -> SchedT db tp m Bool)
-> SchedT db tp m (TVar Bool) -> SchedT db tp m Bool
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (SchedEnv db tp -> TVar Bool) -> SchedT db tp m (TVar Bool)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> TVar Bool
forall db tp. SchedEnv db tp -> TVar Bool
sAutoPoll
Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool
isPolled Bool -> Bool -> Bool
&& Bool
autoPoll) (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ do
Int
maxBatchSize <- TVar Int -> SchedT db tp m Int
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (TVar Int -> SchedT db tp m Int)
-> SchedT db tp m (TVar Int) -> SchedT db tp m Int
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (SchedEnv db tp -> TVar Int) -> SchedT db tp m (TVar Int)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> TVar Int
forall db tp. SchedEnv db tp -> TVar Int
sMaxBatchSize
Int
size <- IOHashMap JobHandle (Int64, Async ()) -> SchedT db tp m Int
forall (m :: * -> *) a b. MonadIO m => IOHashMap a b -> m Int
FL.size IOHashMap JobHandle (Int64, Async ())
taskList
Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
size Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
maxBatchSize) (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ do
STM () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> SchedT db tp m ()) -> STM () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
polled Bool
False
Action -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
Action -> SchedT db tp m ()
pushChanList Action
PollJob
pollJob
:: (MonadUnliftIO m, Persist db, Transport tp)
=> TaskList -> SchedT db tp m ()
pollJob :: IOHashMap JobHandle (Int64, Async ()) -> SchedT db tp m ()
pollJob taskList :: IOHashMap JobHandle (Int64, Async ())
taskList = do
TVar Bool
polled <- (SchedEnv db tp -> TVar Bool) -> SchedT db tp m (TVar Bool)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> TVar Bool
forall db tp. SchedEnv db tp -> TVar Bool
sPolled
STM () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> SchedT db tp m ()) -> STM () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
polled Bool
False
((JobHandle, (Int64, Async ())) -> SchedT db tp m ())
-> [(JobHandle, (Int64, Async ()))] -> SchedT db tp m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (JobHandle, (Int64, Async ())) -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
(JobHandle, (Int64, Async ())) -> SchedT db tp m ()
checkPoll ([(JobHandle, (Int64, Async ()))] -> SchedT db tp m ())
-> SchedT db tp m [(JobHandle, (Int64, Async ()))]
-> SchedT db tp m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IOHashMap JobHandle (Int64, Async ())
-> SchedT db tp m [(JobHandle, (Int64, Async ()))]
forall (m :: * -> *) a b. MonadIO m => IOHashMap a b -> m [(a, b)]
FL.toList IOHashMap JobHandle (Int64, Async ())
taskList
FuncStatList
stList <- (SchedEnv db tp -> FuncStatList) -> SchedT db tp m FuncStatList
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> FuncStatList
forall db tp. SchedEnv db tp -> FuncStatList
sFuncStatList
[FuncName]
funcList <- ((FuncName, FuncStat) -> [FuncName] -> [FuncName])
-> [FuncName] -> [(FuncName, FuncStat)] -> [FuncName]
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (FuncName, FuncStat) -> [FuncName] -> [FuncName]
foldFunc [] ([(FuncName, FuncStat)] -> [FuncName])
-> SchedT db tp m [(FuncName, FuncStat)]
-> SchedT db tp m [FuncName]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> FuncStatList -> SchedT db tp m [(FuncName, FuncStat)]
forall (m :: * -> *) a b. MonadIO m => IOHashMap a b -> m [(a, b)]
FL.toList FuncStatList
stList
IOHashMap JobHandle (Int64, Async ())
-> [FuncName] -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db, Transport tp) =>
IOHashMap JobHandle (Int64, Async ())
-> [FuncName] -> SchedT db tp m ()
pollJob_ IOHashMap JobHandle (Int64, Async ())
taskList [FuncName]
funcList
STM () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> SchedT db tp m ()) -> STM () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
polled Bool
True
where foldFunc :: (FuncName, FuncStat) -> [FuncName] -> [FuncName]
foldFunc :: (FuncName, FuncStat) -> [FuncName] -> [FuncName]
foldFunc (_, FuncStat{sWorker :: FuncStat -> Int64
sWorker=Int64
0}) acc :: [FuncName]
acc = [FuncName]
acc
foldFunc (fn :: FuncName
fn, _) acc :: [FuncName]
acc = FuncName
fnFuncName -> [FuncName] -> [FuncName]
forall a. a -> [a] -> [a]
:[FuncName]
acc
checkPoll
:: (MonadIO m)
=> (JobHandle, (Int64, Async ())) -> SchedT db tp m ()
checkPoll :: (JobHandle, (Int64, Async ())) -> SchedT db tp m ()
checkPoll (jh :: JobHandle
jh, (_, w :: Async ()
w)) = do
Maybe (Either SomeException ())
r <- Async () -> SchedT db tp m (Maybe (Either SomeException ()))
forall (m :: * -> *) a.
MonadIO m =>
Async a -> m (Maybe (Either SomeException a))
poll Async ()
w
case Maybe (Either SomeException ())
r of
Just (Right ()) -> IOHashMap JobHandle (Int64, Async ())
-> JobHandle -> SchedT db tp m ()
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> m ()
FL.delete IOHashMap JobHandle (Int64, Async ())
taskList JobHandle
jh
Just (Left e :: SomeException
e) -> do
IOHashMap JobHandle (Int64, Async ())
-> JobHandle -> SchedT db tp m ()
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> m ()
FL.delete IOHashMap JobHandle (Int64, Async ())
taskList JobHandle
jh
IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
errorM "Periodic.Server.Scheduler" ("Poll error: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ SomeException -> String
forall a. Show a => a -> String
show SomeException
e)
Nothing -> do
Bool
r0 <- FuncName -> SchedT db tp m Bool
forall (m :: * -> *) db tp.
MonadIO m =>
FuncName -> SchedT db tp m Bool
canRun FuncName
fn
Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
r0 (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ Async () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
cancel Async ()
w
where (fn :: FuncName
fn, _) = JobHandle -> (FuncName, JobName)
unHandle JobHandle
jh
pollJob_
:: (MonadUnliftIO m, Persist db, Transport tp)
=> TaskList -> [FuncName] -> SchedT db tp m ()
pollJob_ :: IOHashMap JobHandle (Int64, Async ())
-> [FuncName] -> SchedT db tp m ()
pollJob_ _ [] = () -> SchedT db tp m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
pollJob_ taskList :: IOHashMap JobHandle (Int64, Async ())
taskList funcList :: [FuncName]
funcList = do
Int64
now <- SchedT db tp m Int64
forall (m :: * -> *). MonadIO m => m Int64
getEpochTime
Int64
next <- (Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ (100 Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int64
now)) (Int64 -> Int64) -> SchedT db tp m Int64 -> SchedT db tp m Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SchedT db tp m Int64
forall (m :: * -> *) a db tp.
(MonadIO m, Num a) =>
SchedT db tp m a
pollInterval
[JobHandle]
handles <- IOHashMap JobHandle (Int64, Async ()) -> SchedT db tp m [JobHandle]
forall (m :: * -> *) a b. MonadIO m => IOHashMap a b -> m [a]
FL.keys IOHashMap JobHandle (Int64, Async ())
taskList
let check :: Job -> Bool
check job :: Job
job = JobHandle -> [JobHandle] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
notElem (Job -> JobHandle
getHandle Job
job) [JobHandle]
handles Bool -> Bool -> Bool
&& (Job -> Int64
getSchedAt Job
job Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
< Int64
next)
Int
maxBatchSize <- TVar Int -> SchedT db tp m Int
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (TVar Int -> SchedT db tp m Int)
-> SchedT db tp m (TVar Int) -> SchedT db tp m Int
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (SchedEnv db tp -> TVar Int) -> SchedT db tp m (TVar Int)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> TVar Int
forall db tp. SchedEnv db tp -> TVar Int
sMaxBatchSize
db
p <- (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
HashPSQ JobHandle Int64 Job
jobs <- IO (HashPSQ JobHandle Int64 Job)
-> SchedT db tp m (HashPSQ JobHandle Int64 Job)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HashPSQ JobHandle Int64 Job)
-> SchedT db tp m (HashPSQ JobHandle Int64 Job))
-> IO (HashPSQ JobHandle Int64 Job)
-> SchedT db tp m (HashPSQ JobHandle Int64 Job)
forall a b. (a -> b) -> a -> b
$
db
-> Int64
-> [FuncName]
-> (Job
-> HashPSQ JobHandle Int64 Job -> HashPSQ JobHandle Int64 Job)
-> HashPSQ JobHandle Int64 Job
-> IO (HashPSQ JobHandle Int64 Job)
forall db a.
Persist db =>
db -> Int64 -> [FuncName] -> (Job -> a -> a) -> a -> IO a
P.foldrPending db
p Int64
next [FuncName]
funcList (Int
-> (Job -> Bool)
-> Int64
-> Job
-> HashPSQ JobHandle Int64 Job
-> HashPSQ JobHandle Int64 Job
foldFunc (Int
maxBatchSize Int -> Int -> Int
forall a. Num a => a -> a -> a
* 2) Job -> Bool
check Int64
now) HashPSQ JobHandle Int64 Job
forall k p v. HashPSQ k p v
PSQ.empty
(Job -> SchedT db tp m ())
-> HashPSQ JobHandle Int64 Job -> SchedT db tp m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (IOHashMap JobHandle (Int64, Async ()) -> Job -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db, Transport tp) =>
IOHashMap JobHandle (Int64, Async ()) -> Job -> SchedT db tp m ()
checkJob IOHashMap JobHandle (Int64, Async ())
taskList) HashPSQ JobHandle Int64 Job
jobs
TVar Bool
autoPoll <- (SchedEnv db tp -> TVar Bool) -> SchedT db tp m (TVar Bool)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> TVar Bool
forall db tp. SchedEnv db tp -> TVar Bool
sAutoPoll
STM () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> SchedT db tp m ()) -> STM () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
autoPoll (HashPSQ JobHandle Int64 Job -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length HashPSQ JobHandle Int64 Job
jobs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
maxBatchSize)
where foldFunc :: Int -> (Job -> Bool) -> Int64 -> Job -> HashPSQ JobHandle Int64 Job -> HashPSQ JobHandle Int64 Job
foldFunc :: Int
-> (Job -> Bool)
-> Int64
-> Job
-> HashPSQ JobHandle Int64 Job
-> HashPSQ JobHandle Int64 Job
foldFunc s :: Int
s f :: Job -> Bool
f now :: Int64
now job :: Job
job acc :: HashPSQ JobHandle Int64 Job
acc | Job -> Bool
f Job
job = HashPSQ JobHandle Int64 Job -> HashPSQ JobHandle Int64 Job
trimPSQ (HashPSQ JobHandle Int64 Job -> HashPSQ JobHandle Int64 Job)
-> HashPSQ JobHandle Int64 Job -> HashPSQ JobHandle Int64 Job
forall a b. (a -> b) -> a -> b
$ JobHandle
-> Int64
-> Job
-> HashPSQ JobHandle Int64 Job
-> HashPSQ JobHandle Int64 Job
forall k p v.
(Ord k, Hashable k, Ord p) =>
k -> p -> v -> HashPSQ k p v -> HashPSQ k p v
PSQ.insert (Job -> JobHandle
getHandle Job
job) (Int64
now Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Job -> Int64
getSchedAt Job
job) Job
job HashPSQ JobHandle Int64 Job
acc
| Bool
otherwise = HashPSQ JobHandle Int64 Job
acc
where trimPSQ :: HashPSQ JobHandle Int64 Job -> HashPSQ JobHandle Int64 Job
trimPSQ :: HashPSQ JobHandle Int64 Job -> HashPSQ JobHandle Int64 Job
trimPSQ q :: HashPSQ JobHandle Int64 Job
q | HashPSQ JobHandle Int64 Job -> Int
forall k p v. HashPSQ k p v -> Int
PSQ.size HashPSQ JobHandle Int64 Job
q Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
s = HashPSQ JobHandle Int64 Job -> HashPSQ JobHandle Int64 Job
trimPSQ (HashPSQ JobHandle Int64 Job -> HashPSQ JobHandle Int64 Job)
-> HashPSQ JobHandle Int64 Job -> HashPSQ JobHandle Int64 Job
forall a b. (a -> b) -> a -> b
$ HashPSQ JobHandle Int64 Job -> HashPSQ JobHandle Int64 Job
forall k p v.
(Hashable k, Ord k, Ord p) =>
HashPSQ k p v -> HashPSQ k p v
PSQ.deleteMin HashPSQ JobHandle Int64 Job
q
| Bool
otherwise = HashPSQ JobHandle Int64 Job
q
checkJob
:: (MonadUnliftIO m, Persist db, Transport tp)
=> TaskList -> Job -> SchedT db tp m ()
checkJob :: IOHashMap JobHandle (Int64, Async ()) -> Job -> SchedT db tp m ()
checkJob tl :: IOHashMap JobHandle (Int64, Async ())
tl job :: Job
job = do
Maybe (Async ())
w <- IOHashMap JobHandle (Int64, Async ())
-> Job -> SchedT db tp m (Maybe (Async ()))
forall (m :: * -> *) db tp.
MonadIO m =>
IOHashMap JobHandle (Int64, Async ())
-> Job -> SchedT db tp m (Maybe (Async ()))
findTask IOHashMap JobHandle (Int64, Async ())
tl Job
job
case Maybe (Async ())
w of
Nothing -> do
db
p <- (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
Bool
isProc <- IO Bool -> SchedT db tp m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> SchedT db tp m Bool) -> IO Bool -> SchedT db tp m Bool
forall a b. (a -> b) -> a -> b
$ db -> State -> FuncName -> JobName -> IO Bool
forall db.
Persist db =>
db -> State -> FuncName -> JobName -> IO Bool
P.member db
p State
Running FuncName
fn JobName
jn
Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
isProc (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ IOHashMap JobHandle (Int64, Async ()) -> Job -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db, Transport tp) =>
IOHashMap JobHandle (Int64, Async ()) -> Job -> SchedT db tp m ()
reSchedJob IOHashMap JobHandle (Int64, Async ())
tl Job
job
Just w0 :: Async ()
w0 -> do
Bool
r <- FuncName -> SchedT db tp m Bool
forall (m :: * -> *) db tp.
MonadIO m =>
FuncName -> SchedT db tp m Bool
canRun FuncName
fn
Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
r (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ Async () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
cancel Async ()
w0
where fn :: FuncName
fn = Job -> FuncName
getFuncName Job
job
jn :: JobName
jn = Job -> JobName
getName Job
job
pushChanList :: MonadIO m => Action -> SchedT db tp m ()
pushChanList :: Action -> SchedT db tp m ()
pushChanList act :: Action
act = do
TVar [Action]
cl <- (SchedEnv db tp -> TVar [Action]) -> SchedT db tp m (TVar [Action])
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> TVar [Action]
forall db tp. SchedEnv db tp -> TVar [Action]
sChanList
STM () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> SchedT db tp m ()) -> STM () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ do
[Action]
l <- TVar [Action] -> STM [Action]
forall a. TVar a -> STM a
readTVar TVar [Action]
cl
TVar [Action] -> [Action] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [Action]
cl (Action
actAction -> [Action] -> [Action]
forall a. a -> [a] -> [a]
:[Action]
l)
pushJob :: (MonadIO m, Persist db) => Job -> SchedT db tp m ()
pushJob :: Job -> SchedT db tp m ()
pushJob job :: Job
job = do
IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
infoM "Periodic.Server.Scheduler" ("pushJob: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ JobHandle -> String
forall a. Show a => a -> String
show (Job -> JobHandle
getHandle Job
job))
db
p <- (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
Bool
isRunning <- IO Bool -> SchedT db tp m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> SchedT db tp m Bool) -> IO Bool -> SchedT db tp m Bool
forall a b. (a -> b) -> a -> b
$ db -> State -> FuncName -> JobName -> IO Bool
forall db.
Persist db =>
db -> State -> FuncName -> JobName -> IO Bool
P.member db
p State
Running FuncName
fn JobName
jn
Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
isRunning (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ do
Job
job' <- Job -> SchedT db tp m Job
forall (m :: * -> *) db tp. MonadIO m => Job -> SchedT db tp m Job
fixedSchedAt Job
job
IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ db -> State -> FuncName -> JobName -> Job -> IO ()
forall db.
Persist db =>
db -> State -> FuncName -> JobName -> Job -> IO ()
P.insert db
p State
Pending FuncName
fn JobName
jn Job
job'
Action -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
Action -> SchedT db tp m ()
pushChanList (Job -> Action
Add Job
job')
where fn :: FuncName
fn = Job -> FuncName
getFuncName Job
job
jn :: JobName
jn = Job -> JobName
getName Job
job
fixedSchedAt :: MonadIO m => Job -> SchedT db tp m Job
fixedSchedAt :: Job -> SchedT db tp m Job
fixedSchedAt job :: Job
job = do
Int64
now <- SchedT db tp m Int64
forall (m :: * -> *). MonadIO m => m Int64
getEpochTime
if Job -> Int64
getSchedAt Job
job Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
< Int64
now then
Job -> SchedT db tp m Job
forall (m :: * -> *) a. Monad m => a -> m a
return (Job -> SchedT db tp m Job) -> Job -> SchedT db tp m Job
forall a b. (a -> b) -> a -> b
$ Int64 -> Job -> Job
setSchedAt Int64
now Job
job
else Job -> SchedT db tp m Job
forall (m :: * -> *) a. Monad m => a -> m a
return Job
job
reSchedJob :: (MonadUnliftIO m, Persist db, Transport tp) => TaskList -> Job -> SchedT db tp m ()
reSchedJob :: IOHashMap JobHandle (Int64, Async ()) -> Job -> SchedT db tp m ()
reSchedJob taskList :: IOHashMap JobHandle (Int64, Async ())
taskList job :: Job
job = do
Maybe (Async ())
w <- IOHashMap JobHandle (Int64, Async ())
-> Job -> SchedT db tp m (Maybe (Async ()))
forall (m :: * -> *) db tp.
MonadIO m =>
IOHashMap JobHandle (Int64, Async ())
-> Job -> SchedT db tp m (Maybe (Async ()))
findTask IOHashMap JobHandle (Int64, Async ())
taskList Job
job
Maybe (Async ())
-> (Async () -> SchedT db tp m ()) -> SchedT db tp m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe (Async ())
w Async () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
cancel
Int64
interval <- (Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+100) (Int64 -> Int64) -> SchedT db tp m Int64 -> SchedT db tp m Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SchedT db tp m Int64
forall (m :: * -> *) a db tp.
(MonadIO m, Num a) =>
SchedT db tp m a
pollInterval
Int64
next <- (Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int64
interval) (Int64 -> Int64) -> SchedT db tp m Int64 -> SchedT db tp m Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SchedT db tp m Int64
forall (m :: * -> *). MonadIO m => m Int64
getEpochTime
Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Job -> Int64
getSchedAt Job
job Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
< Int64
next) (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ do
Bool
r <- FuncName -> SchedT db tp m Bool
forall (m :: * -> *) db tp.
MonadIO m =>
FuncName -> SchedT db tp m Bool
canRun (FuncName -> SchedT db tp m Bool)
-> FuncName -> SchedT db tp m Bool
forall a b. (a -> b) -> a -> b
$ Job -> FuncName
getFuncName Job
job
Bool
c <- IOHashMap JobHandle (Int64, Async ()) -> SchedT db tp m Bool
forall (m :: * -> *) db tp.
MonadIO m =>
IOHashMap JobHandle (Int64, Async ()) -> SchedT db tp m Bool
check IOHashMap JobHandle (Int64, Async ())
taskList
Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool
r Bool -> Bool -> Bool
&& Bool
c) (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ do
Async ()
w' <- IOHashMap JobHandle (Int64, Async ())
-> Job -> SchedT db tp m (Async ())
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db, Transport tp) =>
IOHashMap JobHandle (Int64, Async ())
-> Job -> SchedT db tp m (Async ())
schedJob IOHashMap JobHandle (Int64, Async ())
taskList Job
job
IOHashMap JobHandle (Int64, Async ())
-> JobHandle -> (Int64, Async ()) -> SchedT db tp m ()
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> b -> m ()
FL.insert IOHashMap JobHandle (Int64, Async ())
taskList (Job -> JobHandle
getHandle Job
job) (Job -> Int64
getSchedAt Job
job, Async ()
w')
where check :: (MonadIO m) => TaskList -> SchedT db tp m Bool
check :: IOHashMap JobHandle (Int64, Async ()) -> SchedT db tp m Bool
check tl :: IOHashMap JobHandle (Int64, Async ())
tl = do
Int
maxBatchSize <- TVar Int -> SchedT db tp m Int
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (TVar Int -> SchedT db tp m Int)
-> SchedT db tp m (TVar Int) -> SchedT db tp m Int
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (SchedEnv db tp -> TVar Int) -> SchedT db tp m (TVar Int)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> TVar Int
forall db tp. SchedEnv db tp -> TVar Int
sMaxBatchSize
Int
size <- IOHashMap JobHandle (Int64, Async ()) -> SchedT db tp m Int
forall (m :: * -> *) a b. MonadIO m => IOHashMap a b -> m Int
FL.size IOHashMap JobHandle (Int64, Async ())
tl
if Int
size Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
maxBatchSize Int -> Int -> Int
forall a. Num a => a -> a -> a
* 2 then Bool -> SchedT db tp m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
else do
Maybe (Int64, JobHandle, Async ())
lastTask <- IOHashMap JobHandle (Int64, Async ())
-> SchedT db tp m (Maybe (Int64, JobHandle, Async ()))
forall (m :: * -> *) db tp.
MonadIO m =>
IOHashMap JobHandle (Int64, Async ())
-> SchedT db tp m (Maybe (Int64, JobHandle, Async ()))
findLastTask IOHashMap JobHandle (Int64, Async ())
tl
case Maybe (Int64, JobHandle, Async ())
lastTask of
Nothing -> Bool -> SchedT db tp m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Just (sc :: Int64
sc, jh :: JobHandle
jh, w :: Async ()
w) ->
if Int64
sc Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
< Job -> Int64
getSchedAt Job
job then Bool -> SchedT db tp m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
else do
Async () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
cancel Async ()
w
IOHashMap JobHandle (Int64, Async ())
-> JobHandle -> SchedT db tp m ()
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> m ()
FL.delete IOHashMap JobHandle (Int64, Async ())
taskList JobHandle
jh
Bool -> SchedT db tp m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
findTask :: MonadIO m => TaskList -> Job -> SchedT db tp m (Maybe (Async ()))
findTask :: IOHashMap JobHandle (Int64, Async ())
-> Job -> SchedT db tp m (Maybe (Async ()))
findTask taskList :: IOHashMap JobHandle (Int64, Async ())
taskList job :: Job
job = ((Int64, Async ()) -> Async ())
-> Maybe (Int64, Async ()) -> Maybe (Async ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int64, Async ()) -> Async ()
forall a b. (a, b) -> b
snd (Maybe (Int64, Async ()) -> Maybe (Async ()))
-> SchedT db tp m (Maybe (Int64, Async ()))
-> SchedT db tp m (Maybe (Async ()))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IOHashMap JobHandle (Int64, Async ())
-> JobHandle -> SchedT db tp m (Maybe (Int64, Async ()))
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> m (Maybe b)
FL.lookup IOHashMap JobHandle (Int64, Async ())
taskList (Job -> JobHandle
getHandle Job
job)
findLastTask :: MonadIO m => TaskList -> SchedT db tp m (Maybe (Int64, JobHandle, Async ()))
findLastTask :: IOHashMap JobHandle (Int64, Async ())
-> SchedT db tp m (Maybe (Int64, JobHandle, Async ()))
findLastTask tl :: IOHashMap JobHandle (Int64, Async ())
tl = STM (Maybe (Int64, JobHandle, Async ()))
-> SchedT db tp m (Maybe (Int64, JobHandle, Async ()))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (Maybe (Int64, JobHandle, Async ()))
-> SchedT db tp m (Maybe (Int64, JobHandle, Async ())))
-> STM (Maybe (Int64, JobHandle, Async ()))
-> SchedT db tp m (Maybe (Int64, JobHandle, Async ()))
forall a b. (a -> b) -> a -> b
$ IOHashMap JobHandle (Int64, Async ())
-> (JobHandle
-> (Int64, Async ())
-> Maybe (Int64, JobHandle, Async ())
-> Maybe (Int64, JobHandle, Async ()))
-> Maybe (Int64, JobHandle, Async ())
-> STM (Maybe (Int64, JobHandle, Async ()))
forall a b c. IOHashMap a b -> (a -> b -> c -> c) -> c -> STM c
FL.foldrWithKeySTM IOHashMap JobHandle (Int64, Async ())
tl JobHandle
-> (Int64, Async ())
-> Maybe (Int64, JobHandle, Async ())
-> Maybe (Int64, JobHandle, Async ())
forall a.
JobHandle
-> (Int64, a)
-> Maybe (Int64, JobHandle, a)
-> Maybe (Int64, JobHandle, a)
f Maybe (Int64, JobHandle, Async ())
forall a. Maybe a
Nothing
where f :: JobHandle
-> (Int64, a)
-> Maybe (Int64, JobHandle, a)
-> Maybe (Int64, JobHandle, a)
f :: JobHandle
-> (Int64, a)
-> Maybe (Int64, JobHandle, a)
-> Maybe (Int64, JobHandle, a)
f jh :: JobHandle
jh (sc :: Int64
sc, t :: a
t) Nothing = (Int64, JobHandle, a) -> Maybe (Int64, JobHandle, a)
forall a. a -> Maybe a
Just (Int64
sc, JobHandle
jh, a
t)
f jh :: JobHandle
jh (sc :: Int64
sc, t :: a
t) (Just (sc1 :: Int64
sc1, jh1 :: JobHandle
jh1, t1 :: a
t1))
| Int64
sc Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
> Int64
sc1 = (Int64, JobHandle, a) -> Maybe (Int64, JobHandle, a)
forall a. a -> Maybe a
Just (Int64
sc, JobHandle
jh, a
t)
| Bool
otherwise = (Int64, JobHandle, a) -> Maybe (Int64, JobHandle, a)
forall a. a -> Maybe a
Just (Int64
sc1, JobHandle
jh1, a
t1)
canRun :: MonadIO m => FuncName -> SchedT db tp m Bool
canRun :: FuncName -> SchedT db tp m Bool
canRun fn :: FuncName
fn = (SchedEnv db tp -> FuncStatList) -> SchedT db tp m FuncStatList
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> FuncStatList
forall db tp. SchedEnv db tp -> FuncStatList
sFuncStatList SchedT db tp m FuncStatList
-> (FuncStatList -> SchedT db tp m Bool) -> SchedT db tp m Bool
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (FuncStatList -> FuncName -> SchedT db tp m Bool)
-> FuncName -> FuncStatList -> SchedT db tp m Bool
forall a b c. (a -> b -> c) -> b -> a -> c
flip FuncStatList -> FuncName -> SchedT db tp m Bool
forall (m :: * -> *).
MonadIO m =>
FuncStatList -> FuncName -> m Bool
canRun_ FuncName
fn
canRun_ :: MonadIO m => FuncStatList -> FuncName -> m Bool
canRun_ :: FuncStatList -> FuncName -> m Bool
canRun_ stList :: FuncStatList
stList fn :: FuncName
fn = do
Maybe FuncStat
st0 <- FuncStatList -> FuncName -> m (Maybe FuncStat)
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> m (Maybe b)
FL.lookup FuncStatList
stList FuncName
fn
case Maybe FuncStat
st0 of
Nothing -> Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
Just FuncStat{sWorker :: FuncStat -> Int64
sWorker=Int64
0} -> Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
Just _ -> Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
schedJob
:: (MonadUnliftIO m, Persist db, Transport tp)
=> TaskList -> Job -> SchedT db tp m (Async ())
schedJob :: IOHashMap JobHandle (Int64, Async ())
-> Job -> SchedT db tp m (Async ())
schedJob taskList :: IOHashMap JobHandle (Int64, Async ())
taskList = SchedT db tp m () -> SchedT db tp m (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (SchedT db tp m () -> SchedT db tp m (Async ()))
-> (Job -> SchedT db tp m ()) -> Job -> SchedT db tp m (Async ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IOHashMap JobHandle (Int64, Async ()) -> Job -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db, Transport tp) =>
IOHashMap JobHandle (Int64, Async ()) -> Job -> SchedT db tp m ()
schedJob_ IOHashMap JobHandle (Int64, Async ())
taskList
schedJob_ :: (MonadUnliftIO m, Persist db, Transport tp) => TaskList -> Job -> SchedT db tp m ()
schedJob_ :: IOHashMap JobHandle (Int64, Async ()) -> Job -> SchedT db tp m ()
schedJob_ taskList :: IOHashMap JobHandle (Int64, Async ())
taskList job :: Job
job = do
SchedEnv{..} <- SchedT db tp m (SchedEnv db tp)
forall r (m :: * -> *). MonadReader r m => m r
ask
Bool
r <- FuncName -> SchedT db tp m Bool
forall (m :: * -> *) db tp.
MonadIO m =>
FuncName -> SchedT db tp m Bool
canRun FuncName
fn
Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
r (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ do
Int64
now <- SchedT db tp m Int64
forall (m :: * -> *). MonadIO m => m Int64
getEpochTime
Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int64
schedAt Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
> Int64
now Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ 1) (SchedT db tp m () -> SchedT db tp m ())
-> (Int64 -> SchedT db tp m ()) -> Int64 -> SchedT db tp m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> SchedT db tp m ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay (Int -> SchedT db tp m ())
-> (Int64 -> Int) -> Int64 -> SchedT db tp m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> SchedT db tp m ()) -> Int64 -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ (Int64
schedAt Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
now) Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* 1000000
FuncStat{..} <- STM FuncStat -> SchedT db tp m FuncStat
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM FuncStat -> SchedT db tp m FuncStat)
-> STM FuncStat -> SchedT db tp m FuncStat
forall a b. (a -> b) -> a -> b
$ do
Maybe FuncStat
st <- FuncStatList -> FuncName -> STM (Maybe FuncStat)
forall a b.
(Eq a, Hashable a) =>
IOHashMap a b -> a -> STM (Maybe b)
FL.lookupSTM FuncStatList
sFuncStatList FuncName
fn
case Maybe FuncStat
st of
Nothing -> STM FuncStat
forall a. STM a
retrySTM
Just FuncStat{sWorker :: FuncStat -> Int64
sWorker=Int64
0} -> STM FuncStat
forall a. STM a
retrySTM
Just st' :: FuncStat
st' -> FuncStat -> STM FuncStat
forall (f :: * -> *) a. Applicative f => a -> f a
pure FuncStat
st'
if Bool
sBroadcast then SchedT db tp m ()
forall (m :: * -> *) tp db.
(MonadUnliftIO m, Transport tp) =>
SchedT db tp m ()
popAgentListThen
else IOHashMap JobHandle (Int64, Async ()) -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db, Transport tp) =>
IOHashMap JobHandle (Int64, Async ()) -> SchedT db tp m ()
popAgentThen IOHashMap JobHandle (Int64, Async ())
taskList
where fn :: FuncName
fn = Job -> FuncName
getFuncName Job
job
jn :: JobName
jn = Job -> JobName
getName Job
job
schedAt :: Int64
schedAt = Job -> Int64
getSchedAt Job
job
jh :: JobHandle
jh = Job -> JobHandle
getHandle Job
job
popAgentThen
:: (MonadUnliftIO m, Persist db, Transport tp) => TaskList -> SchedT db tp m ()
popAgentThen :: IOHashMap JobHandle (Int64, Async ()) -> SchedT db tp m ()
popAgentThen tl :: IOHashMap JobHandle (Int64, Async ())
tl = do
SchedEnv{..} <- SchedT db tp m (SchedEnv db tp)
forall r (m :: * -> *). MonadReader r m => m r
ask
(jq :: IOList JobHandle
jq, env0 :: CSEnv tp
env0) <- STM (IOList JobHandle, CSEnv tp)
-> SchedT db tp m (IOList JobHandle, CSEnv tp)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (IOList JobHandle, CSEnv tp)
-> SchedT db tp m (IOList JobHandle, CSEnv tp))
-> STM (IOList JobHandle, CSEnv tp)
-> SchedT db tp m (IOList JobHandle, CSEnv tp)
forall a b. (a -> b) -> a -> b
$ GrabQueue tp -> FuncName -> STM (IOList JobHandle, CSEnv tp)
forall tp.
GrabQueue tp -> FuncName -> STM (IOList JobHandle, CSEnv tp)
popAgentSTM GrabQueue tp
sGrabQueue FuncName
fn
Bool
alive <- CSEnv tp
-> SessionT
ClientConfig Nid Msgid (Packet Command) tp (SchedT db tp m) Bool
-> SchedT db tp m Bool
forall u nid k rpkt tp (m :: * -> *) a.
SessionEnv1 u nid k rpkt tp -> SessionT u nid k rpkt tp m a -> m a
runSessionT1 CSEnv tp
env0 SessionT
ClientConfig Nid Msgid (Packet Command) tp (SchedT db tp m) Bool
forall (m :: * -> *) u nid k rpkt tp.
MonadIO m =>
SessionT u nid k rpkt tp m Bool
sessionState
if Bool
alive then do
IOList JobHandle -> JobHandle -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IOList a -> a -> m ()
IL.insert IOList JobHandle
jq (Job -> JobHandle
getHandle Job
job)
Int64
nextSchedAt <- SchedT db tp m Int64
forall (m :: * -> *). MonadIO m => m Int64
getEpochTime
IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ db -> State -> FuncName -> JobName -> Job -> IO ()
forall db.
Persist db =>
db -> State -> FuncName -> JobName -> Job -> IO ()
P.insert db
sPersist State
Running FuncName
fn JobName
jn (Job -> IO ()) -> Job -> IO ()
forall a b. (a -> b) -> a -> b
$ Int64 -> Job -> Job
setSchedAt Int64
nextSchedAt Job
job
Either SomeException ()
r <- CSEnv tp -> SchedT db tp m (Either SomeException ())
forall (m :: * -> *) tp db.
(MonadUnliftIO m, Transport tp) =>
CSEnv tp -> SchedT db tp m (Either SomeException ())
doSubmitJob CSEnv tp
env0
case Either SomeException ()
r of
Left _ -> do
IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ db -> State -> FuncName -> JobName -> Job -> IO ()
forall db.
Persist db =>
db -> State -> FuncName -> JobName -> Job -> IO ()
P.insert db
sPersist State
Pending FuncName
fn JobName
jn (Job -> IO ()) -> Job -> IO ()
forall a b. (a -> b) -> a -> b
$ Int64 -> Job -> Job
setSchedAt Int64
nextSchedAt Job
job
IOList JobHandle -> JobHandle -> SchedT db tp m ()
forall a (m :: * -> *). (Eq a, MonadIO m) => IOList a -> a -> m ()
IL.delete IOList JobHandle
jq JobHandle
jh
IOHashMap JobHandle (Int64, Async ()) -> Job -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db, Transport tp) =>
IOHashMap JobHandle (Int64, Async ()) -> Job -> SchedT db tp m ()
schedJob_ IOHashMap JobHandle (Int64, Async ())
tl Job
job
Right _ -> SchedT db tp m ()
forall (m :: * -> *) db tp. MonadIO m => SchedT db tp m ()
endSchedJob
else IOHashMap JobHandle (Int64, Async ()) -> Job -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db, Transport tp) =>
IOHashMap JobHandle (Int64, Async ()) -> Job -> SchedT db tp m ()
schedJob_ IOHashMap JobHandle (Int64, Async ())
tl Job
job
popAgentListThen :: (MonadUnliftIO m, Transport tp) => SchedT db tp m ()
popAgentListThen :: SchedT db tp m ()
popAgentListThen = do
SchedEnv{..} <- SchedT db tp m (SchedEnv db tp)
forall r (m :: * -> *). MonadReader r m => m r
ask
[(IOList JobHandle, CSEnv tp)]
agents <- GrabQueue tp
-> FuncName -> SchedT db tp m [(IOList JobHandle, CSEnv tp)]
forall (m :: * -> *) tp.
MonadIO m =>
GrabQueue tp -> FuncName -> m [(IOList JobHandle, CSEnv tp)]
popAgentList GrabQueue tp
sGrabQueue FuncName
fn
((IOList JobHandle, CSEnv tp)
-> SchedT db tp m (Either SomeException ()))
-> [(IOList JobHandle, CSEnv tp)] -> SchedT db tp m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (CSEnv tp -> SchedT db tp m (Either SomeException ())
forall (m :: * -> *) tp db.
(MonadUnliftIO m, Transport tp) =>
CSEnv tp -> SchedT db tp m (Either SomeException ())
doSubmitJob (CSEnv tp -> SchedT db tp m (Either SomeException ()))
-> ((IOList JobHandle, CSEnv tp) -> CSEnv tp)
-> (IOList JobHandle, CSEnv tp)
-> SchedT db tp m (Either SomeException ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (IOList JobHandle, CSEnv tp) -> CSEnv tp
forall a b. (a, b) -> b
snd) [(IOList JobHandle, CSEnv tp)]
agents
Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([(IOList JobHandle, CSEnv tp)] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(IOList JobHandle, CSEnv tp)]
agents) SchedT db tp m ()
forall (m :: * -> *) db tp. MonadIO m => SchedT db tp m ()
endSchedJob
doSubmitJob :: (MonadUnliftIO m, Transport tp) => CSEnv tp -> SchedT db tp m (Either SomeException ())
doSubmitJob :: CSEnv tp -> SchedT db tp m (Either SomeException ())
doSubmitJob agent :: CSEnv tp
agent = do
SchedEnv{..} <- SchedT db tp m (SchedEnv db tp)
forall r (m :: * -> *). MonadReader r m => m r
ask
SchedT db tp m () -> SchedT db tp m (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (SchedT db tp m () -> SchedT db tp m (Either SomeException ()))
-> SchedT db tp m () -> SchedT db tp m (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ CSEnv tp -> Job -> SchedT db tp m ()
forall (m :: * -> *) tp.
(MonadUnliftIO m, Transport tp) =>
CSEnv tp -> Job -> m ()
assignJob CSEnv tp
agent Job
job
endSchedJob :: MonadIO m => SchedT db tp m ()
endSchedJob :: SchedT db tp m ()
endSchedJob = Action -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
Action -> SchedT db tp m ()
pushChanList (JobHandle -> Action
TryPoll JobHandle
jh)
adjustFuncStat :: (MonadIO m, Persist db) => FuncName -> SchedT db tp m ()
adjustFuncStat :: FuncName -> SchedT db tp m ()
adjustFuncStat fn :: FuncName
fn = do
SchedEnv{..} <- SchedT db tp m (SchedEnv db tp)
forall r (m :: * -> *). MonadReader r m => m r
ask
Int64
size <- IO Int64 -> SchedT db tp m Int64
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int64 -> SchedT db tp m Int64)
-> IO Int64 -> SchedT db tp m Int64
forall a b. (a -> b) -> a -> b
$ db -> State -> FuncName -> IO Int64
forall db. Persist db => db -> State -> FuncName -> IO Int64
P.size db
sPersist State
Pending FuncName
fn
Int64
sizePQ <- IO Int64 -> SchedT db tp m Int64
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int64 -> SchedT db tp m Int64)
-> IO Int64 -> SchedT db tp m Int64
forall a b. (a -> b) -> a -> b
$ db -> State -> FuncName -> IO Int64
forall db. Persist db => db -> State -> FuncName -> IO Int64
P.size db
sPersist State
Running FuncName
fn
Int64
sizeL <- IO Int64 -> SchedT db tp m Int64
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int64 -> SchedT db tp m Int64)
-> IO Int64 -> SchedT db tp m Int64
forall a b. (a -> b) -> a -> b
$ db -> State -> FuncName -> IO Int64
forall db. Persist db => db -> State -> FuncName -> IO Int64
P.size db
sPersist State
Locking FuncName
fn
Int64
sc <- IO Int64 -> SchedT db tp m Int64
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int64 -> SchedT db tp m Int64)
-> IO Int64 -> SchedT db tp m Int64
forall a b. (a -> b) -> a -> b
$ db -> FuncName -> IO Int64
forall db. Persist db => db -> FuncName -> IO Int64
P.minSchedAt db
sPersist FuncName
fn
Int64
schedAt <- if Int64
sc Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
> 0 then Int64 -> SchedT db tp m Int64
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
sc else SchedT db tp m Int64
forall (m :: * -> *). MonadIO m => m Int64
getEpochTime
FuncStatList
-> (Maybe FuncStat -> Maybe FuncStat)
-> FuncName
-> SchedT db tp m ()
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> (Maybe b -> Maybe b) -> a -> m ()
FL.alter FuncStatList
sFuncStatList (Int64
-> Int64 -> Int64 -> Int64 -> Maybe FuncStat -> Maybe FuncStat
update (Int64
size Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int64
sizePQ Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int64
sizeL) Int64
sizePQ Int64
sizeL Int64
schedAt) FuncName
fn
where update :: Int64 -> Int64 -> Int64 -> Int64 -> Maybe FuncStat -> Maybe FuncStat
update :: Int64
-> Int64 -> Int64 -> Int64 -> Maybe FuncStat -> Maybe FuncStat
update size :: Int64
size sizePQ :: Int64
sizePQ sizeL :: Int64
sizeL schedAt :: Int64
schedAt st :: Maybe FuncStat
st =
FuncStat -> Maybe FuncStat
forall a. a -> Maybe a
Just ((FuncStat -> Maybe FuncStat -> FuncStat
forall a. a -> Maybe a -> a
fromMaybe (FuncName -> FuncStat
funcStat FuncName
fn) Maybe FuncStat
st) { sJob :: Int64
sJob = Int64
size
, sRunning :: Int64
sRunning = Int64
sizePQ
, sLocking :: Int64
sLocking = Int64
sizeL
, sSchedAt :: Int64
sSchedAt = Int64
schedAt
})
removeJob :: (MonadIO m, Persist db) => Job -> SchedT db tp m ()
removeJob :: Job -> SchedT db tp m ()
removeJob job :: Job
job = do
IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
infoM "Periodic.Server.Scheduler" ("removeJob: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ JobHandle -> String
forall a. Show a => a -> String
show (Job -> JobHandle
getHandle Job
job))
db
p <- (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ db -> FuncName -> JobName -> IO ()
forall db. Persist db => db -> FuncName -> JobName -> IO ()
P.delete db
p FuncName
fn JobName
jn
Action -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
Action -> SchedT db tp m ()
pushChanList (Job -> Action
Remove Job
job)
JobHandle -> ByteString -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
JobHandle -> ByteString -> SchedT db tp m ()
pushResult JobHandle
jh ""
where jn :: JobName
jn = Job -> JobName
getName Job
job
fn :: FuncName
fn = Job -> FuncName
getFuncName Job
job
jh :: JobHandle
jh = Job -> JobHandle
getHandle Job
job
dumpJob :: (MonadIO m, Persist db) => SchedT db tp m [Job]
dumpJob :: SchedT db tp m [Job]
dumpJob = IO [Job] -> SchedT db tp m [Job]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [Job] -> SchedT db tp m [Job])
-> (db -> IO [Job]) -> db -> SchedT db tp m [Job]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. db -> IO [Job]
forall db. Persist db => db -> IO [Job]
P.dumpJob (db -> SchedT db tp m [Job])
-> SchedT db tp m db -> SchedT db tp m [Job]
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
alterFunc :: (MonadIO m, Persist db) => FuncName -> (Maybe FuncStat -> Maybe FuncStat) -> SchedT db tp m ()
alterFunc :: FuncName -> (Maybe FuncStat -> Maybe FuncStat) -> SchedT db tp m ()
alterFunc n :: FuncName
n f :: Maybe FuncStat -> Maybe FuncStat
f = do
SchedEnv{..} <- SchedT db tp m (SchedEnv db tp)
forall r (m :: * -> *). MonadReader r m => m r
ask
FuncStatList
-> (Maybe FuncStat -> Maybe FuncStat)
-> FuncName
-> SchedT db tp m ()
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> (Maybe b -> Maybe b) -> a -> m ()
FL.alter FuncStatList
sFuncStatList Maybe FuncStat -> Maybe FuncStat
f FuncName
n
IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ db -> FuncName -> IO ()
forall db. Persist db => db -> FuncName -> IO ()
P.insertFuncName db
sPersist FuncName
n
Action -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
Action -> SchedT db tp m ()
pushChanList Action
PollJob
addFunc :: (MonadIO m, Persist db) => FuncName -> SchedT db tp m ()
addFunc :: FuncName -> SchedT db tp m ()
addFunc n :: FuncName
n = FuncName -> Bool -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
FuncName -> Bool -> SchedT db tp m ()
broadcastFunc FuncName
n Bool
False
broadcastFunc :: (MonadIO m, Persist db) => FuncName -> Bool -> SchedT db tp m ()
broadcastFunc :: FuncName -> Bool -> SchedT db tp m ()
broadcastFunc n :: FuncName
n cast :: Bool
cast = do
IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
infoM "Periodic.Server.Scheduler" (String
h String -> String -> String
forall a. [a] -> [a] -> [a]
++ ": " String -> String -> String
forall a. [a] -> [a] -> [a]
++ FuncName -> String
forall a. Show a => a -> String
show FuncName
n)
FuncName -> (Maybe FuncStat -> Maybe FuncStat) -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
FuncName -> (Maybe FuncStat -> Maybe FuncStat) -> SchedT db tp m ()
alterFunc FuncName
n Maybe FuncStat -> Maybe FuncStat
updateStat
where updateStat :: Maybe FuncStat -> Maybe FuncStat
updateStat :: Maybe FuncStat -> Maybe FuncStat
updateStat Nothing = FuncStat -> Maybe FuncStat
forall a. a -> Maybe a
Just ((FuncName -> FuncStat
funcStat FuncName
n) {sWorker :: Int64
sWorker = 1, sBroadcast :: Bool
sBroadcast = Bool
cast})
updateStat (Just fs :: FuncStat
fs) = FuncStat -> Maybe FuncStat
forall a. a -> Maybe a
Just (FuncStat
fs { sWorker :: Int64
sWorker = FuncStat -> Int64
sWorker FuncStat
fs Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ 1, sBroadcast :: Bool
sBroadcast = Bool
cast })
h :: String
h = if Bool
cast then "broadcastFunc" else "addFunc"
removeFunc :: (MonadIO m, Persist db) => FuncName -> SchedT db tp m ()
removeFunc :: FuncName -> SchedT db tp m ()
removeFunc n :: FuncName
n = do
IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
infoM "Periodic.Server.Scheduler" ("removeFunc: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ FuncName -> String
forall a. Show a => a -> String
show FuncName
n)
FuncName -> (Maybe FuncStat -> Maybe FuncStat) -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
FuncName -> (Maybe FuncStat -> Maybe FuncStat) -> SchedT db tp m ()
alterFunc FuncName
n Maybe FuncStat -> Maybe FuncStat
updateStat
where updateStat :: Maybe FuncStat -> Maybe FuncStat
updateStat :: Maybe FuncStat -> Maybe FuncStat
updateStat Nothing = FuncStat -> Maybe FuncStat
forall a. a -> Maybe a
Just (FuncName -> FuncStat
funcStat FuncName
n)
updateStat (Just fs :: FuncStat
fs) = FuncStat -> Maybe FuncStat
forall a. a -> Maybe a
Just (FuncStat
fs { sWorker :: Int64
sWorker = Int64 -> Int64 -> Int64
forall a. Ord a => a -> a -> a
max (FuncStat -> Int64
sWorker FuncStat
fs Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- 1) 0 })
dropFunc :: (MonadUnliftIO m, Persist db) => FuncName -> SchedT db tp m ()
dropFunc :: FuncName -> SchedT db tp m ()
dropFunc n :: FuncName
n = do
IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
infoM "Periodic.Server.Scheduler" ("dropFunc: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ FuncName -> String
forall a. Show a => a -> String
show FuncName
n)
SchedEnv{..} <- SchedT db tp m (SchedEnv db tp)
forall r (m :: * -> *). MonadReader r m => m r
ask
Lock -> SchedT db tp m () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadUnliftIO m => Lock -> m a -> m a
L.with Lock
sLocker (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ do
Maybe FuncStat
st <- FuncStatList -> FuncName -> SchedT db tp m (Maybe FuncStat)
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> m (Maybe b)
FL.lookup FuncStatList
sFuncStatList FuncName
n
case Maybe FuncStat
st of
Just FuncStat{sWorker :: FuncStat -> Int64
sWorker=Int64
0} -> do
FuncStatList -> FuncName -> SchedT db tp m ()
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> m ()
FL.delete FuncStatList
sFuncStatList FuncName
n
IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ db -> FuncName -> IO ()
forall db. Persist db => db -> FuncName -> IO ()
P.removeFuncName db
sPersist FuncName
n
_ -> () -> SchedT db tp m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Action -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
Action -> SchedT db tp m ()
pushChanList Action
PollJob
pushGrab :: MonadIO m => IOList FuncName -> IOList JobHandle -> CSEnv tp -> SchedT db tp m ()
pushGrab :: IOList FuncName
-> IOList JobHandle -> CSEnv tp -> SchedT db tp m ()
pushGrab funcList :: IOList FuncName
funcList handleList :: IOList JobHandle
handleList ag :: CSEnv tp
ag = do
GrabQueue tp
queue <- (SchedEnv db tp -> GrabQueue tp) -> SchedT db tp m (GrabQueue tp)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> GrabQueue tp
forall db tp. SchedEnv db tp -> GrabQueue tp
sGrabQueue
GrabQueue tp
-> IOList FuncName
-> IOList JobHandle
-> CSEnv tp
-> SchedT db tp m ()
forall (m :: * -> *) tp.
MonadIO m =>
GrabQueue tp
-> IOList FuncName -> IOList JobHandle -> CSEnv tp -> m ()
pushAgent GrabQueue tp
queue IOList FuncName
funcList IOList JobHandle
handleList CSEnv tp
ag
assignJob :: (MonadUnliftIO m, Transport tp) => CSEnv tp -> Job -> m ()
assignJob :: CSEnv tp -> Job -> m ()
assignJob env0 :: CSEnv tp
env0 job :: Job
job = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
infoM "Periodic.Server.Scheduler" ("assignJob: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ JobHandle -> String
forall a. Show a => a -> String
show (Job -> JobHandle
getHandle Job
job))
CSEnv tp
-> SessionT ClientConfig Nid Msgid (Packet Command) tp m () -> m ()
forall u nid k rpkt tp (m :: * -> *) a.
SessionEnv1 u nid k rpkt tp -> SessionT u nid k rpkt tp m a -> m a
runSessionT1 CSEnv tp
env0 (SessionT ClientConfig Nid Msgid (Packet Command) tp m () -> m ())
-> SessionT ClientConfig Nid Msgid (Packet Command) tp m () -> m ()
forall a b. (a -> b) -> a -> b
$ Packet ServerCommand
-> SessionT ClientConfig Nid Msgid (Packet Command) tp m ()
forall (m :: * -> *) tp spkt k u nid rpkt.
(MonadUnliftIO m, Transport tp, SendPacket spkt,
SetPacketId k spkt) =>
spkt -> SessionT u nid k rpkt tp m ()
send (Packet ServerCommand
-> SessionT ClientConfig Nid Msgid (Packet Command) tp m ())
-> Packet ServerCommand
-> SessionT ClientConfig Nid Msgid (Packet Command) tp m ()
forall a b. (a -> b) -> a -> b
$ ServerCommand -> Packet ServerCommand
forall a. a -> Packet a
packetRES (Job -> ServerCommand
JobAssign Job
job)
failJob :: (MonadUnliftIO m, Persist db) => JobHandle -> SchedT db tp m ()
failJob :: JobHandle -> SchedT db tp m ()
failJob jh :: JobHandle
jh = do
IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
infoM "Periodic.Server.Scheduler" ("failJob: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ JobHandle -> String
forall a. Show a => a -> String
show JobHandle
jh)
JobHandle -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db) =>
JobHandle -> SchedT db tp m ()
releaseLock' JobHandle
jh
Bool
isWaiting <- JobHandle -> SchedT db tp m Bool
forall (m :: * -> *) db tp.
MonadIO m =>
JobHandle -> SchedT db tp m Bool
existsWaitList JobHandle
jh
if Bool
isWaiting then do
JobHandle -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
JobHandle -> SchedT db tp m ()
removeFromWaitList JobHandle
jh
JobHandle -> ByteString -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db) =>
JobHandle -> ByteString -> SchedT db tp m ()
doneJob JobHandle
jh ""
else do
db
p <- (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
Maybe Job
job <- IO (Maybe Job) -> SchedT db tp m (Maybe Job)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe Job) -> SchedT db tp m (Maybe Job))
-> IO (Maybe Job) -> SchedT db tp m (Maybe Job)
forall a b. (a -> b) -> a -> b
$ db -> State -> FuncName -> JobName -> IO (Maybe Job)
forall db.
Persist db =>
db -> State -> FuncName -> JobName -> IO (Maybe Job)
P.lookup db
p State
Running FuncName
fn JobName
jn
Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe Job -> Bool
forall a. Maybe a -> Bool
isJust Maybe Job
job) (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ do
Int64
nextSchedAt <- SchedT db tp m Int64
forall (m :: * -> *). MonadIO m => m Int64
getEpochTime
Job -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
Job -> SchedT db tp m ()
retryJob (Job -> SchedT db tp m ()) -> Job -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ Int64 -> Job -> Job
setSchedAt Int64
nextSchedAt (Job -> Job) -> Job -> Job
forall a b. (a -> b) -> a -> b
$ Maybe Job -> Job
forall a. HasCallStack => Maybe a -> a
fromJust Maybe Job
job
where (fn :: FuncName
fn, jn :: JobName
jn) = JobHandle -> (FuncName, JobName)
unHandle JobHandle
jh
retryJob :: (MonadIO m, Persist db) => Job -> SchedT db tp m ()
retryJob :: Job -> SchedT db tp m ()
retryJob job :: Job
job = do
db
p <- (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ db -> State -> FuncName -> JobName -> Job -> IO ()
forall db.
Persist db =>
db -> State -> FuncName -> JobName -> Job -> IO ()
P.insert db
p State
Pending FuncName
fn JobName
jn Job
job
Action -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
Action -> SchedT db tp m ()
pushChanList (Job -> Action
Add Job
job)
where fn :: FuncName
fn = Job -> FuncName
getFuncName Job
job
jn :: JobName
jn = Job -> JobName
getName Job
job
doneJob
:: (MonadUnliftIO m, Persist db)
=> JobHandle -> ByteString -> SchedT db tp m ()
doneJob :: JobHandle -> ByteString -> SchedT db tp m ()
doneJob jh :: JobHandle
jh w :: ByteString
w = do
IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
infoM "Periodic.Server.Scheduler" ("doneJob: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ JobHandle -> String
forall a. Show a => a -> String
show JobHandle
jh)
JobHandle -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db) =>
JobHandle -> SchedT db tp m ()
releaseLock' JobHandle
jh
db
p <- (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ db -> FuncName -> JobName -> IO ()
forall db. Persist db => db -> FuncName -> JobName -> IO ()
P.delete db
p FuncName
fn JobName
jn
JobHandle -> ByteString -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
JobHandle -> ByteString -> SchedT db tp m ()
pushResult JobHandle
jh ByteString
w
where (fn :: FuncName
fn, jn :: JobName
jn) = JobHandle -> (FuncName, JobName)
unHandle JobHandle
jh
schedLaterJob
:: (MonadUnliftIO m, Persist db)
=> JobHandle -> Int64 -> Int -> SchedT db tp m ()
schedLaterJob :: JobHandle -> Int64 -> Int -> SchedT db tp m ()
schedLaterJob jh :: JobHandle
jh later :: Int64
later step :: Int
step = do
IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
infoM "Periodic.Server.Scheduler" ("schedLaterJob: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ JobHandle -> String
forall a. Show a => a -> String
show JobHandle
jh)
JobHandle -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db) =>
JobHandle -> SchedT db tp m ()
releaseLock' JobHandle
jh
Bool
isWaiting <- JobHandle -> SchedT db tp m Bool
forall (m :: * -> *) db tp.
MonadIO m =>
JobHandle -> SchedT db tp m Bool
existsWaitList JobHandle
jh
if Bool
isWaiting then do
JobHandle -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
JobHandle -> SchedT db tp m ()
removeFromWaitList JobHandle
jh
JobHandle -> ByteString -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db) =>
JobHandle -> ByteString -> SchedT db tp m ()
doneJob JobHandle
jh ""
else do
db
p <- (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
Maybe Job
job <- IO (Maybe Job) -> SchedT db tp m (Maybe Job)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe Job) -> SchedT db tp m (Maybe Job))
-> IO (Maybe Job) -> SchedT db tp m (Maybe Job)
forall a b. (a -> b) -> a -> b
$ db -> State -> FuncName -> JobName -> IO (Maybe Job)
forall db.
Persist db =>
db -> State -> FuncName -> JobName -> IO (Maybe Job)
P.lookup db
p State
Running FuncName
fn JobName
jn
Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe Job -> Bool
forall a. Maybe a -> Bool
isJust Maybe Job
job) (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ do
let job' :: Job
job' = Maybe Job -> Job
forall a. HasCallStack => Maybe a -> a
fromJust Maybe Job
job
Int64
nextSchedAt <- Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
(+) Int64
later (Int64 -> Int64) -> SchedT db tp m Int64 -> SchedT db tp m Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SchedT db tp m Int64
forall (m :: * -> *). MonadIO m => m Int64
getEpochTime
Job -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
Job -> SchedT db tp m ()
retryJob (Job -> SchedT db tp m ()) -> Job -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ Int -> Job -> Job
setCount (Job -> Int
getCount Job
job' Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
step) (Job -> Job) -> Job -> Job
forall a b. (a -> b) -> a -> b
$ Int64 -> Job -> Job
setSchedAt Int64
nextSchedAt Job
job'
where (fn :: FuncName
fn, jn :: JobName
jn) = JobHandle -> (FuncName, JobName)
unHandle JobHandle
jh
acquireLock
:: (MonadUnliftIO m, Persist db)
=> LockName -> Int -> JobHandle -> SchedT db tp m Bool
acquireLock :: LockName -> Int -> JobHandle -> SchedT db tp m Bool
acquireLock name :: LockName
name count :: Int
count jh :: JobHandle
jh = do
IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
infoM "Periodic.Server.Scheduler" ("acquireLock: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ LockName -> String
forall a. Show a => a -> String
show LockName
name String -> String -> String
forall a. [a] -> [a] -> [a]
++ " " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
count String -> String -> String
forall a. [a] -> [a] -> [a]
++ " " String -> String -> String
forall a. [a] -> [a] -> [a]
++ JobHandle -> String
forall a. Show a => a -> String
show JobHandle
jh)
Lock
locker <- (SchedEnv db tp -> Lock) -> SchedT db tp m Lock
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> Lock
forall db tp. SchedEnv db tp -> Lock
sLocker
Lock -> SchedT db tp m Bool -> SchedT db tp m Bool
forall (m :: * -> *) a. MonadUnliftIO m => Lock -> m a -> m a
L.with Lock
locker (SchedT db tp m Bool -> SchedT db tp m Bool)
-> SchedT db tp m Bool -> SchedT db tp m Bool
forall a b. (a -> b) -> a -> b
$ do
LockList
lockList <- (SchedEnv db tp -> LockList) -> SchedT db tp m LockList
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> LockList
forall db tp. SchedEnv db tp -> LockList
sLockList
db
p <- (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
Maybe Job
j <- IO (Maybe Job) -> SchedT db tp m (Maybe Job)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe Job) -> SchedT db tp m (Maybe Job))
-> IO (Maybe Job) -> SchedT db tp m (Maybe Job)
forall a b. (a -> b) -> a -> b
$ db -> State -> FuncName -> JobName -> IO (Maybe Job)
forall db.
Persist db =>
db -> State -> FuncName -> JobName -> IO (Maybe Job)
P.lookup db
p State
Running FuncName
fn JobName
jn
case Maybe Job
j of
Nothing -> Bool -> SchedT db tp m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
Just job :: Job
job -> do
Bool
r <- STM Bool -> SchedT db tp m Bool
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Bool -> SchedT db tp m Bool)
-> STM Bool -> SchedT db tp m Bool
forall a b. (a -> b) -> a -> b
$ do
Maybe LockInfo
l <- LockList -> LockName -> STM (Maybe LockInfo)
forall a b.
(Eq a, Hashable a) =>
IOHashMap a b -> a -> STM (Maybe b)
FL.lookupSTM LockList
lockList LockName
name
case Maybe LockInfo
l of
Nothing -> do
LockList -> LockName -> LockInfo -> STM ()
forall a b. (Eq a, Hashable a) => IOHashMap a b -> a -> b -> STM ()
FL.insertSTM LockList
lockList LockName
name LockInfo :: [JobHandle] -> [JobHandle] -> Int -> LockInfo
LockInfo
{ acquired :: [JobHandle]
acquired = [JobHandle
jh]
, locked :: [JobHandle]
locked = []
, maxCount :: Int
maxCount = Int
count
}
Bool -> STM Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
Just info :: LockInfo
info@LockInfo {..} -> do
let newCount :: Int
newCount = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
maxCount Int
count
if JobHandle
jh JobHandle -> [JobHandle] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [JobHandle]
acquired then Bool -> STM Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
else if JobHandle
jh JobHandle -> [JobHandle] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [JobHandle]
locked then Bool -> STM Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
else
if [JobHandle] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [JobHandle]
acquired Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
maxCount then do
LockList -> LockName -> LockInfo -> STM ()
forall a b. (Eq a, Hashable a) => IOHashMap a b -> a -> b -> STM ()
FL.insertSTM LockList
lockList LockName
name LockInfo
info
{ acquired :: [JobHandle]
acquired = [JobHandle]
acquired [JobHandle] -> [JobHandle] -> [JobHandle]
forall a. [a] -> [a] -> [a]
++ [JobHandle
jh]
, maxCount :: Int
maxCount = Int
newCount
}
Bool -> STM Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
else do
LockList -> LockName -> LockInfo -> STM ()
forall a b. (Eq a, Hashable a) => IOHashMap a b -> a -> b -> STM ()
FL.insertSTM LockList
lockList LockName
name LockInfo
info
{ locked :: [JobHandle]
locked = [JobHandle]
locked [JobHandle] -> [JobHandle] -> [JobHandle]
forall a. [a] -> [a] -> [a]
++ [JobHandle
jh]
, maxCount :: Int
maxCount = Int
newCount
}
Bool -> STM Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
r (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ db -> State -> FuncName -> JobName -> Job -> IO ()
forall db.
Persist db =>
db -> State -> FuncName -> JobName -> Job -> IO ()
P.insert db
p State
Locking FuncName
fn JobName
jn Job
job
Bool -> SchedT db tp m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
r
where (fn :: FuncName
fn, jn :: JobName
jn) = JobHandle -> (FuncName, JobName)
unHandle JobHandle
jh
releaseLock
:: (MonadUnliftIO m, Persist db)
=> LockName -> JobHandle -> SchedT db tp m ()
releaseLock :: LockName -> JobHandle -> SchedT db tp m ()
releaseLock name :: LockName
name jh :: JobHandle
jh = do
Lock
locker <- (SchedEnv db tp -> Lock) -> SchedT db tp m Lock
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> Lock
forall db tp. SchedEnv db tp -> Lock
sLocker
Lock -> SchedT db tp m () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadUnliftIO m => Lock -> m a -> m a
L.with Lock
locker (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ LockName -> JobHandle -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db) =>
LockName -> JobHandle -> SchedT db tp m ()
releaseLock_ LockName
name JobHandle
jh
releaseLock_
:: (MonadUnliftIO m, Persist db)
=> LockName -> JobHandle -> SchedT db tp m ()
releaseLock_ :: LockName -> JobHandle -> SchedT db tp m ()
releaseLock_ name :: LockName
name jh :: JobHandle
jh = do
IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
infoM "Periodic.Server.Scheduler" ("releaseLock: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ LockName -> String
forall a. Show a => a -> String
show LockName
name String -> String -> String
forall a. [a] -> [a] -> [a]
++ " " String -> String -> String
forall a. [a] -> [a] -> [a]
++ JobHandle -> String
forall a. Show a => a -> String
show JobHandle
jh)
db
p <- (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
LockList
lockList <- (SchedEnv db tp -> LockList) -> SchedT db tp m LockList
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> LockList
forall db tp. SchedEnv db tp -> LockList
sLockList
Maybe JobHandle
h <- STM (Maybe JobHandle) -> SchedT db tp m (Maybe JobHandle)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (Maybe JobHandle) -> SchedT db tp m (Maybe JobHandle))
-> STM (Maybe JobHandle) -> SchedT db tp m (Maybe JobHandle)
forall a b. (a -> b) -> a -> b
$ do
Maybe LockInfo
l <- LockList -> LockName -> STM (Maybe LockInfo)
forall a b.
(Eq a, Hashable a) =>
IOHashMap a b -> a -> STM (Maybe b)
FL.lookupSTM LockList
lockList LockName
name
case Maybe LockInfo
l of
Nothing -> Maybe JobHandle -> STM (Maybe JobHandle)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe JobHandle
forall a. Maybe a
Nothing
Just info :: LockInfo
info@LockInfo {..} ->
if JobHandle
jh JobHandle -> [JobHandle] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [JobHandle]
acquired then
case [JobHandle]
locked of
[] -> do
LockList -> LockName -> LockInfo -> STM ()
forall a b. (Eq a, Hashable a) => IOHashMap a b -> a -> b -> STM ()
FL.insertSTM LockList
lockList LockName
name LockInfo
info
{ acquired :: [JobHandle]
acquired = JobHandle -> [JobHandle] -> [JobHandle]
forall a. Eq a => a -> [a] -> [a]
L.delete JobHandle
jh [JobHandle]
acquired
}
Maybe JobHandle -> STM (Maybe JobHandle)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe JobHandle
forall a. Maybe a
Nothing
x :: JobHandle
x:xs :: [JobHandle]
xs -> do
LockList -> LockName -> LockInfo -> STM ()
forall a b. (Eq a, Hashable a) => IOHashMap a b -> a -> b -> STM ()
FL.insertSTM LockList
lockList LockName
name LockInfo
info
{ acquired :: [JobHandle]
acquired = JobHandle -> [JobHandle] -> [JobHandle]
forall a. Eq a => a -> [a] -> [a]
L.delete JobHandle
jh [JobHandle]
acquired
, locked :: [JobHandle]
locked = [JobHandle]
xs
}
Maybe JobHandle -> STM (Maybe JobHandle)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe JobHandle -> STM (Maybe JobHandle))
-> Maybe JobHandle -> STM (Maybe JobHandle)
forall a b. (a -> b) -> a -> b
$ JobHandle -> Maybe JobHandle
forall a. a -> Maybe a
Just JobHandle
x
else Maybe JobHandle -> STM (Maybe JobHandle)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe JobHandle
forall a. Maybe a
Nothing
case Maybe JobHandle
h of
Nothing -> () -> SchedT db tp m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just hh :: JobHandle
hh -> do
let (fn :: FuncName
fn, jn :: JobName
jn) = JobHandle -> (FuncName, JobName)
unHandle JobHandle
hh
Maybe Job
j <- IO (Maybe Job) -> SchedT db tp m (Maybe Job)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe Job) -> SchedT db tp m (Maybe Job))
-> IO (Maybe Job) -> SchedT db tp m (Maybe Job)
forall a b. (a -> b) -> a -> b
$ db -> State -> FuncName -> JobName -> IO (Maybe Job)
forall db.
Persist db =>
db -> State -> FuncName -> JobName -> IO (Maybe Job)
P.lookup db
p State
Locking FuncName
fn JobName
jn
case Maybe Job
j of
Nothing -> LockName -> JobHandle -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db) =>
LockName -> JobHandle -> SchedT db tp m ()
releaseLock_ LockName
name JobHandle
hh
Just job :: Job
job -> do
IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ db -> State -> FuncName -> JobName -> Job -> IO ()
forall db.
Persist db =>
db -> State -> FuncName -> JobName -> Job -> IO ()
P.insert db
p State
Pending FuncName
fn JobName
jn Job
job
Action -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
Action -> SchedT db tp m ()
pushChanList (Job -> Action
Add Job
job)
releaseLock'
:: (MonadUnliftIO m, Persist db)
=> JobHandle -> SchedT db tp m ()
releaseLock' :: JobHandle -> SchedT db tp m ()
releaseLock' jh :: JobHandle
jh = do
LockList
lockList <- (SchedEnv db tp -> LockList) -> SchedT db tp m LockList
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> LockList
forall db tp. SchedEnv db tp -> LockList
sLockList
[LockName]
names <- STM [LockName] -> SchedT db tp m [LockName]
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM [LockName] -> SchedT db tp m [LockName])
-> STM [LockName] -> SchedT db tp m [LockName]
forall a b. (a -> b) -> a -> b
$ LockList
-> (LockName -> LockInfo -> [LockName] -> [LockName])
-> [LockName]
-> STM [LockName]
forall a b c. IOHashMap a b -> (a -> b -> c -> c) -> c -> STM c
FL.foldrWithKeySTM LockList
lockList LockName -> LockInfo -> [LockName] -> [LockName]
foldFunc []
(LockName -> SchedT db tp m ()) -> [LockName] -> SchedT db tp m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (LockName -> JobHandle -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db) =>
LockName -> JobHandle -> SchedT db tp m ()
`releaseLock` JobHandle
jh) [LockName]
names
where foldFunc :: LockName -> LockInfo -> [LockName] -> [LockName]
foldFunc :: LockName -> LockInfo -> [LockName] -> [LockName]
foldFunc n :: LockName
n LockInfo {..} acc :: [LockName]
acc | JobHandle
jh JobHandle -> [JobHandle] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [JobHandle]
acquired = LockName
n LockName -> [LockName] -> [LockName]
forall a. a -> [a] -> [a]
: [LockName]
acc
| JobHandle
jh JobHandle -> [JobHandle] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [JobHandle]
locked = LockName
n LockName -> [LockName] -> [LockName]
forall a. a -> [a] -> [a]
: [LockName]
acc
| Bool
otherwise = [LockName]
acc
countLock :: MonadUnliftIO m => (LockInfo -> [JobHandle]) -> FuncName -> SchedT db tp m Int
countLock :: (LockInfo -> [JobHandle]) -> FuncName -> SchedT db tp m Int
countLock f :: LockInfo -> [JobHandle]
f fn :: FuncName
fn = do
LockList
lockList <- (SchedEnv db tp -> LockList) -> SchedT db tp m LockList
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> LockList
forall db tp. SchedEnv db tp -> LockList
sLockList
[Int] -> Int
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ([Int] -> Int) -> ([LockInfo] -> [Int]) -> [LockInfo] -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (LockInfo -> Int) -> [LockInfo] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map LockInfo -> Int
mapFunc ([LockInfo] -> Int)
-> SchedT db tp m [LockInfo] -> SchedT db tp m Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> LockList -> SchedT db tp m [LockInfo]
forall (m :: * -> *) a b. MonadIO m => IOHashMap a b -> m [b]
FL.elems LockList
lockList
where filterFunc :: JobHandle -> Bool
filterFunc :: JobHandle -> Bool
filterFunc jh :: JobHandle
jh = FuncName
fn0 FuncName -> FuncName -> Bool
forall a. Eq a => a -> a -> Bool
== FuncName
fn
where (fn0 :: FuncName
fn0, _) = JobHandle -> (FuncName, JobName)
unHandle JobHandle
jh
mapFunc :: LockInfo -> Int
mapFunc :: LockInfo -> Int
mapFunc = [JobHandle] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ([JobHandle] -> Int)
-> (LockInfo -> [JobHandle]) -> LockInfo -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (JobHandle -> Bool) -> [JobHandle] -> [JobHandle]
forall a. (a -> Bool) -> [a] -> [a]
filter JobHandle -> Bool
filterFunc ([JobHandle] -> [JobHandle])
-> (LockInfo -> [JobHandle]) -> LockInfo -> [JobHandle]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LockInfo -> [JobHandle]
f
getMaxLockCount :: MonadUnliftIO m => SchedT db tp m Int
getMaxLockCount :: SchedT db tp m Int
getMaxLockCount = do
LockList
lockList <- (SchedEnv db tp -> LockList) -> SchedT db tp m LockList
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> LockList
forall db tp. SchedEnv db tp -> LockList
sLockList
[Int] -> Int
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
maximum ([Int] -> Int) -> ([LockInfo] -> [Int]) -> [LockInfo] -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (LockInfo -> Int) -> [LockInfo] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map LockInfo -> Int
maxCount ([LockInfo] -> Int)
-> SchedT db tp m [LockInfo] -> SchedT db tp m Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> LockList -> SchedT db tp m [LockInfo]
forall (m :: * -> *) a b. MonadIO m => IOHashMap a b -> m [b]
FL.elems LockList
lockList
status :: (MonadIO m, Persist db) => SchedT db tp m [FuncStat]
status :: SchedT db tp m [FuncStat]
status = do
(FuncName -> SchedT db tp m ()) -> [FuncName] -> SchedT db tp m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ FuncName -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
FuncName -> SchedT db tp m ()
adjustFuncStat ([FuncName] -> SchedT db tp m ())
-> SchedT db tp m [FuncName] -> SchedT db tp m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO [FuncName] -> SchedT db tp m [FuncName]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [FuncName] -> SchedT db tp m [FuncName])
-> (db -> IO [FuncName]) -> db -> SchedT db tp m [FuncName]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. db -> IO [FuncName]
forall db. Persist db => db -> IO [FuncName]
P.funcList (db -> SchedT db tp m [FuncName])
-> SchedT db tp m db -> SchedT db tp m [FuncName]
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
FuncStatList -> SchedT db tp m [FuncStat]
forall (m :: * -> *) a b. MonadIO m => IOHashMap a b -> m [b]
FL.elems (FuncStatList -> SchedT db tp m [FuncStat])
-> SchedT db tp m FuncStatList -> SchedT db tp m [FuncStat]
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (SchedEnv db tp -> FuncStatList) -> SchedT db tp m FuncStatList
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> FuncStatList
forall db tp. SchedEnv db tp -> FuncStatList
sFuncStatList
revertRunningQueue :: (MonadUnliftIO m, Persist db) => SchedT db tp m ()
revertRunningQueue :: SchedT db tp m ()
revertRunningQueue = do
Int64
now <- SchedT db tp m Int64
forall (m :: * -> *). MonadIO m => m Int64
getEpochTime
Int64
tout <- (Int -> Int64) -> SchedT db tp m Int -> SchedT db tp m Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (SchedT db tp m Int -> SchedT db tp m Int64)
-> (TVar Int -> SchedT db tp m Int)
-> TVar Int
-> SchedT db tp m Int64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar Int -> SchedT db tp m Int
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (TVar Int -> SchedT db tp m Int64)
-> SchedT db tp m (TVar Int) -> SchedT db tp m Int64
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (SchedEnv db tp -> TVar Int) -> SchedT db tp m (TVar Int)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> TVar Int
forall db tp. SchedEnv db tp -> TVar Int
sTaskTimeout
db
p <- (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
[Job]
handles <- IO [Job] -> SchedT db tp m [Job]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [Job] -> SchedT db tp m [Job])
-> IO [Job] -> SchedT db tp m [Job]
forall a b. (a -> b) -> a -> b
$ db -> State -> (Job -> [Job] -> [Job]) -> [Job] -> IO [Job]
forall db a.
Persist db =>
db -> State -> (Job -> a -> a) -> a -> IO a
P.foldr db
p State
Running ((Job -> Bool) -> Job -> [Job] -> [Job]
foldFunc (Int64 -> Int64 -> Job -> Bool
check Int64
now Int64
tout)) []
(Job -> SchedT db tp m ()) -> [Job] -> SchedT db tp m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (JobHandle -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db) =>
JobHandle -> SchedT db tp m ()
failJob (JobHandle -> SchedT db tp m ())
-> (Job -> JobHandle) -> Job -> SchedT db tp m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Job -> JobHandle
getHandle) [Job]
handles
where foldFunc :: (Job -> Bool) -> Job -> [Job] -> [Job]
foldFunc :: (Job -> Bool) -> Job -> [Job] -> [Job]
foldFunc f :: Job -> Bool
f job :: Job
job acc :: [Job]
acc | Job -> Bool
f Job
job = Job
job Job -> [Job] -> [Job]
forall a. a -> [a] -> [a]
: [Job]
acc
| Bool
otherwise = [Job]
acc
check :: Int64 -> Int64 -> Job -> Bool
check :: Int64 -> Int64 -> Job -> Bool
check now :: Int64
now t0 :: Int64
t0 job :: Job
job
| Job -> Int
getTimeout Job
job Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> 0 = Job -> Int64
getSchedAt Job
job Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Job -> Int
getTimeout Job
job) Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
< Int64
now
| Bool
otherwise = Job -> Int64
getSchedAt Job
job Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int64 -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
t0 Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
< Int64
now
revertLockingQueue :: (MonadUnliftIO m, Persist db) => SchedT db tp m ()
revertLockingQueue :: SchedT db tp m ()
revertLockingQueue = (FuncName -> SchedT db tp m ()) -> [FuncName] -> SchedT db tp m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ FuncName -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db) =>
FuncName -> SchedT db tp m ()
checkAndReleaseLock ([FuncName] -> SchedT db tp m ())
-> SchedT db tp m [FuncName] -> SchedT db tp m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO [FuncName] -> SchedT db tp m [FuncName]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [FuncName] -> SchedT db tp m [FuncName])
-> (db -> IO [FuncName]) -> db -> SchedT db tp m [FuncName]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. db -> IO [FuncName]
forall db. Persist db => db -> IO [FuncName]
P.funcList (db -> SchedT db tp m [FuncName])
-> SchedT db tp m db -> SchedT db tp m [FuncName]
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
where checkAndReleaseLock
:: (MonadUnliftIO m, Persist db)
=> FuncName -> SchedT db tp m ()
checkAndReleaseLock :: FuncName -> SchedT db tp m ()
checkAndReleaseLock fn :: FuncName
fn = do
db
p <- (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
Int
sizeLocked <- (LockInfo -> [JobHandle]) -> FuncName -> SchedT db tp m Int
forall (m :: * -> *) db tp.
MonadUnliftIO m =>
(LockInfo -> [JobHandle]) -> FuncName -> SchedT db tp m Int
countLock LockInfo -> [JobHandle]
locked FuncName
fn
Int
sizeAcquired <- (LockInfo -> [JobHandle]) -> FuncName -> SchedT db tp m Int
forall (m :: * -> *) db tp.
MonadUnliftIO m =>
(LockInfo -> [JobHandle]) -> FuncName -> SchedT db tp m Int
countLock LockInfo -> [JobHandle]
acquired FuncName
fn
IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
infoM "Peridic.Server.Scheduler"
(String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ "LockInfo " String -> String -> String
forall a. [a] -> [a] -> [a]
++ FuncName -> String
forall a. Show a => a -> String
show FuncName
fn
String -> String -> String
forall a. [a] -> [a] -> [a]
++ " Locked:" String -> String -> String
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
sizeLocked
String -> String -> String
forall a. [a] -> [a] -> [a]
++ " Acquired:" String -> String -> String
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
sizeAcquired
Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
sizeLocked Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> 0 Bool -> Bool -> Bool
&& Int
sizeAcquired Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== 0) (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ do
Int
count <- SchedT db tp m Int
forall (m :: * -> *) db tp. MonadUnliftIO m => SchedT db tp m Int
getMaxLockCount
[Job]
handles <- IO [Job] -> SchedT db tp m [Job]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [Job] -> SchedT db tp m [Job])
-> IO [Job] -> SchedT db tp m [Job]
forall a b. (a -> b) -> a -> b
$ db
-> Int -> FuncName -> (Job -> [Job] -> [Job]) -> [Job] -> IO [Job]
forall db a.
Persist db =>
db -> Int -> FuncName -> (Job -> a -> a) -> a -> IO a
P.foldrLocking db
p Int
count FuncName
fn (:) []
(Job -> SchedT db tp m ()) -> [Job] -> SchedT db tp m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Job -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
Job -> SchedT db tp m ()
pushJob [Job]
handles
purgeExpired :: MonadIO m => SchedT db tp m ()
purgeExpired :: SchedT db tp m ()
purgeExpired = do
Int64
now <- SchedT db tp m Int64
forall (m :: * -> *). MonadIO m => m Int64
getEpochTime
WaitList
wl <- (SchedEnv db tp -> WaitList) -> SchedT db tp m WaitList
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> WaitList
forall db tp. SchedEnv db tp -> WaitList
sWaitList
Int64
ex <- (Int -> Int64) -> SchedT db tp m Int -> SchedT db tp m Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (SchedT db tp m Int -> SchedT db tp m Int64)
-> (TVar Int -> SchedT db tp m Int)
-> TVar Int
-> SchedT db tp m Int64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar Int -> SchedT db tp m Int
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (TVar Int -> SchedT db tp m Int64)
-> SchedT db tp m (TVar Int) -> SchedT db tp m Int64
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (SchedEnv db tp -> TVar Int) -> SchedT db tp m (TVar Int)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> TVar Int
forall db tp. SchedEnv db tp -> TVar Int
sExpiration
STM () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> SchedT db tp m ()) -> STM () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ do
[JobHandle]
ks <- WaitList
-> (JobHandle -> WaitItem -> [JobHandle] -> [JobHandle])
-> [JobHandle]
-> STM [JobHandle]
forall a b c. IOHashMap a b -> (a -> b -> c -> c) -> c -> STM c
FL.foldrWithKeySTM WaitList
wl ((WaitItem -> Bool)
-> JobHandle -> WaitItem -> [JobHandle] -> [JobHandle]
foldFunc (Int64 -> WaitItem -> Bool
check (Int64
now Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
ex))) []
(JobHandle -> STM ()) -> [JobHandle] -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (WaitList -> JobHandle -> STM ()
forall a b. (Eq a, Hashable a) => IOHashMap a b -> a -> STM ()
FL.deleteSTM WaitList
wl) [JobHandle]
ks
where foldFunc :: (WaitItem -> Bool) -> JobHandle -> WaitItem -> [JobHandle] -> [JobHandle]
foldFunc :: (WaitItem -> Bool)
-> JobHandle -> WaitItem -> [JobHandle] -> [JobHandle]
foldFunc f :: WaitItem -> Bool
f jh :: JobHandle
jh v :: WaitItem
v acc :: [JobHandle]
acc | WaitItem -> Bool
f WaitItem
v = JobHandle
jh JobHandle -> [JobHandle] -> [JobHandle]
forall a. a -> [a] -> [a]
: [JobHandle]
acc
| Bool
otherwise = [JobHandle]
acc
check :: Int64 -> WaitItem -> Bool
check :: Int64 -> WaitItem -> Bool
check t0 :: Int64
t0 item :: WaitItem
item = WaitItem -> Int64
itemTs WaitItem
item Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
< Int64
t0
shutdown :: (MonadUnliftIO m) => SchedT db tp m ()
shutdown :: SchedT db tp m ()
shutdown = do
IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
infoM "Periodic.Server.Scheduler" "Scheduler shutdown"
SchedEnv{..} <- SchedT db tp m (SchedEnv db tp)
forall r (m :: * -> *). MonadReader r m => m r
ask
Action -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
Action -> SchedT db tp m ()
pushChanList Action
Cancel
Bool
alive <- STM Bool -> SchedT db tp m Bool
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Bool -> SchedT db tp m Bool)
-> STM Bool -> SchedT db tp m Bool
forall a b. (a -> b) -> a -> b
$ do
Bool
t <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
sAlive
TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
sAlive Bool
False
Bool -> STM Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
t
Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
alive (SchedT db tp m () -> SchedT db tp m ())
-> (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m ()
-> SchedT db tp m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SchedT db tp m (Async ()) -> SchedT db tp m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (SchedT db tp m (Async ()) -> SchedT db tp m ())
-> (SchedT db tp m () -> SchedT db tp m (Async ()))
-> SchedT db tp m ()
-> SchedT db tp m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SchedT db tp m () -> SchedT db tp m (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ()
sCleanup
prepareWait :: MonadIO m => Job -> SchedT db tp m ()
prepareWait :: Job -> SchedT db tp m ()
prepareWait job :: Job
job = (Int64 -> Maybe WaitItem -> Maybe WaitItem)
-> JobHandle -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
(Int64 -> Maybe WaitItem -> Maybe WaitItem)
-> JobHandle -> SchedT db tp m ()
pushResult_ Int64 -> Maybe WaitItem -> Maybe WaitItem
updateWL JobHandle
jh
where updateWL :: Int64 -> Maybe WaitItem -> Maybe WaitItem
updateWL :: Int64 -> Maybe WaitItem -> Maybe WaitItem
updateWL now :: Int64
now Nothing = WaitItem -> Maybe WaitItem
forall a. a -> Maybe a
Just (WaitItem -> Maybe WaitItem) -> WaitItem -> Maybe WaitItem
forall a b. (a -> b) -> a -> b
$ WaitItem :: Int64 -> Maybe ByteString -> Int -> WaitItem
WaitItem {itemTs :: Int64
itemTs = Int64
now, itemValue :: Maybe ByteString
itemValue = Maybe ByteString
forall a. Maybe a
Nothing, itemWait :: Int
itemWait = 1}
updateWL now :: Int64
now (Just item :: WaitItem
item) = WaitItem -> Maybe WaitItem
forall a. a -> Maybe a
Just (WaitItem -> Maybe WaitItem) -> WaitItem -> Maybe WaitItem
forall a b. (a -> b) -> a -> b
$ WaitItem
item {itemTs :: Int64
itemTs = Int64
now, itemWait :: Int
itemWait = WaitItem -> Int
itemWait WaitItem
item Int -> Int -> Int
forall a. Num a => a -> a -> a
+ 1}
jh :: JobHandle
jh = Job -> JobHandle
getHandle Job
job
waitResult :: MonadIO m => TVar Bool -> Job -> SchedT db tp m ByteString
waitResult :: TVar Bool -> Job -> SchedT db tp m ByteString
waitResult state :: TVar Bool
state job :: Job
job = do
WaitList
wl <- (SchedEnv db tp -> WaitList) -> SchedT db tp m WaitList
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> WaitList
forall db tp. SchedEnv db tp -> WaitList
sWaitList
STM ByteString -> SchedT db tp m ByteString
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM ByteString -> SchedT db tp m ByteString)
-> STM ByteString -> SchedT db tp m ByteString
forall a b. (a -> b) -> a -> b
$ do
Bool
st <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
state
if Bool
st then do
Maybe WaitItem
w0 <- WaitList -> JobHandle -> STM (Maybe WaitItem)
forall a b.
(Eq a, Hashable a) =>
IOHashMap a b -> a -> STM (Maybe b)
FL.lookupSTM WaitList
wl JobHandle
jh
case Maybe WaitItem
w0 of
Nothing -> ByteString -> STM ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure ""
Just item :: WaitItem
item ->
case WaitItem -> Maybe ByteString
itemValue WaitItem
item of
Nothing -> STM ByteString
forall a. STM a
retrySTM
Just w1 :: ByteString
w1 -> do
if WaitItem -> Int
itemWait WaitItem
item Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> 1 then
WaitList -> JobHandle -> WaitItem -> STM ()
forall a b. (Eq a, Hashable a) => IOHashMap a b -> a -> b -> STM ()
FL.insertSTM WaitList
wl JobHandle
jh WaitItem
item { itemWait :: Int
itemWait = WaitItem -> Int
itemWait WaitItem
item Int -> Int -> Int
forall a. Num a => a -> a -> a
- 1 }
else
WaitList -> JobHandle -> STM ()
forall a b. (Eq a, Hashable a) => IOHashMap a b -> a -> STM ()
FL.deleteSTM WaitList
wl JobHandle
jh
ByteString -> STM ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
w1
else ByteString -> STM ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure ""
where jh :: JobHandle
jh = Job -> JobHandle
getHandle Job
job
pushResult
:: MonadIO m
=> JobHandle -> ByteString -> SchedT db tp m ()
pushResult :: JobHandle -> ByteString -> SchedT db tp m ()
pushResult jh :: JobHandle
jh w :: ByteString
w = (Int64 -> Maybe WaitItem -> Maybe WaitItem)
-> JobHandle -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
(Int64 -> Maybe WaitItem -> Maybe WaitItem)
-> JobHandle -> SchedT db tp m ()
pushResult_ Int64 -> Maybe WaitItem -> Maybe WaitItem
updateWL JobHandle
jh
where updateWL :: Int64 -> Maybe WaitItem -> Maybe WaitItem
updateWL :: Int64 -> Maybe WaitItem -> Maybe WaitItem
updateWL _ Nothing = Maybe WaitItem
forall a. Maybe a
Nothing
updateWL now :: Int64
now (Just item :: WaitItem
item) = WaitItem -> Maybe WaitItem
forall a. a -> Maybe a
Just WaitItem
item {itemTs :: Int64
itemTs=Int64
now, itemValue :: Maybe ByteString
itemValue = ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
w}
pushResult_
:: MonadIO m
=> (Int64 -> Maybe WaitItem -> Maybe WaitItem)
-> JobHandle -> SchedT db tp m ()
pushResult_ :: (Int64 -> Maybe WaitItem -> Maybe WaitItem)
-> JobHandle -> SchedT db tp m ()
pushResult_ f :: Int64 -> Maybe WaitItem -> Maybe WaitItem
f jh :: JobHandle
jh = do
WaitList
wl <- (SchedEnv db tp -> WaitList) -> SchedT db tp m WaitList
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> WaitList
forall db tp. SchedEnv db tp -> WaitList
sWaitList
Int64
now <- SchedT db tp m Int64
forall (m :: * -> *). MonadIO m => m Int64
getEpochTime
WaitList
-> (Maybe WaitItem -> Maybe WaitItem)
-> JobHandle
-> SchedT db tp m ()
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> (Maybe b -> Maybe b) -> a -> m ()
FL.alter WaitList
wl (Int64 -> Maybe WaitItem -> Maybe WaitItem
f Int64
now) JobHandle
jh
existsWaitList :: MonadIO m => JobHandle -> SchedT db tp m Bool
existsWaitList :: JobHandle -> SchedT db tp m Bool
existsWaitList jh :: JobHandle
jh = do
WaitList
wl <- (SchedEnv db tp -> WaitList) -> SchedT db tp m WaitList
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> WaitList
forall db tp. SchedEnv db tp -> WaitList
sWaitList
Maybe WaitItem -> Bool
forall a. Maybe a -> Bool
isJust (Maybe WaitItem -> Bool)
-> SchedT db tp m (Maybe WaitItem) -> SchedT db tp m Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> WaitList -> JobHandle -> SchedT db tp m (Maybe WaitItem)
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> m (Maybe b)
FL.lookup WaitList
wl JobHandle
jh
lookupPrevResult :: MonadIO m => Job -> SchedT db tp m (Maybe ByteString)
lookupPrevResult :: Job -> SchedT db tp m (Maybe ByteString)
lookupPrevResult job :: Job
job = do
WaitList
wl <- (SchedEnv db tp -> WaitList) -> SchedT db tp m WaitList
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> WaitList
forall db tp. SchedEnv db tp -> WaitList
sWaitList
Maybe WaitItem
r <- WaitList -> JobHandle -> SchedT db tp m (Maybe WaitItem)
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> m (Maybe b)
FL.lookup WaitList
wl JobHandle
jh
case Maybe WaitItem
r of
Nothing -> Maybe ByteString -> SchedT db tp m (Maybe ByteString)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ByteString
forall a. Maybe a
Nothing
(Just WaitItem {itemValue :: WaitItem -> Maybe ByteString
itemValue = Maybe ByteString
Nothing}) -> Maybe ByteString -> SchedT db tp m (Maybe ByteString)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ByteString
forall a. Maybe a
Nothing
(Just WaitItem {itemValue :: WaitItem -> Maybe ByteString
itemValue = Just v :: ByteString
v}) -> Maybe ByteString -> SchedT db tp m (Maybe ByteString)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
v)
where jh :: JobHandle
jh = Job -> JobHandle
getHandle Job
job
removeFromWaitList :: MonadIO m => JobHandle -> SchedT db tp m ()
removeFromWaitList :: JobHandle -> SchedT db tp m ()
removeFromWaitList jh :: JobHandle
jh = do
WaitList
wl <- (SchedEnv db tp -> WaitList) -> SchedT db tp m WaitList
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> WaitList
forall db tp. SchedEnv db tp -> WaitList
sWaitList
WaitList -> JobHandle -> SchedT db tp m ()
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> m ()
FL.delete WaitList
wl JobHandle
jh