module Faktory.Worker
( WorkerHalt(..)
, runWorker
, runWorkerEnv
, jobArg
)
where
import Faktory.Prelude
import Control.Concurrent (killThread)
import Data.Aeson
import Data.Aeson.Casing
import qualified Data.Text as T
import Faktory.Client
import Faktory.Job (Job, JobId, jobArg, jobJid, jobReserveForMicroseconds)
import Faktory.Settings
import GHC.Generics
import GHC.Stack
import System.Timeout (timeout)
data WorkerHalt = WorkerHalt
deriving stock (WorkerHalt -> WorkerHalt -> Bool
(WorkerHalt -> WorkerHalt -> Bool)
-> (WorkerHalt -> WorkerHalt -> Bool) -> Eq WorkerHalt
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: WorkerHalt -> WorkerHalt -> Bool
$c/= :: WorkerHalt -> WorkerHalt -> Bool
== :: WorkerHalt -> WorkerHalt -> Bool
$c== :: WorkerHalt -> WorkerHalt -> Bool
Eq, Int -> WorkerHalt -> ShowS
[WorkerHalt] -> ShowS
WorkerHalt -> String
(Int -> WorkerHalt -> ShowS)
-> (WorkerHalt -> String)
-> ([WorkerHalt] -> ShowS)
-> Show WorkerHalt
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [WorkerHalt] -> ShowS
$cshowList :: [WorkerHalt] -> ShowS
show :: WorkerHalt -> String
$cshow :: WorkerHalt -> String
showsPrec :: Int -> WorkerHalt -> ShowS
$cshowsPrec :: Int -> WorkerHalt -> ShowS
Show)
deriving anyclass Show WorkerHalt
Typeable WorkerHalt
Typeable WorkerHalt
-> Show WorkerHalt
-> (WorkerHalt -> SomeException)
-> (SomeException -> Maybe WorkerHalt)
-> (WorkerHalt -> String)
-> Exception WorkerHalt
SomeException -> Maybe WorkerHalt
WorkerHalt -> String
WorkerHalt -> SomeException
forall e.
Typeable e
-> Show e
-> (e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> Exception e
displayException :: WorkerHalt -> String
$cdisplayException :: WorkerHalt -> String
fromException :: SomeException -> Maybe WorkerHalt
$cfromException :: SomeException -> Maybe WorkerHalt
toException :: WorkerHalt -> SomeException
$ctoException :: WorkerHalt -> SomeException
$cp2Exception :: Show WorkerHalt
$cp1Exception :: Typeable WorkerHalt
Exception
newtype BeatPayload = BeatPayload
{ BeatPayload -> WorkerId
_bpWid :: WorkerId
}
deriving stock (forall x. BeatPayload -> Rep BeatPayload x)
-> (forall x. Rep BeatPayload x -> BeatPayload)
-> Generic BeatPayload
forall x. Rep BeatPayload x -> BeatPayload
forall x. BeatPayload -> Rep BeatPayload x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep BeatPayload x -> BeatPayload
$cfrom :: forall x. BeatPayload -> Rep BeatPayload x
Generic
instance ToJSON BeatPayload where
toJSON :: BeatPayload -> Value
toJSON = Options -> BeatPayload -> Value
forall a.
(Generic a, GToJSON' Value Zero (Rep a)) =>
Options -> a -> Value
genericToJSON (Options -> BeatPayload -> Value)
-> Options -> BeatPayload -> Value
forall a b. (a -> b) -> a -> b
$ ShowS -> Options
aesonPrefix ShowS
snakeCase
toEncoding :: BeatPayload -> Encoding
toEncoding = Options -> BeatPayload -> Encoding
forall a.
(Generic a, GToJSON' Encoding Zero (Rep a)) =>
Options -> a -> Encoding
genericToEncoding (Options -> BeatPayload -> Encoding)
-> Options -> BeatPayload -> Encoding
forall a b. (a -> b) -> a -> b
$ ShowS -> Options
aesonPrefix ShowS
snakeCase
newtype AckPayload = AckPayload
{ AckPayload -> String
_apJid :: JobId
}
deriving stock (forall x. AckPayload -> Rep AckPayload x)
-> (forall x. Rep AckPayload x -> AckPayload) -> Generic AckPayload
forall x. Rep AckPayload x -> AckPayload
forall x. AckPayload -> Rep AckPayload x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep AckPayload x -> AckPayload
$cfrom :: forall x. AckPayload -> Rep AckPayload x
Generic
instance ToJSON AckPayload where
toJSON :: AckPayload -> Value
toJSON = Options -> AckPayload -> Value
forall a.
(Generic a, GToJSON' Value Zero (Rep a)) =>
Options -> a -> Value
genericToJSON (Options -> AckPayload -> Value) -> Options -> AckPayload -> Value
forall a b. (a -> b) -> a -> b
$ ShowS -> Options
aesonPrefix ShowS
snakeCase
toEncoding :: AckPayload -> Encoding
toEncoding = Options -> AckPayload -> Encoding
forall a.
(Generic a, GToJSON' Encoding Zero (Rep a)) =>
Options -> a -> Encoding
genericToEncoding (Options -> AckPayload -> Encoding)
-> Options -> AckPayload -> Encoding
forall a b. (a -> b) -> a -> b
$ ShowS -> Options
aesonPrefix ShowS
snakeCase
data FailPayload = FailPayload
{ FailPayload -> Text
_fpMessage :: Text
, FailPayload -> String
_fpErrtype :: String
, FailPayload -> String
_fpJid :: JobId
, FailPayload -> [String]
_fpBacktrace :: [String]
}
deriving stock (forall x. FailPayload -> Rep FailPayload x)
-> (forall x. Rep FailPayload x -> FailPayload)
-> Generic FailPayload
forall x. Rep FailPayload x -> FailPayload
forall x. FailPayload -> Rep FailPayload x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep FailPayload x -> FailPayload
$cfrom :: forall x. FailPayload -> Rep FailPayload x
Generic
instance ToJSON FailPayload where
toJSON :: FailPayload -> Value
toJSON = Options -> FailPayload -> Value
forall a.
(Generic a, GToJSON' Value Zero (Rep a)) =>
Options -> a -> Value
genericToJSON (Options -> FailPayload -> Value)
-> Options -> FailPayload -> Value
forall a b. (a -> b) -> a -> b
$ ShowS -> Options
aesonPrefix ShowS
snakeCase
toEncoding :: FailPayload -> Encoding
toEncoding = Options -> FailPayload -> Encoding
forall a.
(Generic a, GToJSON' Encoding Zero (Rep a)) =>
Options -> a -> Encoding
genericToEncoding (Options -> FailPayload -> Encoding)
-> Options -> FailPayload -> Encoding
forall a b. (a -> b) -> a -> b
$ ShowS -> Options
aesonPrefix ShowS
snakeCase
runWorker
:: (HasCallStack, FromJSON args)
=> Settings
-> WorkerSettings
-> (Job args -> IO ())
-> IO ()
runWorker :: Settings -> WorkerSettings -> (Job args -> IO ()) -> IO ()
runWorker Settings
settings WorkerSettings
workerSettings Job args -> IO ()
f = do
WorkerId
workerId <- IO WorkerId
-> (WorkerId -> IO WorkerId) -> Maybe WorkerId -> IO WorkerId
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IO WorkerId
randomWorkerId WorkerId -> IO WorkerId
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe WorkerId -> IO WorkerId) -> Maybe WorkerId -> IO WorkerId
forall a b. (a -> b) -> a -> b
$ WorkerSettings -> Maybe WorkerId
settingsId WorkerSettings
workerSettings
Client
client <- HasCallStack => Settings -> Maybe WorkerId -> IO Client
Settings -> Maybe WorkerId -> IO Client
newClient Settings
settings (Maybe WorkerId -> IO Client) -> Maybe WorkerId -> IO Client
forall a b. (a -> b) -> a -> b
$ WorkerId -> Maybe WorkerId
forall a. a -> Maybe a
Just WorkerId
workerId
ThreadId
beatThreadId <- IO () -> IO ThreadId
forkIOWithThrowToParent (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Client -> WorkerId -> IO ()
heartBeat Client
client WorkerId
workerId
IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (Client
-> Settings -> WorkerSettings -> (Job args -> IO ()) -> IO ()
forall arg.
(HasCallStack, FromJSON arg) =>
Client -> Settings -> WorkerSettings -> (Job arg -> IO ()) -> IO ()
processorLoop Client
client Settings
settings WorkerSettings
workerSettings Job args -> IO ()
f)
IO () -> (WorkerHalt -> IO ()) -> IO ()
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` (\(WorkerHalt
_ex :: WorkerHalt) -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
IO () -> IO () -> IO ()
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally` (ThreadId -> IO ()
killThread ThreadId
beatThreadId IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Client -> IO ()
closeClient Client
client)
runWorkerEnv :: FromJSON args => (Job args -> IO ()) -> IO ()
runWorkerEnv :: (Job args -> IO ()) -> IO ()
runWorkerEnv Job args -> IO ()
f = do
Settings
settings <- IO Settings
envSettings
WorkerSettings
workerSettings <- IO WorkerSettings
envWorkerSettings
Settings -> WorkerSettings -> (Job args -> IO ()) -> IO ()
forall args.
(HasCallStack, FromJSON args) =>
Settings -> WorkerSettings -> (Job args -> IO ()) -> IO ()
runWorker Settings
settings WorkerSettings
workerSettings Job args -> IO ()
f
processorLoop
:: (HasCallStack, FromJSON arg)
=> Client
-> Settings
-> WorkerSettings
-> (Job arg -> IO ())
-> IO ()
processorLoop :: Client -> Settings -> WorkerSettings -> (Job arg -> IO ()) -> IO ()
processorLoop Client
client Settings
settings WorkerSettings
workerSettings Job arg -> IO ()
f = do
let
namespace :: Namespace
namespace = ConnectionInfo -> Namespace
connectionInfoNamespace (ConnectionInfo -> Namespace) -> ConnectionInfo -> Namespace
forall a b. (a -> b) -> a -> b
$ Settings -> ConnectionInfo
settingsConnection Settings
settings
processAndAck :: Job arg -> IO ()
processAndAck Job arg
job = do
Maybe ()
mResult <- Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout (Job arg -> Int
forall arg. Job arg -> Int
jobReserveForMicroseconds Job arg
job) (IO () -> IO (Maybe ())) -> IO () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ Job arg -> IO ()
f Job arg
job
case Maybe ()
mResult of
Maybe ()
Nothing -> Settings -> String -> IO ()
settingsLogError Settings
settings String
"Job reservation period expired."
Just () -> Client -> Job arg -> IO ()
forall args. HasCallStack => Client -> Job args -> IO ()
ackJob Client
client Job arg
job
Either String (Maybe (Job arg))
emJob <- Client -> Queue -> IO (Either String (Maybe (Job arg)))
forall args.
FromJSON args =>
Client -> Queue -> IO (Either String (Maybe (Job args)))
fetchJob Client
client (Queue -> IO (Either String (Maybe (Job arg))))
-> Queue -> IO (Either String (Maybe (Job arg)))
forall a b. (a -> b) -> a -> b
$ Namespace -> Queue -> Queue
namespaceQueue Namespace
namespace (Queue -> Queue) -> Queue -> Queue
forall a b. (a -> b) -> a -> b
$ WorkerSettings -> Queue
settingsQueue
WorkerSettings
workerSettings
case Either String (Maybe (Job arg))
emJob of
Left String
err -> Settings -> String -> IO ()
settingsLogError Settings
settings (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Invalid Job: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
err
Right Maybe (Job arg)
Nothing -> Int -> IO ()
threadDelaySeconds (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ WorkerSettings -> Int
settingsIdleDelay WorkerSettings
workerSettings
Right (Just Job arg
job) ->
Job arg -> IO ()
processAndAck Job arg
job
IO () -> [Handler IO ()] -> IO ()
forall (m :: * -> *) a.
(MonadCatch m, MonadThrow m) =>
m a -> [Handler m a] -> m a
`catches` [ (WorkerHalt -> IO ()) -> Handler IO ()
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Handler ((WorkerHalt -> IO ()) -> Handler IO ())
-> (WorkerHalt -> IO ()) -> Handler IO ()
forall a b. (a -> b) -> a -> b
$ \(WorkerHalt
ex :: WorkerHalt) -> WorkerHalt -> IO ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw WorkerHalt
ex
, (SomeException -> IO ()) -> Handler IO ()
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Handler ((SomeException -> IO ()) -> Handler IO ())
-> (SomeException -> IO ()) -> Handler IO ()
forall a b. (a -> b) -> a -> b
$ \(SomeException
ex :: SomeException) ->
Client -> Job arg -> Text -> IO ()
forall args. HasCallStack => Client -> Job args -> Text -> IO ()
failJob Client
client Job arg
job (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ SomeException -> String
forall a. Show a => a -> String
show SomeException
ex
]
heartBeat :: Client -> WorkerId -> IO ()
heartBeat :: Client -> WorkerId -> IO ()
heartBeat Client
client WorkerId
workerId = do
Int -> IO ()
threadDelaySeconds Int
25
Client -> ByteString -> [ByteString] -> IO ()
command_ Client
client ByteString
"BEAT" [BeatPayload -> ByteString
forall a. ToJSON a => a -> ByteString
encode (BeatPayload -> ByteString) -> BeatPayload -> ByteString
forall a b. (a -> b) -> a -> b
$ WorkerId -> BeatPayload
BeatPayload WorkerId
workerId]
fetchJob
:: FromJSON args => Client -> Queue -> IO (Either String (Maybe (Job args)))
fetchJob :: Client -> Queue -> IO (Either String (Maybe (Job args)))
fetchJob Client
client Queue
queue = Client
-> ByteString
-> [ByteString]
-> IO (Either String (Maybe (Job args)))
forall a.
FromJSON a =>
Client
-> ByteString -> [ByteString] -> IO (Either String (Maybe a))
commandJSON Client
client ByteString
"FETCH" [Queue -> ByteString
queueArg Queue
queue]
ackJob :: HasCallStack => Client -> Job args -> IO ()
ackJob :: Client -> Job args -> IO ()
ackJob Client
client Job args
job = HasCallStack => Client -> ByteString -> [ByteString] -> IO ()
Client -> ByteString -> [ByteString] -> IO ()
commandOK Client
client ByteString
"ACK" [AckPayload -> ByteString
forall a. ToJSON a => a -> ByteString
encode (AckPayload -> ByteString) -> AckPayload -> ByteString
forall a b. (a -> b) -> a -> b
$ String -> AckPayload
AckPayload (String -> AckPayload) -> String -> AckPayload
forall a b. (a -> b) -> a -> b
$ Job args -> String
forall arg. Job arg -> String
jobJid Job args
job]
failJob :: HasCallStack => Client -> Job args -> Text -> IO ()
failJob :: Client -> Job args -> Text -> IO ()
failJob Client
client Job args
job Text
message =
HasCallStack => Client -> ByteString -> [ByteString] -> IO ()
Client -> ByteString -> [ByteString] -> IO ()
commandOK Client
client ByteString
"FAIL" [FailPayload -> ByteString
forall a. ToJSON a => a -> ByteString
encode (FailPayload -> ByteString) -> FailPayload -> ByteString
forall a b. (a -> b) -> a -> b
$ Text -> String -> String -> [String] -> FailPayload
FailPayload Text
message String
"" (Job args -> String
forall arg. Job arg -> String
jobJid Job args
job) []]