{-# LANGUAGE FlexibleContexts           #-}
{-# LANGUAGE FlexibleInstances          #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MultiParamTypeClasses      #-}
{-# LANGUAGE OverloadedStrings          #-}
{-# LANGUAGE RankNTypes                 #-}
{-# LANGUAGE RecordWildCards            #-}
{-# LANGUAGE ScopedTypeVariables        #-}
{-# LANGUAGE TypeFamilies               #-}
{-# LANGUAGE UndecidableInstances       #-}

module Periodic.Server.Scheduler
  ( SchedT
  , runSchedT
  , SchedEnv
  , initSchedEnv
  , startSchedT
  , pushJob
  , pushGrab
  , failJob
  , doneJob
  , schedLaterJob
  , acquireLock
  , releaseLock
  , addFunc
  , removeFunc
  , broadcastFunc
  , dropFunc
  , removeJob
  , dumpJob
  , status
  , shutdown
  , keepalive
  , setConfigInt
  , getConfigInt
  , prepareWait
  , lookupPrevResult
  , waitResult
  , canRun
  ) where

import           Control.Monad                (forever, mzero, unless, void,
                                               when)
import           Control.Monad.Reader.Class   (MonadReader (ask), asks)
import           Control.Monad.Trans.Class    (MonadTrans, lift)
import           Control.Monad.Trans.Maybe    (runMaybeT)
import           Control.Monad.Trans.Reader   (ReaderT (..), runReaderT)
import           Data.ByteString              (ByteString)
import           Data.Foldable                (forM_)
import           Data.HashPSQ                 (HashPSQ)
import qualified Data.HashPSQ                 as PSQ
import           Data.Int                     (Int64)
import qualified Data.List                    as L (delete)
import           Data.Maybe                   (fromJust, fromMaybe, isJust)
import           Metro.Class                  (Transport)
import           Metro.IOHashMap              (IOHashMap, newIOHashMap)
import qualified Metro.IOHashMap              as FL
import qualified Metro.Lock                   as L (Lock, new, with)
import           Metro.Session                (runSessionT1, send, sessionState)
import           Metro.Utils                  (getEpochTime)
import           Periodic.IOList              (IOList)
import qualified Periodic.IOList              as IL
import           Periodic.Server.FuncStat
import           Periodic.Server.GrabQueue
import           Periodic.Server.Persist      (Persist, State (..))
import qualified Periodic.Server.Persist      as P
import           Periodic.Server.Types        (CSEnv)
import           Periodic.Types               (packetRES)
import           Periodic.Types.Internal      (LockName)
import           Periodic.Types.Job
import           Periodic.Types.ServerCommand (ServerCommand (JobAssign))
import           System.Log.Logger            (errorM, infoM)
import           UnliftIO
import           UnliftIO.Concurrent          (threadDelay)

data Action = Add Job
    | Remove Job
    | Cancel
    | PollJob
    | TryPoll JobHandle

data WaitItem = WaitItem
    { WaitItem -> Int64
itemTs    :: Int64
    , WaitItem -> Maybe ByteString
itemValue :: Maybe ByteString
    , WaitItem -> Int
itemWait  :: Int
    }

-- Cache runJob result
--                                   expiredAt, Nothing       retrySTM
--                                   expiredAt, Just bs       return bs
type WaitList = IOHashMap JobHandle WaitItem

-- Distributed lock
--                                  acquired    locked
data LockInfo = LockInfo
    { LockInfo -> [JobHandle]
acquired :: [JobHandle]
    , LockInfo -> [JobHandle]
locked   :: [JobHandle]
    , LockInfo -> Int
maxCount :: Int
    }

type LockList = IOHashMap LockName LockInfo

data SchedEnv db tp = SchedEnv
    { SchedEnv db tp -> TVar Int
sPollInterval   :: TVar Int -- main poll loop every time interval
    -- revert process queue loop every time interval
    , SchedEnv db tp -> TVar Int
sRevertInterval :: TVar Int -- revert process queue loop every time interval
    -- the task do timeout
    , SchedEnv db tp -> TVar Int
sTaskTimeout    :: TVar Int -- the task do timeout
    -- max poll batch size
    , SchedEnv db tp -> TVar Int
sMaxBatchSize   :: TVar Int -- max poll batch size
    -- client or worker keepalive
    , SchedEnv db tp -> TVar Int
sKeepalive      :: TVar Int -- client or worker keepalive
    -- run job cache expiration
    , SchedEnv db tp -> TVar Int
sExpiration     :: TVar Int -- run job cache expiration
    -- auto poll job when job done or failed
    , SchedEnv db tp -> TVar Bool
sAutoPoll       :: TVar Bool -- auto poll job when job done or failed
    -- auto poll lock
    , SchedEnv db tp -> TVar Bool
sPolled         :: TVar Bool -- auto poll lock
    , SchedEnv db tp -> IO ()
sCleanup        :: IO ()
    , SchedEnv db tp -> FuncStatList
sFuncStatList   :: FuncStatList
    , SchedEnv db tp -> Lock
sLocker         :: L.Lock
    , SchedEnv db tp -> GrabQueue tp
sGrabQueue      :: GrabQueue tp
    -- sched state, when false sched is exited.
    , SchedEnv db tp -> TVar Bool
sAlive          :: TVar Bool -- sched state, when false sched is exited.
    , SchedEnv db tp -> TVar [Action]
sChanList       :: TVar [Action]
    , SchedEnv db tp -> WaitList
sWaitList       :: WaitList
    , SchedEnv db tp -> LockList
sLockList       :: LockList
    , SchedEnv db tp -> db
sPersist        :: db
    }

newtype SchedT db tp m a = SchedT {SchedT db tp m a -> ReaderT (SchedEnv db tp) m a
unSchedT :: ReaderT (SchedEnv db tp) m a}
  deriving
    ( a -> SchedT db tp m b -> SchedT db tp m a
(a -> b) -> SchedT db tp m a -> SchedT db tp m b
(forall a b. (a -> b) -> SchedT db tp m a -> SchedT db tp m b)
-> (forall a b. a -> SchedT db tp m b -> SchedT db tp m a)
-> Functor (SchedT db tp m)
forall a b. a -> SchedT db tp m b -> SchedT db tp m a
forall a b. (a -> b) -> SchedT db tp m a -> SchedT db tp m b
forall db tp (m :: * -> *) a b.
Functor m =>
a -> SchedT db tp m b -> SchedT db tp m a
forall db tp (m :: * -> *) a b.
Functor m =>
(a -> b) -> SchedT db tp m a -> SchedT db tp m b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> SchedT db tp m b -> SchedT db tp m a
$c<$ :: forall db tp (m :: * -> *) a b.
Functor m =>
a -> SchedT db tp m b -> SchedT db tp m a
fmap :: (a -> b) -> SchedT db tp m a -> SchedT db tp m b
$cfmap :: forall db tp (m :: * -> *) a b.
Functor m =>
(a -> b) -> SchedT db tp m a -> SchedT db tp m b
Functor
    , Functor (SchedT db tp m)
a -> SchedT db tp m a
Functor (SchedT db tp m) =>
(forall a. a -> SchedT db tp m a)
-> (forall a b.
    SchedT db tp m (a -> b) -> SchedT db tp m a -> SchedT db tp m b)
-> (forall a b c.
    (a -> b -> c)
    -> SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m c)
-> (forall a b.
    SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m b)
-> (forall a b.
    SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m a)
-> Applicative (SchedT db tp m)
SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m b
SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m a
SchedT db tp m (a -> b) -> SchedT db tp m a -> SchedT db tp m b
(a -> b -> c)
-> SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m c
forall a. a -> SchedT db tp m a
forall a b.
SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m a
forall a b.
SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m b
forall a b.
SchedT db tp m (a -> b) -> SchedT db tp m a -> SchedT db tp m b
forall a b c.
(a -> b -> c)
-> SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m c
forall db tp (m :: * -> *).
Applicative m =>
Functor (SchedT db tp m)
forall db tp (m :: * -> *) a.
Applicative m =>
a -> SchedT db tp m a
forall db tp (m :: * -> *) a b.
Applicative m =>
SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m a
forall db tp (m :: * -> *) a b.
Applicative m =>
SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m b
forall db tp (m :: * -> *) a b.
Applicative m =>
SchedT db tp m (a -> b) -> SchedT db tp m a -> SchedT db tp m b
forall db tp (m :: * -> *) a b c.
Applicative m =>
(a -> b -> c)
-> SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m c
forall (f :: * -> *).
Functor f =>
(forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
<* :: SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m a
$c<* :: forall db tp (m :: * -> *) a b.
Applicative m =>
SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m a
*> :: SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m b
$c*> :: forall db tp (m :: * -> *) a b.
Applicative m =>
SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m b
liftA2 :: (a -> b -> c)
-> SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m c
$cliftA2 :: forall db tp (m :: * -> *) a b c.
Applicative m =>
(a -> b -> c)
-> SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m c
<*> :: SchedT db tp m (a -> b) -> SchedT db tp m a -> SchedT db tp m b
$c<*> :: forall db tp (m :: * -> *) a b.
Applicative m =>
SchedT db tp m (a -> b) -> SchedT db tp m a -> SchedT db tp m b
pure :: a -> SchedT db tp m a
$cpure :: forall db tp (m :: * -> *) a.
Applicative m =>
a -> SchedT db tp m a
$cp1Applicative :: forall db tp (m :: * -> *).
Applicative m =>
Functor (SchedT db tp m)
Applicative
    , Applicative (SchedT db tp m)
a -> SchedT db tp m a
Applicative (SchedT db tp m) =>
(forall a b.
 SchedT db tp m a -> (a -> SchedT db tp m b) -> SchedT db tp m b)
-> (forall a b.
    SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m b)
-> (forall a. a -> SchedT db tp m a)
-> Monad (SchedT db tp m)
SchedT db tp m a -> (a -> SchedT db tp m b) -> SchedT db tp m b
SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m b
forall a. a -> SchedT db tp m a
forall a b.
SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m b
forall a b.
SchedT db tp m a -> (a -> SchedT db tp m b) -> SchedT db tp m b
forall db tp (m :: * -> *). Monad m => Applicative (SchedT db tp m)
forall db tp (m :: * -> *) a. Monad m => a -> SchedT db tp m a
forall db tp (m :: * -> *) a b.
Monad m =>
SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m b
forall db tp (m :: * -> *) a b.
Monad m =>
SchedT db tp m a -> (a -> SchedT db tp m b) -> SchedT db tp m b
forall (m :: * -> *).
Applicative m =>
(forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
return :: a -> SchedT db tp m a
$creturn :: forall db tp (m :: * -> *) a. Monad m => a -> SchedT db tp m a
>> :: SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m b
$c>> :: forall db tp (m :: * -> *) a b.
Monad m =>
SchedT db tp m a -> SchedT db tp m b -> SchedT db tp m b
>>= :: SchedT db tp m a -> (a -> SchedT db tp m b) -> SchedT db tp m b
$c>>= :: forall db tp (m :: * -> *) a b.
Monad m =>
SchedT db tp m a -> (a -> SchedT db tp m b) -> SchedT db tp m b
$cp1Monad :: forall db tp (m :: * -> *). Monad m => Applicative (SchedT db tp m)
Monad
    , m a -> SchedT db tp m a
(forall (m :: * -> *) a. Monad m => m a -> SchedT db tp m a)
-> MonadTrans (SchedT db tp)
forall db tp (m :: * -> *) a. Monad m => m a -> SchedT db tp m a
forall (m :: * -> *) a. Monad m => m a -> SchedT db tp m a
forall (t :: (* -> *) -> * -> *).
(forall (m :: * -> *) a. Monad m => m a -> t m a) -> MonadTrans t
lift :: m a -> SchedT db tp m a
$clift :: forall db tp (m :: * -> *) a. Monad m => m a -> SchedT db tp m a
MonadTrans
    , Monad (SchedT db tp m)
Monad (SchedT db tp m) =>
(forall a. IO a -> SchedT db tp m a) -> MonadIO (SchedT db tp m)
IO a -> SchedT db tp m a
forall a. IO a -> SchedT db tp m a
forall db tp (m :: * -> *). MonadIO m => Monad (SchedT db tp m)
forall db tp (m :: * -> *) a. MonadIO m => IO a -> SchedT db tp m a
forall (m :: * -> *).
Monad m =>
(forall a. IO a -> m a) -> MonadIO m
liftIO :: IO a -> SchedT db tp m a
$cliftIO :: forall db tp (m :: * -> *) a. MonadIO m => IO a -> SchedT db tp m a
$cp1MonadIO :: forall db tp (m :: * -> *). MonadIO m => Monad (SchedT db tp m)
MonadIO
    , MonadReader (SchedEnv db tp)
    )

instance MonadUnliftIO m => MonadUnliftIO (SchedT db tp m) where
  withRunInIO :: ((forall a. SchedT db tp m a -> IO a) -> IO b) -> SchedT db tp m b
withRunInIO inner :: (forall a. SchedT db tp m a -> IO a) -> IO b
inner = ReaderT (SchedEnv db tp) m b -> SchedT db tp m b
forall db tp (m :: * -> *) a.
ReaderT (SchedEnv db tp) m a -> SchedT db tp m a
SchedT (ReaderT (SchedEnv db tp) m b -> SchedT db tp m b)
-> ReaderT (SchedEnv db tp) m b -> SchedT db tp m b
forall a b. (a -> b) -> a -> b
$
    (SchedEnv db tp -> m b) -> ReaderT (SchedEnv db tp) m b
forall r (m :: * -> *) a. (r -> m a) -> ReaderT r m a
ReaderT ((SchedEnv db tp -> m b) -> ReaderT (SchedEnv db tp) m b)
-> (SchedEnv db tp -> m b) -> ReaderT (SchedEnv db tp) m b
forall a b. (a -> b) -> a -> b
$ \r :: SchedEnv db tp
r ->
      ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO b) -> m b)
-> ((forall a. m a -> IO a) -> IO b) -> m b
forall a b. (a -> b) -> a -> b
$ \run :: forall a. m a -> IO a
run ->
        (forall a. SchedT db tp m a -> IO a) -> IO b
inner (m a -> IO a
forall a. m a -> IO a
run (m a -> IO a)
-> (SchedT db tp m a -> m a) -> SchedT db tp m a -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SchedEnv db tp -> SchedT db tp m a -> m a
forall db tp (m :: * -> *) a.
SchedEnv db tp -> SchedT db tp m a -> m a
runSchedT SchedEnv db tp
r)


type TaskList = IOHashMap JobHandle (Int64, Async ())

runSchedT :: SchedEnv db tp -> SchedT db tp m a -> m a
runSchedT :: SchedEnv db tp -> SchedT db tp m a -> m a
runSchedT schedEnv :: SchedEnv db tp
schedEnv = (ReaderT (SchedEnv db tp) m a -> SchedEnv db tp -> m a)
-> SchedEnv db tp -> ReaderT (SchedEnv db tp) m a -> m a
forall a b c. (a -> b -> c) -> b -> a -> c
flip ReaderT (SchedEnv db tp) m a -> SchedEnv db tp -> m a
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT SchedEnv db tp
schedEnv (ReaderT (SchedEnv db tp) m a -> m a)
-> (SchedT db tp m a -> ReaderT (SchedEnv db tp) m a)
-> SchedT db tp m a
-> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SchedT db tp m a -> ReaderT (SchedEnv db tp) m a
forall db tp (m :: * -> *) a.
SchedT db tp m a -> ReaderT (SchedEnv db tp) m a
unSchedT

initSchedEnv :: (MonadUnliftIO m, Persist db) => P.PersistConfig db -> m () -> m (SchedEnv db tp)
initSchedEnv :: PersistConfig db -> m () -> m (SchedEnv db tp)
initSchedEnv config :: PersistConfig db
config sC :: m ()
sC = do
  FuncStatList
sFuncStatList   <- m FuncStatList
forall (m :: * -> *) a b. MonadIO m => m (IOHashMap a b)
newIOHashMap
  WaitList
sWaitList       <- m WaitList
forall (m :: * -> *) a b. MonadIO m => m (IOHashMap a b)
newIOHashMap
  LockList
sLockList       <- m LockList
forall (m :: * -> *) a b. MonadIO m => m (IOHashMap a b)
newIOHashMap
  Lock
sLocker         <- m Lock
forall (m :: * -> *). MonadIO m => m Lock
L.new
  GrabQueue tp
sGrabQueue      <- m (GrabQueue tp)
forall (m :: * -> *) tp. MonadIO m => m (GrabQueue tp)
newGrabQueue
  TVar Bool
sAlive          <- Bool -> m (TVar Bool)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Bool
True
  TVar [Action]
sChanList       <- [Action] -> m (TVar [Action])
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO []
  TVar Int
sPollInterval   <- Int -> m (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO 300
  TVar Int
sRevertInterval <- Int -> m (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO 300
  TVar Int
sTaskTimeout    <- Int -> m (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO 600
  TVar Int
sMaxBatchSize   <- Int -> m (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO 250
  TVar Int
sKeepalive      <- Int -> m (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO 300
  TVar Int
sExpiration     <- Int -> m (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO 300
  TVar Bool
sAutoPoll       <- Bool -> m (TVar Bool)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Bool
False
  TVar Bool
sPolled         <- Bool -> m (TVar Bool)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Bool
False
  IO ()
sCleanup        <- m () -> m (IO ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (IO a)
toIO m ()
sC
  db
sPersist        <- IO db -> m db
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO db -> m db) -> IO db -> m db
forall a b. (a -> b) -> a -> b
$ PersistConfig db -> IO db
forall db. Persist db => PersistConfig db -> IO db
P.newPersist PersistConfig db
config
  SchedEnv db tp -> m (SchedEnv db tp)
forall (f :: * -> *) a. Applicative f => a -> f a
pure SchedEnv :: forall db tp.
TVar Int
-> TVar Int
-> TVar Int
-> TVar Int
-> TVar Int
-> TVar Int
-> TVar Bool
-> TVar Bool
-> IO ()
-> FuncStatList
-> Lock
-> GrabQueue tp
-> TVar Bool
-> TVar [Action]
-> WaitList
-> LockList
-> db
-> SchedEnv db tp
SchedEnv{..}

startSchedT :: (MonadUnliftIO m, Persist db, Transport tp) => SchedT db tp m ()
startSchedT :: SchedT db tp m ()
startSchedT = do
  IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
infoM "Periodic.Server.Scheduler" "Scheduler started"
  SchedEnv{..} <- SchedT db tp m (SchedEnv db tp)
forall r (m :: * -> *). MonadReader r m => m r
ask
  TVar Int -> SchedT db tp m () -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadUnliftIO m =>
TVar Int -> SchedT db tp m () -> SchedT db tp m ()
runTask_ TVar Int
sRevertInterval SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db) =>
SchedT db tp m ()
revertRunningQueue
  IOHashMap JobHandle (Int64, Async ())
taskList <- SchedT db tp m (IOHashMap JobHandle (Int64, Async ()))
forall (m :: * -> *) a b. MonadIO m => m (IOHashMap a b)
newIOHashMap
  TVar Int -> SchedT db tp m () -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadUnliftIO m =>
TVar Int -> SchedT db tp m () -> SchedT db tp m ()
runTask_ TVar Int
sPollInterval (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ Action -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
Action -> SchedT db tp m ()
pushChanList Action
PollJob
  Int -> SchedT db tp m () -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadUnliftIO m =>
Int -> SchedT db tp m () -> SchedT db tp m ()
runTask 0 (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ IOHashMap JobHandle (Int64, Async ()) -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db, Transport tp) =>
IOHashMap JobHandle (Int64, Async ()) -> SchedT db tp m ()
runChanJob IOHashMap JobHandle (Int64, Async ())
taskList
  Int -> SchedT db tp m () -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadUnliftIO m =>
Int -> SchedT db tp m () -> SchedT db tp m ()
runTask 100 SchedT db tp m ()
forall (m :: * -> *) db tp. MonadIO m => SchedT db tp m ()
purgeExpired
  Int -> SchedT db tp m () -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadUnliftIO m =>
Int -> SchedT db tp m () -> SchedT db tp m ()
runTask 60 SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db) =>
SchedT db tp m ()
revertLockingQueue

  String -> TVar Int -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
String -> TVar Int -> SchedT db tp m ()
loadInt "poll-interval" TVar Int
sPollInterval
  String -> TVar Int -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
String -> TVar Int -> SchedT db tp m ()
loadInt "revert-interval" TVar Int
sRevertInterval
  String -> TVar Int -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
String -> TVar Int -> SchedT db tp m ()
loadInt "timeout" TVar Int
sTaskTimeout
  String -> TVar Int -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
String -> TVar Int -> SchedT db tp m ()
loadInt "keepalive" TVar Int
sKeepalive
  String -> TVar Int -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
String -> TVar Int -> SchedT db tp m ()
loadInt "max-batch-size" TVar Int
sMaxBatchSize
  String -> TVar Int -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
String -> TVar Int -> SchedT db tp m ()
loadInt "expiration" TVar Int
sExpiration

loadInt :: (MonadIO m, Persist db) => String -> TVar Int -> SchedT db tp m ()
loadInt :: String -> TVar Int -> SchedT db tp m ()
loadInt name :: String
name ref :: TVar Int
ref = do
  Maybe Int
v <- IO (Maybe Int) -> SchedT db tp m (Maybe Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe Int) -> SchedT db tp m (Maybe Int))
-> (db -> IO (Maybe Int)) -> db -> SchedT db tp m (Maybe Int)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (db -> String -> IO (Maybe Int)) -> String -> db -> IO (Maybe Int)
forall a b c. (a -> b -> c) -> b -> a -> c
flip db -> String -> IO (Maybe Int)
forall db. Persist db => db -> String -> IO (Maybe Int)
P.configGet String
name (db -> SchedT db tp m (Maybe Int))
-> SchedT db tp m db -> SchedT db tp m (Maybe Int)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
  case Maybe Int
v of
    Nothing -> () -> SchedT db tp m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Just v' :: Int
v' -> STM () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> SchedT db tp m ()) -> STM () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
ref Int
v'

saveInt :: (MonadIO m, Persist db) => String -> Int -> TVar Int -> SchedT db tp m ()
saveInt :: String -> Int -> TVar Int -> SchedT db tp m ()
saveInt name :: String
name v :: Int
v ref :: TVar Int
ref = do
  db
p <- (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
  IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ db -> String -> Int -> IO ()
forall db. Persist db => db -> String -> Int -> IO ()
P.configSet db
p String
name Int
v
  STM () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> SchedT db tp m ()) -> STM () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
ref Int
v

setConfigInt :: (MonadIO m, Persist db) => String -> Int -> SchedT db tp m ()
setConfigInt :: String -> Int -> SchedT db tp m ()
setConfigInt key :: String
key val :: Int
val = do
  SchedEnv {..} <- SchedT db tp m (SchedEnv db tp)
forall r (m :: * -> *). MonadReader r m => m r
ask
  case String
key of
    "poll-interval"   -> String -> Int -> TVar Int -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
String -> Int -> TVar Int -> SchedT db tp m ()
saveInt "poll-interval" Int
val TVar Int
sPollInterval
    "revert-interval" -> String -> Int -> TVar Int -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
String -> Int -> TVar Int -> SchedT db tp m ()
saveInt "revert-interval" Int
val TVar Int
sRevertInterval
    "timeout"         -> String -> Int -> TVar Int -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
String -> Int -> TVar Int -> SchedT db tp m ()
saveInt "timeout" Int
val TVar Int
sTaskTimeout
    "keepalive"       -> String -> Int -> TVar Int -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
String -> Int -> TVar Int -> SchedT db tp m ()
saveInt "keepalive" Int
val TVar Int
sKeepalive
    "max-batch-size"  -> String -> Int -> TVar Int -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
String -> Int -> TVar Int -> SchedT db tp m ()
saveInt "max-batch-size" Int
val TVar Int
sMaxBatchSize
    "expiration"      -> String -> Int -> TVar Int -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
String -> Int -> TVar Int -> SchedT db tp m ()
saveInt "expiration" Int
val TVar Int
sExpiration
    _                 -> () -> SchedT db tp m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

getConfigInt :: (MonadIO m, Persist db) => String -> SchedT db tp m Int
getConfigInt :: String -> SchedT db tp m Int
getConfigInt key :: String
key = do
  SchedEnv {..} <- SchedT db tp m (SchedEnv db tp)
forall r (m :: * -> *). MonadReader r m => m r
ask
  case String
key of
    "poll-interval"   -> TVar Int -> SchedT db tp m Int
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar Int
sPollInterval
    "revert-interval" -> TVar Int -> SchedT db tp m Int
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar Int
sRevertInterval
    "timeout"         -> TVar Int -> SchedT db tp m Int
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar Int
sTaskTimeout
    "keepalive"       -> TVar Int -> SchedT db tp m Int
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar Int
sKeepalive
    "max-batch-size"  -> TVar Int -> SchedT db tp m Int
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar Int
sMaxBatchSize
    "expiration"      -> TVar Int -> SchedT db tp m Int
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar Int
sExpiration
    _                 -> Int -> SchedT db tp m Int
forall (f :: * -> *) a. Applicative f => a -> f a
pure 0

keepalive :: Monad m => SchedT db tp m (TVar Int)
keepalive :: SchedT db tp m (TVar Int)
keepalive = (SchedEnv db tp -> TVar Int) -> SchedT db tp m (TVar Int)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> TVar Int
forall db tp. SchedEnv db tp -> TVar Int
sKeepalive

runTask :: (MonadUnliftIO m) => Int -> SchedT db tp m () -> SchedT db tp m ()
runTask :: Int -> SchedT db tp m () -> SchedT db tp m ()
runTask d :: Int
d m :: SchedT db tp m ()
m = (TVar Int -> SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> TVar Int -> SchedT db tp m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip TVar Int -> SchedT db tp m () -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadUnliftIO m =>
TVar Int -> SchedT db tp m () -> SchedT db tp m ()
runTask_ SchedT db tp m ()
m (TVar Int -> SchedT db tp m ())
-> SchedT db tp m (TVar Int) -> SchedT db tp m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Int -> SchedT db tp m (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Int
d

runTask_ :: (MonadUnliftIO m) => TVar Int -> SchedT db tp m () -> SchedT db tp m ()
runTask_ :: TVar Int -> SchedT db tp m () -> SchedT db tp m ()
runTask_ d :: TVar Int
d m :: SchedT db tp m ()
m = SchedT db tp m (Async ()) -> SchedT db tp m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (SchedT db tp m (Async ()) -> SchedT db tp m ())
-> (SchedT db tp m () -> SchedT db tp m (Async ()))
-> SchedT db tp m ()
-> SchedT db tp m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SchedT db tp m () -> SchedT db tp m (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ do
  SchedEnv{..} <- SchedT db tp m (SchedEnv db tp)
forall r (m :: * -> *). MonadReader r m => m r
ask
  SchedT db tp m (Maybe Any) -> SchedT db tp m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (SchedT db tp m (Maybe Any) -> SchedT db tp m ())
-> (MaybeT (SchedT db tp m) () -> SchedT db tp m (Maybe Any))
-> MaybeT (SchedT db tp m) ()
-> SchedT db tp m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MaybeT (SchedT db tp m) Any -> SchedT db tp m (Maybe Any)
forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT (MaybeT (SchedT db tp m) Any -> SchedT db tp m (Maybe Any))
-> (MaybeT (SchedT db tp m) () -> MaybeT (SchedT db tp m) Any)
-> MaybeT (SchedT db tp m) ()
-> SchedT db tp m (Maybe Any)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MaybeT (SchedT db tp m) () -> MaybeT (SchedT db tp m) Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (MaybeT (SchedT db tp m) () -> SchedT db tp m ())
-> MaybeT (SchedT db tp m) () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ do
    Int
interval <- TVar Int -> MaybeT (SchedT db tp m) Int
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar Int
d
    Bool -> MaybeT (SchedT db tp m) () -> MaybeT (SchedT db tp m) ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
interval Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> 0) (MaybeT (SchedT db tp m) () -> MaybeT (SchedT db tp m) ())
-> MaybeT (SchedT db tp m) () -> MaybeT (SchedT db tp m) ()
forall a b. (a -> b) -> a -> b
$ Int -> MaybeT (SchedT db tp m) ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay (Int -> MaybeT (SchedT db tp m) ())
-> Int -> MaybeT (SchedT db tp m) ()
forall a b. (a -> b) -> a -> b
$ Int
interval Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000
    Bool
alive <- TVar Bool -> MaybeT (SchedT db tp m) Bool
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar Bool
sAlive
    if Bool
alive then SchedT db tp m () -> MaybeT (SchedT db tp m) ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift SchedT db tp m ()
m
             else MaybeT (SchedT db tp m) ()
forall (m :: * -> *) a. MonadPlus m => m a
mzero

runChanJob
  :: (MonadUnliftIO m, Persist db, Transport tp)
  => TaskList -> SchedT db tp m ()
runChanJob :: IOHashMap JobHandle (Int64, Async ()) -> SchedT db tp m ()
runChanJob taskList :: IOHashMap JobHandle (Int64, Async ())
taskList = do
  TVar [Action]
cl <- (SchedEnv db tp -> TVar [Action]) -> SchedT db tp m (TVar [Action])
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> TVar [Action]
forall db tp. SchedEnv db tp -> TVar [Action]
sChanList
  TVar Bool
al <- (SchedEnv db tp -> TVar Bool) -> SchedT db tp m (TVar Bool)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> TVar Bool
forall db tp. SchedEnv db tp -> TVar Bool
sAlive
  [Action]
acts <- STM [Action] -> SchedT db tp m [Action]
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM [Action] -> SchedT db tp m [Action])
-> STM [Action] -> SchedT db tp m [Action]
forall a b. (a -> b) -> a -> b
$ do
    [Action]
acts <- TVar [Action] -> STM [Action]
forall a. TVar a -> STM a
readTVar TVar [Action]
cl
    if [Action] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Action]
acts then do
      Bool
st <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
al
      if Bool
st then STM [Action]
forall a. STM a
retrySTM
            else [Action] -> STM [Action]
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
    else do
      TVar [Action] -> [Action] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [Action]
cl []
      [Action] -> STM [Action]
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Action]
acts

  (Action -> SchedT db tp m ()) -> [Action] -> SchedT db tp m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (IOHashMap JobHandle (Int64, Async ())
-> Action -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db, Transport tp) =>
IOHashMap JobHandle (Int64, Async ())
-> Action -> SchedT db tp m ()
doChanJob IOHashMap JobHandle (Int64, Async ())
taskList) [Action]
acts

  where doChanJob
          :: (MonadUnliftIO m, Persist db, Transport tp)
          => TaskList -> Action -> SchedT db tp m ()
        doChanJob :: IOHashMap JobHandle (Int64, Async ())
-> Action -> SchedT db tp m ()
doChanJob tl :: IOHashMap JobHandle (Int64, Async ())
tl (Add job :: Job
job)    = IOHashMap JobHandle (Int64, Async ()) -> Job -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db, Transport tp) =>
IOHashMap JobHandle (Int64, Async ()) -> Job -> SchedT db tp m ()
reSchedJob IOHashMap JobHandle (Int64, Async ())
tl Job
job
        doChanJob tl :: IOHashMap JobHandle (Int64, Async ())
tl (Remove job :: Job
job) = IOHashMap JobHandle (Int64, Async ())
-> Job -> SchedT db tp m (Maybe (Async ()))
forall (m :: * -> *) db tp.
MonadIO m =>
IOHashMap JobHandle (Int64, Async ())
-> Job -> SchedT db tp m (Maybe (Async ()))
findTask IOHashMap JobHandle (Int64, Async ())
tl Job
job SchedT db tp m (Maybe (Async ()))
-> (Maybe (Async ()) -> SchedT db tp m ()) -> SchedT db tp m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Async () -> SchedT db tp m ())
-> Maybe (Async ()) -> SchedT db tp m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Async () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
cancel
        doChanJob tl :: IOHashMap JobHandle (Int64, Async ())
tl Cancel       = ((Int64, Async ()) -> SchedT db tp m ())
-> [(Int64, Async ())] -> SchedT db tp m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Async () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
cancel (Async () -> SchedT db tp m ())
-> ((Int64, Async ()) -> Async ())
-> (Int64, Async ())
-> SchedT db tp m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int64, Async ()) -> Async ()
forall a b. (a, b) -> b
snd) ([(Int64, Async ())] -> SchedT db tp m ())
-> SchedT db tp m [(Int64, Async ())] -> SchedT db tp m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IOHashMap JobHandle (Int64, Async ())
-> SchedT db tp m [(Int64, Async ())]
forall (m :: * -> *) a b. MonadIO m => IOHashMap a b -> m [b]
FL.elems IOHashMap JobHandle (Int64, Async ())
tl
        doChanJob tl :: IOHashMap JobHandle (Int64, Async ())
tl PollJob      = IOHashMap JobHandle (Int64, Async ()) -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db, Transport tp) =>
IOHashMap JobHandle (Int64, Async ()) -> SchedT db tp m ()
pollJob IOHashMap JobHandle (Int64, Async ())
tl
        doChanJob tl :: IOHashMap JobHandle (Int64, Async ())
tl (TryPoll jh :: JobHandle
jh) = IOHashMap JobHandle (Int64, Async ())
-> JobHandle -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
IOHashMap JobHandle (Int64, Async ())
-> JobHandle -> SchedT db tp m ()
removeTaskAndTryPoll IOHashMap JobHandle (Int64, Async ())
tl JobHandle
jh


pollInterval :: (MonadIO m, Num a) => SchedT db tp m a
pollInterval :: SchedT db tp m a
pollInterval = (Int -> a) -> SchedT db tp m Int -> SchedT db tp m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Int -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral (SchedT db tp m Int -> SchedT db tp m a)
-> (TVar Int -> SchedT db tp m Int) -> TVar Int -> SchedT db tp m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar Int -> SchedT db tp m Int
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (TVar Int -> SchedT db tp m a)
-> SchedT db tp m (TVar Int) -> SchedT db tp m a
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (SchedEnv db tp -> TVar Int) -> SchedT db tp m (TVar Int)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> TVar Int
forall db tp. SchedEnv db tp -> TVar Int
sPollInterval

removeTaskAndTryPoll :: MonadIO m => TaskList -> JobHandle -> SchedT db tp m ()
removeTaskAndTryPoll :: IOHashMap JobHandle (Int64, Async ())
-> JobHandle -> SchedT db tp m ()
removeTaskAndTryPoll taskList :: IOHashMap JobHandle (Int64, Async ())
taskList jh :: JobHandle
jh = do
  IOHashMap JobHandle (Int64, Async ())
-> JobHandle -> SchedT db tp m ()
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> m ()
FL.delete IOHashMap JobHandle (Int64, Async ())
taskList JobHandle
jh
  TVar Bool
polled <- (SchedEnv db tp -> TVar Bool) -> SchedT db tp m (TVar Bool)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> TVar Bool
forall db tp. SchedEnv db tp -> TVar Bool
sPolled
  Bool
isPolled <- TVar Bool -> SchedT db tp m Bool
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar Bool
polled
  Bool
autoPoll <- TVar Bool -> SchedT db tp m Bool
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (TVar Bool -> SchedT db tp m Bool)
-> SchedT db tp m (TVar Bool) -> SchedT db tp m Bool
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (SchedEnv db tp -> TVar Bool) -> SchedT db tp m (TVar Bool)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> TVar Bool
forall db tp. SchedEnv db tp -> TVar Bool
sAutoPoll
  Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool
isPolled Bool -> Bool -> Bool
&& Bool
autoPoll) (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ do
    Int
maxBatchSize <- TVar Int -> SchedT db tp m Int
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (TVar Int -> SchedT db tp m Int)
-> SchedT db tp m (TVar Int) -> SchedT db tp m Int
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (SchedEnv db tp -> TVar Int) -> SchedT db tp m (TVar Int)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> TVar Int
forall db tp. SchedEnv db tp -> TVar Int
sMaxBatchSize
    Int
size <- IOHashMap JobHandle (Int64, Async ()) -> SchedT db tp m Int
forall (m :: * -> *) a b. MonadIO m => IOHashMap a b -> m Int
FL.size IOHashMap JobHandle (Int64, Async ())
taskList
    Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
size Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
maxBatchSize) (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ do
      STM () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> SchedT db tp m ()) -> STM () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
polled Bool
False
      Action -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
Action -> SchedT db tp m ()
pushChanList Action
PollJob

pollJob
  :: (MonadUnliftIO m, Persist db, Transport tp)
  => TaskList -> SchedT db tp m ()
pollJob :: IOHashMap JobHandle (Int64, Async ()) -> SchedT db tp m ()
pollJob taskList :: IOHashMap JobHandle (Int64, Async ())
taskList = do
  TVar Bool
polled <- (SchedEnv db tp -> TVar Bool) -> SchedT db tp m (TVar Bool)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> TVar Bool
forall db tp. SchedEnv db tp -> TVar Bool
sPolled
  STM () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> SchedT db tp m ()) -> STM () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
polled Bool
False
  ((JobHandle, (Int64, Async ())) -> SchedT db tp m ())
-> [(JobHandle, (Int64, Async ()))] -> SchedT db tp m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (JobHandle, (Int64, Async ())) -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
(JobHandle, (Int64, Async ())) -> SchedT db tp m ()
checkPoll ([(JobHandle, (Int64, Async ()))] -> SchedT db tp m ())
-> SchedT db tp m [(JobHandle, (Int64, Async ()))]
-> SchedT db tp m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IOHashMap JobHandle (Int64, Async ())
-> SchedT db tp m [(JobHandle, (Int64, Async ()))]
forall (m :: * -> *) a b. MonadIO m => IOHashMap a b -> m [(a, b)]
FL.toList IOHashMap JobHandle (Int64, Async ())
taskList
  FuncStatList
stList <- (SchedEnv db tp -> FuncStatList) -> SchedT db tp m FuncStatList
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> FuncStatList
forall db tp. SchedEnv db tp -> FuncStatList
sFuncStatList
  [FuncName]
funcList <- ((FuncName, FuncStat) -> [FuncName] -> [FuncName])
-> [FuncName] -> [(FuncName, FuncStat)] -> [FuncName]
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (FuncName, FuncStat) -> [FuncName] -> [FuncName]
foldFunc [] ([(FuncName, FuncStat)] -> [FuncName])
-> SchedT db tp m [(FuncName, FuncStat)]
-> SchedT db tp m [FuncName]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> FuncStatList -> SchedT db tp m [(FuncName, FuncStat)]
forall (m :: * -> *) a b. MonadIO m => IOHashMap a b -> m [(a, b)]
FL.toList FuncStatList
stList
  IOHashMap JobHandle (Int64, Async ())
-> [FuncName] -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db, Transport tp) =>
IOHashMap JobHandle (Int64, Async ())
-> [FuncName] -> SchedT db tp m ()
pollJob_ IOHashMap JobHandle (Int64, Async ())
taskList [FuncName]
funcList
  STM () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> SchedT db tp m ()) -> STM () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
polled Bool
True

  where foldFunc :: (FuncName, FuncStat) -> [FuncName] -> [FuncName]
        foldFunc :: (FuncName, FuncStat) -> [FuncName] -> [FuncName]
foldFunc (_, FuncStat{sWorker :: FuncStat -> Int64
sWorker=Int64
0}) acc :: [FuncName]
acc = [FuncName]
acc
        foldFunc (fn :: FuncName
fn, _) acc :: [FuncName]
acc                  = FuncName
fnFuncName -> [FuncName] -> [FuncName]
forall a. a -> [a] -> [a]
:[FuncName]
acc

        checkPoll
          :: (MonadIO m)
          => (JobHandle, (Int64, Async ())) -> SchedT db tp m ()
        checkPoll :: (JobHandle, (Int64, Async ())) -> SchedT db tp m ()
checkPoll (jh :: JobHandle
jh, (_, w :: Async ()
w)) = do
          Maybe (Either SomeException ())
r <- Async () -> SchedT db tp m (Maybe (Either SomeException ()))
forall (m :: * -> *) a.
MonadIO m =>
Async a -> m (Maybe (Either SomeException a))
poll Async ()
w
          case Maybe (Either SomeException ())
r of
            Just (Right ())  -> IOHashMap JobHandle (Int64, Async ())
-> JobHandle -> SchedT db tp m ()
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> m ()
FL.delete IOHashMap JobHandle (Int64, Async ())
taskList JobHandle
jh
            Just (Left e :: SomeException
e)  -> do
              IOHashMap JobHandle (Int64, Async ())
-> JobHandle -> SchedT db tp m ()
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> m ()
FL.delete IOHashMap JobHandle (Int64, Async ())
taskList JobHandle
jh
              IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
errorM "Periodic.Server.Scheduler" ("Poll error: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ SomeException -> String
forall a. Show a => a -> String
show SomeException
e)
            Nothing -> do
              Bool
r0 <- FuncName -> SchedT db tp m Bool
forall (m :: * -> *) db tp.
MonadIO m =>
FuncName -> SchedT db tp m Bool
canRun FuncName
fn
              Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
r0 (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ Async () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
cancel Async ()
w

          where (fn :: FuncName
fn, _) = JobHandle -> (FuncName, JobName)
unHandle JobHandle
jh

pollJob_
  :: (MonadUnliftIO m, Persist db, Transport tp)
  => TaskList -> [FuncName] -> SchedT db tp m ()
pollJob_ :: IOHashMap JobHandle (Int64, Async ())
-> [FuncName] -> SchedT db tp m ()
pollJob_ _ [] = () -> SchedT db tp m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
pollJob_ taskList :: IOHashMap JobHandle (Int64, Async ())
taskList funcList :: [FuncName]
funcList = do
  Int64
now <- SchedT db tp m Int64
forall (m :: * -> *). MonadIO m => m Int64
getEpochTime
  Int64
next <- (Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ (100 Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int64
now)) (Int64 -> Int64) -> SchedT db tp m Int64 -> SchedT db tp m Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SchedT db tp m Int64
forall (m :: * -> *) a db tp.
(MonadIO m, Num a) =>
SchedT db tp m a
pollInterval
  [JobHandle]
handles <- IOHashMap JobHandle (Int64, Async ()) -> SchedT db tp m [JobHandle]
forall (m :: * -> *) a b. MonadIO m => IOHashMap a b -> m [a]
FL.keys IOHashMap JobHandle (Int64, Async ())
taskList
  let check :: Job -> Bool
check job :: Job
job = JobHandle -> [JobHandle] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
notElem (Job -> JobHandle
getHandle Job
job) [JobHandle]
handles Bool -> Bool -> Bool
&& (Job -> Int64
getSchedAt Job
job Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
< Int64
next)

  Int
maxBatchSize <- TVar Int -> SchedT db tp m Int
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (TVar Int -> SchedT db tp m Int)
-> SchedT db tp m (TVar Int) -> SchedT db tp m Int
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (SchedEnv db tp -> TVar Int) -> SchedT db tp m (TVar Int)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> TVar Int
forall db tp. SchedEnv db tp -> TVar Int
sMaxBatchSize
  db
p <- (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
  HashPSQ JobHandle Int64 Job
jobs <- IO (HashPSQ JobHandle Int64 Job)
-> SchedT db tp m (HashPSQ JobHandle Int64 Job)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HashPSQ JobHandle Int64 Job)
 -> SchedT db tp m (HashPSQ JobHandle Int64 Job))
-> IO (HashPSQ JobHandle Int64 Job)
-> SchedT db tp m (HashPSQ JobHandle Int64 Job)
forall a b. (a -> b) -> a -> b
$
    db
-> Int64
-> [FuncName]
-> (Job
    -> HashPSQ JobHandle Int64 Job -> HashPSQ JobHandle Int64 Job)
-> HashPSQ JobHandle Int64 Job
-> IO (HashPSQ JobHandle Int64 Job)
forall db a.
Persist db =>
db -> Int64 -> [FuncName] -> (Job -> a -> a) -> a -> IO a
P.foldrPending db
p Int64
next [FuncName]
funcList (Int
-> (Job -> Bool)
-> Int64
-> Job
-> HashPSQ JobHandle Int64 Job
-> HashPSQ JobHandle Int64 Job
foldFunc (Int
maxBatchSize Int -> Int -> Int
forall a. Num a => a -> a -> a
* 2) Job -> Bool
check Int64
now) HashPSQ JobHandle Int64 Job
forall k p v. HashPSQ k p v
PSQ.empty

  (Job -> SchedT db tp m ())
-> HashPSQ JobHandle Int64 Job -> SchedT db tp m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (IOHashMap JobHandle (Int64, Async ()) -> Job -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db, Transport tp) =>
IOHashMap JobHandle (Int64, Async ()) -> Job -> SchedT db tp m ()
checkJob IOHashMap JobHandle (Int64, Async ())
taskList) HashPSQ JobHandle Int64 Job
jobs

  TVar Bool
autoPoll <- (SchedEnv db tp -> TVar Bool) -> SchedT db tp m (TVar Bool)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> TVar Bool
forall db tp. SchedEnv db tp -> TVar Bool
sAutoPoll
  STM () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> SchedT db tp m ()) -> STM () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
autoPoll (HashPSQ JobHandle Int64 Job -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length HashPSQ JobHandle Int64 Job
jobs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
maxBatchSize)

  where foldFunc :: Int -> (Job -> Bool) -> Int64 -> Job -> HashPSQ JobHandle Int64 Job -> HashPSQ JobHandle Int64 Job
        foldFunc :: Int
-> (Job -> Bool)
-> Int64
-> Job
-> HashPSQ JobHandle Int64 Job
-> HashPSQ JobHandle Int64 Job
foldFunc s :: Int
s f :: Job -> Bool
f now :: Int64
now job :: Job
job acc :: HashPSQ JobHandle Int64 Job
acc | Job -> Bool
f Job
job = HashPSQ JobHandle Int64 Job -> HashPSQ JobHandle Int64 Job
trimPSQ (HashPSQ JobHandle Int64 Job -> HashPSQ JobHandle Int64 Job)
-> HashPSQ JobHandle Int64 Job -> HashPSQ JobHandle Int64 Job
forall a b. (a -> b) -> a -> b
$ JobHandle
-> Int64
-> Job
-> HashPSQ JobHandle Int64 Job
-> HashPSQ JobHandle Int64 Job
forall k p v.
(Ord k, Hashable k, Ord p) =>
k -> p -> v -> HashPSQ k p v -> HashPSQ k p v
PSQ.insert (Job -> JobHandle
getHandle Job
job) (Int64
now Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Job -> Int64
getSchedAt Job
job) Job
job HashPSQ JobHandle Int64 Job
acc
                                 | Bool
otherwise = HashPSQ JobHandle Int64 Job
acc
          where trimPSQ :: HashPSQ JobHandle Int64 Job -> HashPSQ JobHandle Int64 Job
                trimPSQ :: HashPSQ JobHandle Int64 Job -> HashPSQ JobHandle Int64 Job
trimPSQ q :: HashPSQ JobHandle Int64 Job
q | HashPSQ JobHandle Int64 Job -> Int
forall k p v. HashPSQ k p v -> Int
PSQ.size HashPSQ JobHandle Int64 Job
q Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
s = HashPSQ JobHandle Int64 Job -> HashPSQ JobHandle Int64 Job
trimPSQ (HashPSQ JobHandle Int64 Job -> HashPSQ JobHandle Int64 Job)
-> HashPSQ JobHandle Int64 Job -> HashPSQ JobHandle Int64 Job
forall a b. (a -> b) -> a -> b
$ HashPSQ JobHandle Int64 Job -> HashPSQ JobHandle Int64 Job
forall k p v.
(Hashable k, Ord k, Ord p) =>
HashPSQ k p v -> HashPSQ k p v
PSQ.deleteMin HashPSQ JobHandle Int64 Job
q
                          | Bool
otherwise = HashPSQ JobHandle Int64 Job
q

        checkJob
          :: (MonadUnliftIO m, Persist db, Transport tp)
          => TaskList -> Job -> SchedT db tp m ()
        checkJob :: IOHashMap JobHandle (Int64, Async ()) -> Job -> SchedT db tp m ()
checkJob tl :: IOHashMap JobHandle (Int64, Async ())
tl job :: Job
job = do
          Maybe (Async ())
w <- IOHashMap JobHandle (Int64, Async ())
-> Job -> SchedT db tp m (Maybe (Async ()))
forall (m :: * -> *) db tp.
MonadIO m =>
IOHashMap JobHandle (Int64, Async ())
-> Job -> SchedT db tp m (Maybe (Async ()))
findTask IOHashMap JobHandle (Int64, Async ())
tl Job
job
          case Maybe (Async ())
w of
            Nothing -> do
              db
p <- (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
              Bool
isProc <- IO Bool -> SchedT db tp m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> SchedT db tp m Bool) -> IO Bool -> SchedT db tp m Bool
forall a b. (a -> b) -> a -> b
$ db -> State -> FuncName -> JobName -> IO Bool
forall db.
Persist db =>
db -> State -> FuncName -> JobName -> IO Bool
P.member db
p State
Running FuncName
fn JobName
jn
              Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
isProc (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ IOHashMap JobHandle (Int64, Async ()) -> Job -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db, Transport tp) =>
IOHashMap JobHandle (Int64, Async ()) -> Job -> SchedT db tp m ()
reSchedJob IOHashMap JobHandle (Int64, Async ())
tl Job
job
            Just w0 :: Async ()
w0 -> do
              Bool
r <- FuncName -> SchedT db tp m Bool
forall (m :: * -> *) db tp.
MonadIO m =>
FuncName -> SchedT db tp m Bool
canRun FuncName
fn
              Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
r (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ Async () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
cancel Async ()
w0
          where fn :: FuncName
fn = Job -> FuncName
getFuncName Job
job
                jn :: JobName
jn = Job -> JobName
getName Job
job

pushChanList :: MonadIO m => Action -> SchedT db tp m ()
pushChanList :: Action -> SchedT db tp m ()
pushChanList act :: Action
act = do
  TVar [Action]
cl <- (SchedEnv db tp -> TVar [Action]) -> SchedT db tp m (TVar [Action])
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> TVar [Action]
forall db tp. SchedEnv db tp -> TVar [Action]
sChanList
  STM () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> SchedT db tp m ()) -> STM () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ do
    [Action]
l <- TVar [Action] -> STM [Action]
forall a. TVar a -> STM a
readTVar TVar [Action]
cl
    TVar [Action] -> [Action] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [Action]
cl (Action
actAction -> [Action] -> [Action]
forall a. a -> [a] -> [a]
:[Action]
l)

pushJob :: (MonadIO m, Persist db) => Job -> SchedT db tp m ()
pushJob :: Job -> SchedT db tp m ()
pushJob job :: Job
job = do
  IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
infoM "Periodic.Server.Scheduler" ("pushJob: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ JobHandle -> String
forall a. Show a => a -> String
show (Job -> JobHandle
getHandle Job
job))
  db
p <- (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
  Bool
isRunning <- IO Bool -> SchedT db tp m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> SchedT db tp m Bool) -> IO Bool -> SchedT db tp m Bool
forall a b. (a -> b) -> a -> b
$ db -> State -> FuncName -> JobName -> IO Bool
forall db.
Persist db =>
db -> State -> FuncName -> JobName -> IO Bool
P.member db
p State
Running FuncName
fn JobName
jn
  Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
isRunning (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ do
    Job
job' <- Job -> SchedT db tp m Job
forall (m :: * -> *) db tp. MonadIO m => Job -> SchedT db tp m Job
fixedSchedAt Job
job
    IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ db -> State -> FuncName -> JobName -> Job -> IO ()
forall db.
Persist db =>
db -> State -> FuncName -> JobName -> Job -> IO ()
P.insert db
p State
Pending FuncName
fn JobName
jn Job
job'
    Action -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
Action -> SchedT db tp m ()
pushChanList (Job -> Action
Add Job
job')

  where fn :: FuncName
fn = Job -> FuncName
getFuncName Job
job
        jn :: JobName
jn = Job -> JobName
getName Job
job


fixedSchedAt :: MonadIO m => Job -> SchedT db tp m Job
fixedSchedAt :: Job -> SchedT db tp m Job
fixedSchedAt job :: Job
job = do
  Int64
now <- SchedT db tp m Int64
forall (m :: * -> *). MonadIO m => m Int64
getEpochTime
  if Job -> Int64
getSchedAt Job
job Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
< Int64
now then
    Job -> SchedT db tp m Job
forall (m :: * -> *) a. Monad m => a -> m a
return (Job -> SchedT db tp m Job) -> Job -> SchedT db tp m Job
forall a b. (a -> b) -> a -> b
$ Int64 -> Job -> Job
setSchedAt Int64
now Job
job
  else Job -> SchedT db tp m Job
forall (m :: * -> *) a. Monad m => a -> m a
return Job
job

reSchedJob :: (MonadUnliftIO m, Persist db, Transport tp) => TaskList -> Job -> SchedT db tp m ()
reSchedJob :: IOHashMap JobHandle (Int64, Async ()) -> Job -> SchedT db tp m ()
reSchedJob taskList :: IOHashMap JobHandle (Int64, Async ())
taskList job :: Job
job = do
  Maybe (Async ())
w <- IOHashMap JobHandle (Int64, Async ())
-> Job -> SchedT db tp m (Maybe (Async ()))
forall (m :: * -> *) db tp.
MonadIO m =>
IOHashMap JobHandle (Int64, Async ())
-> Job -> SchedT db tp m (Maybe (Async ()))
findTask IOHashMap JobHandle (Int64, Async ())
taskList Job
job
  Maybe (Async ())
-> (Async () -> SchedT db tp m ()) -> SchedT db tp m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe (Async ())
w Async () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
cancel

  Int64
interval <- (Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+100) (Int64 -> Int64) -> SchedT db tp m Int64 -> SchedT db tp m Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SchedT db tp m Int64
forall (m :: * -> *) a db tp.
(MonadIO m, Num a) =>
SchedT db tp m a
pollInterval
  Int64
next <- (Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int64
interval) (Int64 -> Int64) -> SchedT db tp m Int64 -> SchedT db tp m Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SchedT db tp m Int64
forall (m :: * -> *). MonadIO m => m Int64
getEpochTime
  Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Job -> Int64
getSchedAt Job
job Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
< Int64
next) (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ do
    Bool
r <- FuncName -> SchedT db tp m Bool
forall (m :: * -> *) db tp.
MonadIO m =>
FuncName -> SchedT db tp m Bool
canRun (FuncName -> SchedT db tp m Bool)
-> FuncName -> SchedT db tp m Bool
forall a b. (a -> b) -> a -> b
$ Job -> FuncName
getFuncName Job
job
    Bool
c <- IOHashMap JobHandle (Int64, Async ()) -> SchedT db tp m Bool
forall (m :: * -> *) db tp.
MonadIO m =>
IOHashMap JobHandle (Int64, Async ()) -> SchedT db tp m Bool
check IOHashMap JobHandle (Int64, Async ())
taskList
    Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool
r Bool -> Bool -> Bool
&& Bool
c) (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ do
      Async ()
w' <- IOHashMap JobHandle (Int64, Async ())
-> Job -> SchedT db tp m (Async ())
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db, Transport tp) =>
IOHashMap JobHandle (Int64, Async ())
-> Job -> SchedT db tp m (Async ())
schedJob IOHashMap JobHandle (Int64, Async ())
taskList Job
job
      IOHashMap JobHandle (Int64, Async ())
-> JobHandle -> (Int64, Async ()) -> SchedT db tp m ()
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> b -> m ()
FL.insert IOHashMap JobHandle (Int64, Async ())
taskList (Job -> JobHandle
getHandle Job
job) (Job -> Int64
getSchedAt Job
job, Async ()
w')
  where check :: (MonadIO m) => TaskList -> SchedT db tp m Bool
        check :: IOHashMap JobHandle (Int64, Async ()) -> SchedT db tp m Bool
check tl :: IOHashMap JobHandle (Int64, Async ())
tl = do
          Int
maxBatchSize <- TVar Int -> SchedT db tp m Int
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (TVar Int -> SchedT db tp m Int)
-> SchedT db tp m (TVar Int) -> SchedT db tp m Int
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (SchedEnv db tp -> TVar Int) -> SchedT db tp m (TVar Int)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> TVar Int
forall db tp. SchedEnv db tp -> TVar Int
sMaxBatchSize
          Int
size <- IOHashMap JobHandle (Int64, Async ()) -> SchedT db tp m Int
forall (m :: * -> *) a b. MonadIO m => IOHashMap a b -> m Int
FL.size IOHashMap JobHandle (Int64, Async ())
tl
          if Int
size Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
maxBatchSize Int -> Int -> Int
forall a. Num a => a -> a -> a
* 2 then Bool -> SchedT db tp m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
          else do
            Maybe (Int64, JobHandle, Async ())
lastTask <- IOHashMap JobHandle (Int64, Async ())
-> SchedT db tp m (Maybe (Int64, JobHandle, Async ()))
forall (m :: * -> *) db tp.
MonadIO m =>
IOHashMap JobHandle (Int64, Async ())
-> SchedT db tp m (Maybe (Int64, JobHandle, Async ()))
findLastTask IOHashMap JobHandle (Int64, Async ())
tl
            case Maybe (Int64, JobHandle, Async ())
lastTask of
              Nothing -> Bool -> SchedT db tp m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
              Just (sc :: Int64
sc, jh :: JobHandle
jh, w :: Async ()
w) ->
                if Int64
sc Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
< Job -> Int64
getSchedAt Job
job then Bool -> SchedT db tp m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
                else do
                  Async () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
cancel Async ()
w
                  IOHashMap JobHandle (Int64, Async ())
-> JobHandle -> SchedT db tp m ()
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> m ()
FL.delete IOHashMap JobHandle (Int64, Async ())
taskList JobHandle
jh
                  Bool -> SchedT db tp m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True

findTask :: MonadIO m => TaskList -> Job -> SchedT db tp m (Maybe (Async ()))
findTask :: IOHashMap JobHandle (Int64, Async ())
-> Job -> SchedT db tp m (Maybe (Async ()))
findTask taskList :: IOHashMap JobHandle (Int64, Async ())
taskList job :: Job
job = ((Int64, Async ()) -> Async ())
-> Maybe (Int64, Async ()) -> Maybe (Async ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int64, Async ()) -> Async ()
forall a b. (a, b) -> b
snd (Maybe (Int64, Async ()) -> Maybe (Async ()))
-> SchedT db tp m (Maybe (Int64, Async ()))
-> SchedT db tp m (Maybe (Async ()))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IOHashMap JobHandle (Int64, Async ())
-> JobHandle -> SchedT db tp m (Maybe (Int64, Async ()))
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> m (Maybe b)
FL.lookup IOHashMap JobHandle (Int64, Async ())
taskList (Job -> JobHandle
getHandle Job
job)

findLastTask :: MonadIO m => TaskList -> SchedT db tp m (Maybe (Int64, JobHandle, Async ()))
findLastTask :: IOHashMap JobHandle (Int64, Async ())
-> SchedT db tp m (Maybe (Int64, JobHandle, Async ()))
findLastTask tl :: IOHashMap JobHandle (Int64, Async ())
tl = STM (Maybe (Int64, JobHandle, Async ()))
-> SchedT db tp m (Maybe (Int64, JobHandle, Async ()))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (Maybe (Int64, JobHandle, Async ()))
 -> SchedT db tp m (Maybe (Int64, JobHandle, Async ())))
-> STM (Maybe (Int64, JobHandle, Async ()))
-> SchedT db tp m (Maybe (Int64, JobHandle, Async ()))
forall a b. (a -> b) -> a -> b
$ IOHashMap JobHandle (Int64, Async ())
-> (JobHandle
    -> (Int64, Async ())
    -> Maybe (Int64, JobHandle, Async ())
    -> Maybe (Int64, JobHandle, Async ()))
-> Maybe (Int64, JobHandle, Async ())
-> STM (Maybe (Int64, JobHandle, Async ()))
forall a b c. IOHashMap a b -> (a -> b -> c -> c) -> c -> STM c
FL.foldrWithKeySTM IOHashMap JobHandle (Int64, Async ())
tl JobHandle
-> (Int64, Async ())
-> Maybe (Int64, JobHandle, Async ())
-> Maybe (Int64, JobHandle, Async ())
forall a.
JobHandle
-> (Int64, a)
-> Maybe (Int64, JobHandle, a)
-> Maybe (Int64, JobHandle, a)
f Maybe (Int64, JobHandle, Async ())
forall a. Maybe a
Nothing
  where f :: JobHandle
          -> (Int64, a)
          -> Maybe (Int64, JobHandle, a)
          -> Maybe (Int64, JobHandle, a)
        f :: JobHandle
-> (Int64, a)
-> Maybe (Int64, JobHandle, a)
-> Maybe (Int64, JobHandle, a)
f jh :: JobHandle
jh (sc :: Int64
sc, t :: a
t) Nothing = (Int64, JobHandle, a) -> Maybe (Int64, JobHandle, a)
forall a. a -> Maybe a
Just (Int64
sc, JobHandle
jh, a
t)
        f jh :: JobHandle
jh (sc :: Int64
sc, t :: a
t) (Just (sc1 :: Int64
sc1, jh1 :: JobHandle
jh1, t1 :: a
t1))
          | Int64
sc Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
> Int64
sc1 = (Int64, JobHandle, a) -> Maybe (Int64, JobHandle, a)
forall a. a -> Maybe a
Just (Int64
sc, JobHandle
jh, a
t)
          | Bool
otherwise = (Int64, JobHandle, a) -> Maybe (Int64, JobHandle, a)
forall a. a -> Maybe a
Just (Int64
sc1, JobHandle
jh1, a
t1)

canRun :: MonadIO m => FuncName -> SchedT db tp m Bool
canRun :: FuncName -> SchedT db tp m Bool
canRun fn :: FuncName
fn = (SchedEnv db tp -> FuncStatList) -> SchedT db tp m FuncStatList
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> FuncStatList
forall db tp. SchedEnv db tp -> FuncStatList
sFuncStatList SchedT db tp m FuncStatList
-> (FuncStatList -> SchedT db tp m Bool) -> SchedT db tp m Bool
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (FuncStatList -> FuncName -> SchedT db tp m Bool)
-> FuncName -> FuncStatList -> SchedT db tp m Bool
forall a b c. (a -> b -> c) -> b -> a -> c
flip FuncStatList -> FuncName -> SchedT db tp m Bool
forall (m :: * -> *).
MonadIO m =>
FuncStatList -> FuncName -> m Bool
canRun_ FuncName
fn

canRun_ :: MonadIO m => FuncStatList -> FuncName -> m Bool
canRun_ :: FuncStatList -> FuncName -> m Bool
canRun_ stList :: FuncStatList
stList fn :: FuncName
fn = do
  Maybe FuncStat
st0 <- FuncStatList -> FuncName -> m (Maybe FuncStat)
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> m (Maybe b)
FL.lookup FuncStatList
stList FuncName
fn
  case Maybe FuncStat
st0 of
    Nothing                  -> Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
    Just FuncStat{sWorker :: FuncStat -> Int64
sWorker=Int64
0} -> Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
    Just _                   -> Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True

schedJob
  :: (MonadUnliftIO m, Persist db, Transport tp)
  => TaskList -> Job -> SchedT db tp m (Async ())
schedJob :: IOHashMap JobHandle (Int64, Async ())
-> Job -> SchedT db tp m (Async ())
schedJob taskList :: IOHashMap JobHandle (Int64, Async ())
taskList = SchedT db tp m () -> SchedT db tp m (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (SchedT db tp m () -> SchedT db tp m (Async ()))
-> (Job -> SchedT db tp m ()) -> Job -> SchedT db tp m (Async ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IOHashMap JobHandle (Int64, Async ()) -> Job -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db, Transport tp) =>
IOHashMap JobHandle (Int64, Async ()) -> Job -> SchedT db tp m ()
schedJob_ IOHashMap JobHandle (Int64, Async ())
taskList

schedJob_ :: (MonadUnliftIO m, Persist db, Transport tp) => TaskList -> Job -> SchedT db tp m ()
schedJob_ :: IOHashMap JobHandle (Int64, Async ()) -> Job -> SchedT db tp m ()
schedJob_ taskList :: IOHashMap JobHandle (Int64, Async ())
taskList job :: Job
job = do
  SchedEnv{..} <- SchedT db tp m (SchedEnv db tp)
forall r (m :: * -> *). MonadReader r m => m r
ask
  Bool
r <- FuncName -> SchedT db tp m Bool
forall (m :: * -> *) db tp.
MonadIO m =>
FuncName -> SchedT db tp m Bool
canRun FuncName
fn
  Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
r (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ do
    Int64
now <- SchedT db tp m Int64
forall (m :: * -> *). MonadIO m => m Int64
getEpochTime
    Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int64
schedAt Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
> Int64
now Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ 1) (SchedT db tp m () -> SchedT db tp m ())
-> (Int64 -> SchedT db tp m ()) -> Int64 -> SchedT db tp m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> SchedT db tp m ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay (Int -> SchedT db tp m ())
-> (Int64 -> Int) -> Int64 -> SchedT db tp m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> SchedT db tp m ()) -> Int64 -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ (Int64
schedAt Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
now) Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
* 1000000
    FuncStat{..} <- STM FuncStat -> SchedT db tp m FuncStat
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM FuncStat -> SchedT db tp m FuncStat)
-> STM FuncStat -> SchedT db tp m FuncStat
forall a b. (a -> b) -> a -> b
$ do
      Maybe FuncStat
st <- FuncStatList -> FuncName -> STM (Maybe FuncStat)
forall a b.
(Eq a, Hashable a) =>
IOHashMap a b -> a -> STM (Maybe b)
FL.lookupSTM FuncStatList
sFuncStatList FuncName
fn
      case Maybe FuncStat
st of
        Nothing                  -> STM FuncStat
forall a. STM a
retrySTM
        Just FuncStat{sWorker :: FuncStat -> Int64
sWorker=Int64
0} -> STM FuncStat
forall a. STM a
retrySTM
        Just st' :: FuncStat
st'                 -> FuncStat -> STM FuncStat
forall (f :: * -> *) a. Applicative f => a -> f a
pure FuncStat
st'
    if Bool
sBroadcast then SchedT db tp m ()
forall (m :: * -> *) tp db.
(MonadUnliftIO m, Transport tp) =>
SchedT db tp m ()
popAgentListThen
                  else IOHashMap JobHandle (Int64, Async ()) -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db, Transport tp) =>
IOHashMap JobHandle (Int64, Async ()) -> SchedT db tp m ()
popAgentThen IOHashMap JobHandle (Int64, Async ())
taskList

  where fn :: FuncName
fn = Job -> FuncName
getFuncName Job
job
        jn :: JobName
jn = Job -> JobName
getName Job
job
        schedAt :: Int64
schedAt = Job -> Int64
getSchedAt Job
job
        jh :: JobHandle
jh = Job -> JobHandle
getHandle Job
job

        popAgentThen
          :: (MonadUnliftIO m, Persist db, Transport tp) => TaskList -> SchedT db tp m ()
        popAgentThen :: IOHashMap JobHandle (Int64, Async ()) -> SchedT db tp m ()
popAgentThen tl :: IOHashMap JobHandle (Int64, Async ())
tl = do
          SchedEnv{..} <- SchedT db tp m (SchedEnv db tp)
forall r (m :: * -> *). MonadReader r m => m r
ask
          (jq :: IOList JobHandle
jq, env0 :: CSEnv tp
env0) <- STM (IOList JobHandle, CSEnv tp)
-> SchedT db tp m (IOList JobHandle, CSEnv tp)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (IOList JobHandle, CSEnv tp)
 -> SchedT db tp m (IOList JobHandle, CSEnv tp))
-> STM (IOList JobHandle, CSEnv tp)
-> SchedT db tp m (IOList JobHandle, CSEnv tp)
forall a b. (a -> b) -> a -> b
$ GrabQueue tp -> FuncName -> STM (IOList JobHandle, CSEnv tp)
forall tp.
GrabQueue tp -> FuncName -> STM (IOList JobHandle, CSEnv tp)
popAgentSTM GrabQueue tp
sGrabQueue FuncName
fn
          Bool
alive <- CSEnv tp
-> SessionT
     ClientConfig Nid Msgid (Packet Command) tp (SchedT db tp m) Bool
-> SchedT db tp m Bool
forall u nid k rpkt tp (m :: * -> *) a.
SessionEnv1 u nid k rpkt tp -> SessionT u nid k rpkt tp m a -> m a
runSessionT1 CSEnv tp
env0 SessionT
  ClientConfig Nid Msgid (Packet Command) tp (SchedT db tp m) Bool
forall (m :: * -> *) u nid k rpkt tp.
MonadIO m =>
SessionT u nid k rpkt tp m Bool
sessionState
          if Bool
alive then do
            IOList JobHandle -> JobHandle -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IOList a -> a -> m ()
IL.insert IOList JobHandle
jq (Job -> JobHandle
getHandle Job
job)
            Int64
nextSchedAt <- SchedT db tp m Int64
forall (m :: * -> *). MonadIO m => m Int64
getEpochTime
            IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ db -> State -> FuncName -> JobName -> Job -> IO ()
forall db.
Persist db =>
db -> State -> FuncName -> JobName -> Job -> IO ()
P.insert db
sPersist State
Running FuncName
fn JobName
jn (Job -> IO ()) -> Job -> IO ()
forall a b. (a -> b) -> a -> b
$ Int64 -> Job -> Job
setSchedAt Int64
nextSchedAt Job
job
            Either SomeException ()
r <- CSEnv tp -> SchedT db tp m (Either SomeException ())
forall (m :: * -> *) tp db.
(MonadUnliftIO m, Transport tp) =>
CSEnv tp -> SchedT db tp m (Either SomeException ())
doSubmitJob CSEnv tp
env0
            case Either SomeException ()
r of
              Left _ -> do
                IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ db -> State -> FuncName -> JobName -> Job -> IO ()
forall db.
Persist db =>
db -> State -> FuncName -> JobName -> Job -> IO ()
P.insert db
sPersist State
Pending FuncName
fn JobName
jn (Job -> IO ()) -> Job -> IO ()
forall a b. (a -> b) -> a -> b
$ Int64 -> Job -> Job
setSchedAt Int64
nextSchedAt Job
job
                IOList JobHandle -> JobHandle -> SchedT db tp m ()
forall a (m :: * -> *). (Eq a, MonadIO m) => IOList a -> a -> m ()
IL.delete IOList JobHandle
jq JobHandle
jh
                IOHashMap JobHandle (Int64, Async ()) -> Job -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db, Transport tp) =>
IOHashMap JobHandle (Int64, Async ()) -> Job -> SchedT db tp m ()
schedJob_ IOHashMap JobHandle (Int64, Async ())
tl Job
job
              Right _ -> SchedT db tp m ()
forall (m :: * -> *) db tp. MonadIO m => SchedT db tp m ()
endSchedJob
          else IOHashMap JobHandle (Int64, Async ()) -> Job -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db, Transport tp) =>
IOHashMap JobHandle (Int64, Async ()) -> Job -> SchedT db tp m ()
schedJob_ IOHashMap JobHandle (Int64, Async ())
tl Job
job

        popAgentListThen :: (MonadUnliftIO m, Transport tp) => SchedT db tp m ()
        popAgentListThen :: SchedT db tp m ()
popAgentListThen = do
          SchedEnv{..} <- SchedT db tp m (SchedEnv db tp)
forall r (m :: * -> *). MonadReader r m => m r
ask
          [(IOList JobHandle, CSEnv tp)]
agents <- GrabQueue tp
-> FuncName -> SchedT db tp m [(IOList JobHandle, CSEnv tp)]
forall (m :: * -> *) tp.
MonadIO m =>
GrabQueue tp -> FuncName -> m [(IOList JobHandle, CSEnv tp)]
popAgentList GrabQueue tp
sGrabQueue FuncName
fn
          ((IOList JobHandle, CSEnv tp)
 -> SchedT db tp m (Either SomeException ()))
-> [(IOList JobHandle, CSEnv tp)] -> SchedT db tp m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (CSEnv tp -> SchedT db tp m (Either SomeException ())
forall (m :: * -> *) tp db.
(MonadUnliftIO m, Transport tp) =>
CSEnv tp -> SchedT db tp m (Either SomeException ())
doSubmitJob (CSEnv tp -> SchedT db tp m (Either SomeException ()))
-> ((IOList JobHandle, CSEnv tp) -> CSEnv tp)
-> (IOList JobHandle, CSEnv tp)
-> SchedT db tp m (Either SomeException ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (IOList JobHandle, CSEnv tp) -> CSEnv tp
forall a b. (a, b) -> b
snd) [(IOList JobHandle, CSEnv tp)]
agents
          Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([(IOList JobHandle, CSEnv tp)] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(IOList JobHandle, CSEnv tp)]
agents) SchedT db tp m ()
forall (m :: * -> *) db tp. MonadIO m => SchedT db tp m ()
endSchedJob -- wait to resched the broadcast job

        doSubmitJob :: (MonadUnliftIO m, Transport tp) => CSEnv tp -> SchedT db tp m (Either SomeException ())
        doSubmitJob :: CSEnv tp -> SchedT db tp m (Either SomeException ())
doSubmitJob agent :: CSEnv tp
agent = do
          SchedEnv{..} <- SchedT db tp m (SchedEnv db tp)
forall r (m :: * -> *). MonadReader r m => m r
ask
          SchedT db tp m () -> SchedT db tp m (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (SchedT db tp m () -> SchedT db tp m (Either SomeException ()))
-> SchedT db tp m () -> SchedT db tp m (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ CSEnv tp -> Job -> SchedT db tp m ()
forall (m :: * -> *) tp.
(MonadUnliftIO m, Transport tp) =>
CSEnv tp -> Job -> m ()
assignJob CSEnv tp
agent Job
job

        endSchedJob :: MonadIO m => SchedT db tp m ()
        endSchedJob :: SchedT db tp m ()
endSchedJob = Action -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
Action -> SchedT db tp m ()
pushChanList (JobHandle -> Action
TryPoll JobHandle
jh)

adjustFuncStat :: (MonadIO m, Persist db) => FuncName -> SchedT db tp m ()
adjustFuncStat :: FuncName -> SchedT db tp m ()
adjustFuncStat fn :: FuncName
fn = do
  SchedEnv{..} <- SchedT db tp m (SchedEnv db tp)
forall r (m :: * -> *). MonadReader r m => m r
ask
  Int64
size <- IO Int64 -> SchedT db tp m Int64
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int64 -> SchedT db tp m Int64)
-> IO Int64 -> SchedT db tp m Int64
forall a b. (a -> b) -> a -> b
$ db -> State -> FuncName -> IO Int64
forall db. Persist db => db -> State -> FuncName -> IO Int64
P.size db
sPersist State
Pending FuncName
fn
  Int64
sizePQ <- IO Int64 -> SchedT db tp m Int64
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int64 -> SchedT db tp m Int64)
-> IO Int64 -> SchedT db tp m Int64
forall a b. (a -> b) -> a -> b
$ db -> State -> FuncName -> IO Int64
forall db. Persist db => db -> State -> FuncName -> IO Int64
P.size db
sPersist State
Running FuncName
fn
  Int64
sizeL <- IO Int64 -> SchedT db tp m Int64
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int64 -> SchedT db tp m Int64)
-> IO Int64 -> SchedT db tp m Int64
forall a b. (a -> b) -> a -> b
$ db -> State -> FuncName -> IO Int64
forall db. Persist db => db -> State -> FuncName -> IO Int64
P.size db
sPersist State
Locking FuncName
fn
  Int64
sc <- IO Int64 -> SchedT db tp m Int64
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int64 -> SchedT db tp m Int64)
-> IO Int64 -> SchedT db tp m Int64
forall a b. (a -> b) -> a -> b
$ db -> FuncName -> IO Int64
forall db. Persist db => db -> FuncName -> IO Int64
P.minSchedAt db
sPersist FuncName
fn

  Int64
schedAt <- if Int64
sc Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
> 0 then Int64 -> SchedT db tp m Int64
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
sc else SchedT db tp m Int64
forall (m :: * -> *). MonadIO m => m Int64
getEpochTime

  FuncStatList
-> (Maybe FuncStat -> Maybe FuncStat)
-> FuncName
-> SchedT db tp m ()
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> (Maybe b -> Maybe b) -> a -> m ()
FL.alter FuncStatList
sFuncStatList (Int64
-> Int64 -> Int64 -> Int64 -> Maybe FuncStat -> Maybe FuncStat
update (Int64
size Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int64
sizePQ Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int64
sizeL) Int64
sizePQ Int64
sizeL Int64
schedAt) FuncName
fn

  where update :: Int64 -> Int64 -> Int64 -> Int64 -> Maybe FuncStat -> Maybe FuncStat
        update :: Int64
-> Int64 -> Int64 -> Int64 -> Maybe FuncStat -> Maybe FuncStat
update size :: Int64
size sizePQ :: Int64
sizePQ sizeL :: Int64
sizeL schedAt :: Int64
schedAt st :: Maybe FuncStat
st =
          FuncStat -> Maybe FuncStat
forall a. a -> Maybe a
Just ((FuncStat -> Maybe FuncStat -> FuncStat
forall a. a -> Maybe a -> a
fromMaybe (FuncName -> FuncStat
funcStat FuncName
fn) Maybe FuncStat
st) { sJob :: Int64
sJob = Int64
size
                                             , sRunning :: Int64
sRunning = Int64
sizePQ
                                             , sLocking :: Int64
sLocking = Int64
sizeL
                                             , sSchedAt :: Int64
sSchedAt = Int64
schedAt
                                             })

removeJob :: (MonadIO m, Persist db) => Job -> SchedT db tp m ()
removeJob :: Job -> SchedT db tp m ()
removeJob job :: Job
job = do
  IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
infoM "Periodic.Server.Scheduler" ("removeJob: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ JobHandle -> String
forall a. Show a => a -> String
show (Job -> JobHandle
getHandle Job
job))
  db
p <- (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
  IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ db -> FuncName -> JobName -> IO ()
forall db. Persist db => db -> FuncName -> JobName -> IO ()
P.delete db
p FuncName
fn JobName
jn

  Action -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
Action -> SchedT db tp m ()
pushChanList (Job -> Action
Remove Job
job)
  JobHandle -> ByteString -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
JobHandle -> ByteString -> SchedT db tp m ()
pushResult JobHandle
jh ""
  where jn :: JobName
jn = Job -> JobName
getName Job
job
        fn :: FuncName
fn = Job -> FuncName
getFuncName Job
job
        jh :: JobHandle
jh = Job -> JobHandle
getHandle Job
job

dumpJob :: (MonadIO m, Persist db) => SchedT db tp m [Job]
dumpJob :: SchedT db tp m [Job]
dumpJob = IO [Job] -> SchedT db tp m [Job]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [Job] -> SchedT db tp m [Job])
-> (db -> IO [Job]) -> db -> SchedT db tp m [Job]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. db -> IO [Job]
forall db. Persist db => db -> IO [Job]
P.dumpJob (db -> SchedT db tp m [Job])
-> SchedT db tp m db -> SchedT db tp m [Job]
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist

alterFunc :: (MonadIO m, Persist db) => FuncName -> (Maybe FuncStat -> Maybe FuncStat) -> SchedT db tp m ()
alterFunc :: FuncName -> (Maybe FuncStat -> Maybe FuncStat) -> SchedT db tp m ()
alterFunc n :: FuncName
n f :: Maybe FuncStat -> Maybe FuncStat
f = do
  SchedEnv{..} <- SchedT db tp m (SchedEnv db tp)
forall r (m :: * -> *). MonadReader r m => m r
ask
  FuncStatList
-> (Maybe FuncStat -> Maybe FuncStat)
-> FuncName
-> SchedT db tp m ()
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> (Maybe b -> Maybe b) -> a -> m ()
FL.alter FuncStatList
sFuncStatList Maybe FuncStat -> Maybe FuncStat
f FuncName
n
  IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ db -> FuncName -> IO ()
forall db. Persist db => db -> FuncName -> IO ()
P.insertFuncName db
sPersist FuncName
n
  Action -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
Action -> SchedT db tp m ()
pushChanList Action
PollJob

addFunc :: (MonadIO m, Persist db) => FuncName -> SchedT db tp m ()
addFunc :: FuncName -> SchedT db tp m ()
addFunc n :: FuncName
n = FuncName -> Bool -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
FuncName -> Bool -> SchedT db tp m ()
broadcastFunc FuncName
n Bool
False

broadcastFunc :: (MonadIO m, Persist db) => FuncName -> Bool -> SchedT db tp m ()
broadcastFunc :: FuncName -> Bool -> SchedT db tp m ()
broadcastFunc n :: FuncName
n cast :: Bool
cast = do
  IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
infoM "Periodic.Server.Scheduler" (String
h String -> String -> String
forall a. [a] -> [a] -> [a]
++ ": " String -> String -> String
forall a. [a] -> [a] -> [a]
++ FuncName -> String
forall a. Show a => a -> String
show FuncName
n)
  FuncName -> (Maybe FuncStat -> Maybe FuncStat) -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
FuncName -> (Maybe FuncStat -> Maybe FuncStat) -> SchedT db tp m ()
alterFunc FuncName
n Maybe FuncStat -> Maybe FuncStat
updateStat

  where updateStat :: Maybe FuncStat -> Maybe FuncStat
        updateStat :: Maybe FuncStat -> Maybe FuncStat
updateStat Nothing   = FuncStat -> Maybe FuncStat
forall a. a -> Maybe a
Just ((FuncName -> FuncStat
funcStat FuncName
n) {sWorker :: Int64
sWorker = 1, sBroadcast :: Bool
sBroadcast = Bool
cast})
        updateStat (Just fs :: FuncStat
fs) = FuncStat -> Maybe FuncStat
forall a. a -> Maybe a
Just (FuncStat
fs { sWorker :: Int64
sWorker = FuncStat -> Int64
sWorker FuncStat
fs Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ 1, sBroadcast :: Bool
sBroadcast = Bool
cast })

        h :: String
h = if Bool
cast then "broadcastFunc" else "addFunc"

removeFunc :: (MonadIO m, Persist db) => FuncName -> SchedT db tp m ()
removeFunc :: FuncName -> SchedT db tp m ()
removeFunc n :: FuncName
n = do
  IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
infoM "Periodic.Server.Scheduler" ("removeFunc: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ FuncName -> String
forall a. Show a => a -> String
show FuncName
n)
  FuncName -> (Maybe FuncStat -> Maybe FuncStat) -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
FuncName -> (Maybe FuncStat -> Maybe FuncStat) -> SchedT db tp m ()
alterFunc FuncName
n Maybe FuncStat -> Maybe FuncStat
updateStat

  where updateStat :: Maybe FuncStat -> Maybe FuncStat
        updateStat :: Maybe FuncStat -> Maybe FuncStat
updateStat Nothing   = FuncStat -> Maybe FuncStat
forall a. a -> Maybe a
Just (FuncName -> FuncStat
funcStat FuncName
n)
        updateStat (Just fs :: FuncStat
fs) = FuncStat -> Maybe FuncStat
forall a. a -> Maybe a
Just (FuncStat
fs { sWorker :: Int64
sWorker = Int64 -> Int64 -> Int64
forall a. Ord a => a -> a -> a
max (FuncStat -> Int64
sWorker FuncStat
fs Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- 1) 0 })

dropFunc :: (MonadUnliftIO m, Persist db) => FuncName -> SchedT db tp m ()
dropFunc :: FuncName -> SchedT db tp m ()
dropFunc n :: FuncName
n = do
  IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
infoM "Periodic.Server.Scheduler" ("dropFunc: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ FuncName -> String
forall a. Show a => a -> String
show FuncName
n)
  SchedEnv{..} <- SchedT db tp m (SchedEnv db tp)
forall r (m :: * -> *). MonadReader r m => m r
ask
  Lock -> SchedT db tp m () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadUnliftIO m => Lock -> m a -> m a
L.with Lock
sLocker (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ do
    Maybe FuncStat
st <- FuncStatList -> FuncName -> SchedT db tp m (Maybe FuncStat)
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> m (Maybe b)
FL.lookup FuncStatList
sFuncStatList FuncName
n
    case Maybe FuncStat
st of
      Just FuncStat{sWorker :: FuncStat -> Int64
sWorker=Int64
0} -> do
        FuncStatList -> FuncName -> SchedT db tp m ()
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> m ()
FL.delete FuncStatList
sFuncStatList FuncName
n
        IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ db -> FuncName -> IO ()
forall db. Persist db => db -> FuncName -> IO ()
P.removeFuncName db
sPersist FuncName
n
      _                        -> () -> SchedT db tp m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

  Action -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
Action -> SchedT db tp m ()
pushChanList Action
PollJob

pushGrab :: MonadIO m => IOList FuncName -> IOList JobHandle -> CSEnv tp -> SchedT db tp m ()
pushGrab :: IOList FuncName
-> IOList JobHandle -> CSEnv tp -> SchedT db tp m ()
pushGrab funcList :: IOList FuncName
funcList handleList :: IOList JobHandle
handleList ag :: CSEnv tp
ag = do
  GrabQueue tp
queue <- (SchedEnv db tp -> GrabQueue tp) -> SchedT db tp m (GrabQueue tp)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> GrabQueue tp
forall db tp. SchedEnv db tp -> GrabQueue tp
sGrabQueue
  GrabQueue tp
-> IOList FuncName
-> IOList JobHandle
-> CSEnv tp
-> SchedT db tp m ()
forall (m :: * -> *) tp.
MonadIO m =>
GrabQueue tp
-> IOList FuncName -> IOList JobHandle -> CSEnv tp -> m ()
pushAgent GrabQueue tp
queue IOList FuncName
funcList IOList JobHandle
handleList CSEnv tp
ag

assignJob :: (MonadUnliftIO m, Transport tp) => CSEnv tp -> Job -> m ()
assignJob :: CSEnv tp -> Job -> m ()
assignJob env0 :: CSEnv tp
env0 job :: Job
job = do
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
infoM "Periodic.Server.Scheduler" ("assignJob: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ JobHandle -> String
forall a. Show a => a -> String
show (Job -> JobHandle
getHandle Job
job))
  CSEnv tp
-> SessionT ClientConfig Nid Msgid (Packet Command) tp m () -> m ()
forall u nid k rpkt tp (m :: * -> *) a.
SessionEnv1 u nid k rpkt tp -> SessionT u nid k rpkt tp m a -> m a
runSessionT1 CSEnv tp
env0 (SessionT ClientConfig Nid Msgid (Packet Command) tp m () -> m ())
-> SessionT ClientConfig Nid Msgid (Packet Command) tp m () -> m ()
forall a b. (a -> b) -> a -> b
$ Packet ServerCommand
-> SessionT ClientConfig Nid Msgid (Packet Command) tp m ()
forall (m :: * -> *) tp spkt k u nid rpkt.
(MonadUnliftIO m, Transport tp, SendPacket spkt,
 SetPacketId k spkt) =>
spkt -> SessionT u nid k rpkt tp m ()
send (Packet ServerCommand
 -> SessionT ClientConfig Nid Msgid (Packet Command) tp m ())
-> Packet ServerCommand
-> SessionT ClientConfig Nid Msgid (Packet Command) tp m ()
forall a b. (a -> b) -> a -> b
$ ServerCommand -> Packet ServerCommand
forall a. a -> Packet a
packetRES (Job -> ServerCommand
JobAssign Job
job)

failJob :: (MonadUnliftIO m, Persist db) => JobHandle -> SchedT db tp m ()
failJob :: JobHandle -> SchedT db tp m ()
failJob jh :: JobHandle
jh = do
  IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
infoM "Periodic.Server.Scheduler" ("failJob: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ JobHandle -> String
forall a. Show a => a -> String
show JobHandle
jh)
  JobHandle -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db) =>
JobHandle -> SchedT db tp m ()
releaseLock' JobHandle
jh
  Bool
isWaiting <- JobHandle -> SchedT db tp m Bool
forall (m :: * -> *) db tp.
MonadIO m =>
JobHandle -> SchedT db tp m Bool
existsWaitList JobHandle
jh
  if Bool
isWaiting then do
    JobHandle -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
JobHandle -> SchedT db tp m ()
removeFromWaitList JobHandle
jh
    JobHandle -> ByteString -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db) =>
JobHandle -> ByteString -> SchedT db tp m ()
doneJob JobHandle
jh ""
  else do
    db
p <- (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
    Maybe Job
job <- IO (Maybe Job) -> SchedT db tp m (Maybe Job)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe Job) -> SchedT db tp m (Maybe Job))
-> IO (Maybe Job) -> SchedT db tp m (Maybe Job)
forall a b. (a -> b) -> a -> b
$ db -> State -> FuncName -> JobName -> IO (Maybe Job)
forall db.
Persist db =>
db -> State -> FuncName -> JobName -> IO (Maybe Job)
P.lookup db
p State
Running FuncName
fn JobName
jn
    Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe Job -> Bool
forall a. Maybe a -> Bool
isJust Maybe Job
job) (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ do
      Int64
nextSchedAt <- SchedT db tp m Int64
forall (m :: * -> *). MonadIO m => m Int64
getEpochTime
      Job -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
Job -> SchedT db tp m ()
retryJob (Job -> SchedT db tp m ()) -> Job -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ Int64 -> Job -> Job
setSchedAt Int64
nextSchedAt (Job -> Job) -> Job -> Job
forall a b. (a -> b) -> a -> b
$ Maybe Job -> Job
forall a. HasCallStack => Maybe a -> a
fromJust Maybe Job
job

  where (fn :: FuncName
fn, jn :: JobName
jn) = JobHandle -> (FuncName, JobName)
unHandle JobHandle
jh

retryJob :: (MonadIO m, Persist db) => Job -> SchedT db tp m ()
retryJob :: Job -> SchedT db tp m ()
retryJob job :: Job
job = do
  db
p <- (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
  IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ db -> State -> FuncName -> JobName -> Job -> IO ()
forall db.
Persist db =>
db -> State -> FuncName -> JobName -> Job -> IO ()
P.insert db
p State
Pending FuncName
fn JobName
jn Job
job

  Action -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
Action -> SchedT db tp m ()
pushChanList (Job -> Action
Add Job
job)

  where  fn :: FuncName
fn = Job -> FuncName
getFuncName Job
job
         jn :: JobName
jn = Job -> JobName
getName Job
job

doneJob
  :: (MonadUnliftIO m, Persist db)
  => JobHandle -> ByteString -> SchedT db tp m ()
doneJob :: JobHandle -> ByteString -> SchedT db tp m ()
doneJob jh :: JobHandle
jh w :: ByteString
w = do
  IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
infoM "Periodic.Server.Scheduler" ("doneJob: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ JobHandle -> String
forall a. Show a => a -> String
show JobHandle
jh)
  JobHandle -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db) =>
JobHandle -> SchedT db tp m ()
releaseLock' JobHandle
jh
  db
p <- (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
  IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ db -> FuncName -> JobName -> IO ()
forall db. Persist db => db -> FuncName -> JobName -> IO ()
P.delete db
p FuncName
fn JobName
jn
  JobHandle -> ByteString -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
JobHandle -> ByteString -> SchedT db tp m ()
pushResult JobHandle
jh ByteString
w
  where (fn :: FuncName
fn, jn :: JobName
jn) = JobHandle -> (FuncName, JobName)
unHandle JobHandle
jh

schedLaterJob
  :: (MonadUnliftIO m, Persist db)
  => JobHandle -> Int64 -> Int -> SchedT db tp m ()
schedLaterJob :: JobHandle -> Int64 -> Int -> SchedT db tp m ()
schedLaterJob jh :: JobHandle
jh later :: Int64
later step :: Int
step = do
  IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
infoM "Periodic.Server.Scheduler" ("schedLaterJob: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ JobHandle -> String
forall a. Show a => a -> String
show JobHandle
jh)
  JobHandle -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db) =>
JobHandle -> SchedT db tp m ()
releaseLock' JobHandle
jh
  Bool
isWaiting <- JobHandle -> SchedT db tp m Bool
forall (m :: * -> *) db tp.
MonadIO m =>
JobHandle -> SchedT db tp m Bool
existsWaitList JobHandle
jh
  if Bool
isWaiting then do
    JobHandle -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
JobHandle -> SchedT db tp m ()
removeFromWaitList JobHandle
jh
    JobHandle -> ByteString -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db) =>
JobHandle -> ByteString -> SchedT db tp m ()
doneJob JobHandle
jh ""
  else do
    db
p <- (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
    Maybe Job
job <- IO (Maybe Job) -> SchedT db tp m (Maybe Job)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe Job) -> SchedT db tp m (Maybe Job))
-> IO (Maybe Job) -> SchedT db tp m (Maybe Job)
forall a b. (a -> b) -> a -> b
$ db -> State -> FuncName -> JobName -> IO (Maybe Job)
forall db.
Persist db =>
db -> State -> FuncName -> JobName -> IO (Maybe Job)
P.lookup db
p State
Running FuncName
fn JobName
jn
    Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe Job -> Bool
forall a. Maybe a -> Bool
isJust Maybe Job
job) (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ do
      let job' :: Job
job' = Maybe Job -> Job
forall a. HasCallStack => Maybe a -> a
fromJust Maybe Job
job

      Int64
nextSchedAt <- Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
(+) Int64
later (Int64 -> Int64) -> SchedT db tp m Int64 -> SchedT db tp m Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SchedT db tp m Int64
forall (m :: * -> *). MonadIO m => m Int64
getEpochTime
      Job -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
Job -> SchedT db tp m ()
retryJob (Job -> SchedT db tp m ()) -> Job -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ Int -> Job -> Job
setCount (Job -> Int
getCount Job
job' Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
step) (Job -> Job) -> Job -> Job
forall a b. (a -> b) -> a -> b
$ Int64 -> Job -> Job
setSchedAt Int64
nextSchedAt Job
job'

  where (fn :: FuncName
fn, jn :: JobName
jn) = JobHandle -> (FuncName, JobName)
unHandle JobHandle
jh

acquireLock
  :: (MonadUnliftIO m, Persist db)
  => LockName -> Int -> JobHandle -> SchedT db tp m Bool
acquireLock :: LockName -> Int -> JobHandle -> SchedT db tp m Bool
acquireLock name :: LockName
name count :: Int
count jh :: JobHandle
jh = do
  IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
infoM "Periodic.Server.Scheduler" ("acquireLock: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ LockName -> String
forall a. Show a => a -> String
show LockName
name String -> String -> String
forall a. [a] -> [a] -> [a]
++ " " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
count String -> String -> String
forall a. [a] -> [a] -> [a]
++ " " String -> String -> String
forall a. [a] -> [a] -> [a]
++ JobHandle -> String
forall a. Show a => a -> String
show JobHandle
jh)
  Lock
locker <- (SchedEnv db tp -> Lock) -> SchedT db tp m Lock
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> Lock
forall db tp. SchedEnv db tp -> Lock
sLocker
  Lock -> SchedT db tp m Bool -> SchedT db tp m Bool
forall (m :: * -> *) a. MonadUnliftIO m => Lock -> m a -> m a
L.with Lock
locker (SchedT db tp m Bool -> SchedT db tp m Bool)
-> SchedT db tp m Bool -> SchedT db tp m Bool
forall a b. (a -> b) -> a -> b
$ do
    LockList
lockList <- (SchedEnv db tp -> LockList) -> SchedT db tp m LockList
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> LockList
forall db tp. SchedEnv db tp -> LockList
sLockList
    db
p <- (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
    Maybe Job
j <- IO (Maybe Job) -> SchedT db tp m (Maybe Job)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe Job) -> SchedT db tp m (Maybe Job))
-> IO (Maybe Job) -> SchedT db tp m (Maybe Job)
forall a b. (a -> b) -> a -> b
$ db -> State -> FuncName -> JobName -> IO (Maybe Job)
forall db.
Persist db =>
db -> State -> FuncName -> JobName -> IO (Maybe Job)
P.lookup db
p State
Running FuncName
fn JobName
jn
    case Maybe Job
j of
      Nothing -> Bool -> SchedT db tp m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
      Just job :: Job
job -> do
        Bool
r <- STM Bool -> SchedT db tp m Bool
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Bool -> SchedT db tp m Bool)
-> STM Bool -> SchedT db tp m Bool
forall a b. (a -> b) -> a -> b
$ do
          Maybe LockInfo
l <- LockList -> LockName -> STM (Maybe LockInfo)
forall a b.
(Eq a, Hashable a) =>
IOHashMap a b -> a -> STM (Maybe b)
FL.lookupSTM LockList
lockList LockName
name
          case Maybe LockInfo
l of
            Nothing -> do
              LockList -> LockName -> LockInfo -> STM ()
forall a b. (Eq a, Hashable a) => IOHashMap a b -> a -> b -> STM ()
FL.insertSTM LockList
lockList LockName
name LockInfo :: [JobHandle] -> [JobHandle] -> Int -> LockInfo
LockInfo
                { acquired :: [JobHandle]
acquired = [JobHandle
jh]
                , locked :: [JobHandle]
locked = []
                , maxCount :: Int
maxCount = Int
count
                }
              Bool -> STM Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
            Just info :: LockInfo
info@LockInfo {..} -> do
              let newCount :: Int
newCount = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
maxCount Int
count
              if JobHandle
jh JobHandle -> [JobHandle] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [JobHandle]
acquired then Bool -> STM Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
              else if JobHandle
jh JobHandle -> [JobHandle] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [JobHandle]
locked then Bool -> STM Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
              else
                if [JobHandle] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [JobHandle]
acquired Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
maxCount then do
                  LockList -> LockName -> LockInfo -> STM ()
forall a b. (Eq a, Hashable a) => IOHashMap a b -> a -> b -> STM ()
FL.insertSTM LockList
lockList LockName
name LockInfo
info
                    { acquired :: [JobHandle]
acquired = [JobHandle]
acquired [JobHandle] -> [JobHandle] -> [JobHandle]
forall a. [a] -> [a] -> [a]
++ [JobHandle
jh]
                    , maxCount :: Int
maxCount = Int
newCount
                    }
                  Bool -> STM Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
                else do
                  LockList -> LockName -> LockInfo -> STM ()
forall a b. (Eq a, Hashable a) => IOHashMap a b -> a -> b -> STM ()
FL.insertSTM LockList
lockList LockName
name LockInfo
info
                    { locked :: [JobHandle]
locked = [JobHandle]
locked [JobHandle] -> [JobHandle] -> [JobHandle]
forall a. [a] -> [a] -> [a]
++ [JobHandle
jh]
                    , maxCount :: Int
maxCount = Int
newCount
                    }
                  Bool -> STM Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False

        Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
r (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ db -> State -> FuncName -> JobName -> Job -> IO ()
forall db.
Persist db =>
db -> State -> FuncName -> JobName -> Job -> IO ()
P.insert db
p State
Locking FuncName
fn JobName
jn Job
job
        Bool -> SchedT db tp m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
r

  where (fn :: FuncName
fn, jn :: JobName
jn) = JobHandle -> (FuncName, JobName)
unHandle JobHandle
jh

releaseLock
  :: (MonadUnliftIO m, Persist db)
  => LockName -> JobHandle -> SchedT db tp m ()
releaseLock :: LockName -> JobHandle -> SchedT db tp m ()
releaseLock name :: LockName
name jh :: JobHandle
jh = do
  Lock
locker <- (SchedEnv db tp -> Lock) -> SchedT db tp m Lock
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> Lock
forall db tp. SchedEnv db tp -> Lock
sLocker
  Lock -> SchedT db tp m () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadUnliftIO m => Lock -> m a -> m a
L.with Lock
locker (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ LockName -> JobHandle -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db) =>
LockName -> JobHandle -> SchedT db tp m ()
releaseLock_ LockName
name JobHandle
jh

releaseLock_
  :: (MonadUnliftIO m, Persist db)
  => LockName -> JobHandle -> SchedT db tp m ()
releaseLock_ :: LockName -> JobHandle -> SchedT db tp m ()
releaseLock_ name :: LockName
name jh :: JobHandle
jh = do
  IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
infoM "Periodic.Server.Scheduler" ("releaseLock: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ LockName -> String
forall a. Show a => a -> String
show LockName
name String -> String -> String
forall a. [a] -> [a] -> [a]
++ " " String -> String -> String
forall a. [a] -> [a] -> [a]
++ JobHandle -> String
forall a. Show a => a -> String
show JobHandle
jh)
  db
p <- (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
  LockList
lockList <- (SchedEnv db tp -> LockList) -> SchedT db tp m LockList
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> LockList
forall db tp. SchedEnv db tp -> LockList
sLockList
  Maybe JobHandle
h <- STM (Maybe JobHandle) -> SchedT db tp m (Maybe JobHandle)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (Maybe JobHandle) -> SchedT db tp m (Maybe JobHandle))
-> STM (Maybe JobHandle) -> SchedT db tp m (Maybe JobHandle)
forall a b. (a -> b) -> a -> b
$ do
    Maybe LockInfo
l <- LockList -> LockName -> STM (Maybe LockInfo)
forall a b.
(Eq a, Hashable a) =>
IOHashMap a b -> a -> STM (Maybe b)
FL.lookupSTM LockList
lockList LockName
name
    case Maybe LockInfo
l of
      Nothing -> Maybe JobHandle -> STM (Maybe JobHandle)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe JobHandle
forall a. Maybe a
Nothing
      Just info :: LockInfo
info@LockInfo {..} ->
        if JobHandle
jh JobHandle -> [JobHandle] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [JobHandle]
acquired then
          case [JobHandle]
locked of
            [] -> do
              LockList -> LockName -> LockInfo -> STM ()
forall a b. (Eq a, Hashable a) => IOHashMap a b -> a -> b -> STM ()
FL.insertSTM LockList
lockList LockName
name LockInfo
info
                { acquired :: [JobHandle]
acquired = JobHandle -> [JobHandle] -> [JobHandle]
forall a. Eq a => a -> [a] -> [a]
L.delete JobHandle
jh [JobHandle]
acquired
                }
              Maybe JobHandle -> STM (Maybe JobHandle)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe JobHandle
forall a. Maybe a
Nothing
            x :: JobHandle
x:xs :: [JobHandle]
xs -> do
              LockList -> LockName -> LockInfo -> STM ()
forall a b. (Eq a, Hashable a) => IOHashMap a b -> a -> b -> STM ()
FL.insertSTM LockList
lockList LockName
name LockInfo
info
                { acquired :: [JobHandle]
acquired = JobHandle -> [JobHandle] -> [JobHandle]
forall a. Eq a => a -> [a] -> [a]
L.delete JobHandle
jh [JobHandle]
acquired
                , locked :: [JobHandle]
locked   = [JobHandle]
xs
                }
              Maybe JobHandle -> STM (Maybe JobHandle)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe JobHandle -> STM (Maybe JobHandle))
-> Maybe JobHandle -> STM (Maybe JobHandle)
forall a b. (a -> b) -> a -> b
$ JobHandle -> Maybe JobHandle
forall a. a -> Maybe a
Just JobHandle
x
        else Maybe JobHandle -> STM (Maybe JobHandle)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe JobHandle
forall a. Maybe a
Nothing

  case Maybe JobHandle
h of
    Nothing -> () -> SchedT db tp m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Just hh :: JobHandle
hh -> do
      let (fn :: FuncName
fn, jn :: JobName
jn) = JobHandle -> (FuncName, JobName)
unHandle JobHandle
hh
      Maybe Job
j <- IO (Maybe Job) -> SchedT db tp m (Maybe Job)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe Job) -> SchedT db tp m (Maybe Job))
-> IO (Maybe Job) -> SchedT db tp m (Maybe Job)
forall a b. (a -> b) -> a -> b
$ db -> State -> FuncName -> JobName -> IO (Maybe Job)
forall db.
Persist db =>
db -> State -> FuncName -> JobName -> IO (Maybe Job)
P.lookup db
p State
Locking FuncName
fn JobName
jn
      case Maybe Job
j of
        Nothing  -> LockName -> JobHandle -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db) =>
LockName -> JobHandle -> SchedT db tp m ()
releaseLock_ LockName
name JobHandle
hh
        Just job :: Job
job -> do
          IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ db -> State -> FuncName -> JobName -> Job -> IO ()
forall db.
Persist db =>
db -> State -> FuncName -> JobName -> Job -> IO ()
P.insert db
p State
Pending FuncName
fn JobName
jn Job
job
          Action -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
Action -> SchedT db tp m ()
pushChanList (Job -> Action
Add Job
job)

releaseLock'
  :: (MonadUnliftIO m, Persist db)
  => JobHandle -> SchedT db tp m ()
releaseLock' :: JobHandle -> SchedT db tp m ()
releaseLock' jh :: JobHandle
jh = do
  LockList
lockList <- (SchedEnv db tp -> LockList) -> SchedT db tp m LockList
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> LockList
forall db tp. SchedEnv db tp -> LockList
sLockList
  [LockName]
names <- STM [LockName] -> SchedT db tp m [LockName]
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM [LockName] -> SchedT db tp m [LockName])
-> STM [LockName] -> SchedT db tp m [LockName]
forall a b. (a -> b) -> a -> b
$ LockList
-> (LockName -> LockInfo -> [LockName] -> [LockName])
-> [LockName]
-> STM [LockName]
forall a b c. IOHashMap a b -> (a -> b -> c -> c) -> c -> STM c
FL.foldrWithKeySTM LockList
lockList LockName -> LockInfo -> [LockName] -> [LockName]
foldFunc []
  (LockName -> SchedT db tp m ()) -> [LockName] -> SchedT db tp m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (LockName -> JobHandle -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db) =>
LockName -> JobHandle -> SchedT db tp m ()
`releaseLock` JobHandle
jh) [LockName]
names

  where foldFunc :: LockName -> LockInfo -> [LockName] -> [LockName]
        foldFunc :: LockName -> LockInfo -> [LockName] -> [LockName]
foldFunc n :: LockName
n LockInfo {..} acc :: [LockName]
acc | JobHandle
jh JobHandle -> [JobHandle] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [JobHandle]
acquired = LockName
n LockName -> [LockName] -> [LockName]
forall a. a -> [a] -> [a]
: [LockName]
acc
                                     | JobHandle
jh JobHandle -> [JobHandle] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [JobHandle]
locked   = LockName
n LockName -> [LockName] -> [LockName]
forall a. a -> [a] -> [a]
: [LockName]
acc
                                     | Bool
otherwise          = [LockName]
acc

countLock :: MonadUnliftIO m => (LockInfo -> [JobHandle]) -> FuncName -> SchedT db tp m Int
countLock :: (LockInfo -> [JobHandle]) -> FuncName -> SchedT db tp m Int
countLock f :: LockInfo -> [JobHandle]
f fn :: FuncName
fn = do
  LockList
lockList <- (SchedEnv db tp -> LockList) -> SchedT db tp m LockList
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> LockList
forall db tp. SchedEnv db tp -> LockList
sLockList
  [Int] -> Int
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ([Int] -> Int) -> ([LockInfo] -> [Int]) -> [LockInfo] -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (LockInfo -> Int) -> [LockInfo] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map LockInfo -> Int
mapFunc ([LockInfo] -> Int)
-> SchedT db tp m [LockInfo] -> SchedT db tp m Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> LockList -> SchedT db tp m [LockInfo]
forall (m :: * -> *) a b. MonadIO m => IOHashMap a b -> m [b]
FL.elems LockList
lockList
  where filterFunc :: JobHandle -> Bool
        filterFunc :: JobHandle -> Bool
filterFunc jh :: JobHandle
jh = FuncName
fn0 FuncName -> FuncName -> Bool
forall a. Eq a => a -> a -> Bool
== FuncName
fn
          where (fn0 :: FuncName
fn0, _) = JobHandle -> (FuncName, JobName)
unHandle JobHandle
jh

        mapFunc :: LockInfo -> Int
        mapFunc :: LockInfo -> Int
mapFunc = [JobHandle] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ([JobHandle] -> Int)
-> (LockInfo -> [JobHandle]) -> LockInfo -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (JobHandle -> Bool) -> [JobHandle] -> [JobHandle]
forall a. (a -> Bool) -> [a] -> [a]
filter JobHandle -> Bool
filterFunc ([JobHandle] -> [JobHandle])
-> (LockInfo -> [JobHandle]) -> LockInfo -> [JobHandle]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LockInfo -> [JobHandle]
f

getMaxLockCount :: MonadUnliftIO m => SchedT db tp m Int
getMaxLockCount :: SchedT db tp m Int
getMaxLockCount = do
  LockList
lockList <- (SchedEnv db tp -> LockList) -> SchedT db tp m LockList
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> LockList
forall db tp. SchedEnv db tp -> LockList
sLockList
  [Int] -> Int
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
maximum ([Int] -> Int) -> ([LockInfo] -> [Int]) -> [LockInfo] -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (LockInfo -> Int) -> [LockInfo] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map LockInfo -> Int
maxCount ([LockInfo] -> Int)
-> SchedT db tp m [LockInfo] -> SchedT db tp m Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> LockList -> SchedT db tp m [LockInfo]
forall (m :: * -> *) a b. MonadIO m => IOHashMap a b -> m [b]
FL.elems LockList
lockList


status :: (MonadIO m, Persist db) => SchedT db tp m [FuncStat]
status :: SchedT db tp m [FuncStat]
status = do
  (FuncName -> SchedT db tp m ()) -> [FuncName] -> SchedT db tp m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ FuncName -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
FuncName -> SchedT db tp m ()
adjustFuncStat ([FuncName] -> SchedT db tp m ())
-> SchedT db tp m [FuncName] -> SchedT db tp m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO [FuncName] -> SchedT db tp m [FuncName]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [FuncName] -> SchedT db tp m [FuncName])
-> (db -> IO [FuncName]) -> db -> SchedT db tp m [FuncName]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. db -> IO [FuncName]
forall db. Persist db => db -> IO [FuncName]
P.funcList (db -> SchedT db tp m [FuncName])
-> SchedT db tp m db -> SchedT db tp m [FuncName]
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
  FuncStatList -> SchedT db tp m [FuncStat]
forall (m :: * -> *) a b. MonadIO m => IOHashMap a b -> m [b]
FL.elems (FuncStatList -> SchedT db tp m [FuncStat])
-> SchedT db tp m FuncStatList -> SchedT db tp m [FuncStat]
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (SchedEnv db tp -> FuncStatList) -> SchedT db tp m FuncStatList
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> FuncStatList
forall db tp. SchedEnv db tp -> FuncStatList
sFuncStatList

revertRunningQueue :: (MonadUnliftIO m, Persist db) => SchedT db tp m ()
revertRunningQueue :: SchedT db tp m ()
revertRunningQueue = do
  Int64
now <- SchedT db tp m Int64
forall (m :: * -> *). MonadIO m => m Int64
getEpochTime
  Int64
tout <- (Int -> Int64) -> SchedT db tp m Int -> SchedT db tp m Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (SchedT db tp m Int -> SchedT db tp m Int64)
-> (TVar Int -> SchedT db tp m Int)
-> TVar Int
-> SchedT db tp m Int64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar Int -> SchedT db tp m Int
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (TVar Int -> SchedT db tp m Int64)
-> SchedT db tp m (TVar Int) -> SchedT db tp m Int64
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (SchedEnv db tp -> TVar Int) -> SchedT db tp m (TVar Int)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> TVar Int
forall db tp. SchedEnv db tp -> TVar Int
sTaskTimeout
  db
p <- (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
  [Job]
handles <- IO [Job] -> SchedT db tp m [Job]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [Job] -> SchedT db tp m [Job])
-> IO [Job] -> SchedT db tp m [Job]
forall a b. (a -> b) -> a -> b
$ db -> State -> (Job -> [Job] -> [Job]) -> [Job] -> IO [Job]
forall db a.
Persist db =>
db -> State -> (Job -> a -> a) -> a -> IO a
P.foldr db
p State
Running ((Job -> Bool) -> Job -> [Job] -> [Job]
foldFunc (Int64 -> Int64 -> Job -> Bool
check Int64
now Int64
tout)) []
  (Job -> SchedT db tp m ()) -> [Job] -> SchedT db tp m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (JobHandle -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db) =>
JobHandle -> SchedT db tp m ()
failJob (JobHandle -> SchedT db tp m ())
-> (Job -> JobHandle) -> Job -> SchedT db tp m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Job -> JobHandle
getHandle) [Job]
handles

  where foldFunc :: (Job -> Bool) -> Job -> [Job] -> [Job]
        foldFunc :: (Job -> Bool) -> Job -> [Job] -> [Job]
foldFunc f :: Job -> Bool
f job :: Job
job acc :: [Job]
acc | Job -> Bool
f Job
job = Job
job Job -> [Job] -> [Job]
forall a. a -> [a] -> [a]
: [Job]
acc
                           | Bool
otherwise = [Job]
acc

        check :: Int64 -> Int64 -> Job -> Bool
        check :: Int64 -> Int64 -> Job -> Bool
check now :: Int64
now t0 :: Int64
t0 job :: Job
job
          | Job -> Int
getTimeout Job
job Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> 0 = Job -> Int64
getSchedAt Job
job Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Job -> Int
getTimeout Job
job) Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
< Int64
now
          | Bool
otherwise = Job -> Int64
getSchedAt Job
job Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ Int64 -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
t0 Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
< Int64
now

revertLockingQueue :: (MonadUnliftIO m, Persist db) => SchedT db tp m ()
revertLockingQueue :: SchedT db tp m ()
revertLockingQueue = (FuncName -> SchedT db tp m ()) -> [FuncName] -> SchedT db tp m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ FuncName -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadUnliftIO m, Persist db) =>
FuncName -> SchedT db tp m ()
checkAndReleaseLock ([FuncName] -> SchedT db tp m ())
-> SchedT db tp m [FuncName] -> SchedT db tp m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO [FuncName] -> SchedT db tp m [FuncName]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [FuncName] -> SchedT db tp m [FuncName])
-> (db -> IO [FuncName]) -> db -> SchedT db tp m [FuncName]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. db -> IO [FuncName]
forall db. Persist db => db -> IO [FuncName]
P.funcList (db -> SchedT db tp m [FuncName])
-> SchedT db tp m db -> SchedT db tp m [FuncName]
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist

  where checkAndReleaseLock
          :: (MonadUnliftIO m, Persist db)
          => FuncName -> SchedT db tp m ()
        checkAndReleaseLock :: FuncName -> SchedT db tp m ()
checkAndReleaseLock fn :: FuncName
fn = do
          db
p <- (SchedEnv db tp -> db) -> SchedT db tp m db
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> db
forall db tp. SchedEnv db tp -> db
sPersist
          Int
sizeLocked <- (LockInfo -> [JobHandle]) -> FuncName -> SchedT db tp m Int
forall (m :: * -> *) db tp.
MonadUnliftIO m =>
(LockInfo -> [JobHandle]) -> FuncName -> SchedT db tp m Int
countLock LockInfo -> [JobHandle]
locked FuncName
fn
          Int
sizeAcquired <- (LockInfo -> [JobHandle]) -> FuncName -> SchedT db tp m Int
forall (m :: * -> *) db tp.
MonadUnliftIO m =>
(LockInfo -> [JobHandle]) -> FuncName -> SchedT db tp m Int
countLock LockInfo -> [JobHandle]
acquired FuncName
fn
          IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
infoM "Peridic.Server.Scheduler"
                 (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ "LockInfo " String -> String -> String
forall a. [a] -> [a] -> [a]
++ FuncName -> String
forall a. Show a => a -> String
show FuncName
fn
                 String -> String -> String
forall a. [a] -> [a] -> [a]
++ " Locked:" String -> String -> String
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
sizeLocked
                 String -> String -> String
forall a. [a] -> [a] -> [a]
++ " Acquired:" String -> String -> String
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
sizeAcquired
          Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
sizeLocked Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> 0 Bool -> Bool -> Bool
&& Int
sizeAcquired Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== 0) (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ do
            Int
count <- SchedT db tp m Int
forall (m :: * -> *) db tp. MonadUnliftIO m => SchedT db tp m Int
getMaxLockCount
            [Job]
handles <- IO [Job] -> SchedT db tp m [Job]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [Job] -> SchedT db tp m [Job])
-> IO [Job] -> SchedT db tp m [Job]
forall a b. (a -> b) -> a -> b
$ db
-> Int -> FuncName -> (Job -> [Job] -> [Job]) -> [Job] -> IO [Job]
forall db a.
Persist db =>
db -> Int -> FuncName -> (Job -> a -> a) -> a -> IO a
P.foldrLocking db
p Int
count FuncName
fn (:) []
            (Job -> SchedT db tp m ()) -> [Job] -> SchedT db tp m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Job -> SchedT db tp m ()
forall (m :: * -> *) db tp.
(MonadIO m, Persist db) =>
Job -> SchedT db tp m ()
pushJob [Job]
handles


purgeExpired :: MonadIO m => SchedT db tp m ()
purgeExpired :: SchedT db tp m ()
purgeExpired = do
  Int64
now <- SchedT db tp m Int64
forall (m :: * -> *). MonadIO m => m Int64
getEpochTime
  WaitList
wl <- (SchedEnv db tp -> WaitList) -> SchedT db tp m WaitList
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> WaitList
forall db tp. SchedEnv db tp -> WaitList
sWaitList
  Int64
ex <- (Int -> Int64) -> SchedT db tp m Int -> SchedT db tp m Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (SchedT db tp m Int -> SchedT db tp m Int64)
-> (TVar Int -> SchedT db tp m Int)
-> TVar Int
-> SchedT db tp m Int64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar Int -> SchedT db tp m Int
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (TVar Int -> SchedT db tp m Int64)
-> SchedT db tp m (TVar Int) -> SchedT db tp m Int64
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (SchedEnv db tp -> TVar Int) -> SchedT db tp m (TVar Int)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> TVar Int
forall db tp. SchedEnv db tp -> TVar Int
sExpiration
  STM () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> SchedT db tp m ()) -> STM () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ do
    [JobHandle]
ks <- WaitList
-> (JobHandle -> WaitItem -> [JobHandle] -> [JobHandle])
-> [JobHandle]
-> STM [JobHandle]
forall a b c. IOHashMap a b -> (a -> b -> c -> c) -> c -> STM c
FL.foldrWithKeySTM WaitList
wl ((WaitItem -> Bool)
-> JobHandle -> WaitItem -> [JobHandle] -> [JobHandle]
foldFunc (Int64 -> WaitItem -> Bool
check (Int64
now Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
ex))) []
    (JobHandle -> STM ()) -> [JobHandle] -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (WaitList -> JobHandle -> STM ()
forall a b. (Eq a, Hashable a) => IOHashMap a b -> a -> STM ()
FL.deleteSTM WaitList
wl) [JobHandle]
ks

  where foldFunc :: (WaitItem -> Bool) -> JobHandle -> WaitItem -> [JobHandle] -> [JobHandle]
        foldFunc :: (WaitItem -> Bool)
-> JobHandle -> WaitItem -> [JobHandle] -> [JobHandle]
foldFunc f :: WaitItem -> Bool
f jh :: JobHandle
jh v :: WaitItem
v acc :: [JobHandle]
acc | WaitItem -> Bool
f WaitItem
v = JobHandle
jh JobHandle -> [JobHandle] -> [JobHandle]
forall a. a -> [a] -> [a]
: [JobHandle]
acc
                            | Bool
otherwise = [JobHandle]
acc

        check :: Int64 -> WaitItem -> Bool
        check :: Int64 -> WaitItem -> Bool
check t0 :: Int64
t0 item :: WaitItem
item = WaitItem -> Int64
itemTs WaitItem
item Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
< Int64
t0

shutdown :: (MonadUnliftIO m) => SchedT db tp m ()
shutdown :: SchedT db tp m ()
shutdown = do
  IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> SchedT db tp m ()) -> IO () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
infoM "Periodic.Server.Scheduler" "Scheduler shutdown"
  SchedEnv{..} <- SchedT db tp m (SchedEnv db tp)
forall r (m :: * -> *). MonadReader r m => m r
ask
  Action -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
Action -> SchedT db tp m ()
pushChanList Action
Cancel
  Bool
alive <- STM Bool -> SchedT db tp m Bool
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Bool -> SchedT db tp m Bool)
-> STM Bool -> SchedT db tp m Bool
forall a b. (a -> b) -> a -> b
$ do
    Bool
t <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
sAlive
    TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
sAlive Bool
False
    Bool -> STM Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
t
  Bool -> SchedT db tp m () -> SchedT db tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
alive (SchedT db tp m () -> SchedT db tp m ())
-> (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m ()
-> SchedT db tp m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SchedT db tp m (Async ()) -> SchedT db tp m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (SchedT db tp m (Async ()) -> SchedT db tp m ())
-> (SchedT db tp m () -> SchedT db tp m (Async ()))
-> SchedT db tp m ()
-> SchedT db tp m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SchedT db tp m () -> SchedT db tp m (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (SchedT db tp m () -> SchedT db tp m ())
-> SchedT db tp m () -> SchedT db tp m ()
forall a b. (a -> b) -> a -> b
$ IO () -> SchedT db tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ()
sCleanup

prepareWait :: MonadIO m => Job -> SchedT db tp m ()
prepareWait :: Job -> SchedT db tp m ()
prepareWait job :: Job
job = (Int64 -> Maybe WaitItem -> Maybe WaitItem)
-> JobHandle -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
(Int64 -> Maybe WaitItem -> Maybe WaitItem)
-> JobHandle -> SchedT db tp m ()
pushResult_ Int64 -> Maybe WaitItem -> Maybe WaitItem
updateWL JobHandle
jh
  where updateWL :: Int64 -> Maybe WaitItem -> Maybe WaitItem
        updateWL :: Int64 -> Maybe WaitItem -> Maybe WaitItem
updateWL now :: Int64
now Nothing       = WaitItem -> Maybe WaitItem
forall a. a -> Maybe a
Just (WaitItem -> Maybe WaitItem) -> WaitItem -> Maybe WaitItem
forall a b. (a -> b) -> a -> b
$ WaitItem :: Int64 -> Maybe ByteString -> Int -> WaitItem
WaitItem {itemTs :: Int64
itemTs = Int64
now, itemValue :: Maybe ByteString
itemValue = Maybe ByteString
forall a. Maybe a
Nothing, itemWait :: Int
itemWait = 1}
        updateWL now :: Int64
now (Just item :: WaitItem
item) = WaitItem -> Maybe WaitItem
forall a. a -> Maybe a
Just (WaitItem -> Maybe WaitItem) -> WaitItem -> Maybe WaitItem
forall a b. (a -> b) -> a -> b
$ WaitItem
item {itemTs :: Int64
itemTs = Int64
now, itemWait :: Int
itemWait = WaitItem -> Int
itemWait WaitItem
item Int -> Int -> Int
forall a. Num a => a -> a -> a
+ 1}

        jh :: JobHandle
jh = Job -> JobHandle
getHandle Job
job

waitResult :: MonadIO m => TVar Bool -> Job -> SchedT db tp m ByteString
waitResult :: TVar Bool -> Job -> SchedT db tp m ByteString
waitResult state :: TVar Bool
state job :: Job
job = do
  WaitList
wl <- (SchedEnv db tp -> WaitList) -> SchedT db tp m WaitList
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> WaitList
forall db tp. SchedEnv db tp -> WaitList
sWaitList
  STM ByteString -> SchedT db tp m ByteString
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM ByteString -> SchedT db tp m ByteString)
-> STM ByteString -> SchedT db tp m ByteString
forall a b. (a -> b) -> a -> b
$ do
    Bool
st <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
state
    if Bool
st then do
      Maybe WaitItem
w0 <- WaitList -> JobHandle -> STM (Maybe WaitItem)
forall a b.
(Eq a, Hashable a) =>
IOHashMap a b -> a -> STM (Maybe b)
FL.lookupSTM WaitList
wl JobHandle
jh
      case Maybe WaitItem
w0 of
        Nothing   -> ByteString -> STM ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure ""
        Just item :: WaitItem
item ->
          case WaitItem -> Maybe ByteString
itemValue WaitItem
item of
            Nothing -> STM ByteString
forall a. STM a
retrySTM
            Just w1 :: ByteString
w1 -> do

              if WaitItem -> Int
itemWait WaitItem
item Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> 1 then
                WaitList -> JobHandle -> WaitItem -> STM ()
forall a b. (Eq a, Hashable a) => IOHashMap a b -> a -> b -> STM ()
FL.insertSTM WaitList
wl JobHandle
jh WaitItem
item { itemWait :: Int
itemWait = WaitItem -> Int
itemWait WaitItem
item Int -> Int -> Int
forall a. Num a => a -> a -> a
- 1 }
              else
                WaitList -> JobHandle -> STM ()
forall a b. (Eq a, Hashable a) => IOHashMap a b -> a -> STM ()
FL.deleteSTM WaitList
wl JobHandle
jh

              ByteString -> STM ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
w1

     else ByteString -> STM ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure ""

  where jh :: JobHandle
jh = Job -> JobHandle
getHandle Job
job

pushResult
  :: MonadIO m
  => JobHandle -> ByteString -> SchedT db tp m ()
pushResult :: JobHandle -> ByteString -> SchedT db tp m ()
pushResult jh :: JobHandle
jh w :: ByteString
w = (Int64 -> Maybe WaitItem -> Maybe WaitItem)
-> JobHandle -> SchedT db tp m ()
forall (m :: * -> *) db tp.
MonadIO m =>
(Int64 -> Maybe WaitItem -> Maybe WaitItem)
-> JobHandle -> SchedT db tp m ()
pushResult_ Int64 -> Maybe WaitItem -> Maybe WaitItem
updateWL JobHandle
jh
  where updateWL :: Int64 -> Maybe WaitItem -> Maybe WaitItem
        updateWL :: Int64 -> Maybe WaitItem -> Maybe WaitItem
updateWL _ Nothing       = Maybe WaitItem
forall a. Maybe a
Nothing
        updateWL now :: Int64
now (Just item :: WaitItem
item) = WaitItem -> Maybe WaitItem
forall a. a -> Maybe a
Just WaitItem
item {itemTs :: Int64
itemTs=Int64
now, itemValue :: Maybe ByteString
itemValue = ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
w}

pushResult_
  :: MonadIO m
  => (Int64 -> Maybe WaitItem -> Maybe WaitItem)
  -> JobHandle -> SchedT db tp m ()
pushResult_ :: (Int64 -> Maybe WaitItem -> Maybe WaitItem)
-> JobHandle -> SchedT db tp m ()
pushResult_ f :: Int64 -> Maybe WaitItem -> Maybe WaitItem
f jh :: JobHandle
jh = do
  WaitList
wl <- (SchedEnv db tp -> WaitList) -> SchedT db tp m WaitList
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> WaitList
forall db tp. SchedEnv db tp -> WaitList
sWaitList
  Int64
now <- SchedT db tp m Int64
forall (m :: * -> *). MonadIO m => m Int64
getEpochTime
  WaitList
-> (Maybe WaitItem -> Maybe WaitItem)
-> JobHandle
-> SchedT db tp m ()
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> (Maybe b -> Maybe b) -> a -> m ()
FL.alter WaitList
wl (Int64 -> Maybe WaitItem -> Maybe WaitItem
f Int64
now) JobHandle
jh

existsWaitList :: MonadIO m => JobHandle -> SchedT db tp m Bool
existsWaitList :: JobHandle -> SchedT db tp m Bool
existsWaitList jh :: JobHandle
jh = do
  WaitList
wl <- (SchedEnv db tp -> WaitList) -> SchedT db tp m WaitList
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> WaitList
forall db tp. SchedEnv db tp -> WaitList
sWaitList
  Maybe WaitItem -> Bool
forall a. Maybe a -> Bool
isJust (Maybe WaitItem -> Bool)
-> SchedT db tp m (Maybe WaitItem) -> SchedT db tp m Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> WaitList -> JobHandle -> SchedT db tp m (Maybe WaitItem)
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> m (Maybe b)
FL.lookup WaitList
wl JobHandle
jh

lookupPrevResult :: MonadIO m => Job -> SchedT db tp m (Maybe ByteString)
lookupPrevResult :: Job -> SchedT db tp m (Maybe ByteString)
lookupPrevResult job :: Job
job = do
  WaitList
wl <- (SchedEnv db tp -> WaitList) -> SchedT db tp m WaitList
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> WaitList
forall db tp. SchedEnv db tp -> WaitList
sWaitList
  Maybe WaitItem
r <- WaitList -> JobHandle -> SchedT db tp m (Maybe WaitItem)
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> m (Maybe b)
FL.lookup WaitList
wl JobHandle
jh
  case Maybe WaitItem
r of
    Nothing                               -> Maybe ByteString -> SchedT db tp m (Maybe ByteString)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ByteString
forall a. Maybe a
Nothing
    (Just WaitItem {itemValue :: WaitItem -> Maybe ByteString
itemValue = Maybe ByteString
Nothing}) -> Maybe ByteString -> SchedT db tp m (Maybe ByteString)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ByteString
forall a. Maybe a
Nothing
    (Just WaitItem {itemValue :: WaitItem -> Maybe ByteString
itemValue = Just v :: ByteString
v})  -> Maybe ByteString -> SchedT db tp m (Maybe ByteString)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
v)

  where jh :: JobHandle
jh = Job -> JobHandle
getHandle Job
job

removeFromWaitList :: MonadIO m => JobHandle -> SchedT db tp m ()
removeFromWaitList :: JobHandle -> SchedT db tp m ()
removeFromWaitList jh :: JobHandle
jh = do
  WaitList
wl <- (SchedEnv db tp -> WaitList) -> SchedT db tp m WaitList
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks SchedEnv db tp -> WaitList
forall db tp. SchedEnv db tp -> WaitList
sWaitList
  WaitList -> JobHandle -> SchedT db tp m ()
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> m ()
FL.delete WaitList
wl JobHandle
jh