odd-jobs-0.2.2: A full-featured PostgreSQL-backed job queue (with an admin UI)

Safe HaskellNone
LanguageHaskell2010

OddJobs.Job

Contents

Synopsis

Starting the job-runner

startJobRunner :: Config -> IO () Source #

Start the job-runner in the current thread, i.e. you'll need to use forkIO or async manually, if you want the job-runner to run in the background. Consider using Cli to rapidly build your own standalone daemon.

Configuring the job-runner

data Config Source #

While odd-jobs is highly configurable and the Config data-type might seem daunting at first, it is not necessary to tweak every single configuration parameter by hand.

Recommendation: Please start-off by building a Config by using the mkConfig function (to get something with sensible defaults) and then tweaking config parameters on a case-by-case basis.

Constructors

Config 

Fields

  • cfgTableName :: TableName

    The DB table which holds your jobs. Please note, this should have been created by the createJobTable function.

  • cfgJobRunner :: Job -> IO ()

    The actualy "job-runner" that you need to provide. If this function throws a runtime exception, the job will be retried cfgDefaultMaxAttempts times. Please look at the examples/tutorials if your applicaton's code is not in the IO monad.

  • cfgDefaultMaxAttempts :: Int

    The number of times a failing job is retried before it is considered is "permanently failed" and ignored by the job-runner. This config parameter is called "default max attempts" because, in the future, it would be possible to specify the number of retry-attemps on a per-job basis (Note: per-job retry-attempts has not been implemented yet)

  • cfgConcurrencyControl :: ConcurrencyControl

    Controls how many jobs can be run concurrently by this instance of the job-runner. Please note, this is NOT the global concurrency of entire job-queue. It is possible to have job-runners running on multiple machines, and each will apply the concurrency control independnt of other job-runners. Ref: Section on controllng concurrency in the implementtion guide.

  • cfgDbPool :: Pool Connection

    The DB connection-pool to use for the job-runner. Note: in case your jobs require a DB connection, please create a separate connection-pool for them. This pool will be used ONLY for monitoring jobs and changing their status. We need to have at least 4 connections in this connection-pool for the job-runner to work as expected.

  • cfgPollingInterval :: Seconds

    How frequently should the jobPoller check for jobs where the Job's jobRunAt field indicates that it's time for the job to be executed. Ref: Please read the section on how Odd Jobs works (architecture) to find out more.

  • cfgOnJobSuccess :: Job -> IO ()

    User-defined callback function that is called whenever a job succeeds.

  • cfgOnJobFailed :: forall a. [JobErrHandler a]

    User-defined error-handler that is called whenever a job fails (indicated by cfgJobRunner throwing an unhandled runtime exception). Please refer to JobErrHandler for documentation on how to use this.

  • cfgOnJobStart :: Job -> IO ()

    User-defined callback function that is called whenever a job starts execution.

  • cfgOnJobTimeout :: Job -> IO ()

    User-defined callback function that is called whenever a job times-out. Also check cfgDefaultJobTimeout

  • cfgPidFile :: Maybe FilePath

    File to store the PID of the job-runner process. This is used only when invoking the job-runner as an independent background deemon (the usual mode of deployment).

  • cfgLogger :: LogLevel -> LogEvent -> IO ()

    A "structured logging" function that you need to provide. The odd-jobs library does NOT use the standard logging interface provided by 'monad-logger' on purpose. Also look at cfgJobType and defaultLogStr

    Note: Please take a look at the section on structured logging to find out how to use this to log in JSON.

  • cfgJobType :: Job -> Text

    How to extract the "job type" from a Job. If you are overriding this, please consider overriding cfgJobTypeSql as well. Related: defaultJobType

  • cfgJobTypeSql :: Query

    How to extract the "job type" directly in SQL. There are many places, especially in the web/admin UI, where we need to know a job's type directly in SQL (because transferrring the entire payload column to Haskell, and then parsing it into JSON, and then applying the cfgJobType function on it would be too inefficient). Ref: defaultJobTypeSql and cfgJobType

  • cfgDefaultJobTimeout :: Seconds

    How long can a job run after which it is considered to be "crashed" and picked up for execution again

  • cfgJobToHtml :: [Job] -> IO [Html ()]

    How to convert a list of Jobs to a list of HTML fragments. This is used in the Web/Admin UI. This function accepts a list of jobs and returns a list of Html fragments, because, in case, you need to query another table to fetch some metadata (eg. convert a primary-key to a human-readable name), you can do it efficiently instead of resulting in an N+1 SQL bug. Ref: defaultJobToHtml

  • cfgAllJobTypes :: AllJobTypes

    How to get a list of all known job-types? This is used by the Web/Admin UI to power the "filter by job-type" functionality. The default value for this is defaultDynamicJobTypes which does a SELECT DISTINCT payload ->> ... to get a list of job-types directly from the DB.

data ConcurrencyControl Source #

Note: Please read the section on controlling concurrency in the implementation guide to understand the implications of each option given by the data-type.

Constructors

MaxConcurrentJobs Int

The maximum number of concurrent jobs that this instance of the job-runner can execute.

UnlimitedConcurrentJobs

Not recommended: Please do not use this in production unless you know what you're doing. No machine can support unlimited concurrency. If your jobs are doing anything worthwhile, running a sufficiently large number concurrently is going to max-out some resource of the underlying machine, such as, CPU, memory, disk IOPS, or network bandwidth.

DynamicConcurrency (IO Bool)

Use this to dynamically determine if the next job should be picked-up, or not. This is useful to write custom-logic to determine whether a limited resource is below a certain usage threshold (eg. CPU usage is below 80%). Caveat: This feature has not been tested in production, yet.

Creating/scheduling jobs

Ideally you'd want to create wrappers for createJob and scheduleJob in your application so that instead of being in IO they can be in your application's monad m instead (this saving you from a liftIO every time you want to enqueue a job

createJob :: ToJSON p => Connection -> TableName -> p -> IO Job Source #

Create a job for immediate execution.

Internally calls scheduleJob passing it the current time. Read scheduleJob for further documentation.

scheduleJob Source #

Arguments

:: ToJSON p 
=> Connection

DB connection to use. Note: This should ideally come out of your application's DB pool, not the cfgDbPool you used in the job-runner.

-> TableName

DB-table which holds your jobs

-> p

Job payload

-> UTCTime

when should the job be executed

-> IO Job 

Create a job for execution at the given time.

  • If time has already past, jobEventListener is going to pick this up for execution immediately.
  • If time is in the future, jobPoller is going to pick this up with an error of +/- cfgPollingInterval seconds. Please do not expect very high accuracy of when the job is actually executed.

Job and associated data-types

data Job Source #

Instances
Eq Job Source # 
Instance details

Defined in OddJobs.Types

Methods

(==) :: Job -> Job -> Bool #

(/=) :: Job -> Job -> Bool #

Show Job Source # 
Instance details

Defined in OddJobs.Types

Methods

showsPrec :: Int -> Job -> ShowS #

show :: Job -> String #

showList :: [Job] -> ShowS #

Generic Job Source # 
Instance details

Defined in OddJobs.Types

Associated Types

type Rep Job :: Type -> Type #

Methods

from :: Job -> Rep Job x #

to :: Rep Job x -> Job #

FromRow Job Source # 
Instance details

Defined in OddJobs.Types

type Rep Job Source # 
Instance details

Defined in OddJobs.Types

type JobId = Int Source #

data Status Source #

Constructors

Success

In the current version of odd-jobs you should not find any jobs having the Success status, because successful jobs are immediately deleted. However, in the future, we may keep them around for a certain time-period before removing them from the jobs table.

Queued

Jobs in Queued status may be picked up by the job-runner on the basis of the jobRunAt field.

Failed

Jobs in Failed status will will not be retried by the job-runner.

Retry

Jobs in Retry status will be retried by the job-runner on the basis of the jobRunAt field.

Locked

Jobs in Locked status are currently being executed by a job-runner, which is identified by the jobLockedBy field. The start of job-execution is indicated by the jobLocketAt field.

Instances
Bounded Status Source # 
Instance details

Defined in OddJobs.Types

Enum Status Source # 
Instance details

Defined in OddJobs.Types

Eq Status Source # 
Instance details

Defined in OddJobs.Types

Methods

(==) :: Status -> Status -> Bool #

(/=) :: Status -> Status -> Bool #

Ord Status Source # 
Instance details

Defined in OddJobs.Types

Show Status Source # 
Instance details

Defined in OddJobs.Types

Generic Status Source # 
Instance details

Defined in OddJobs.Types

Associated Types

type Rep Status :: Type -> Type #

Methods

from :: Status -> Rep Status x #

to :: Rep Status x -> Status #

ToJSON Status Source # 
Instance details

Defined in OddJobs.Types

FromJSON Status Source # 
Instance details

Defined in OddJobs.Types

FromField Status Source # 
Instance details

Defined in OddJobs.Types

ToField Status Source # 
Instance details

Defined in OddJobs.Types

Methods

toField :: Status -> Action #

ToText Status Source # 
Instance details

Defined in OddJobs.Types

Methods

toText :: Status -> Text #

StringConv Text a => FromText (Either a Status) Source # 
Instance details

Defined in OddJobs.Types

Methods

fromText :: Text -> Either a Status #

type Rep Status Source # 
Instance details

Defined in OddJobs.Types

type Rep Status = D1 (MetaData "Status" "OddJobs.Types" "odd-jobs-0.2.2-IQT5Y8dLVtd1UwbOhFLeU6" False) ((C1 (MetaCons "Success" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "Queued" PrefixI False) (U1 :: Type -> Type)) :+: (C1 (MetaCons "Failed" PrefixI False) (U1 :: Type -> Type) :+: (C1 (MetaCons "Retry" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "Locked" PrefixI False) (U1 :: Type -> Type))))

newtype JobRunnerName Source #

Constructors

JobRunnerName 
Instances
Eq JobRunnerName Source # 
Instance details

Defined in OddJobs.Types

Show JobRunnerName Source # 
Instance details

Defined in OddJobs.Types

Generic JobRunnerName Source # 
Instance details

Defined in OddJobs.Types

Associated Types

type Rep JobRunnerName :: Type -> Type #

ToJSON JobRunnerName Source # 
Instance details

Defined in OddJobs.Types

FromJSON JobRunnerName Source # 
Instance details

Defined in OddJobs.Types

FromField JobRunnerName Source # 
Instance details

Defined in OddJobs.Types

ToField JobRunnerName Source # 
Instance details

Defined in OddJobs.Types

type Rep JobRunnerName Source # 
Instance details

Defined in OddJobs.Types

type Rep JobRunnerName = D1 (MetaData "JobRunnerName" "OddJobs.Types" "odd-jobs-0.2.2-IQT5Y8dLVtd1UwbOhFLeU6" True) (C1 (MetaCons "JobRunnerName" PrefixI True) (S1 (MetaSel (Just "unJobRunnerName") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Text)))

type TableName = Query Source #

An alias for Query type. Since this type has an instance of IsString you do not need to do anything special to create a value for this type. Just ensure you have the OverloadedStrings extention enabled. For example:

{-# LANGUAGE OverloadedStrings #-}

myJobsTable :: TableName
myJobsTable = "my_jobs"

delaySeconds :: MonadIO m => Seconds -> m () Source #

Convenience wrapper on-top of threadDelay which takes Seconds as an argument, instead of micro-seconds.

newtype Seconds Source #

Constructors

Seconds 

Fields

Instances
Eq Seconds Source # 
Instance details

Defined in OddJobs.Types

Methods

(==) :: Seconds -> Seconds -> Bool #

(/=) :: Seconds -> Seconds -> Bool #

Num Seconds Source # 
Instance details

Defined in OddJobs.Types

Ord Seconds Source # 
Instance details

Defined in OddJobs.Types

Read Seconds Source # 
Instance details

Defined in OddJobs.Types

Show Seconds Source # 
Instance details

Defined in OddJobs.Types

data JobErrHandler a Source #

Exception handler for jobs. This is conceptually very similar to how Handler and catches (from Exception) work in-tandem. Using cfgOnJobFailed you can install multiple exception handlers, where each handler is responsible for one type of exception. OddJobs will execute the correct exception handler on the basis of the type of runtime exception raised. For example:

cfgOnJobFailed =
  [ JobErrHandler $ (e :: HttpException) job failMode -> ...
  , JobErrHandler $ (e :: SqlException) job failMode -> ...
  , JobErrHandler $ (e :: ) job failMode -> ...
  ]

Note: Please refer to the section on alerts and notifications in the implementation guide to understand how to use the machinery provided by JobErrHandler and cfgOnJobFailed.

Constructors

Exception e => JobErrHandler (e -> Job -> FailureMode -> IO a) 

data AllJobTypes Source #

The web/admin UI needs to know a "master list" of all job-types to be able to power the "filter by job-type" feature. This data-type helps in letting odd-jobs know how to get such a master-list. The function specified by this type is run once when the job-runner starts (and stored in an internal IORef). After that the list of job-types needs to be updated manually by pressing the appropriate "refresh" link in the admin/web UI.

Constructors

AJTFixed [Text]

A fixed-list of job-types. If you don't want to increase boilerplate, consider using defaultConstantJobTypes which will automatically generate the list of available job-types based on a sum-type that represents your job payload.

AJTSql (Connection -> IO [Text])

Construct the list of job-types dynamically by looking at the actual payloads in cfgTableName (using an SQL query).

AJTCustom (IO [Text])

A custom IO action for fetching the list of job-types.

Structured logging

OddJobs uses "structured logging" for important events that occur during the life-cycle of a job-runner. This is useful if you're using JSON/XML for aggegating logs of your entire application to something like Kibana, AWS CloudFront, GCP StackDriver Logging, etc.

If you're not interested in using structured logging, look at defaultLogStr to output plain-text logs (or you can write your own function, as well).

data LogEvent Source #

Constructors

LogJobStart !Job

Emitted when a job starts execution

LogJobSuccess !Job !NominalDiffTime

Emitted when a job succeeds along with the time taken for execution.

LogJobFailed !Job !SomeException !FailureMode !NominalDiffTime

Emitted when a job fails (but will be retried) along with the time taken for this attempt

LogJobTimeout !Job

Emitted when a job times out and is picked-up again for execution

LogPoll

Emitted whenever jobPoller polls the DB table

LogWebUIRequest

TODO

LogText !Text

Emitted whenever any other event occurs

Instances
Show LogEvent Source # 
Instance details

Defined in OddJobs.Types

Generic LogEvent Source # 
Instance details

Defined in OddJobs.Types

Associated Types

type Rep LogEvent :: Type -> Type #

Methods

from :: LogEvent -> Rep LogEvent x #

to :: Rep LogEvent x -> LogEvent #

type Rep LogEvent Source # 
Instance details

Defined in OddJobs.Types

Job-runner interals

jobMonitor :: forall m. HasJobRunner m => m () Source #

Spawns jobPoller and jobEventListener in separate threads and restarts them in the off-chance they happen to crash. Also responsible for implementing graceful shutdown, i.e. waiting for all jobs already being executed to finish execution before exiting the main thread.

jobEventListener :: HasJobRunner m => m () Source #

Uses PostgreSQL's LISTEN/NOTIFY to be immediately notified of newly created jobs.

jobPoller :: HasJobRunner m => m () Source #

Executes jobPollingSql every cfgPollingInterval seconds to pick up jobs for execution. Uses UPDATE along with SELECT...FOR UPDATE to efficiently find a job that matches all of the following conditions:

  • jobRunAt should be in the past
  • one of the following conditions match:

    • jobStatus should be Queued or Retry
    • jobStatus should be Locked and jobLockedAt should be defaultLockTimeout seconds in the past, thus indicating that the job was picked up execution, but didn't complete on time (possible because the thread/process executing it crashed without being able to update the DB)

type JobRunner = Job -> IO () Source #

class (MonadUnliftIO m, MonadBaseControl IO m) => HasJobRunner m where Source #

The documentation of odd-jobs currently promotes startJobRunner, which expects a fairly detailed Config record, as a top-level function for initiating a job-runner. However, internally, this Config record is used as an enviroment for a ReaderT, and almost all functions are written in this ReaderT monad which impleents an instance of the HasJobRunner type-class.

In future, this internal implementation detail will allow us to offer a type-class based interface as well (similar to what YesodJobQueue provides).

Database helpers

A bunch of functions that help you query cfgTableName and change the status of individual jobs. Most of these functions are in IO and you might want to write wrappers that lift them into you application's custom monad.

Note: When passing a Connection to these function, it is recommended to not take a connection from cfgDbPool. Use your application's database pool instead.

unlockJobIO :: Connection -> TableName -> JobId -> IO (Maybe Job) Source #

TODO: First check in all job-runners if this job is still running, or not, and somehow send an uninterruptibleCancel to that thread.

jobDbColumns :: (IsString s, Semigroup s) => [s] Source #

If you are writing SQL queries where you want to return ALL columns from the jobs table it is recommended that you do not issue a SELECT * or RETURNIG *. List out specific DB columns using jobDbColumns and concatJobDbColumns instead. This will insulate you from runtime errors caused by addition of new columns to cfgTableName in future versions of OddJobs.

concatJobDbColumns :: (IsString s, Semigroup s) => s Source #

All jobDbColumns joined together with commas. Useful for constructing SQL queries, eg:

query_ conn $ "SELECT " <> concatJobDbColumns <> "FROM jobs"

fetchAllJobTypes :: MonadIO m => Config -> m [Text] Source #

Used by the web/admin UI to fetch a "master list" of all known job-types. Ref: cfgAllJobTypes

fetchAllJobRunners :: MonadIO m => Config -> m [JobRunnerName] Source #

Used by web/admin IO to fetch a "master list" of all known job-runners. There is a known issue with the way this has been implemented:

  • Since this looks at the jobLockedBy column of cfgTableName, it will discover only those job-runners that are actively executing at least one job at the time this function is executed.

JSON helpers