Safe Haskell | None |
---|---|
Language | Haskell2010 |
This module contains an at-least-once persistent job processing queue backed by Redis. It depends upon Redis not losing data once it has acknowledged it, and guaranteeing the atomicity that is specified for commands like EVAL (ie, that if you do several things within an EVAL, they will all happen or none will happen). Nothing has been tested with Redis clusters (and it likely will not work).
An example use is the following (see the repository for a slightly expanded version; also, the test cases in the repository are also good examples):
data PrintJob = Print deriving (Generic, Show) data State = State (MVar Int) instance ToJSON PrintJob instance FromJSON PrintJob instance Job State PrintJob where job (State mvar) Print = do v <- takeMVar mvar putMVar mvar (v + 1) putStrLn $ "A(" ++ show v ++ ")" return Success main = do mvar <- newMVar 0 hworker <- create "printer" (State mvar) forkIO (worker hworker) forkIO (monitor hworker) forkIO (forever $ queue hworker Print >> threadDelay 1000000) forever (threadDelay 1000000)
- data Result
- class (FromJSON t, ToJSON t, Show t) => Job s t | s -> t where
- data Hworker s t
- data HworkerConfig s = HworkerConfig {
- hwconfigName :: Text
- hwconfigState :: s
- hwconfigRedisConnectInfo :: RedisConnection
- hwconfigExceptionBehavior :: ExceptionBehavior
- hwconfigLogger :: forall a. Show a => a -> IO ()
- hwconfigTimeout :: NominalDiffTime
- hwconfigFailedQueueSize :: Int
- hwconfigDebug :: Bool
- data ExceptionBehavior
- data RedisConnection
- defaultHworkerConfig :: Text -> s -> HworkerConfig s
- create :: Job s t => Text -> s -> IO (Hworker s t)
- createWith :: Job s t => HworkerConfig s -> IO (Hworker s t)
- destroy :: Job s t => Hworker s t -> IO ()
- worker :: Job s t => Hworker s t -> IO ()
- monitor :: Job s t => Hworker s t -> IO ()
- queue :: Job s t => Hworker s t -> t -> IO Bool
- jobs :: Job s t => Hworker s t -> IO [t]
- failed :: Job s t => Hworker s t -> IO [t]
- broken :: Hworker s t -> IO [(ByteString, UTCTime)]
- debugger :: Job s t => Int -> Hworker s t -> IO ()
Types
class (FromJSON t, ToJSON t, Show t) => Job s t | s -> t where Source
Each Worker that you create will be responsible for one type of
job, defined by a Job
instance.
The job can do many different things (as the value can be a variant), but be careful not to break deserialization if you add new things it can do.
The job will take some state (passed as the s
parameter), which
does not vary based on the job, and the actual job data
structure. The data structure (the t
parameter) will be stored
and copied a few times in Redis while in the lifecycle, so
generally it is a good idea for it to be relatively small (and have
it be able to look up data that it needs while the job in running).
Finally, while deriving FromJSON and ToJSON instances automatically
might seem like a good idea, you will most likely be better off
defining them manually, so you can make sure they are backwards
compatible if you change them, as any jobs that can't be
deserialized will not be run (and will end up in the broken
queue). This will only happen if the queue is non-empty when you
replce the running application version, but this is obviously
possible and could be likely depending on your use.
The worker data type - it is parametrized be the worker
state (the s
) and the job type (the t
).
data HworkerConfig s Source
The main configuration for workers.
Each pool of workers should have a unique hwconfigName
, as the
queues are set up by that name, and if you have different types of
data written in, they will likely be unable to be deserialized (and
thus could end up in the broken
queue).
The hwconfigLogger
defaults to writing to stdout, so you will
likely want to replace that with something appropriate (like from a
logging package).
The hwconfigTimeout
is really important. It determines the length
of time after a job is started before the monitor
will decide
that the job must have died and will restart it. If it is shorter
than the length of time that a normal job takes to complete, the
jobs _will_ be run multiple times. This is _semantically_ okay, as
this is an at-least-once processor, but obviously won't be
desirable. It defaults to 120 seconds.
The hwconfigExceptionBehavior
controls what happens when an
exception is thrown within a job.
hwconfigFailedQueueSize
controls how many failed
jobs will be
kept. It defaults to 1000.
HworkerConfig | |
|
data ExceptionBehavior Source
data RedisConnection Source
When configuring a worker, you can tell it to use an existing redis connection pool (which you may have for the rest of your application). Otherwise, you can specify connection info. By default, hworker tries to connect to localhost, which may not be true for your production application.
defaultHworkerConfig :: Text -> s -> HworkerConfig s Source
The default worker config - it needs a name and a state (as those will always be unique).
Managing Workers
create :: Job s t => Text -> s -> IO (Hworker s t) Source
Create a new worker with the default HworkerConfig
.
Note that you must create at least one worker
and monitor
for
the queue to actually process jobs (and for it to retry ones that
time-out).
createWith :: Job s t => HworkerConfig s -> IO (Hworker s t) Source
Create a new worker with a specified HworkerConfig
.
Note that you must create at least one worker
and monitor
for
the queue to actually process jobs (and for it to retry ones that
time-out).
worker :: Job s t => Hworker s t -> IO () Source
Creates a new worker thread. This is blocking, so you will want to
forkIO
this into a thread. You can have any number of these (and
on any number of servers); the more there are, the faster jobs will
be processed.
monitor :: Job s t => Hworker s t -> IO () Source
Start a monitor. Like worker
, this is blocking, so should be
started in a thread. This is responsible for retrying jobs that
time out (which can happen if the processing thread is killed, for
example). You need to have at least one of these running to have
the retry happen, but it is safe to have any number running.
Queuing Jobs
queue :: Job s t => Hworker s t -> t -> IO Bool Source
Adds a job to the queue. Returns whether the operation succeeded.
Inspecting Workers
failed :: Job s t => Hworker s t -> IO [t] Source
Returns all failed jobs. This is capped at the most recent
hworkerconfigFailedQueueSize
jobs that returned Failure
(or
threw an exception when hworkerconfigExceptionBehavior
is
FailOnException
).
broken :: Hworker s t -> IO [(ByteString, UTCTime)] Source
Returns the jobs that could not be deserialized, most likely
because you changed the 'ToJSON'/'FromJSON' instances for you job
in a way that resulted in old jobs not being able to be converted
back from json. Another reason for jobs to end up here (and much
worse) is if you point two instances of Hworker
, with different
job types, at the same queue (ie, you re-use the name). Then
anytime a worker from one queue gets a job from the other it would
think it is broken.