module Faktory.Job
( Job
, JobId
, JobOptions
, perform
, retry
, once
, reserveFor
, queue
, jobtype
, at
, in_
, custom
, buildJob
, newJob
, jobJid
, jobArg
, jobOptions
, jobRetriesRemaining
, jobReserveForMicroseconds
) where
import Faktory.Prelude
import Data.Aeson
import Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as NE
import Data.Semigroup (Last (..))
import Data.Time (UTCTime)
import Faktory.Client (Client (..))
import Faktory.Connection (ConnectionInfo (..))
import Faktory.JobFailure
import Faktory.JobOptions
import Faktory.Producer (Producer (..), pushJob)
import Faktory.Settings (Namespace, Settings (..))
import GHC.Stack
import System.Random
data Job arg = Job
{ forall arg. Job arg -> JobId
jobJid :: JobId
, forall arg. Job arg -> Maybe UTCTime
jobAt :: Maybe UTCTime
, forall arg. Job arg -> NonEmpty arg
jobArgs :: NonEmpty arg
, forall arg. Job arg -> JobOptions
jobOptions :: JobOptions
, forall arg. Job arg -> Maybe JobFailure
jobFailure :: Maybe JobFailure
}
perform
:: (HasCallStack, ToJSON arg) => JobOptions -> Producer -> arg -> IO JobId
perform :: forall arg.
(HasCallStack, ToJSON arg) =>
JobOptions -> Producer -> arg -> IO JobId
perform JobOptions
options Producer
producer arg
arg = do
Job arg
job <- forall arg. JobOptions -> Producer -> arg -> IO (Job arg)
buildJob JobOptions
options Producer
producer arg
arg
forall arg. Job arg -> JobId
jobJid Job arg
job forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall a. (HasCallStack, ToJSON a) => Producer -> a -> IO ()
pushJob Producer
producer Job arg
job
applyOptions :: Namespace -> JobOptions -> Job arg -> IO (Job arg)
applyOptions :: forall arg. Namespace -> JobOptions -> Job arg -> IO (Job arg)
applyOptions Namespace
namespace JobOptions
options Job arg
job = do
Maybe UTCTime
scheduledAt <- JobOptions -> IO (Maybe UTCTime)
getAtFromSchedule JobOptions
options
let namespacedOptions :: JobOptions
namespacedOptions = Namespace -> JobOptions -> JobOptions
namespaceQueue Namespace
namespace forall a b. (a -> b) -> a -> b
$ forall arg. Job arg -> JobOptions
jobOptions Job arg
job forall a. Semigroup a => a -> a -> a
<> JobOptions
options
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ Job arg
job {jobAt :: Maybe UTCTime
jobAt = Maybe UTCTime
scheduledAt, jobOptions :: JobOptions
jobOptions = JobOptions
namespacedOptions}
buildJob :: JobOptions -> Producer -> arg -> IO (Job arg)
buildJob :: forall arg. JobOptions -> Producer -> arg -> IO (Job arg)
buildJob JobOptions
options Producer
producer arg
arg =
forall arg. Namespace -> JobOptions -> Job arg -> IO (Job arg)
applyOptions Namespace
namespace (JobOptions -> JobOptions
applyDefaults JobOptions
options)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall arg. arg -> IO (Job arg)
newJob arg
arg
where
namespace :: Namespace
namespace =
ConnectionInfo -> Namespace
connectionInfoNamespace forall a b. (a -> b) -> a -> b
$
Settings -> ConnectionInfo
settingsConnection forall a b. (a -> b) -> a -> b
$
Client -> Settings
clientSettings forall a b. (a -> b) -> a -> b
$
Producer -> Client
producerClient Producer
producer
applyDefaults :: JobOptions -> JobOptions
applyDefaults =
forall a. Monoid a => a -> a -> a
mappend forall a b. (a -> b) -> a -> b
$
Settings -> JobOptions
settingsDefaultJobOptions forall a b. (a -> b) -> a -> b
$
Client -> Settings
clientSettings forall a b. (a -> b) -> a -> b
$
Producer -> Client
producerClient
Producer
producer
newJob :: arg -> IO (Job arg)
newJob :: forall arg. arg -> IO (Job arg)
newJob arg
arg = do
JobId
jobId <- forall a. Int -> [a] -> [a]
take Int
12 forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a g. (Random a, RandomGen g) => (a, a) -> g -> [a]
randomRs (Char
'a', Char
'z') forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *). MonadIO m => m StdGen
newStdGen
forall (f :: * -> *) a. Applicative f => a -> f a
pure
Job
{ jobJid :: JobId
jobJid = JobId
jobId
, jobAt :: Maybe UTCTime
jobAt = forall a. Maybe a
Nothing
, jobArgs :: NonEmpty arg
jobArgs = forall (f :: * -> *) a. Applicative f => a -> f a
pure arg
arg
, jobOptions :: JobOptions
jobOptions = JobId -> JobOptions
jobtype JobId
"Default"
, jobFailure :: Maybe JobFailure
jobFailure = forall a. Maybe a
Nothing
}
jobArg :: Job arg -> arg
jobArg :: forall arg. Job arg -> arg
jobArg Job {JobId
Maybe UTCTime
Maybe JobFailure
NonEmpty arg
JobOptions
jobFailure :: Maybe JobFailure
jobOptions :: JobOptions
jobArgs :: NonEmpty arg
jobAt :: Maybe UTCTime
jobJid :: JobId
jobFailure :: forall arg. Job arg -> Maybe JobFailure
jobArgs :: forall arg. Job arg -> NonEmpty arg
jobAt :: forall arg. Job arg -> Maybe UTCTime
jobOptions :: forall arg. Job arg -> JobOptions
jobJid :: forall arg. Job arg -> JobId
..} = forall a. NonEmpty a -> a
NE.head NonEmpty arg
jobArgs
jobRetriesRemaining :: Job arg -> Int
jobRetriesRemaining :: forall arg. Job arg -> Int
jobRetriesRemaining Job arg
job = forall a. Ord a => a -> a -> a
max Int
0 forall a b. (a -> b) -> a -> b
$ Int
enqueuedRetry forall a. Num a => a -> a -> a
- Int
attemptCount
where
enqueuedRetry :: Int
enqueuedRetry = forall b a. b -> (a -> b) -> Maybe a -> b
maybe Int
faktoryDefaultRetry forall a. Last a -> a
getLast forall a b. (a -> b) -> a -> b
$ JobOptions -> Maybe (Last Int)
joRetry forall a b. (a -> b) -> a -> b
$ forall arg. Job arg -> JobOptions
jobOptions Job arg
job
attemptCount :: Int
attemptCount = forall b a. b -> (a -> b) -> Maybe a -> b
maybe Int
0 ((forall a. Num a => a -> a -> a
+ Int
1) forall b c a. (b -> c) -> (a -> b) -> a -> c
. JobFailure -> Int
jfRetryCount) forall a b. (a -> b) -> a -> b
$ forall arg. Job arg -> Maybe JobFailure
jobFailure Job arg
job
jobReserveForMicroseconds :: Job arg -> Int
jobReserveForMicroseconds :: forall arg. Job arg -> Int
jobReserveForMicroseconds =
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Int
faktoryDefaultReserveFor (Int -> Int
secondToMicrosecond forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (Integral a, Num b) => a -> b
fromIntegral forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Last a -> a
getLast)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. JobOptions -> Maybe (Last Natural)
joReserveFor
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall arg. Job arg -> JobOptions
jobOptions
instance ToJSON args => ToJSON (Job args) where
toJSON :: Job args -> Value
toJSON = [Pair] -> Value
object forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a arg. (KeyValue a, ToJSON arg) => Job arg -> [a]
toPairs
toEncoding :: Job args -> Encoding
toEncoding = Series -> Encoding
pairs forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Monoid a => [a] -> a
mconcat forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a arg. (KeyValue a, ToJSON arg) => Job arg -> [a]
toPairs
toPairs :: (KeyValue a, ToJSON arg) => Job arg -> [a]
toPairs :: forall a arg. (KeyValue a, ToJSON arg) => Job arg -> [a]
toPairs Job {JobId
Maybe UTCTime
Maybe JobFailure
NonEmpty arg
JobOptions
jobFailure :: Maybe JobFailure
jobOptions :: JobOptions
jobArgs :: NonEmpty arg
jobAt :: Maybe UTCTime
jobJid :: JobId
jobFailure :: forall arg. Job arg -> Maybe JobFailure
jobArgs :: forall arg. Job arg -> NonEmpty arg
jobAt :: forall arg. Job arg -> Maybe UTCTime
jobOptions :: forall arg. Job arg -> JobOptions
jobJid :: forall arg. Job arg -> JobId
..} =
[ Key
"jid" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= JobId
jobJid
, Key
"at" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Maybe UTCTime
jobAt
, Key
"args" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= NonEmpty arg
jobArgs
, Key
"jobtype" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= JobOptions -> Maybe (Last JobId)
joJobtype JobOptions
jobOptions
, Key
"retry" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= JobOptions -> Maybe (Last Int)
joRetry JobOptions
jobOptions
, Key
"queue" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= JobOptions -> Maybe (Last Queue)
joQueue JobOptions
jobOptions
, Key
"custom" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= JobOptions -> Maybe Custom
joCustom JobOptions
jobOptions
, Key
"reserve_for" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= JobOptions -> Maybe (Last Natural)
joReserveFor JobOptions
jobOptions
]
instance FromJSON args => FromJSON (Job args) where
parseJSON :: Value -> Parser (Job args)
parseJSON = forall a. JobId -> (Object -> Parser a) -> Value -> Parser a
withObject JobId
"Job" forall a b. (a -> b) -> a -> b
$ \Object
o ->
forall arg.
JobId
-> Maybe UTCTime
-> NonEmpty arg
-> JobOptions
-> Maybe JobFailure
-> Job arg
Job
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
o forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"jid"
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
o forall a. FromJSON a => Object -> Key -> Parser (Maybe a)
.:? Key
"at"
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
o forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"args"
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. FromJSON a => Value -> Parser a
parseJSON (Object -> Value
Object Object
o)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
o forall a. FromJSON a => Object -> Key -> Parser (Maybe a)
.:? Key
"failure"
type JobId = String
faktoryDefaultRetry :: Int
faktoryDefaultRetry :: Int
faktoryDefaultRetry = Int
25
faktoryDefaultReserveFor :: Int
faktoryDefaultReserveFor :: Int
faktoryDefaultReserveFor = Int -> Int
secondToMicrosecond Int
1800
secondToMicrosecond :: Int -> Int
secondToMicrosecond :: Int -> Int
secondToMicrosecond Int
n = Int
n forall a. Num a => a -> a -> a
* (Int
10 forall a b. (Num a, Integral b) => a -> b -> a
^ (Int
6 :: Int))