{-# LANGUAGE RankNTypes, FlexibleInstances, FlexibleContexts, PartialTypeSignatures, TupleSections, DeriveGeneric, UndecidableInstances #-}
module OddJobs.Job
(
startJobRunner
, Config(..)
, ConcurrencyControl(..)
, defaultConfig
, defaultJobToText
, defaultJobType
, defaultTimedLogger
, defaultLogStr
, defaultPollingInterval
, defaultLockTimeout
, withConnectionPool
, createJob
, scheduleJob
, Job(..)
, JobId
, Status(..)
, TableName
, delaySeconds
, Seconds(..)
, LogEvent(..)
, LogLevel(..)
, jobMonitor
, jobEventListener
, jobPoller
, jobPollingSql
, JobRunner
, HasJobRunner (..)
, findJobById
, findJobByIdIO
, saveJob
, saveJobIO
, jobDbColumns
, concatJobDbColumns
, eitherParsePayload
, throwParsePayload
, eitherParsePayloadWith
, throwParsePayloadWith
)
where
import OddJobs.Types
import Data.Pool
import Data.Text as T
import Database.PostgreSQL.Simple as PGS
import Database.PostgreSQL.Simple.Notification
import Database.PostgreSQL.Simple.FromField as FromField
import Database.PostgreSQL.Simple.ToField as ToField
import Database.PostgreSQL.Simple.FromRow as FromRow
import UnliftIO.Async
import UnliftIO.Concurrent (threadDelay, myThreadId)
import Data.String
import System.Posix.Process (getProcessID)
import Network.HostName (getHostName)
import UnliftIO.MVar
import Debug.Trace
import Control.Monad.Logger as MLogger (LogLevel(..), LogStr(..), toLogStr)
import UnliftIO.IORef
import UnliftIO.Exception ( SomeException(..), try, catch, finally
, catchAny, bracket, Exception(..), throwIO
, catches, Handler(..), mask_, throwString
)
import Data.Proxy
import Control.Monad.Trans.Control
import Control.Monad.IO.Unlift (MonadUnliftIO, withRunInIO, liftIO)
import Data.Text.Conversions
import Data.Time
import Data.Aeson hiding (Success)
import qualified Data.Aeson as Aeson
import qualified Data.Aeson.Types as Aeson (Parser, parseMaybe)
import Data.String.Conv (StringConv(..), toS)
import Data.Functor (void)
import Control.Monad (forever)
import Data.Maybe (isNothing, maybe, fromMaybe)
import Data.Either (either)
import Control.Monad.Reader
import GHC.Generics
import qualified Data.HashMap.Strict as HM
import qualified Data.List as DL
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import System.FilePath (FilePath)
import qualified System.Directory as Dir
import Data.Aeson.Internal (iparse, IResult(..), formatError)
import qualified System.Log.FastLogger as FLogger
import Prelude hiding (log)
class (MonadUnliftIO m, MonadBaseControl IO m) => HasJobRunner m where
getPollingInterval :: m Seconds
onJobSuccess :: Job -> m ()
onJobFailed :: Job -> m ()
onJobPermanentlyFailed :: Job -> m ()
getJobRunner :: m (Job -> IO ())
getDbPool :: m (Pool Connection)
getTableName :: m TableName
onJobStart :: Job -> m ()
getDefaultMaxAttempts :: m Int
onJobTimeout :: Job -> m ()
getRunnerEnv :: m RunnerEnv
getConcurrencyControl :: m ConcurrencyControl
getPidFile :: m (Maybe FilePath)
getJobToText :: m (Job -> Text)
log :: LogLevel -> LogEvent -> m ()
data LogEvent
= LogJobStart !Job
| LogJobSuccess !Job !NominalDiffTime
| LogJobFailed !Job !NominalDiffTime
| LogJobPermanentlyFailed !Job !NominalDiffTime
| LogJobTimeout !Job
| LogPoll
| LogText !Text
deriving (Eq, Show, Generic)
data Config = Config
{
cfgTableName :: TableName
, cfgJobRunner :: Job -> IO ()
, cfgDefaultMaxAttempts :: Int
, cfgConcurrencyControl :: ConcurrencyControl
, cfgDbPool :: Pool Connection
, cfgPollingInterval :: Seconds
, cfgOnJobSuccess :: Job -> IO ()
, cfgOnJobFailed :: Job -> IO ()
, cfgOnJobPermanentlyFailed :: Job -> IO ()
, cfgOnJobStart :: Job -> IO ()
, cfgOnJobTimeout :: Job -> IO ()
, cfgPidFile :: Maybe FilePath
, cfgLogger :: LogLevel -> LogEvent -> IO ()
, cfgJobToText :: Job -> Text
, cfgJobType :: Job -> Text
}
data ConcurrencyControl
= MaxConcurrentJobs Int
| UnlimitedConcurrentJobs
| DynamicConcurrency (IO Bool)
instance Show ConcurrencyControl where
show cc = case cc of
MaxConcurrentJobs n -> "MaxConcurrentJobs " <> show n
UnlimitedConcurrentJobs -> "UnlimitedConcurrentJobs"
DynamicConcurrency _ -> "DynamicConcurrency (IO Bool)"
defaultConfig :: (LogLevel -> LogEvent -> IO ())
-> TableName
-> Pool Connection
-> ConcurrencyControl
-> (Job -> IO ())
-> Config
defaultConfig logger tname dbpool ccControl jrunner =
let cfg = Config
{ cfgPollingInterval = defaultPollingInterval
, cfgOnJobSuccess = (const $ pure ())
, cfgOnJobFailed = (const $ pure ())
, cfgOnJobPermanentlyFailed = (const $ pure ())
, cfgJobRunner = jrunner
, cfgLogger = logger
, cfgDbPool = dbpool
, cfgOnJobStart = (const $ pure ())
, cfgDefaultMaxAttempts = 10
, cfgTableName = tname
, cfgOnJobTimeout = (const $ pure ())
, cfgConcurrencyControl = ccControl
, cfgPidFile = Nothing
, cfgJobToText = defaultJobToText (cfgJobType cfg)
, cfgJobType = defaultJobType
}
in cfg
defaultJobToText :: (Job -> Text) -> Job -> Text
defaultJobToText jobTypeFn job@Job{jobId} =
"JobId=" <> (toS $ show jobId) <> " JobType=" <> jobTypeFn job
defaultJobType :: Job -> Text
defaultJobType Job{jobPayload} =
case jobPayload of
Aeson.Object hm -> case HM.lookup "tag" hm of
Just (Aeson.String t) -> t
_ -> "unknown"
_ -> "unknown"
data RunnerEnv = RunnerEnv
{ envConfig :: !Config
, envJobThreadsRef :: !(IORef [Async ()])
}
type RunnerM = ReaderT RunnerEnv IO
logCallbackErrors :: (HasJobRunner m) => JobId -> Text -> m () -> m ()
logCallbackErrors jid msg action = catchAny action $ \e -> log LevelError $ LogText $ msg <> " Job ID=" <> toS (show jid) <> ": " <> toS (show e)
instance HasJobRunner RunnerM where
getPollingInterval = cfgPollingInterval . envConfig <$> ask
onJobFailed job = do
fn <- cfgOnJobFailed . envConfig <$> ask
logCallbackErrors (jobId job) "onJobFailed" $ liftIO $ fn job
onJobSuccess job = do
fn <- cfgOnJobSuccess . envConfig <$> ask
logCallbackErrors (jobId job) "onJobSuccess" $ liftIO $ fn job
onJobPermanentlyFailed job = do
fn <- cfgOnJobPermanentlyFailed . envConfig <$> ask
logCallbackErrors (jobId job) "onJobPermanentlyFailed" $ liftIO $ fn job
getJobRunner = cfgJobRunner . envConfig <$> ask
getDbPool = cfgDbPool . envConfig <$> ask
getTableName = cfgTableName . envConfig <$> ask
onJobStart job = do
fn <- cfgOnJobStart . envConfig <$> ask
logCallbackErrors (jobId job) "onJobStart" $ liftIO $ fn job
onJobTimeout job = do
fn <- cfgOnJobTimeout . envConfig <$> ask
logCallbackErrors (jobId job) "onJobTimeout" $ liftIO $ fn job
getDefaultMaxAttempts = cfgDefaultMaxAttempts . envConfig <$> ask
getRunnerEnv = ask
getConcurrencyControl = (cfgConcurrencyControl . envConfig <$> ask)
getPidFile = cfgPidFile . envConfig <$> ask
getJobToText = cfgJobToText . envConfig <$> ask
log logLevel logEvent = do
loggerFn <- cfgLogger . envConfig <$> ask
liftIO $ loggerFn logLevel logEvent
startJobRunner :: Config -> IO ()
startJobRunner jm = do
r <- newIORef []
let monitorEnv = RunnerEnv
{ envConfig = jm
, envJobThreadsRef = r
}
runReaderT jobMonitor monitorEnv
withConnectionPool :: (MonadUnliftIO m)
=> Either BS.ByteString PGS.ConnectInfo
-> (Pool PGS.Connection -> m a)
-> m a
withConnectionPool connConfig action = withRunInIO $ \runInIO -> do
bracket poolCreator destroyAllResources (runInIO . action)
where
poolCreator = liftIO $
case connConfig of
Left connString ->
createPool (PGS.connectPostgreSQL connString) PGS.close 1 (fromIntegral $ 2 * (unSeconds defaultPollingInterval)) 5
Right connInfo ->
createPool (PGS.connect connInfo) PGS.close 1 (fromIntegral $ 2 * (unSeconds defaultPollingInterval)) 5
defaultPollingInterval :: Seconds
defaultPollingInterval = Seconds 5
type JobId = Int
data Status = Success
| Queued
| Failed
| Retry
| Locked
deriving (Eq, Show, Generic, Enum)
instance Ord Status where
compare x y = compare (toText x) (toText y)
data Job = Job
{ jobId :: JobId
, jobCreatedAt :: UTCTime
, jobUpdatedAt :: UTCTime
, jobRunAt :: UTCTime
, jobStatus :: Status
, jobPayload :: Aeson.Value
, jobLastError :: Maybe Value
, jobAttempts :: Int
, jobLockedAt :: Maybe UTCTime
, jobLockedBy :: Maybe Text
} deriving (Eq, Show)
instance ToText Status where
toText s = case s of
Success -> "success"
Queued -> "queued"
Retry -> "retry"
Failed -> "failed"
Locked -> "locked"
instance (StringConv Text a) => FromText (Either a Status) where
fromText t = case t of
"success" -> Right Success
"queued" -> Right Queued
"failed" -> Right Failed
"retry" -> Right Retry
"locked" -> Right Locked
x -> Left $ toS $ "Unknown job status: " <> x
instance FromField Status where
fromField f mBS = (fromText <$> (fromField f mBS)) >>= \case
Left e -> FromField.returnError PGS.ConversionFailed f e
Right s -> pure s
instance ToField Status where
toField s = toField $ toText s
instance FromRow Job where
fromRow = Job
<$> field
<*> field
<*> field
<*> field
<*> field
<*> field
<*> field
<*> field
<*> field
<*> field
type JobRunner = Job -> IO ()
jobWorkerName :: IO String
jobWorkerName = do
pid <- getProcessID
hname <- getHostName
pure $ hname ++ ":" ++ (show pid)
defaultLockTimeout :: Seconds
defaultLockTimeout = Seconds 600
jobDbColumns :: (IsString s, Semigroup s) => [s]
jobDbColumns =
[ "id"
, "created_at"
, "updated_at"
, "run_at"
, "status"
, "payload"
, "last_error"
, "attempts"
, "locked_at"
, "locked_by"
]
concatJobDbColumns :: (IsString s, Semigroup s) => s
concatJobDbColumns = concatJobDbColumns_ jobDbColumns ""
where
concatJobDbColumns_ [] x = x
concatJobDbColumns_ (col:[]) x = x <> col
concatJobDbColumns_ (col:cols) x = concatJobDbColumns_ cols (x <> col <> ", ")
findJobByIdQuery :: TableName -> PGS.Query
findJobByIdQuery tname = "SELECT " <> concatJobDbColumns <> " FROM " <> tname <> " WHERE id = ?"
withDbConnection :: (HasJobRunner m)
=> (Connection -> m a)
-> m a
withDbConnection action = do
pool <- getDbPool
withResource pool action
findJobById :: (HasJobRunner m)
=> JobId
-> m (Maybe Job)
findJobById jid = do
tname <- getTableName
withDbConnection $ \conn -> liftIO $ findJobByIdIO conn tname jid
findJobByIdIO :: Connection -> TableName -> JobId -> IO (Maybe Job)
findJobByIdIO conn tname jid = PGS.query conn (findJobByIdQuery tname) (Only jid) >>= \case
[] -> pure Nothing
[j] -> pure (Just j)
js -> Prelude.error $ "Not expecting to find multiple jobs by id=" <> (show jid)
saveJobQuery :: TableName -> PGS.Query
saveJobQuery tname = "UPDATE " <> tname <> " set run_at = ?, status = ?, payload = ?, last_error = ?, attempts = ?, locked_at = ?, locked_by = ? WHERE id = ? RETURNING " <> concatJobDbColumns
saveJob :: (HasJobRunner m) => Job -> m Job
saveJob j = do
tname <- getTableName
withDbConnection $ \conn -> liftIO $ saveJobIO conn tname j
saveJobIO :: Connection -> TableName -> Job -> IO Job
saveJobIO conn tname Job{jobRunAt, jobStatus, jobPayload, jobLastError, jobAttempts, jobLockedBy, jobLockedAt, jobId} = do
rs <- PGS.query conn (saveJobQuery tname)
( jobRunAt
, jobStatus
, jobPayload
, jobLastError
, jobAttempts
, jobLockedAt
, jobLockedBy
, jobId
)
case rs of
[] -> Prelude.error $ "Could not find job while updating it id=" <> (show jobId)
[j] -> pure j
js -> Prelude.error $ "Not expecting multiple rows to ber returned when updating job id=" <> (show jobId)
data TimeoutException = TimeoutException deriving (Eq, Show)
instance Exception TimeoutException
runJobWithTimeout :: (HasJobRunner m)
=> Seconds
-> Job
-> m ()
runJobWithTimeout timeoutSec job = do
threadsRef <- envJobThreadsRef <$> getRunnerEnv
jobRunner_ <- getJobRunner
a <- async $ liftIO $ jobRunner_ job
x <- atomicModifyIORef' threadsRef $ \threads -> (a:threads, DL.map asyncThreadId (a:threads))
log LevelDebug $ LogText $ toS $ "Spawned job in " <> show (asyncThreadId a)
t <- async $ do
delaySeconds timeoutSec
uninterruptibleCancel a
throwIO TimeoutException
void $ finally
(waitEitherCancel a t)
(atomicModifyIORef' threadsRef $ \threads -> (DL.delete a threads, ()))
runJob :: (HasJobRunner m) => JobId -> m ()
runJob jid = do
(findJobById jid) >>= \case
Nothing -> Prelude.error $ "Could not find job id=" <> show jid
Just job -> do
startTime <- liftIO getCurrentTime
(flip catches) [Handler $ timeoutHandler job startTime, Handler $ exceptionHandler job startTime] $ do
runJobWithTimeout defaultLockTimeout job
endTime <- liftIO getCurrentTime
newJob <- saveJob job{jobStatus=Success, jobLockedBy=Nothing, jobLockedAt=Nothing}
log LevelInfo $ LogJobSuccess newJob (diffUTCTime endTime startTime)
onJobSuccess newJob
pure ()
where
timeoutHandler job startTime (e :: TimeoutException) = retryOrFail (show e) job onJobTimeout onJobPermanentlyFailed startTime
exceptionHandler job startTime (e :: SomeException) = retryOrFail (show e) job onJobFailed onJobPermanentlyFailed startTime
retryOrFail errStr job@Job{jobAttempts} onFail onPermanentFail startTime = do
endTime <- liftIO getCurrentTime
defaultMaxAttempts <- getDefaultMaxAttempts
jobToText <- getJobToText
let runTime = diffUTCTime endTime startTime
(newStatus, action, logAction) = if jobAttempts >= defaultMaxAttempts
then ( Failed
, onPermanentFail
, log LevelError $ LogJobPermanentlyFailed job runTime
)
else ( Retry
, onFail
, log LevelWarn $ LogJobFailed job runTime
)
t <- liftIO getCurrentTime
newJob <- saveJob job{ jobStatus=newStatus
, jobLockedBy=Nothing
, jobLockedAt=Nothing
, jobLastError=(Just $ toJSON errStr)
, jobRunAt=(addUTCTime (fromIntegral $ (2::Int) ^ jobAttempts) t)
}
logAction
void $ action newJob
pure ()
restartUponCrash :: (HasJobRunner m, Show a) => Text -> m a -> m ()
restartUponCrash name_ action = do
a <- async action
finally (waitCatch a >>= fn) $ do
(log LevelInfo $ LogText $ "Received shutdown: " <> toS name_)
cancel a
where
fn x = do
case x of
Left (e :: SomeException) -> log LevelError $ LogText $ name_ <> " seems to have exited with an error. Restarting: " <> toS (show e)
Right r -> log LevelError $ LogText $ name_ <> " seems to have exited with the folloing result: " <> toS (show r) <> ". Restaring."
restartUponCrash name_ action
jobMonitor :: forall m . (HasJobRunner m) => m ()
jobMonitor = do
a1 <- async $ restartUponCrash "Job poller" jobPoller
a2 <- async $ restartUponCrash "Job event listener" jobEventListener
finally (void $ waitAnyCatch [a1, a2]) $ do
log LevelInfo (LogText "Stopping jobPoller and jobEventListener threads.")
cancel a2
cancel a1
log LevelInfo (LogText "Waiting for jobs to complete.")
waitForJobs
getPidFile >>= \case
Nothing -> pure ()
Just f -> do
log LevelInfo $ LogText $ "Removing PID file: " <> toS f
liftIO $ Dir.removePathForcibly f
jobPollingSql :: TableName -> Query
jobPollingSql tname = "update " <> tname <> " set status = ?, locked_at = ?, locked_by = ?, attempts=attempts+1 WHERE id in (select id from " <> tname <> " where (run_at<=? AND ((status in ?) OR (status = ? and locked_at<?))) ORDER BY run_at ASC LIMIT 1 FOR UPDATE) RETURNING id"
waitForJobs :: (HasJobRunner m)
=> m ()
waitForJobs = do
threadsRef <- envJobThreadsRef <$> getRunnerEnv
readIORef threadsRef >>= \case
[] -> log LevelInfo $ LogText "All job-threads exited"
as -> do
tid <- myThreadId
(a, _) <- waitAnyCatch as
log LevelDebug $ LogText $ toS $ "Waiting for " <> show (DL.length as) <> " jobs to complete before shutting down. myThreadId=" <> (show tid)
delaySeconds (Seconds 1)
waitForJobs
getConcurrencyControlFn :: (HasJobRunner m)
=> m (m Bool)
getConcurrencyControlFn = getConcurrencyControl >>= \case
UnlimitedConcurrentJobs -> pure $ pure True
MaxConcurrentJobs maxJobs -> pure $ do
curJobs <- getRunnerEnv >>= (readIORef . envJobThreadsRef)
pure $ (DL.length curJobs) < maxJobs
DynamicConcurrency fn -> pure $ liftIO fn
jobPoller :: (HasJobRunner m) => m ()
jobPoller = do
processName <- liftIO jobWorkerName
pool <- getDbPool
tname <- getTableName
log LevelInfo $ LogText $ toS $ "Starting the job monitor via DB polling with processName=" <> processName
concurrencyControlFn <- getConcurrencyControlFn
withResource pool $ \pollerDbConn -> forever $ concurrencyControlFn >>= \case
False -> log LevelWarn $ LogText $ "NOT polling the job queue due to concurrency control"
True -> do
nextAction <- mask_ $ do
log LevelDebug $ LogText $ toS $ "[" <> processName <> "] Polling the job queue.."
t <- liftIO getCurrentTime
r <- liftIO $
PGS.query pollerDbConn (jobPollingSql tname)
(Locked, t, processName, t, (In [Queued, Retry]), Locked, (addUTCTime (fromIntegral $ negate $ unSeconds defaultLockTimeout) t))
case r of
[] -> pure delayAction
[Only (jid :: JobId)] -> do
void $ async $ runJob jid
pure noDelayAction
x -> error $ "WTF just happened? I was supposed to get only a single row, but got: " ++ (show x)
nextAction
where
delayAction = delaySeconds =<< getPollingInterval
noDelayAction = pure ()
jobEventListener :: (HasJobRunner m)
=> m ()
jobEventListener = do
log LevelInfo $ LogText "Starting the job monitor via LISTEN/NOTIFY..."
pool <- getDbPool
tname <- getTableName
jwName <- liftIO jobWorkerName
concurrencyControlFn <- getConcurrencyControlFn
let tryLockingJob jid = do
let q = "UPDATE " <> tname <> " SET status=?, locked_at=now(), locked_by=?, attempts=attempts+1 WHERE id=? AND status in ? RETURNING id"
(withDbConnection $ \conn -> (liftIO $ PGS.query conn q (Locked, jwName, jid, In [Queued, Retry]))) >>= \case
[] -> do
log LevelDebug $ LogText $ toS $ "Job was locked by someone else before I could start. Skipping it. JobId=" <> show jid
pure Nothing
[Only (_ :: JobId)] -> pure $ Just jid
x -> error $ "WTF just happned? Was expecting a single row to be returned, received " ++ (show x)
withResource pool $ \monitorDbConn -> do
void $ liftIO $ PGS.execute monitorDbConn ("LISTEN " <> pgEventName tname) ()
forever $ do
log LevelDebug $ LogText "[LISTEN/NOFIFY] Event loop"
notif <- liftIO $ getNotification monitorDbConn
concurrencyControlFn >>= \case
False -> log LevelWarn $ LogText "Received job event, but ignoring it due to concurrency control"
True -> do
let pload = notificationData notif
log LevelDebug $ LogText $ toS $ "NOTIFY | " <> show pload
case (eitherDecode $ toS pload) of
Left e -> log LevelError $ LogText $ toS $ "Unable to decode notification payload received from Postgres. Payload=" <> show pload <> " Error=" <> show e
Right (v :: Value) -> case (Aeson.parseMaybe parser v) of
Nothing -> log LevelError $ LogText $ toS $ "Unable to extract id/run_at/locked_at from " <> show pload
Just (jid, runAt_, mLockedAt_) -> do
t <- liftIO getCurrentTime
if (runAt_ <= t) && (isNothing mLockedAt_)
then do log LevelDebug $ LogText $ toS $ "Job needs needs to be run immediately. Attempting to fork in background. JobId=" <> show jid
void $ async $ do
tryLockingJob jid >>= \case
Nothing -> pure ()
Just lockedJid -> runJob lockedJid
else log LevelDebug $ LogText $ toS $ "Job is either for future, or is already locked. Skipping. JobId=" <> show jid
where
parser :: Value -> Aeson.Parser (JobId, UTCTime, Maybe UTCTime)
parser = withObject "expecting an object to parse job.run_at and job.locked_at" $ \o -> do
runAt_ <- o .: "run_at"
mLockedAt_ <- o .:? "locked_at"
jid <- o .: "id"
pure (jid, runAt_, mLockedAt_)
createJobQuery :: TableName -> PGS.Query
createJobQuery tname = "INSERT INTO " <> tname <> "(run_at, status, payload, last_error, attempts, locked_at, locked_by) VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING " <> concatJobDbColumns
createJob :: ToJSON p
=> Connection
-> TableName
-> p
-> IO Job
createJob conn tname payload = do
t <- getCurrentTime
scheduleJob conn tname payload t
scheduleJob :: ToJSON p
=> Connection
-> TableName
-> p
-> UTCTime
-> IO Job
scheduleJob conn tname payload runAt = do
let args = ( runAt, Queued, toJSON payload, Nothing :: Maybe Value, 0 :: Int, Nothing :: Maybe Text, Nothing :: Maybe Text )
queryFormatter = toS <$> (PGS.formatQuery conn (createJobQuery tname) args)
rs <- PGS.query conn (createJobQuery tname) args
case rs of
[] -> (Prelude.error . (<> "Not expecting a blank result set when creating a job. Query=")) <$> queryFormatter
[r] -> pure r
_ -> (Prelude.error . (<> "Not expecting multiple rows when creating a single job. Query=")) <$> queryFormatter
eitherParsePayload :: (FromJSON a)
=> Job
-> Either String a
eitherParsePayload job =
eitherParsePayloadWith parseJSON job
throwParsePayload :: (FromJSON a)
=> Job
-> IO a
throwParsePayload job =
throwParsePayloadWith parseJSON job
eitherParsePayloadWith :: (Aeson.Value -> Aeson.Parser a)
-> Job
-> Either String a
eitherParsePayloadWith parser Job{jobPayload} = do
case iparse parser jobPayload of
IError jpath e -> Left $ formatError jpath e
ISuccess r -> Right r
throwParsePayloadWith :: (Aeson.Value -> Aeson.Parser a)
-> Job
-> IO a
throwParsePayloadWith parser job =
either throwString (pure . Prelude.id) (eitherParsePayloadWith parser job)
defaultTimedLogger :: FLogger.TimedFastLogger
-> (LogLevel -> LogEvent -> LogStr)
-> LogLevel
-> LogEvent
-> IO ()
defaultTimedLogger logger logStrFn logLevel logEvent =
if logLevel == LevelDebug
then pure ()
else logger $ \t -> (toLogStr t) <> " | " <>
(logStrFn logLevel logEvent) <>
"\n"
defaultLogStr :: (Job -> Text)
-> LogLevel
-> LogEvent
-> LogStr
defaultLogStr jobToText logLevel logEvent =
(toLogStr $ show logLevel) <> " | " <> str
where
jobToLogStr j = toLogStr $ jobToText j
str = case logEvent of
LogJobStart j ->
"Started | " <> jobToLogStr j
LogJobFailed j t ->
"Failed (retry) | " <> jobToLogStr j <> " | runtime=" <> (toLogStr $ show t)
LogJobSuccess j t ->
"Success | " <> (jobToLogStr j) <> " | runtime=" <> (toLogStr $ show t)
LogJobPermanentlyFailed j t ->
"Failed (permanent) | " <> jobToLogStr j <> " | runtime=" <> (toLogStr $ show t)
LogJobTimeout j@Job{jobLockedAt, jobLockedBy} ->
"Timeout | " <> jobToLogStr j <> " | lockedBy=" <> (toLogStr $ fromMaybe "unknown" jobLockedBy) <>
" lockedAt=" <> (toLogStr $ maybe "unknown" show jobLockedAt)
LogPoll ->
"Polling jobs table"
LogText t ->
toLogStr t