{-# LANGUAGE RankNTypes, FlexibleInstances, FlexibleContexts, PartialTypeSignatures, TupleSections, DeriveGeneric, UndecidableInstances #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE ScopedTypeVariables #-}
module OddJobs.Job
(
startJobRunner
, Config(..)
, ConcurrencyControl(..)
, createJob
, scheduleJob
, Job(..)
, JobId
, Status(..)
, JobRunnerName(..)
, TableName
, delaySeconds
, Seconds(..)
, JobErrHandler(..)
, AllJobTypes(..)
, LogEvent(..)
, LogLevel(..)
, jobMonitor
, jobEventListener
, jobPoller
, jobPollingSql
, JobRunner
, HasJobRunner (..)
, findJobByIdIO
, saveJobIO
, runJobNowIO
, unlockJobIO
, cancelJobIO
, jobDbColumns
, concatJobDbColumns
, fetchAllJobTypes
, fetchAllJobRunners
, eitherParsePayload
, throwParsePayload
, eitherParsePayloadWith
, throwParsePayloadWith
)
where
import OddJobs.Types
import Data.Pool
import Data.Text as T
import Database.PostgreSQL.Simple as PGS
import Database.PostgreSQL.Simple.Notification
import UnliftIO.Async
import UnliftIO.Concurrent (threadDelay, myThreadId)
import Data.String
import System.Posix.Process (getProcessID)
import Network.HostName (getHostName)
import UnliftIO.MVar
import Debug.Trace
import Control.Monad.Logger as MLogger (LogLevel(..), LogStr, toLogStr)
import UnliftIO.IORef
import UnliftIO.Exception ( SomeException(..), try, catch, finally
, catchAny, bracket, Exception(..), throwIO
, catches, Handler(..), mask_, throwString
)
import Data.Proxy
import Control.Monad.Trans.Control
import Control.Monad.IO.Unlift (MonadUnliftIO, withRunInIO, liftIO)
import Data.Text.Conversions
import Data.Time
import Data.Aeson hiding (Success)
import qualified Data.Aeson as Aeson
import qualified Data.Aeson.Types as Aeson (Parser, parseMaybe)
import Data.String.Conv (StringConv(..), toS)
import Data.Functor (void)
import Control.Monad (forever)
import Data.Maybe (isNothing, maybe, fromMaybe, listToMaybe, mapMaybe)
import Data.Either (either)
import Control.Monad.Reader
import GHC.Generics
import qualified Data.HashMap.Strict as HM
import qualified Data.List as DL
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import System.FilePath (FilePath)
import qualified System.Directory as Dir
import Data.Aeson.Internal (iparse, IResult(..), formatError)
import Prelude hiding (log)
import GHC.Exts (toList)
import Database.PostgreSQL.Simple.Types as PGS (Identifier(..))
import Database.PostgreSQL.Simple.ToField as PGS (toField)
class (MonadUnliftIO m, MonadBaseControl IO m) => HasJobRunner m where
getPollingInterval :: m Seconds
onJobSuccess :: Job -> m ()
onJobFailed :: forall a . m [JobErrHandler a]
getJobRunner :: m (Job -> IO ())
getDbPool :: m (Pool Connection)
getTableName :: m TableName
onJobStart :: Job -> m ()
getDefaultMaxAttempts :: m Int
getRunnerEnv :: m RunnerEnv
getConcurrencyControl :: m ConcurrencyControl
getPidFile :: m (Maybe FilePath)
log :: LogLevel -> LogEvent -> m ()
getDefaultJobTimeout :: m Seconds
data RunnerEnv = RunnerEnv
{ envConfig :: !Config
, envJobThreadsRef :: !(IORef [Async ()])
}
type RunnerM = ReaderT RunnerEnv IO
logCallbackErrors :: (HasJobRunner m) => JobId -> Text -> m () -> m ()
logCallbackErrors jid msg action = catchAny action $ \e -> log LevelError $ LogText $ msg <> " Job ID=" <> toS (show jid) <> ": " <> toS (show e)
instance HasJobRunner RunnerM where
getPollingInterval = cfgPollingInterval . envConfig <$> ask
onJobFailed = cfgOnJobFailed . envConfig <$> ask
onJobSuccess job = do
fn <- cfgOnJobSuccess . envConfig <$> ask
logCallbackErrors (jobId job) "onJobSuccess" $ liftIO $ fn job
getJobRunner = cfgJobRunner . envConfig <$> ask
getDbPool = cfgDbPool . envConfig <$> ask
getTableName = cfgTableName . envConfig <$> ask
onJobStart job = do
fn <- cfgOnJobStart . envConfig <$> ask
logCallbackErrors (jobId job) "onJobStart" $ liftIO $ fn job
getDefaultMaxAttempts = cfgDefaultMaxAttempts . envConfig <$> ask
getRunnerEnv = ask
getConcurrencyControl = (cfgConcurrencyControl . envConfig <$> ask)
getPidFile = cfgPidFile . envConfig <$> ask
log logLevel logEvent = do
loggerFn <- cfgLogger . envConfig <$> ask
liftIO $ loggerFn logLevel logEvent
getDefaultJobTimeout = cfgDefaultJobTimeout . envConfig <$> ask
startJobRunner :: Config -> IO ()
startJobRunner jm = do
r <- newIORef []
let monitorEnv = RunnerEnv
{ envConfig = jm
, envJobThreadsRef = r
}
runReaderT jobMonitor monitorEnv
jobWorkerName :: IO String
jobWorkerName = do
pid <- getProcessID
hname <- getHostName
pure $ hname ++ ":" ++ (show pid)
jobDbColumns :: (IsString s, Semigroup s) => [s]
jobDbColumns =
[ "id"
, "created_at"
, "updated_at"
, "run_at"
, "status"
, "payload"
, "last_error"
, "attempts"
, "locked_at"
, "locked_by"
]
concatJobDbColumns :: (IsString s, Semigroup s) => s
concatJobDbColumns = concatJobDbColumns_ jobDbColumns ""
where
concatJobDbColumns_ [] x = x
concatJobDbColumns_ (col:[]) x = x <> col
concatJobDbColumns_ (col:cols) x = concatJobDbColumns_ cols (x <> col <> ", ")
findJobByIdQuery :: TableName -> PGS.Query
findJobByIdQuery tname = "SELECT " <> concatJobDbColumns <> " FROM " <> tname <> " WHERE id = ?"
withDbConnection :: (HasJobRunner m)
=> (Connection -> m a)
-> m a
withDbConnection action = do
pool <- getDbPool
withResource pool action
findJobById :: (HasJobRunner m)
=> JobId
-> m (Maybe Job)
findJobById jid = do
tname <- getTableName
withDbConnection $ \conn -> liftIO $ findJobByIdIO conn tname jid
findJobByIdIO :: Connection -> TableName -> JobId -> IO (Maybe Job)
findJobByIdIO conn tname jid = PGS.query conn (findJobByIdQuery tname) (Only jid) >>= \case
[] -> pure Nothing
[j] -> pure (Just j)
js -> Prelude.error $ "Not expecting to find multiple jobs by id=" <> (show jid)
saveJobQuery :: TableName -> PGS.Query
saveJobQuery tname = "UPDATE " <> tname <> " set run_at = ?, status = ?, payload = ?, last_error = ?, attempts = ?, locked_at = ?, locked_by = ? WHERE id = ? RETURNING " <> concatJobDbColumns
deleteJobQuery :: TableName -> PGS.Query
deleteJobQuery tname = "DELETE FROM " <> tname <> " WHERE id = ?"
saveJob :: (HasJobRunner m) => Job -> m Job
saveJob j = do
tname <- getTableName
withDbConnection $ \conn -> liftIO $ saveJobIO conn tname j
saveJobIO :: Connection -> TableName -> Job -> IO Job
saveJobIO conn tname Job{jobRunAt, jobStatus, jobPayload, jobLastError, jobAttempts, jobLockedBy, jobLockedAt, jobId} = do
rs <- PGS.query conn (saveJobQuery tname)
( jobRunAt
, jobStatus
, jobPayload
, jobLastError
, jobAttempts
, jobLockedAt
, jobLockedBy
, jobId
)
case rs of
[] -> Prelude.error $ "Could not find job while updating it id=" <> (show jobId)
[j] -> pure j
js -> Prelude.error $ "Not expecting multiple rows to ber returned when updating job id=" <> (show jobId)
deleteJob :: (HasJobRunner m) => JobId -> m ()
deleteJob jid = do
tname <- getTableName
withDbConnection $ \conn -> liftIO $ deleteJobIO conn tname jid
deleteJobIO :: Connection -> TableName -> JobId -> IO ()
deleteJobIO conn tname jid = do
void $ PGS.execute conn (deleteJobQuery tname) (Only jid)
runJobNowIO :: Connection -> TableName -> JobId -> IO (Maybe Job)
runJobNowIO conn tname jid = do
t <- getCurrentTime
updateJobHelper tname conn (Queued, [Queued, Retry, Failed], Just t, jid)
unlockJobIO :: Connection -> TableName -> JobId -> IO (Maybe Job)
unlockJobIO conn tname jid = do
fmap listToMaybe $ PGS.query conn q (Retry, jid, In [Locked])
where
q = "update " <> tname <> " set status=?, run_at=now(), locked_at=null, locked_by=null where id=? and status in ? returning " <> concatJobDbColumns
cancelJobIO :: Connection -> TableName -> JobId -> IO (Maybe Job)
cancelJobIO conn tname jid =
updateJobHelper tname conn (Failed, [Queued, Retry], Nothing, jid)
updateJobHelper :: TableName
-> Connection
-> (Status, [Status], Maybe UTCTime, JobId)
-> IO (Maybe Job)
updateJobHelper tname conn (newStatus, existingStates, mRunAt, jid) =
fmap listToMaybe $ PGS.query conn q (newStatus, runAt, jid, PGS.In existingStates)
where
q = "update " <> tname <> " set attempts=0, status=?, run_at=? where id=? and status in ? returning " <> concatJobDbColumns
runAt = case mRunAt of
Nothing -> PGS.toField $ PGS.Identifier "run_at"
Just t -> PGS.toField t
data TimeoutException = TimeoutException deriving (Eq, Show)
instance Exception TimeoutException
runJobWithTimeout :: (HasJobRunner m)
=> Seconds
-> Job
-> m ()
runJobWithTimeout timeoutSec job = do
threadsRef <- envJobThreadsRef <$> getRunnerEnv
jobRunner_ <- getJobRunner
a <- async $ liftIO $ jobRunner_ job
x <- atomicModifyIORef' threadsRef $ \threads -> (a:threads, DL.map asyncThreadId (a:threads))
log LevelDebug $ LogText $ toS $ "Spawned job in " <> show (asyncThreadId a)
t <- async $ do
delaySeconds timeoutSec
uninterruptibleCancel a
throwIO TimeoutException
void $ finally
(waitEitherCancel a t)
(atomicModifyIORef' threadsRef $ \threads -> (DL.delete a threads, ()))
runJob :: (HasJobRunner m) => JobId -> m ()
runJob jid = do
(findJobById jid) >>= \case
Nothing -> Prelude.error $ "Could not find job id=" <> show jid
Just job -> do
startTime <- liftIO getCurrentTime
lockTimeout <- getDefaultJobTimeout
(flip catches) [Handler $ timeoutHandler job startTime, Handler $ exceptionHandler job startTime] $ do
runJobWithTimeout lockTimeout job
endTime <- liftIO getCurrentTime
deleteJob jid
let newJob = job{jobStatus=Success, jobLockedBy=Nothing, jobLockedAt=Nothing}
log LevelInfo $ LogJobSuccess newJob (diffUTCTime endTime startTime)
onJobSuccess newJob
pure ()
where
timeoutHandler job startTime (e :: TimeoutException) = retryOrFail (toException e) job startTime
exceptionHandler job startTime (e :: SomeException) = retryOrFail (toException e) job startTime
retryOrFail e job@Job{jobAttempts} startTime = do
endTime <- liftIO getCurrentTime
defaultMaxAttempts <- getDefaultMaxAttempts
let runTime = diffUTCTime endTime startTime
(newStatus, failureMode, logLevel) = if jobAttempts >= defaultMaxAttempts
then ( Failed, FailPermanent, LevelError )
else ( Retry, FailWithRetry, LevelWarn )
t <- liftIO getCurrentTime
newJob <- saveJob job{ jobStatus=newStatus
, jobLockedBy=Nothing
, jobLockedAt=Nothing
, jobLastError=(Just $ toJSON $ show e)
, jobRunAt=(addUTCTime (fromIntegral $ (2::Int) ^ jobAttempts) t)
}
case fromException e :: Maybe TimeoutException of
Nothing -> log logLevel $ LogJobFailed newJob e failureMode runTime
Just _ -> log logLevel $ LogJobTimeout newJob
let tryHandler (JobErrHandler handler) res = case fromException e of
Nothing -> res
Just e_ -> handler e_ newJob failureMode
handlers <- onJobFailed
liftIO $ void $ Prelude.foldr tryHandler (throwIO e) handlers
pure ()
restartUponCrash :: (HasJobRunner m, Show a) => Text -> m a -> m ()
restartUponCrash name_ action = do
a <- async action
finally (waitCatch a >>= fn) $ do
(log LevelInfo $ LogText $ "Received shutdown: " <> toS name_)
cancel a
where
fn x = do
case x of
Left (e :: SomeException) -> log LevelError $ LogText $ name_ <> " seems to have exited with an error. Restarting: " <> toS (show e)
Right r -> log LevelError $ LogText $ name_ <> " seems to have exited with the folloing result: " <> toS (show r) <> ". Restaring."
restartUponCrash name_ action
jobMonitor :: forall m . (HasJobRunner m) => m ()
jobMonitor = do
a1 <- async $ restartUponCrash "Job poller" jobPoller
a2 <- async $ restartUponCrash "Job event listener" jobEventListener
finally (void $ waitAnyCatch [a1, a2]) $ do
log LevelInfo (LogText "Stopping jobPoller and jobEventListener threads.")
cancel a2
cancel a1
log LevelInfo (LogText "Waiting for jobs to complete.")
waitForJobs
getPidFile >>= \case
Nothing -> pure ()
Just f -> do
log LevelInfo $ LogText $ "Removing PID file: " <> toS f
liftIO $ Dir.removePathForcibly f
jobPollingSql :: TableName -> Query
jobPollingSql tname = "update " <> tname <> " set status = ?, locked_at = ?, locked_by = ?, attempts=attempts+1 WHERE id in (select id from " <> tname <> " where (run_at<=? AND ((status in ?) OR (status = ? and locked_at<?))) ORDER BY run_at ASC LIMIT 1 FOR UPDATE) RETURNING id"
waitForJobs :: (HasJobRunner m)
=> m ()
waitForJobs = do
threadsRef <- envJobThreadsRef <$> getRunnerEnv
readIORef threadsRef >>= \case
[] -> log LevelInfo $ LogText "All job-threads exited"
as -> do
tid <- myThreadId
void $ waitAnyCatch as
log LevelDebug $ LogText $ toS $ "Waiting for " <> show (DL.length as) <> " jobs to complete before shutting down. myThreadId=" <> (show tid)
delaySeconds (Seconds 1)
waitForJobs
getConcurrencyControlFn :: (HasJobRunner m)
=> m (m Bool)
getConcurrencyControlFn = getConcurrencyControl >>= \case
UnlimitedConcurrentJobs -> pure $ pure True
MaxConcurrentJobs maxJobs -> pure $ do
curJobs <- getRunnerEnv >>= (readIORef . envJobThreadsRef)
pure $ (DL.length curJobs) < maxJobs
DynamicConcurrency fn -> pure $ liftIO fn
jobPoller :: (HasJobRunner m) => m ()
jobPoller = do
processName <- liftIO jobWorkerName
pool <- getDbPool
tname <- getTableName
lockTimeout <- getDefaultJobTimeout
log LevelInfo $ LogText $ toS $ "Starting the job monitor via DB polling with processName=" <> processName
concurrencyControlFn <- getConcurrencyControlFn
withResource pool $ \pollerDbConn -> forever $ concurrencyControlFn >>= \case
False -> log LevelWarn $ LogText $ "NOT polling the job queue due to concurrency control"
True -> do
nextAction <- mask_ $ do
log LevelDebug $ LogText $ toS $ "[" <> processName <> "] Polling the job queue.."
t <- liftIO getCurrentTime
r <- liftIO $
PGS.query pollerDbConn (jobPollingSql tname)
(Locked, t, processName, t, (In [Queued, Retry]), Locked, (addUTCTime (fromIntegral $ negate $ unSeconds lockTimeout) t))
case r of
[] -> pure delayAction
[Only (jid :: JobId)] -> do
void $ async $ runJob jid
pure noDelayAction
x -> error $ "WTF just happened? I was supposed to get only a single row, but got: " ++ (show x)
nextAction
where
delayAction = delaySeconds =<< getPollingInterval
noDelayAction = pure ()
jobEventListener :: (HasJobRunner m)
=> m ()
jobEventListener = do
log LevelInfo $ LogText "Starting the job monitor via LISTEN/NOTIFY..."
pool <- getDbPool
tname <- getTableName
jwName <- liftIO jobWorkerName
concurrencyControlFn <- getConcurrencyControlFn
let tryLockingJob jid = do
let q = "UPDATE " <> tname <> " SET status=?, locked_at=now(), locked_by=?, attempts=attempts+1 WHERE id=? AND status in ? RETURNING id"
(withDbConnection $ \conn -> (liftIO $ PGS.query conn q (Locked, jwName, jid, In [Queued, Retry]))) >>= \case
[] -> do
log LevelDebug $ LogText $ toS $ "Job was locked by someone else before I could start. Skipping it. JobId=" <> show jid
pure Nothing
[Only (_ :: JobId)] -> pure $ Just jid
x -> error $ "WTF just happned? Was expecting a single row to be returned, received " ++ (show x)
withResource pool $ \monitorDbConn -> do
void $ liftIO $ PGS.execute monitorDbConn ("LISTEN " <> pgEventName tname) ()
forever $ do
log LevelDebug $ LogText "[LISTEN/NOFIFY] Event loop"
notif <- liftIO $ getNotification monitorDbConn
concurrencyControlFn >>= \case
False -> log LevelWarn $ LogText "Received job event, but ignoring it due to concurrency control"
True -> do
let pload = notificationData notif
log LevelDebug $ LogText $ toS $ "NOTIFY | " <> show pload
case (eitherDecode $ toS pload) of
Left e -> log LevelError $ LogText $ toS $ "Unable to decode notification payload received from Postgres. Payload=" <> show pload <> " Error=" <> show e
Right (v :: Value) -> case (Aeson.parseMaybe parser v) of
Nothing -> log LevelError $ LogText $ toS $ "Unable to extract id/run_at/locked_at from " <> show pload
Just (jid, runAt_, mLockedAt_) -> do
t <- liftIO getCurrentTime
if (runAt_ <= t) && (isNothing mLockedAt_)
then do log LevelDebug $ LogText $ toS $ "Job needs needs to be run immediately. Attempting to fork in background. JobId=" <> show jid
void $ async $ do
tryLockingJob jid >>= \case
Nothing -> pure ()
Just lockedJid -> runJob lockedJid
else log LevelDebug $ LogText $ toS $ "Job is either for future, or is already locked. Skipping. JobId=" <> show jid
where
parser :: Value -> Aeson.Parser (JobId, UTCTime, Maybe UTCTime)
parser = withObject "expecting an object to parse job.run_at and job.locked_at" $ \o -> do
runAt_ <- o .: "run_at"
mLockedAt_ <- o .:? "locked_at"
jid <- o .: "id"
pure (jid, runAt_, mLockedAt_)
createJobQuery :: TableName -> PGS.Query
createJobQuery tname = "INSERT INTO " <> tname <> "(run_at, status, payload, last_error, attempts, locked_at, locked_by) VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING " <> concatJobDbColumns
createJob :: ToJSON p
=> Connection
-> TableName
-> p
-> IO Job
createJob conn tname payload = do
t <- getCurrentTime
scheduleJob conn tname payload t
scheduleJob :: ToJSON p
=> Connection
-> TableName
-> p
-> UTCTime
-> IO Job
scheduleJob conn tname payload runAt = do
let args = ( runAt, Queued, toJSON payload, Nothing :: Maybe Value, 0 :: Int, Nothing :: Maybe Text, Nothing :: Maybe Text )
queryFormatter = toS <$> (PGS.formatQuery conn (createJobQuery tname) args)
rs <- PGS.query conn (createJobQuery tname) args
case rs of
[] -> (Prelude.error . (<> "Not expecting a blank result set when creating a job. Query=")) <$> queryFormatter
[r] -> pure r
_ -> (Prelude.error . (<> "Not expecting multiple rows when creating a single job. Query=")) <$> queryFormatter
eitherParsePayload :: (FromJSON a)
=> Job
-> Either String a
eitherParsePayload job =
eitherParsePayloadWith parseJSON job
throwParsePayload :: (FromJSON a)
=> Job
-> IO a
throwParsePayload job =
throwParsePayloadWith parseJSON job
eitherParsePayloadWith :: (Aeson.Value -> Aeson.Parser a)
-> Job
-> Either String a
eitherParsePayloadWith parser Job{jobPayload} = do
case iparse parser jobPayload of
IError jpath e -> Left $ formatError jpath e
ISuccess r -> Right r
throwParsePayloadWith :: (Aeson.Value -> Aeson.Parser a)
-> Job
-> IO a
throwParsePayloadWith parser job =
either throwString (pure . Prelude.id) (eitherParsePayloadWith parser job)
fetchAllJobTypes :: (MonadIO m)
=> Config
-> m [Text]
fetchAllJobTypes Config{cfgAllJobTypes, cfgDbPool} = liftIO $ do
case cfgAllJobTypes of
AJTFixed jts -> pure jts
AJTSql fn -> withResource cfgDbPool fn
AJTCustom fn -> fn
fetchAllJobRunners :: (MonadIO m)
=> Config
-> m [JobRunnerName]
fetchAllJobRunners Config{cfgTableName, cfgDbPool} = liftIO $ withResource cfgDbPool $ \conn -> do
fmap (mapMaybe fromOnly) $ PGS.query_ conn $ "select distinct locked_by from " <> cfgTableName