Safe Haskell | None |
---|---|
Language | Haskell2010 |
Synopsis
- startJobRunner :: Config -> IO ()
- data Config = Config {
- cfgTableName :: TableName
- cfgJobRunner :: Job -> IO ()
- cfgDefaultMaxAttempts :: Int
- cfgConcurrencyControl :: ConcurrencyControl
- cfgDbPool :: Pool Connection
- cfgPollingInterval :: Seconds
- cfgOnJobSuccess :: Job -> IO ()
- cfgOnJobFailed :: Job -> IO ()
- cfgOnJobPermanentlyFailed :: Job -> IO ()
- cfgOnJobStart :: Job -> IO ()
- cfgOnJobTimeout :: Job -> IO ()
- cfgPidFile :: Maybe FilePath
- cfgLogger :: LogLevel -> LogEvent -> IO ()
- cfgJobToText :: Job -> Text
- cfgJobType :: Job -> Text
- data ConcurrencyControl
- defaultConfig :: (LogLevel -> LogEvent -> IO ()) -> TableName -> Pool Connection -> ConcurrencyControl -> (Job -> IO ()) -> Config
- defaultJobToText :: (Job -> Text) -> Job -> Text
- defaultJobType :: Job -> Text
- defaultTimedLogger :: TimedFastLogger -> (LogLevel -> LogEvent -> LogStr) -> LogLevel -> LogEvent -> IO ()
- defaultLogStr :: (Job -> Text) -> LogLevel -> LogEvent -> LogStr
- defaultPollingInterval :: Seconds
- defaultLockTimeout :: Seconds
- withConnectionPool :: MonadUnliftIO m => Either ByteString ConnectInfo -> (Pool Connection -> m a) -> m a
- createJob :: ToJSON p => Connection -> TableName -> p -> IO Job
- scheduleJob :: ToJSON p => Connection -> TableName -> p -> UTCTime -> IO Job
- data Job = Job {
- jobId :: JobId
- jobCreatedAt :: UTCTime
- jobUpdatedAt :: UTCTime
- jobRunAt :: UTCTime
- jobStatus :: Status
- jobPayload :: Value
- jobLastError :: Maybe Value
- jobAttempts :: Int
- jobLockedAt :: Maybe UTCTime
- jobLockedBy :: Maybe Text
- type JobId = Int
- data Status
- type TableName = Query
- delaySeconds :: MonadIO m => Seconds -> m ()
- newtype Seconds = Seconds {}
- data LogEvent
- data LogLevel
- jobMonitor :: forall m. HasJobRunner m => m ()
- jobEventListener :: HasJobRunner m => m ()
- jobPoller :: HasJobRunner m => m ()
- jobPollingSql :: TableName -> Query
- type JobRunner = Job -> IO ()
- class (MonadUnliftIO m, MonadBaseControl IO m) => HasJobRunner m where
- getPollingInterval :: m Seconds
- onJobSuccess :: Job -> m ()
- onJobFailed :: Job -> m ()
- onJobPermanentlyFailed :: Job -> m ()
- getJobRunner :: m (Job -> IO ())
- getDbPool :: m (Pool Connection)
- getTableName :: m TableName
- onJobStart :: Job -> m ()
- getDefaultMaxAttempts :: m Int
- onJobTimeout :: Job -> m ()
- getRunnerEnv :: m RunnerEnv
- getConcurrencyControl :: m ConcurrencyControl
- getPidFile :: m (Maybe FilePath)
- getJobToText :: m (Job -> Text)
- log :: LogLevel -> LogEvent -> m ()
- findJobById :: HasJobRunner m => JobId -> m (Maybe Job)
- findJobByIdIO :: Connection -> TableName -> JobId -> IO (Maybe Job)
- saveJob :: HasJobRunner m => Job -> m Job
- saveJobIO :: Connection -> TableName -> Job -> IO Job
- jobDbColumns :: (IsString s, Semigroup s) => [s]
- concatJobDbColumns :: (IsString s, Semigroup s) => s
- eitherParsePayload :: FromJSON a => Job -> Either String a
- throwParsePayload :: FromJSON a => Job -> IO a
- eitherParsePayloadWith :: (Value -> Parser a) -> Job -> Either String a
- throwParsePayloadWith :: (Value -> Parser a) -> Job -> IO a
Starting the job-runner
startJobRunner :: Config -> IO () Source #
Configuring the job-runner
While odd-jobs is highly configurable and the Config
data-type might seem
daunting at first, it is not necessary to tweak every single configuration
parameter by hand. Please start-off by using the sensible defaults provided
by the configuration helpers, and tweaking
config parameters on a case-by-case basis.
Config | |
|
data ConcurrencyControl Source #
MaxConcurrentJobs Int | The maximum number of concurrent jobs that this instance of the job-runner can execute. TODO: Link-off to tutorial. |
UnlimitedConcurrentJobs | Not recommended: Please do not use this in production unless you know what you're doing. No machine can support unlimited concurrency. If your jobs are doing anything worthwhile, running a sufficiently large number concurrently is going to max-out some resource of the underlying machine, such as, CPU, memory, disk IOPS, or network bandwidth. |
DynamicConcurrency (IO Bool) | Use this to dynamically determine if the next job should be picked-up, or not. This is useful to write custom-logic to determine whether a limited resource is below a certain usage threshold (eg. CPU usage is below 80%). Caveat: This feature has not been tested in production, yet. TODO: Link-off to tutorial. |
Instances
Show ConcurrencyControl Source # | |
Defined in OddJobs.Job showsPrec :: Int -> ConcurrencyControl -> ShowS # show :: ConcurrencyControl -> String # showList :: [ConcurrencyControl] -> ShowS # |
Configuration helpers
:: (LogLevel -> LogEvent -> IO ()) | "Structured logging" function. Ref: |
-> TableName | DB table which holds your jobs. Ref: |
-> Pool Connection | DB connection-pool to be used by job-runner. Ref: |
-> ConcurrencyControl | Concurrency configuration. Ref: |
-> (Job -> IO ()) | The actual "job runner" which contains your application code. Ref: |
-> Config |
This function gives you a Config
with a bunch of sensible defaults
already applied. It requies the bare minimum arguments that this library
cannot assume on your behalf.
It makes a few important assumptions about your 'jobPayload 'JSON, which
are documented in defaultJobType
.
defaultJobToText :: (Job -> Text) -> Job -> Text Source #
Used only by defaultLogStr
now. TODO: Is this even required anymore?
Should this be removed?
defaultJobType :: Job -> Text Source #
This makes two important assumptions. First, this assumes that jobs in your app are represented by a sum-type. For example:
data MyJob = SendWelcomeEmail Int | SendPasswordResetEmail Text | SetupSampleData Int
Second, it assumes that the JSON representatin of this sum-type is "tagged". For example, the following...
let pload = SendWelcomeEmail 10
...when converted to JSON, would look like...
{"tag":"SendWelcomeEmail", "contents":10}
It uses this assumption to extract the "job type" from a Value
(which would be SendWelcomeEmail
in the example given above). This is used
in logging and the admin UI.
Even if tihs assumption is violated, the job-runner should continue to function. It's just that you won't get very useful log messages.
defaultTimedLogger :: TimedFastLogger -> (LogLevel -> LogEvent -> LogStr) -> LogLevel -> LogEvent -> IO () Source #
TODO: Should the library be doing this?
defaultPollingInterval :: Seconds Source #
As the name says. Ref: cfgPollingInterval
defaultLockTimeout :: Seconds Source #
TODO: Make this configurable for the job-runner, why is this still hard-coded?
withConnectionPool :: MonadUnliftIO m => Either ByteString ConnectInfo -> (Pool Connection -> m a) -> m a Source #
Convenience function to create a DB connection-pool with some sensible defaults. Please see the source-code of this function to understand what it's doing. TODO: link-off to tutorial.
Creating/scheduling jobs
Ideally you'd want to create wrappers for createJob
and scheduleJob
in
your application so that instead of being in IO
they can be in your
application's monad m
instead (this saving you from a liftIO
every time
you want to enqueue a job
createJob :: ToJSON p => Connection -> TableName -> p -> IO Job Source #
Create a job for immediate execution.
Internally calls scheduleJob
passing it the current time. Read
scheduleJob
for further documentation.
:: ToJSON p | |
=> Connection | DB connection to use. Note: This should
ideally come out of your application's DB pool,
not the |
-> TableName | DB-table which holds your jobs |
-> p | Job payload |
-> UTCTime | when should the job be executed |
-> IO Job |
Create a job for execution at the given time.
- If time has already past,
jobEventListener
is going to pick this up for execution immediately. - If time is in the future,
jobPoller
is going to pick this up with an error of +/-cfgPollingInterval
seconds. Please do not expect very high accuracy of when the job is actually executed.
Job
and associated data-types
Job | |
|
Instances
Enum Status Source # | |
Defined in OddJobs.Job | |
Eq Status Source # | |
Ord Status Source # | |
Show Status Source # | |
Generic Status Source # | |
ToJSON Status Source # | |
Defined in OddJobs.Web | |
FromJSON Status Source # | |
FromField Status Source # | |
Defined in OddJobs.Job | |
ToField Status Source # | |
Defined in OddJobs.Job | |
ToText Status Source # | |
Defined in OddJobs.Job | |
StringConv Text a => FromText (Either a Status) Source # | |
type Rep Status Source # | |
Defined in OddJobs.Job type Rep Status = D1 (MetaData "Status" "OddJobs.Job" "odd-jobs-0.1.0-AG1ucQCmc3LHSWSLszrvJU" False) ((C1 (MetaCons "Success" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "Queued" PrefixI False) (U1 :: Type -> Type)) :+: (C1 (MetaCons "Failed" PrefixI False) (U1 :: Type -> Type) :+: (C1 (MetaCons "Retry" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "Locked" PrefixI False) (U1 :: Type -> Type)))) |
type TableName = Query Source #
An alias for Query
type. Since this type has an instance of IsString
you do not need to do anything special to create a value for this type. Just
ensure you have the OverloadedStrings
extention enabled. For example:
{-# LANGUAGE OverloadedStrings #-} myJobsTable :: TableName myJobsTable = "my_jobs"
delaySeconds :: MonadIO m => Seconds -> m () Source #
Convenience wrapper on-top of threadDelay
which takes Seconds
as an
argument, instead of micro-seconds.
Structured logging
TODO: Complete the prose here
LogJobStart !Job | Emitted when a job starts execution |
LogJobSuccess !Job !NominalDiffTime | Emitted when a job succeeds along with the time taken for execution. |
LogJobFailed !Job !NominalDiffTime | Emitted when a job fails (but will be retried) along with the time taken for this attempt |
LogJobPermanentlyFailed !Job !NominalDiffTime | Emitted when a job fails permanently (and will no longer be retried) along with the time taken for this attempt (i.e. final attempt) |
LogJobTimeout !Job | Emitted when a job times out and is picked-up again for execution |
LogPoll | Emitted whenever |
LogText !Text | Emitted whenever any other event occurs |
Instances
Job-runner interals
jobMonitor :: forall m. HasJobRunner m => m () Source #
Spawns jobPoller
and jobEventListener
in separate threads and restarts
them in the off-chance they happen to crash. Also responsible for
implementing graceful shutdown, i.e. waiting for all jobs already being
executed to finish execution before exiting the main thread.
jobEventListener :: HasJobRunner m => m () Source #
Uses PostgreSQL's LISTEN/NOTIFY to be immediately notified of newly created jobs.
jobPoller :: HasJobRunner m => m () Source #
Executes jobPollingSql
every cfgPollingInterval
seconds to pick up jobs
for execution. Uses UPDATE
along with SELECT...FOR UPDATE
to efficiently
find a job that matches all of the following conditions:
jobRunAt
should be in the pastone of the following conditions match:
jobStatus
should beQueued
orRetry
jobStatus
should beLocked
andjobLockedAt
should bedefaultLockTimeout
seconds in the past, thus indicating that the job was picked up execution, but didn't complete on time (possible because the thread/process executing it crashed without being able to update the DB)
class (MonadUnliftIO m, MonadBaseControl IO m) => HasJobRunner m where Source #
The documentation of odd-jobs currently promotes startJobRunner
, which
expects a fairly detailed Config
record, as a top-level function for
initiating a job-runner. However, internally, this Config
record is used as
an enviroment for a ReaderT
, and almost all functions are written in this
ReaderT
monad which impleents an instance of the HasJobRunner
type-class.
- *In future,** this internal implementation detail will allow us to offer a
type-class based interface as well (similar to what
YesodJobQueue
provides).
getPollingInterval :: m Seconds Source #
onJobSuccess :: Job -> m () Source #
onJobFailed :: Job -> m () Source #
onJobPermanentlyFailed :: Job -> m () Source #
getJobRunner :: m (Job -> IO ()) Source #
getDbPool :: m (Pool Connection) Source #
getTableName :: m TableName Source #
onJobStart :: Job -> m () Source #
getDefaultMaxAttempts :: m Int Source #
onJobTimeout :: Job -> m () Source #
getRunnerEnv :: m RunnerEnv Source #
getConcurrencyControl :: m ConcurrencyControl Source #
getPidFile :: m (Maybe FilePath) Source #
getJobToText :: m (Job -> Text) Source #
Database helpers
findJobById :: HasJobRunner m => JobId -> m (Maybe Job) Source #
findJobByIdIO :: Connection -> TableName -> JobId -> IO (Maybe Job) Source #
jobDbColumns :: (IsString s, Semigroup s) => [s] Source #
concatJobDbColumns :: (IsString s, Semigroup s) => s Source #