{-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE FunctionalDependencies #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} {-| This module contains an at-least-once persistent job processing queue backed by Redis. It depends upon Redis not losing data once it has acknowledged it, and guaranteeing the atomicity that is specified for commands like EVAL (ie, that if you do several things within an EVAL, they will all happen or none will happen). Nothing has been tested with Redis clusters (and it likely will not work). An example use is the following (see the repository for a slightly expanded version; also, the test cases in the repository are also good examples): > data PrintJob = Print deriving (Generic, Show) > data State = State (MVar Int) > instance ToJSON PrintJob > instance FromJSON PrintJob > > instance Job State PrintJob where > job (State mvar) Print = > do v <- takeMVar mvar > putMVar mvar (v + 1) > putStrLn $ "A(" ++ show v ++ ")" > return Success > > main = do mvar <- newMVar 0 > hworker <- create "printer" (State mvar) > forkIO (worker hworker) > forkIO (monitor hworker) > forkIO (forever $ queue hworker Print >> threadDelay 1000000) > forever (threadDelay 1000000) -} module System.Hworker ( -- * Types Result(..) , Job(..) , Hworker , HworkerConfig(..) , ExceptionBehavior(..) , RedisConnection(..) , defaultHworkerConfig -- * Managing Workers , create , createWith , destroy , worker , monitor -- * Queuing Jobs , queue -- * Inspecting Workers , jobs , failed , broken -- * Debugging Utilities , debugger ) where import Control.Arrow (second) import Control.Concurrent (forkIO, threadDelay) import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar) import Control.Exception (SomeException, catch) import Control.Monad (forM, forever, void, when) import Data.Aeson (FromJSON, ToJSON) import qualified Data.Aeson as A import Data.Aeson.Helpers import Data.ByteString (ByteString) import qualified Data.ByteString.Char8 as B8 import qualified Data.ByteString.Lazy as LB import Data.Either (isRight) import Data.Maybe (fromJust, mapMaybe) import Data.Monoid ((<>)) import Data.Text (Text) import qualified Data.Text as T import qualified Data.Text.Encoding as T import Data.Time.Calendar (Day (..)) import Data.Time.Clock (NominalDiffTime, UTCTime (..), diffUTCTime, getCurrentTime) import qualified Data.UUID as UUID import qualified Data.UUID.V4 as UUID import qualified Database.Redis as R import GHC.Generics (Generic) -- | Jobs can return 'Success', 'Retry' (with a message), or 'Failure' -- (with a message). Jobs that return 'Failure' are stored in the -- 'failed' queue and are not re-run. Jobs that return 'Retry' are re-run. data Result = Success | Retry Text | Failure Text deriving (Generic, Show) instance ToJSON Result instance FromJSON Result -- | Each Worker that you create will be responsible for one type of -- job, defined by a 'Job' instance. -- -- The job can do many different things (as the value can be a -- variant), but be careful not to break deserialization if you add -- new things it can do. -- -- The job will take some state (passed as the `s` parameter), which -- does not vary based on the job, and the actual job data -- structure. The data structure (the `t` parameter) will be stored -- and copied a few times in Redis while in the lifecycle, so -- generally it is a good idea for it to be relatively small (and have -- it be able to look up data that it needs while the job in running). -- -- Finally, while deriving FromJSON and ToJSON instances automatically -- might seem like a good idea, you will most likely be better off -- defining them manually, so you can make sure they are backwards -- compatible if you change them, as any jobs that can't be -- deserialized will not be run (and will end up in the 'broken' -- queue). This will only happen if the queue is non-empty when you -- replce the running application version, but this is obviously -- possible and could be likely depending on your use. class (FromJSON t, ToJSON t, Show t) => Job s t | s -> t where job :: s -> t -> IO Result data JobData t = JobData UTCTime t -- | What should happen when an unexpected exception is thrown in a -- job - it can be treated as either a 'Failure' (the default) or a -- 'Retry' (if you know the only exceptions are triggered by -- intermittent problems). data ExceptionBehavior = RetryOnException | FailOnException hwlog :: Show a => Hworker s t -> a -> IO () hwlog hw a = hworkerLogger hw (hworkerName hw, a) -- | The worker data type - it is parametrized be the worker -- state (the `s`) and the job type (the `t`). data Hworker s t = Hworker { hworkerName :: ByteString , hworkerState :: s , hworkerConnection :: R.Connection , hworkerExceptionBehavior :: ExceptionBehavior , hworkerLogger :: forall a. Show a => a -> IO () , hworkerJobTimeout :: NominalDiffTime , hworkerFailedQueueSize :: Int , hworkerDebug :: Bool } -- | When configuring a worker, you can tell it to use an existing -- redis connection pool (which you may have for the rest of your -- application). Otherwise, you can specify connection info. By -- default, hworker tries to connect to localhost, which may not be -- true for your production application. data RedisConnection = RedisConnectInfo R.ConnectInfo | RedisConnection R.Connection -- | The main configuration for workers. -- -- Each pool of workers should have a unique `hwconfigName`, as the -- queues are set up by that name, and if you have different types of -- data written in, they will likely be unable to be deserialized (and -- thus could end up in the 'broken' queue). -- -- The 'hwconfigLogger' defaults to writing to stdout, so you will -- likely want to replace that with something appropriate (like from a -- logging package). -- -- The `hwconfigTimeout` is really important. It determines the length -- of time after a job is started before the 'monitor' will decide -- that the job must have died and will restart it. If it is shorter -- than the length of time that a normal job takes to complete, the -- jobs _will_ be run multiple times. This is _semantically_ okay, as -- this is an at-least-once processor, but obviously won't be -- desirable. It defaults to 120 seconds. -- -- The 'hwconfigExceptionBehavior' controls what happens when an -- exception is thrown within a job. -- -- 'hwconfigFailedQueueSize' controls how many 'failed' jobs will be -- kept. It defaults to 1000. data HworkerConfig s = HworkerConfig { hwconfigName :: Text , hwconfigState :: s , hwconfigRedisConnectInfo :: RedisConnection , hwconfigExceptionBehavior :: ExceptionBehavior , hwconfigLogger :: forall a. Show a => a -> IO () , hwconfigTimeout :: NominalDiffTime , hwconfigFailedQueueSize :: Int , hwconfigDebug :: Bool } -- | The default worker config - it needs a name and a state (as those -- will always be unique). defaultHworkerConfig :: Text -> s -> HworkerConfig s defaultHworkerConfig name state = HworkerConfig name state (RedisConnectInfo R.defaultConnectInfo) FailOnException print 120 1000 False -- | Create a new worker with the default 'HworkerConfig'. -- -- Note that you must create at least one 'worker' and 'monitor' for -- the queue to actually process jobs (and for it to retry ones that -- time-out). create :: Job s t => Text -> s -> IO (Hworker s t) create name state = createWith (defaultHworkerConfig name state) -- | Create a new worker with a specified 'HworkerConfig'. -- -- Note that you must create at least one 'worker' and 'monitor' for -- the queue to actually process jobs (and for it to retry ones that -- time-out). createWith :: Job s t => HworkerConfig s -> IO (Hworker s t) createWith HworkerConfig{..} = do conn <- case hwconfigRedisConnectInfo of RedisConnectInfo c -> R.connect c RedisConnection c -> return c return $ Hworker (T.encodeUtf8 hwconfigName) hwconfigState conn hwconfigExceptionBehavior hwconfigLogger hwconfigTimeout hwconfigFailedQueueSize hwconfigDebug -- | Destroy a worker. This will delete all the queues, clearing out -- all existing 'jobs', the 'broken' and 'failed' queues. There is no need -- to do this in normal applications (and most likely, you won't want to). destroy :: Job s t => Hworker s t -> IO () destroy hw = void $ R.runRedis (hworkerConnection hw) $ R.del [ jobQueue hw , progressQueue hw , brokenQueue hw , failedQueue hw ] jobQueue :: Hworker s t -> ByteString jobQueue hw = "hworker-jobs-" <> hworkerName hw progressQueue :: Hworker s t -> ByteString progressQueue hw = "hworker-progress-" <> hworkerName hw brokenQueue :: Hworker s t -> ByteString brokenQueue hw = "hworker-broken-" <> hworkerName hw failedQueue :: Hworker s t -> ByteString failedQueue hw = "hworker-failed-" <> hworkerName hw -- | Adds a job to the queue. Returns whether the operation succeeded. queue :: Job s t => Hworker s t -> t -> IO Bool queue hw j = do job_id <- UUID.toString <$> UUID.nextRandom isRight <$> R.runRedis (hworkerConnection hw) (R.lpush (jobQueue hw) [LB.toStrict $ A.encode (job_id, j)]) -- | Creates a new worker thread. This is blocking, so you will want to -- 'forkIO' this into a thread. You can have any number of these (and -- on any number of servers); the more there are, the faster jobs will -- be processed. worker :: Job s t => Hworker s t -> IO () worker hw = do now <- getCurrentTime r <- R.runRedis (hworkerConnection hw) $ R.eval "local job = redis.call('rpop',KEYS[1])\n\ \if job ~= nil then\n\ \ redis.call('hset', KEYS[2], job, ARGV[1])\n\ \ return job\n\ \else\n\ \ return nil\n\ \end" [jobQueue hw, progressQueue hw] [LB.toStrict $ A.encode now] case r of Left err -> hwlog hw err >> delayAndRun Right Nothing -> delayAndRun Right (Just t) -> do when (hworkerDebug hw) $ hwlog hw ("WORKER RUNNING", t) case decodeValue (LB.fromStrict t) of Nothing -> do hwlog hw ("BROKEN JOB", t) now <- getCurrentTime withNil hw (R.eval "local del = redis.call('hdel', KEYS[1], ARGV[1])\n\ \if del == 1 then\n\ \ redis.call('hset', KEYS[2], ARGV[1], ARGV[2])\n\ \end\n\ \return nil" [progressQueue hw, brokenQueue hw] [t, LB.toStrict $ A.encode now]) delayAndRun Just (_ :: String, j) -> do result <- runJob (job (hworkerState hw) j) case result of Success -> do when (hworkerDebug hw) $ hwlog hw ("JOB COMPLETE", t) delete_res <- R.runRedis (hworkerConnection hw) (R.hdel (progressQueue hw) [t]) case delete_res of Left err -> hwlog hw err >> delayAndRun Right 1 -> justRun Right n -> do hwlog hw ("Job done: did not delete 1, deleted " <> show n) delayAndRun Retry msg -> do hwlog hw ("Retry: " <> msg) withNil hw (R.eval "local del = redis.call('hdel', KEYS[1], ARGV[1])\n\ \if del == 1 then\n\ \ redis.call('lpush', KEYS[2], ARGV[1])\n\ \end\n\ \return nil" [progressQueue hw, jobQueue hw] [t]) delayAndRun Failure msg -> do hwlog hw ("Failure: " <> msg) withNil hw (R.eval "local del = redis.call('hdel', KEYS[1], ARGV[1])\n\ \if del == 1 then\n\ \ redis.call('lpush', KEYS[2], ARGV[1])\n\ \ redis.call('ltrim', KEYS[2], 0, ARGV[2])\n\ \end\n\ \return nil" [progressQueue hw, failedQueue hw] [t, B8.pack (show (hworkerFailedQueueSize hw - 1))]) void $ R.runRedis (hworkerConnection hw) (R.hdel (progressQueue hw) [t]) delayAndRun where delayAndRun = threadDelay 10000 >> worker hw justRun = worker hw runJob v = do x <- newEmptyMVar jt <- forkIO (catch (v >>= putMVar x . Right) (\(e::SomeException) -> putMVar x (Left e))) res <- takeMVar x case res of Left e -> let b = case hworkerExceptionBehavior hw of RetryOnException -> Retry FailOnException -> Failure in return (b ("Exception raised: " <> (T.pack . show) e)) Right r -> return r -- | Start a monitor. Like 'worker', this is blocking, so should be -- started in a thread. This is responsible for retrying jobs that -- time out (which can happen if the processing thread is killed, for -- example). You need to have at least one of these running to have -- the retry happen, but it is safe to have any number running. monitor :: Job s t => Hworker s t -> IO () monitor hw = forever $ do now <- getCurrentTime withList hw (R.hkeys (progressQueue hw)) (\jobs -> void $ forM jobs $ \job -> withMaybe hw (R.hget (progressQueue hw) job) (\start -> when (diffUTCTime now (fromJust $ decodeValue (LB.fromStrict start)) > hworkerJobTimeout hw) $ do n <- withInt hw (R.eval "local del = redis.call('hdel', KEYS[2], ARGV[1])\n\ \if del == 1 then\ \ redis.call('rpush', KEYS[1], ARGV[1])\n\ \end\n\ \return del" [jobQueue hw, progressQueue hw] [job]) when (hworkerDebug hw) $ hwlog hw ("MONITOR RV", n) when (hworkerDebug hw && n == 1) $ hwlog hw ("MONITOR REQUEUED", job))) -- NOTE(dbp 2015-07-25): We check every 1/10th of timeout. threadDelay (floor $ 100000 * hworkerJobTimeout hw) -- | Returns the jobs that could not be deserialized, most likely -- because you changed the 'ToJSON'/'FromJSON' instances for you job -- in a way that resulted in old jobs not being able to be converted -- back from json. Another reason for jobs to end up here (and much -- worse) is if you point two instances of 'Hworker', with different -- job types, at the same queue (ie, you re-use the name). Then -- anytime a worker from one queue gets a job from the other it would -- think it is broken. broken :: Hworker s t -> IO [(ByteString, UTCTime)] broken hw = do r <- R.runRedis (hworkerConnection hw) (R.hgetall (brokenQueue hw)) case r of Left err -> hwlog hw err >> return [] Right xs -> return (map (second parseTime) xs) where parseTime = fromJust . decodeValue . LB.fromStrict jobsFromQueue :: Job s t => Hworker s t -> ByteString -> IO [t] jobsFromQueue hw queue = do r <- R.runRedis (hworkerConnection hw) (R.lrange queue 0 (-1)) case r of Left err -> hwlog hw err >> return [] Right [] -> return [] Right xs -> return $ mapMaybe (fmap (\(_::String, x) -> x) . decodeValue . LB.fromStrict) xs -- | Returns all pending jobs. jobs :: Job s t => Hworker s t -> IO [t] jobs hw = jobsFromQueue hw (jobQueue hw) -- | Returns all failed jobs. This is capped at the most recent -- 'hworkerconfigFailedQueueSize' jobs that returned 'Failure' (or -- threw an exception when 'hworkerconfigExceptionBehavior' is -- 'FailOnException'). failed :: Job s t => Hworker s t -> IO [t] failed hw = jobsFromQueue hw (failedQueue hw) -- | Logs the contents of the jobqueue and the inprogress queue at -- `microseconds` intervals. debugger :: Job s t => Int -> Hworker s t -> IO () debugger microseconds hw = forever $ do withList hw (R.hkeys (progressQueue hw)) (\running -> withList hw (R.lrange (jobQueue hw) 0 (-1)) (\queued -> hwlog hw ("DEBUG", queued, running))) threadDelay microseconds -- Redis helpers follow withList hw a f = do r <- R.runRedis (hworkerConnection hw) a case r of Left err -> hwlog hw err Right [] -> return () Right xs -> f xs withMaybe hw a f = do r <- R.runRedis (hworkerConnection hw) a case r of Left err -> hwlog hw err Right Nothing -> return () Right (Just v) -> f v withNil hw a = do r <- R.runRedis (hworkerConnection hw) a case r of Left err -> hwlog hw err Right (Just ("" :: ByteString)) -> return () Right _ -> return () withInt :: Hworker s t -> R.Redis (Either R.Reply Integer) -> IO Integer withInt hw a = do r <- R.runRedis (hworkerConnection hw) a case r of Left err -> hwlog hw err >> return (-1) Right n -> return n withIgnore :: Hworker s t -> R.Redis (Either R.Reply a) -> IO () withIgnore hw a = do r <- R.runRedis (hworkerConnection hw) a case r of Left err -> hwlog hw err Right _ -> return ()