module Faktory.Job
( Job
, JobId
, JobOptions
, perform
, retry
, once
, queue
, jobtype
, at
, in_
, newJob
, jobJid
, jobArg
) where
import Faktory.Prelude
import Data.Aeson
import Data.Aeson.Casing
import Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as NE
import Data.Time
import Faktory.Client (Client(..))
import Faktory.Producer (Producer(..), pushJob)
import Faktory.Settings
import GHC.Generics
import GHC.Stack
import System.Random
data Job arg = Job
{ Job arg -> JobId
jobJid :: JobId
, Job arg -> JobId
jobJobtype :: String
, Job arg -> NonEmpty arg
jobArgs :: NonEmpty arg
, Job arg -> Maybe Int
jobRetry :: Maybe Int
, Job arg -> Maybe Queue
jobQueue :: Maybe Queue
, Job arg -> Maybe UTCTime
jobAt :: Maybe UTCTime
}
deriving stock (forall x. Job arg -> Rep (Job arg) x)
-> (forall x. Rep (Job arg) x -> Job arg) -> Generic (Job arg)
forall x. Rep (Job arg) x -> Job arg
forall x. Job arg -> Rep (Job arg) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall arg x. Rep (Job arg) x -> Job arg
forall arg x. Job arg -> Rep (Job arg) x
$cto :: forall arg x. Rep (Job arg) x -> Job arg
$cfrom :: forall arg x. Job arg -> Rep (Job arg) x
Generic
data JobUpdate
= SetRetry Int
| SetQueue Queue
| SetJobtype String
| SetAt UTCTime
| SetIn NominalDiffTime
newtype JobOptions = JobOptions [JobUpdate]
deriving newtype (b -> JobOptions -> JobOptions
NonEmpty JobOptions -> JobOptions
JobOptions -> JobOptions -> JobOptions
(JobOptions -> JobOptions -> JobOptions)
-> (NonEmpty JobOptions -> JobOptions)
-> (forall b. Integral b => b -> JobOptions -> JobOptions)
-> Semigroup JobOptions
forall b. Integral b => b -> JobOptions -> JobOptions
forall a.
(a -> a -> a)
-> (NonEmpty a -> a)
-> (forall b. Integral b => b -> a -> a)
-> Semigroup a
stimes :: b -> JobOptions -> JobOptions
$cstimes :: forall b. Integral b => b -> JobOptions -> JobOptions
sconcat :: NonEmpty JobOptions -> JobOptions
$csconcat :: NonEmpty JobOptions -> JobOptions
<> :: JobOptions -> JobOptions -> JobOptions
$c<> :: JobOptions -> JobOptions -> JobOptions
Semigroup, Semigroup JobOptions
JobOptions
Semigroup JobOptions
-> JobOptions
-> (JobOptions -> JobOptions -> JobOptions)
-> ([JobOptions] -> JobOptions)
-> Monoid JobOptions
[JobOptions] -> JobOptions
JobOptions -> JobOptions -> JobOptions
forall a.
Semigroup a -> a -> (a -> a -> a) -> ([a] -> a) -> Monoid a
mconcat :: [JobOptions] -> JobOptions
$cmconcat :: [JobOptions] -> JobOptions
mappend :: JobOptions -> JobOptions -> JobOptions
$cmappend :: JobOptions -> JobOptions -> JobOptions
mempty :: JobOptions
$cmempty :: JobOptions
$cp1Monoid :: Semigroup JobOptions
Monoid)
perform
:: (HasCallStack, ToJSON arg) => JobOptions -> Producer -> arg -> IO JobId
perform :: JobOptions -> Producer -> arg -> IO JobId
perform JobOptions
options Producer
producer arg
arg = do
let
namespace :: Namespace
namespace =
ConnectionInfo -> Namespace
connectionInfoNamespace
(ConnectionInfo -> Namespace) -> ConnectionInfo -> Namespace
forall a b. (a -> b) -> a -> b
$ Settings -> ConnectionInfo
settingsConnection
(Settings -> ConnectionInfo) -> Settings -> ConnectionInfo
forall a b. (a -> b) -> a -> b
$ Client -> Settings
clientSettings
(Client -> Settings) -> Client -> Settings
forall a b. (a -> b) -> a -> b
$ Producer -> Client
producerClient Producer
producer
Job arg
job <- Namespace -> JobOptions -> Job arg -> IO (Job arg)
forall arg. Namespace -> JobOptions -> Job arg -> IO (Job arg)
applyOptions Namespace
namespace JobOptions
options (Job arg -> IO (Job arg)) -> IO (Job arg) -> IO (Job arg)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< arg -> IO (Job arg)
forall arg. arg -> IO (Job arg)
newJob arg
arg
Job arg -> JobId
forall arg. Job arg -> JobId
jobJid Job arg
job JobId -> IO () -> IO JobId
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Producer -> Job arg -> IO ()
forall a. (HasCallStack, ToJSON a) => Producer -> a -> IO ()
pushJob Producer
producer Job arg
job
applyOptions :: Namespace -> JobOptions -> Job arg -> IO (Job arg)
applyOptions :: Namespace -> JobOptions -> Job arg -> IO (Job arg)
applyOptions Namespace
namespace (JobOptions [JobUpdate]
patches) = [JobUpdate] -> Job arg -> IO (Job arg)
go [JobUpdate]
patches
where
go :: [JobUpdate] -> Job arg -> IO (Job arg)
go [] Job arg
job = Job arg -> IO (Job arg)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Job arg
job
go (JobUpdate
set : [JobUpdate]
sets) Job arg
job = case JobUpdate
set of
SetRetry Int
n -> [JobUpdate] -> Job arg -> IO (Job arg)
go [JobUpdate]
sets (Job arg -> IO (Job arg)) -> Job arg -> IO (Job arg)
forall a b. (a -> b) -> a -> b
$ Job arg
job { jobRetry :: Maybe Int
jobRetry = Int -> Maybe Int
forall a. a -> Maybe a
Just Int
n }
SetQueue Queue
q ->
[JobUpdate] -> Job arg -> IO (Job arg)
go [JobUpdate]
sets (Job arg -> IO (Job arg)) -> Job arg -> IO (Job arg)
forall a b. (a -> b) -> a -> b
$ Job arg
job { jobQueue :: Maybe Queue
jobQueue = Queue -> Maybe Queue
forall a. a -> Maybe a
Just (Queue -> Maybe Queue) -> Queue -> Maybe Queue
forall a b. (a -> b) -> a -> b
$ Namespace -> Queue -> Queue
namespaceQueue Namespace
namespace Queue
q }
SetJobtype JobId
jt -> [JobUpdate] -> Job arg -> IO (Job arg)
go [JobUpdate]
sets (Job arg -> IO (Job arg)) -> Job arg -> IO (Job arg)
forall a b. (a -> b) -> a -> b
$ Job arg
job { jobJobtype :: JobId
jobJobtype = JobId
jt }
SetAt UTCTime
time -> [JobUpdate] -> Job arg -> IO (Job arg)
go [JobUpdate]
sets (Job arg -> IO (Job arg)) -> Job arg -> IO (Job arg)
forall a b. (a -> b) -> a -> b
$ Job arg
job { jobAt :: Maybe UTCTime
jobAt = UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just UTCTime
time }
SetIn NominalDiffTime
diff -> do
UTCTime
now <- IO UTCTime
getCurrentTime
[JobUpdate] -> Job arg -> IO (Job arg)
go [JobUpdate]
sets (Job arg -> IO (Job arg)) -> Job arg -> IO (Job arg)
forall a b. (a -> b) -> a -> b
$ Job arg
job { jobAt :: Maybe UTCTime
jobAt = UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just (UTCTime -> Maybe UTCTime) -> UTCTime -> Maybe UTCTime
forall a b. (a -> b) -> a -> b
$ NominalDiffTime -> UTCTime -> UTCTime
addUTCTime NominalDiffTime
diff UTCTime
now }
retry :: Int -> JobOptions
retry :: Int -> JobOptions
retry Int
n = [JobUpdate] -> JobOptions
JobOptions [Int -> JobUpdate
SetRetry Int
n]
once :: JobOptions
once :: JobOptions
once = Int -> JobOptions
retry (-Int
1)
queue :: Queue -> JobOptions
queue :: Queue -> JobOptions
queue Queue
q = [JobUpdate] -> JobOptions
JobOptions [Queue -> JobUpdate
SetQueue Queue
q]
jobtype :: String -> JobOptions
jobtype :: JobId -> JobOptions
jobtype JobId
jt = [JobUpdate] -> JobOptions
JobOptions [JobId -> JobUpdate
SetJobtype JobId
jt]
at :: UTCTime -> JobOptions
at :: UTCTime -> JobOptions
at UTCTime
t = [JobUpdate] -> JobOptions
JobOptions [UTCTime -> JobUpdate
SetAt UTCTime
t]
in_ :: NominalDiffTime -> JobOptions
in_ :: NominalDiffTime -> JobOptions
in_ NominalDiffTime
i = [JobUpdate] -> JobOptions
JobOptions [NominalDiffTime -> JobUpdate
SetIn NominalDiffTime
i]
newJob :: arg -> IO (Job arg)
newJob :: arg -> IO (Job arg)
newJob arg
arg = do
JobId
jobId <- Int -> JobId -> JobId
forall a. Int -> [a] -> [a]
take Int
12 (JobId -> JobId) -> (StdGen -> JobId) -> StdGen -> JobId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Char, Char) -> StdGen -> JobId
forall a g. (Random a, RandomGen g) => (a, a) -> g -> [a]
randomRs (Char
'a', Char
'z') (StdGen -> JobId) -> IO StdGen -> IO JobId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO StdGen
forall (m :: * -> *). MonadIO m => m StdGen
newStdGen
Job arg -> IO (Job arg)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Job :: forall arg.
JobId
-> JobId
-> NonEmpty arg
-> Maybe Int
-> Maybe Queue
-> Maybe UTCTime
-> Job arg
Job
{ jobJid :: JobId
jobJid = JobId
jobId
, jobJobtype :: JobId
jobJobtype = JobId
"Default"
, jobArgs :: NonEmpty arg
jobArgs = arg -> NonEmpty arg
forall (f :: * -> *) a. Applicative f => a -> f a
pure arg
arg
, jobRetry :: Maybe Int
jobRetry = Maybe Int
forall a. Maybe a
Nothing
, jobQueue :: Maybe Queue
jobQueue = Maybe Queue
forall a. Maybe a
Nothing
, jobAt :: Maybe UTCTime
jobAt = Maybe UTCTime
forall a. Maybe a
Nothing
}
jobArg :: Job arg -> arg
jobArg :: Job arg -> arg
jobArg Job {JobId
Maybe Int
Maybe UTCTime
Maybe Queue
NonEmpty arg
jobAt :: Maybe UTCTime
jobQueue :: Maybe Queue
jobRetry :: Maybe Int
jobArgs :: NonEmpty arg
jobJobtype :: JobId
jobJid :: JobId
jobAt :: forall arg. Job arg -> Maybe UTCTime
jobQueue :: forall arg. Job arg -> Maybe Queue
jobRetry :: forall arg. Job arg -> Maybe Int
jobArgs :: forall arg. Job arg -> NonEmpty arg
jobJobtype :: forall arg. Job arg -> JobId
jobJid :: forall arg. Job arg -> JobId
..} = NonEmpty arg -> arg
forall a. NonEmpty a -> a
NE.head NonEmpty arg
jobArgs
instance ToJSON args => ToJSON (Job args) where
toJSON :: Job args -> Value
toJSON = Options -> Job args -> Value
forall a.
(Generic a, GToJSON' Value Zero (Rep a)) =>
Options -> a -> Value
genericToJSON (Options -> Job args -> Value) -> Options -> Job args -> Value
forall a b. (a -> b) -> a -> b
$ (JobId -> JobId) -> Options
aesonPrefix JobId -> JobId
snakeCase
toEncoding :: Job args -> Encoding
toEncoding = Options -> Job args -> Encoding
forall a.
(Generic a, GToJSON' Encoding Zero (Rep a)) =>
Options -> a -> Encoding
genericToEncoding (Options -> Job args -> Encoding)
-> Options -> Job args -> Encoding
forall a b. (a -> b) -> a -> b
$ (JobId -> JobId) -> Options
aesonPrefix JobId -> JobId
snakeCase
instance FromJSON args => FromJSON (Job args) where
parseJSON :: Value -> Parser (Job args)
parseJSON = Options -> Value -> Parser (Job args)
forall a.
(Generic a, GFromJSON Zero (Rep a)) =>
Options -> Value -> Parser a
genericParseJSON (Options -> Value -> Parser (Job args))
-> Options -> Value -> Parser (Job args)
forall a b. (a -> b) -> a -> b
$ (JobId -> JobId) -> Options
aesonPrefix JobId -> JobId
snakeCase
type JobId = String