module Database.PostgreSQL.Consumers.Components (
    runConsumer
  , runConsumerWithIdleSignal
  , spawnListener
  , spawnMonitor
  , spawnDispatcher
  ) where

import Control.Applicative
import Control.Concurrent.Lifted
import Control.Concurrent.STM hiding (atomically)
import Control.Exception (AsyncException(ThreadKilled))
import Control.Monad
import Control.Monad.Base
import Control.Monad.Catch
import Control.Monad.Time
import Control.Monad.Trans
import Control.Monad.Trans.Control
import Data.Function
import Data.Int
import Data.Maybe
import Data.Monoid
import Data.Monoid.Utils
import Database.PostgreSQL.PQTypes
import Log
import Prelude
import qualified Control.Concurrent.STM as STM
import qualified Control.Concurrent.Thread.Lifted as T
import qualified Data.Foldable as F
import qualified Data.Map.Strict as M

import Database.PostgreSQL.Consumers.Config
import Database.PostgreSQL.Consumers.Consumer
import Database.PostgreSQL.Consumers.Utils

-- | Run the consumer. The purpose of the returned monadic
-- action is to wait for currently processed jobs and clean up.
-- This function is best used in conjunction with 'finalize' to
-- seamlessly handle the finalization.
runConsumer
  :: ( MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m, Eq idx, Show idx
     , FromSQL idx, ToSQL idx )
  => ConsumerConfig m idx job
  -> ConnectionSourceM m
  -> m (m ())
runConsumer :: forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
 Eq idx, Show idx, FromSQL idx, ToSQL idx) =>
ConsumerConfig m idx job -> ConnectionSourceM m -> m (m ())
runConsumer ConsumerConfig m idx job
cc ConnectionSourceM m
cs = forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
 Eq idx, Show idx, FromSQL idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> Maybe (TMVar Bool) -> m (m ())
runConsumerWithMaybeIdleSignal ConsumerConfig m idx job
cc ConnectionSourceM m
cs forall a. Maybe a
Nothing

runConsumerWithIdleSignal
  :: ( MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m, Eq idx, Show idx
     , FromSQL idx, ToSQL idx )
  => ConsumerConfig m idx job
  -> ConnectionSourceM m
  -> TMVar Bool
  -> m (m ())
runConsumerWithIdleSignal :: forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
 Eq idx, Show idx, FromSQL idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> TMVar Bool -> m (m ())
runConsumerWithIdleSignal ConsumerConfig m idx job
cc ConnectionSourceM m
cs TMVar Bool
idleSignal = forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
 Eq idx, Show idx, FromSQL idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> Maybe (TMVar Bool) -> m (m ())
runConsumerWithMaybeIdleSignal ConsumerConfig m idx job
cc ConnectionSourceM m
cs (forall a. a -> Maybe a
Just TMVar Bool
idleSignal)

-- | Run the consumer and also signal whenever the consumer is waiting for
-- getNotification or threadDelay.
runConsumerWithMaybeIdleSignal
  :: ( MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m, Eq idx, Show idx
     , FromSQL idx, ToSQL idx )
  => ConsumerConfig m idx job
  -> ConnectionSourceM m
  -> Maybe (TMVar Bool)
  -> m (m ())
runConsumerWithMaybeIdleSignal :: forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
 Eq idx, Show idx, FromSQL idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> Maybe (TMVar Bool) -> m (m ())
runConsumerWithMaybeIdleSignal ConsumerConfig m idx job
cc ConnectionSourceM m
cs Maybe (TMVar Bool)
mIdleSignal
  | forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccMaxRunningJobs ConsumerConfig m idx job
cc forall a. Ord a => a -> a -> Bool
< Int
1 = do
      forall (m :: * -> *). MonadLog m => Text -> m ()
logInfo_ Text
"ccMaxRunningJobs < 1, not starting the consumer"
      forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return ()
  | Bool
otherwise = do
      MVar ()
semaphore <- forall (m :: * -> *) a. MonadBase IO m => a -> m (MVar a)
newMVar ()
      TVar (Map ThreadId idx)
runningJobsInfo <- forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (TVar a)
newTVarIO forall k a. Map k a
M.empty
      TVar Int
runningJobs <- forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (TVar a)
newTVarIO Int
0

      Either DBException ()
skipLockedTest :: Either DBException () <-
        forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
(MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
defaultTransactionSettings forall a b. (a -> b) -> a -> b
$
        forall (m :: * -> *). MonadDB m => SQL -> m ()
runSQL_ SQL
"SELECT TRUE FOR UPDATE SKIP LOCKED"
      -- If we can't lock rows using 'skip locked' throw an exception
      forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (forall a b. a -> b -> a
const forall a b. (a -> b) -> a -> b
$ forall a. HasCallStack => String -> a
error String
"PostgreSQL version with support for SKIP LOCKED is required") forall (f :: * -> *) a. Applicative f => a -> f a
pure Either DBException ()
skipLockedTest

      ConsumerID
cid <- forall (m :: * -> *) (n :: * -> *) idx job.
(MonadBase IO m, MonadMask m, MonadTime m) =>
ConsumerConfig n idx job -> ConnectionSourceM m -> m ConsumerID
registerConsumer ConsumerConfig m idx job
cc ConnectionSourceM m
cs
      forall (m :: * -> *) a. MonadLog m => [Pair] -> m a -> m a
localData [Key
"consumer_id" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall a. Show a => a -> String
show ConsumerID
cid] forall a b. (a -> b) -> a -> b
$ do
        ThreadId
listener <- forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadMask m) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> MVar () -> m ThreadId
spawnListener ConsumerConfig m idx job
cc ConnectionSourceM m
cs MVar ()
semaphore
        ThreadId
monitor <- forall (m :: * -> *) a. MonadLog m => Text -> m a -> m a
localDomain Text
"monitor" forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
 Show idx, FromSQL idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> ConsumerID -> m ThreadId
spawnMonitor ConsumerConfig m idx job
cc ConnectionSourceM m
cs ConsumerID
cid
        ThreadId
dispatcher <- forall (m :: * -> *) a. MonadLog m => Text -> m a -> m a
localDomain Text
"dispatcher" forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
 Show idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m
-> ConsumerID
-> MVar ()
-> TVar (Map ThreadId idx)
-> TVar Int
-> Maybe (TMVar Bool)
-> m ThreadId
spawnDispatcher ConsumerConfig m idx job
cc
          ConnectionSourceM m
cs ConsumerID
cid MVar ()
semaphore TVar (Map ThreadId idx)
runningJobsInfo TVar Int
runningJobs Maybe (TMVar Bool)
mIdleSignal
        forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. MonadLog m => Text -> m a -> m a
localDomain Text
"finalizer" forall a b. (a -> b) -> a -> b
$ do
          forall (m :: * -> *). MonadBase IO m => ThreadId -> m ()
stopExecution ThreadId
listener
          forall (m :: * -> *). MonadBase IO m => ThreadId -> m ()
stopExecution ThreadId
dispatcher
          forall {m :: * -> *} {a} {k} {a}.
(MonadBase IO m, MonadLog m, Num a, Eq a, Eq k, Eq a, Show a) =>
TVar (Map k a) -> TVar a -> m ()
waitForRunningJobs TVar (Map ThreadId idx)
runningJobsInfo TVar Int
runningJobs
          forall (m :: * -> *). MonadBase IO m => ThreadId -> m ()
stopExecution ThreadId
monitor
          forall (m :: * -> *) (n :: * -> *) idx job.
(MonadBase IO m, MonadMask m) =>
ConsumerConfig n idx job
-> ConnectionSourceM m -> ConsumerID -> m ()
unregisterConsumer ConsumerConfig m idx job
cc ConnectionSourceM m
cs ConsumerID
cid
  where
    waitForRunningJobs :: TVar (Map k a) -> TVar a -> m ()
waitForRunningJobs TVar (Map k a)
runningJobsInfo TVar a
runningJobs = do
      Map k a
initialJobs <- forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> IO a
readTVarIO TVar (Map k a)
runningJobsInfo
      (forall a. (a -> a) -> a
`fix` Map k a
initialJobs) forall a b. (a -> b) -> a -> b
$ \Map k a -> m ()
loop Map k a
jobsInfo -> do
        -- If jobs are still running, display info about them.
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not forall a b. (a -> b) -> a -> b
$ forall k a. Map k a -> Bool
M.null Map k a
jobsInfo) forall a b. (a -> b) -> a -> b
$ do
          forall (m :: * -> *) a. (MonadLog m, ToJSON a) => Text -> a -> m ()
logInfo Text
"Waiting for running jobs" forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [
              Key
"job_id" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall {k}. Map k a -> [String]
showJobsInfo Map k a
jobsInfo
            ]
        forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
          a
jobs <- forall a. TVar a -> STM a
readTVar TVar a
runningJobs
          if a
jobs forall a. Eq a => a -> a -> Bool
== a
0
            then forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return ()
            else do
              Map k a
newJobsInfo <- forall a. TVar a -> STM a
readTVar TVar (Map k a)
runningJobsInfo
              -- If jobs info didn't change, wait for it to change.
              -- Otherwise loop so it either displays the new info
              -- or exits if there are no jobs running anymore.
              if (Map k a
newJobsInfo forall a. Eq a => a -> a -> Bool
== Map k a
jobsInfo)
                then forall a. STM a
retry
                else forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Map k a -> m ()
loop Map k a
newJobsInfo
      where
        showJobsInfo :: Map k a -> [String]
showJobsInfo = forall a b k. (a -> b -> b) -> b -> Map k a -> b
M.foldr (\a
idx [String]
acc -> forall a. Show a => a -> String
show a
idx forall a. a -> [a] -> [a]
: [String]
acc) []

-- | Spawn a thread that generates signals for the
-- dispatcher to probe the database for incoming jobs.
spawnListener
  :: (MonadBaseControl IO m, MonadMask m)
  => ConsumerConfig m idx job
  -> ConnectionSourceM m
  -> MVar ()
  -> m ThreadId
spawnListener :: forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadMask m) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> MVar () -> m ThreadId
spawnListener ConsumerConfig m idx job
cc ConnectionSourceM m
cs MVar ()
semaphore =
  forall (m :: * -> *).
MonadBaseControl IO m =>
String -> m () -> m ThreadId
forkP String
"listener" forall a b. (a -> b) -> a -> b
$
  case forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> Maybe Channel
ccNotificationChannel ConsumerConfig m idx job
cc of
    Just Channel
chan ->
      forall (m :: * -> *) a.
(MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
noTs forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a c b. MonadMask m => m a -> m c -> m b -> m b
bracket_ (forall (m :: * -> *). MonadDB m => Channel -> m ()
listen Channel
chan) (forall (m :: * -> *). MonadDB m => Channel -> m ()
unlisten Channel
chan)
      forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
      -- If there are many notifications, we need to collect them
      -- as soon as possible, because they are stored in memory by
      -- libpq. They are also not squashed, so we perform the
      -- squashing ourselves with the help of MVar ().
      forall (f :: * -> *) a. Functor f => f a -> f ()
void forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *). MonadDB m => Int -> m (Maybe Notification)
getNotification forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccNotificationTimeout ConsumerConfig m idx job
cc
      forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift m Bool
signalDispatcher
    Maybe Channel
Nothing -> forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
      forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *). MonadBase IO m => Int -> m ()
threadDelay forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccNotificationTimeout ConsumerConfig m idx job
cc
      m Bool
signalDispatcher
  where
    signalDispatcher :: m Bool
signalDispatcher = do
      forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadBase IO m => MVar a -> a -> m Bool
tryPutMVar MVar ()
semaphore ()

    noTs :: TransactionSettings
noTs = TransactionSettings
defaultTransactionSettings {
      tsAutoTransaction :: Bool
tsAutoTransaction = Bool
False
    }

-- | Spawn a thread that monitors working consumers
-- for activity and periodically updates its own.
spawnMonitor
  :: forall m idx job. (MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
                        Show idx, FromSQL idx, ToSQL idx)
  => ConsumerConfig m idx job
  -> ConnectionSourceM m
  -> ConsumerID
  -> m ThreadId
spawnMonitor :: forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
 Show idx, FromSQL idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> ConsumerID -> m ThreadId
spawnMonitor ConsumerConfig{Int
[SQL]
Maybe Channel
RawSQL ()
job -> idx
job -> m Result
row -> job
SomeException -> job -> m Action
ccOnException :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> SomeException -> job -> m Action
ccProcessJob :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> m Result
ccJobIndex :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> idx
ccJobFetcher :: ()
ccJobSelectors :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> [SQL]
ccConsumersTable :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> RawSQL ()
ccJobsTable :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> RawSQL ()
ccOnException :: SomeException -> job -> m Action
ccProcessJob :: job -> m Result
ccMaxRunningJobs :: Int
ccNotificationTimeout :: Int
ccNotificationChannel :: Maybe Channel
ccJobIndex :: job -> idx
ccJobFetcher :: row -> job
ccJobSelectors :: [SQL]
ccConsumersTable :: RawSQL ()
ccJobsTable :: RawSQL ()
ccNotificationTimeout :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccNotificationChannel :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> Maybe Channel
ccMaxRunningJobs :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
..} ConnectionSourceM m
cs ConsumerID
cid = forall (m :: * -> *).
MonadBaseControl IO m =>
String -> m () -> m ThreadId
forkP String
"monitor" forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
  forall (m :: * -> *) a.
(MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
ts forall a b. (a -> b) -> a -> b
$ do
    UTCTime
now <- forall (m :: * -> *). MonadTime m => m UTCTime
currentTime
    -- Update last_activity of the consumer.
    Bool
ok <- forall (m :: * -> *). (MonadDB m, MonadThrow m) => SQL -> m Bool
runSQL01 forall a b. (a -> b) -> a -> b
$ forall m. (IsString m, Monoid m) => [m] -> m
smconcat [
        SQL
"UPDATE" forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccConsumersTable
      , SQL
"SET last_activity = " forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> UTCTime
now
      , SQL
"WHERE id =" forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> ConsumerID
cid
      , SQL
"  AND name =" forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> RawSQL () -> Text
unRawSQL RawSQL ()
ccJobsTable
      ]
    if Bool
ok
      then forall (m :: * -> *). MonadLog m => Text -> m ()
logInfo_ Text
"Activity of the consumer updated"
      else do
        forall (m :: * -> *). MonadLog m => Text -> m ()
logInfo_ forall a b. (a -> b) -> a -> b
$ Text
"Consumer is not registered"
        forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM AsyncException
ThreadKilled
  (Int
inactiveConsumers, [idx]
freedJobs) <- forall (m :: * -> *) a.
(MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
ts forall a b. (a -> b) -> a -> b
$ do
    UTCTime
now <- forall (m :: * -> *). MonadTime m => m UTCTime
currentTime
    -- Reserve all inactive (assumed dead) consumers and get their ids. We don't
    -- delete them here, because if the coresponding reserved_by column in the
    -- jobs table has an IMMEDIATE foreign key with the ON DELETE SET NULL
    -- property, we will not be able to determine stuck jobs in the next step.
    forall (m :: * -> *). MonadDB m => SQL -> m ()
runSQL_ forall a b. (a -> b) -> a -> b
$ forall m. (IsString m, Monoid m) => [m] -> m
smconcat
      [ SQL
"SELECT id::bigint"
      , SQL
"FROM" forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccConsumersTable
      , SQL
"WHERE last_activity +" forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> Int32 -> Interval
iminutes Int32
1 forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
"<= " forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> UTCTime
now
      , SQL
"  AND name =" forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> RawSQL () -> Text
unRawSQL RawSQL ()
ccJobsTable
      , SQL
"FOR UPDATE SKIP LOCKED"
      ]
    forall (m :: * -> *) row t.
(MonadDB m, FromRow row) =>
(row -> t) -> m [t]
fetchMany (forall a. Identity a -> a
runIdentity @Int64) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      [] -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int
0, [])
      [Int64]
inactive -> do
        -- Fetch all stuck jobs and run ccOnException on them to determine
        -- actions. This is necessary e.g. to be able to apply exponential
        -- backoff to them correctly.
        forall (m :: * -> *). MonadDB m => SQL -> m ()
runSQL_ forall a b. (a -> b) -> a -> b
$ forall m. (IsString m, Monoid m) => [m] -> m
smconcat
          [ SQL
"SELECT" forall m. (IsString m, Monoid m) => m -> m -> m
<+> forall m. Monoid m => m -> [m] -> m
mintercalate SQL
", " [SQL]
ccJobSelectors
          , SQL
"FROM" forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccJobsTable
          , SQL
"WHERE reserved_by = ANY(" forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> forall a. [a] -> Array1 a
Array1 [Int64]
inactive forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
")"
          , SQL
"FOR UPDATE SKIP LOCKED"
          ]
        [job]
stuckJobs <- forall (m :: * -> *) row t.
(MonadDB m, FromRow row) =>
(row -> t) -> m [t]
fetchMany row -> job
ccJobFetcher
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (forall (t :: * -> *) a. Foldable t => t a -> Bool
null [job]
stuckJobs) forall a b. (a -> b) -> a -> b
$ do
          [(idx, Result)]
results <- forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [job]
stuckJobs forall a b. (a -> b) -> a -> b
$ \job
job -> do
            Action
action <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall a b. (a -> b) -> a -> b
$ SomeException -> job -> m Action
ccOnException (forall e. Exception e => e -> SomeException
toException AsyncException
ThreadKilled) job
job
            forall (f :: * -> *) a. Applicative f => a -> f a
pure (job -> idx
ccJobIndex job
job, Action -> Result
Failed Action
action)
          forall (m :: * -> *). MonadDB m => SQL -> m ()
runSQL_ forall a b. (a -> b) -> a -> b
$ forall idx.
(Show idx, ToSQL idx) =>
RawSQL () -> [(idx, Result)] -> UTCTime -> SQL
updateJobsQuery RawSQL ()
ccJobsTable [(idx, Result)]
results UTCTime
now
        forall (m :: * -> *). MonadDB m => SQL -> m ()
runSQL_ forall a b. (a -> b) -> a -> b
$ forall m. (IsString m, Monoid m) => [m] -> m
smconcat
          [ SQL
"DELETE FROM" forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccConsumersTable
          , SQL
"WHERE id = ANY(" forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> forall a. [a] -> Array1 a
Array1 [Int64]
inactive forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
")"
          ]
        forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall (t :: * -> *) a. Foldable t => t a -> Int
length [Int64]
inactive, forall a b. (a -> b) -> [a] -> [b]
map job -> idx
ccJobIndex [job]
stuckJobs)
  forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
inactiveConsumers forall a. Ord a => a -> a -> Bool
> Int
0) forall a b. (a -> b) -> a -> b
$ do
    forall (m :: * -> *) a. (MonadLog m, ToJSON a) => Text -> a -> m ()
logInfo Text
"Unregistered inactive consumers" forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [
        Key
"inactive_consumers" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Int
inactiveConsumers
      ]
  forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (forall (t :: * -> *) a. Foldable t => t a -> Bool
null [idx]
freedJobs) forall a b. (a -> b) -> a -> b
$ do
    forall (m :: * -> *) a. (MonadLog m, ToJSON a) => Text -> a -> m ()
logInfo Text
"Freed locked jobs" forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [
        Key
"freed_jobs" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall a b. (a -> b) -> [a] -> [b]
map forall a. Show a => a -> String
show [idx]
freedJobs
      ]
  forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *). MonadBase IO m => Int -> m ()
threadDelay forall a b. (a -> b) -> a -> b
$ Int
30 forall a. Num a => a -> a -> a
* Int
1000000 -- wait 30 seconds

-- | Spawn a thread that reserves and processes jobs.
spawnDispatcher
  :: forall m idx job. ( MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m
                       , Show idx, ToSQL idx )
  => ConsumerConfig m idx job
  -> ConnectionSourceM m
  -> ConsumerID
  -> MVar ()
  -> TVar (M.Map ThreadId idx)
  -> TVar Int
  -> Maybe (TMVar Bool)
  -> m ThreadId
spawnDispatcher :: forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
 Show idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m
-> ConsumerID
-> MVar ()
-> TVar (Map ThreadId idx)
-> TVar Int
-> Maybe (TMVar Bool)
-> m ThreadId
spawnDispatcher ConsumerConfig{Int
[SQL]
Maybe Channel
RawSQL ()
job -> idx
job -> m Result
row -> job
SomeException -> job -> m Action
ccOnException :: SomeException -> job -> m Action
ccProcessJob :: job -> m Result
ccMaxRunningJobs :: Int
ccNotificationTimeout :: Int
ccNotificationChannel :: Maybe Channel
ccJobIndex :: job -> idx
ccJobFetcher :: row -> job
ccJobSelectors :: [SQL]
ccConsumersTable :: RawSQL ()
ccJobsTable :: RawSQL ()
ccOnException :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> SomeException -> job -> m Action
ccProcessJob :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> m Result
ccJobIndex :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> idx
ccJobFetcher :: ()
ccJobSelectors :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> [SQL]
ccConsumersTable :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> RawSQL ()
ccJobsTable :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> RawSQL ()
ccNotificationTimeout :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccNotificationChannel :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> Maybe Channel
ccMaxRunningJobs :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
..} ConnectionSourceM m
cs ConsumerID
cid MVar ()
semaphore
  TVar (Map ThreadId idx)
runningJobsInfo TVar Int
runningJobs Maybe (TMVar Bool)
mIdleSignal =
  forall (m :: * -> *).
MonadBaseControl IO m =>
String -> m () -> m ThreadId
forkP String
"dispatcher" forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
    forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadBase IO m => MVar a -> m a
takeMVar MVar ()
semaphore
    Bool
someJobWasProcessed <- Int -> m Bool
loop Int
1
    if Bool
someJobWasProcessed
      then forall (m' :: * -> *). MonadBaseControl IO m' => Bool -> m' ()
setIdle Bool
False
      else forall (m' :: * -> *). MonadBaseControl IO m' => Bool -> m' ()
setIdle Bool
True
  where
    setIdle :: forall m' . (MonadBaseControl IO m') => Bool -> m' ()
    setIdle :: forall (m' :: * -> *). MonadBaseControl IO m' => Bool -> m' ()
setIdle Bool
isIdle = case Maybe (TMVar Bool)
mIdleSignal of
      Maybe (TMVar Bool)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
      Just TMVar Bool
idleSignal -> forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
        Maybe Bool
_ <- forall a. TMVar a -> STM (Maybe a)
tryTakeTMVar TMVar Bool
idleSignal
        forall a. TMVar a -> a -> STM ()
putTMVar TMVar Bool
idleSignal Bool
isIdle

    loop :: Int -> m Bool
    loop :: Int -> m Bool
loop Int
limit = do
      ([job]
batch, Int
batchSize) <- Int -> m ([job], Int)
reserveJobs Int
limit
      forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
batchSize forall a. Ord a => a -> a -> Bool
> Int
0) forall a b. (a -> b) -> a -> b
$ do
        forall (m :: * -> *) a. (MonadLog m, ToJSON a) => Text -> a -> m ()
logInfo Text
"Processing batch" forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [
            Key
"batch_size" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Int
batchSize
          ]
        -- Update runningJobs before forking so that we can
        -- adjust maxBatchSize appropriately later. We also
        -- need to mask asynchronous exceptions here as we
        -- rely on correct value of runningJobs to perform
        -- graceful termination.
        forall (m :: * -> *) b.
MonadMask m =>
((forall a. m a -> m a) -> m b) -> m b
mask forall a b. (a -> b) -> a -> b
$ \forall a. m a -> m a
restore -> do
          forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
runningJobs (forall a. Num a => a -> a -> a
+Int
batchSize)
          let subtractJobs :: m ()
subtractJobs = forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
                forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
runningJobs (forall a. Num a => a -> a -> a
subtract Int
batchSize)
          forall (f :: * -> *) a. Functor f => f a -> f ()
void forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *).
MonadBaseControl IO m =>
String -> m () -> m ThreadId
forkP String
"batch processor"
            forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally` m ()
subtractJobs) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. m a -> m a
restore forall a b. (a -> b) -> a -> b
$ do
            forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM job -> m (job, m (Result Result))
startJob [job]
batch forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (job, m (Result Result)) -> m (idx, Result)
joinJob forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= [(idx, Result)] -> m ()
updateJobs

        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
batchSize forall a. Eq a => a -> a -> Bool
== Int
limit) forall a b. (a -> b) -> a -> b
$ do
          Int
maxBatchSize <- forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
            Int
jobs <- forall a. TVar a -> STM a
readTVar TVar Int
runningJobs
            forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
jobs forall a. Ord a => a -> a -> Bool
>= Int
ccMaxRunningJobs) forall a. STM a
retry
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Int
ccMaxRunningJobs forall a. Num a => a -> a -> a
- Int
jobs
          forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ Int -> m Bool
loop forall a b. (a -> b) -> a -> b
$ forall a. Ord a => a -> a -> a
min Int
maxBatchSize (Int
2forall a. Num a => a -> a -> a
*Int
limit)

      forall (m :: * -> *) a. Monad m => a -> m a
return (Int
batchSize forall a. Ord a => a -> a -> Bool
> Int
0)

    reserveJobs :: Int -> m ([job], Int)
    reserveJobs :: Int -> m ([job], Int)
reserveJobs Int
limit = forall (m :: * -> *) a.
(MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
ts forall a b. (a -> b) -> a -> b
$ do
      UTCTime
now <- forall (m :: * -> *). MonadTime m => m UTCTime
currentTime
      Int
n <- forall (m :: * -> *). MonadDB m => SQL -> m Int
runSQL forall a b. (a -> b) -> a -> b
$ forall m. (IsString m, Monoid m) => [m] -> m
smconcat [
          SQL
"UPDATE" forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccJobsTable forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
"SET"
        , SQL
"  reserved_by =" forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> ConsumerID
cid
        , SQL
", attempts = CASE"
        , SQL
"    WHEN finished_at IS NULL THEN attempts + 1"
        , SQL
"    ELSE 1"
        , SQL
"  END"
        , SQL
"WHERE id IN (" forall a. Semigroup a => a -> a -> a
<> UTCTime -> SQL
reservedJobs UTCTime
now forall a. Semigroup a => a -> a -> a
<> SQL
")"
        , SQL
"RETURNING" forall m. (IsString m, Monoid m) => m -> m -> m
<+> forall m. Monoid m => m -> [m] -> m
mintercalate SQL
", " [SQL]
ccJobSelectors
        ]
      -- Decode lazily as we want the transaction to be as short as possible.
      (, Int
n) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) a. Foldable t => t a -> [a]
F.toList forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap row -> job
ccJobFetcher forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) row.
(MonadDB m, MonadThrow m, FromRow row) =>
m (QueryResult row)
queryResult
      where
        reservedJobs :: UTCTime -> SQL
        reservedJobs :: UTCTime -> SQL
reservedJobs UTCTime
now = forall m. (IsString m, Monoid m) => [m] -> m
smconcat [
            SQL
"SELECT id FROM" forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccJobsTable
          , SQL
"WHERE"
          , SQL
"       reserved_by IS NULL"
          , SQL
"       AND run_at IS NOT NULL"
          , SQL
"       AND run_at <= " forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> UTCTime
now
          , SQL
"       ORDER BY run_at"
          , SQL
"LIMIT" forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> Int
limit
          , SQL
"FOR UPDATE SKIP LOCKED"
          ]

    -- | Spawn each job in a separate thread.
    startJob :: job -> m (job, m (T.Result Result))
    startJob :: job -> m (job, m (Result Result))
startJob job
job = do
      (ThreadId
_, m (Result Result)
joinFork) <- forall (m :: * -> *) b.
MonadMask m =>
((forall a. m a -> m a) -> m b) -> m b
mask forall a b. (a -> b) -> a -> b
$ \forall a. m a -> m a
restore -> forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (ThreadId, m (Result a))
T.fork forall a b. (a -> b) -> a -> b
$ do
        ThreadId
tid <- forall (m :: * -> *). MonadBase IO m => m ThreadId
myThreadId
        forall (m :: * -> *) a c b. MonadMask m => m a -> m c -> m b -> m b
bracket_ (ThreadId -> m ()
registerJob ThreadId
tid) (ThreadId -> m ()
unregisterJob ThreadId
tid) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. m a -> m a
restore forall a b. (a -> b) -> a -> b
$ do
          job -> m Result
ccProcessJob job
job
      forall (m :: * -> *) a. Monad m => a -> m a
return (job
job, m (Result Result)
joinFork)
      where
        registerJob :: ThreadId -> m ()
registerJob ThreadId
tid = forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
          forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map ThreadId idx)
runningJobsInfo forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert ThreadId
tid forall a b. (a -> b) -> a -> b
$ job -> idx
ccJobIndex job
job
        unregisterJob :: ThreadId -> m ()
unregisterJob ThreadId
tid = forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
           forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map ThreadId idx)
runningJobsInfo forall a b. (a -> b) -> a -> b
$ forall k a. Ord k => k -> Map k a -> Map k a
M.delete ThreadId
tid

    -- | Wait for all the jobs and collect their results.
    joinJob :: (job, m (T.Result Result)) -> m (idx, Result)
    joinJob :: (job, m (Result Result)) -> m (idx, Result)
joinJob (job
job, m (Result Result)
joinFork) = m (Result Result)
joinFork forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Result Result
eres -> case Result Result
eres of
      Right Result
result -> forall (m :: * -> *) a. Monad m => a -> m a
return (job -> idx
ccJobIndex job
job, Result
result)
      Left SomeException
ex -> do
        Action
action <- SomeException -> job -> m Action
ccOnException SomeException
ex job
job
        forall (m :: * -> *) a. (MonadLog m, ToJSON a) => Text -> a -> m ()
logAttention Text
"Unexpected exception caught while processing job" forall a b. (a -> b) -> a -> b
$
          [Pair] -> Value
object [
            Key
"job_id" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall a. Show a => a -> String
show (job -> idx
ccJobIndex job
job)
          , Key
"exception" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall a. Show a => a -> String
show SomeException
ex
          , Key
"action" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall a. Show a => a -> String
show Action
action
          ]
        forall (m :: * -> *) a. Monad m => a -> m a
return (job -> idx
ccJobIndex job
job, Action -> Result
Failed Action
action)

    -- | Update status of the jobs.
    updateJobs :: [(idx, Result)] -> m ()
    updateJobs :: [(idx, Result)] -> m ()
updateJobs [(idx, Result)]
results = forall (m :: * -> *) a.
(MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
ts forall a b. (a -> b) -> a -> b
$ do
      UTCTime
now <- forall (m :: * -> *). MonadTime m => m UTCTime
currentTime
      forall (m :: * -> *). MonadDB m => SQL -> m ()
runSQL_ forall a b. (a -> b) -> a -> b
$ forall idx.
(Show idx, ToSQL idx) =>
RawSQL () -> [(idx, Result)] -> UTCTime -> SQL
updateJobsQuery RawSQL ()
ccJobsTable [(idx, Result)]
results UTCTime
now

----------------------------------------

-- | Generate a single SQL query for updating all given jobs.
updateJobsQuery
  :: (Show idx, ToSQL idx)
  => RawSQL ()
  -> [(idx, Result)]
  -> UTCTime
  -> SQL
updateJobsQuery :: forall idx.
(Show idx, ToSQL idx) =>
RawSQL () -> [(idx, Result)] -> UTCTime -> SQL
updateJobsQuery RawSQL ()
jobsTable [(idx, Result)]
results UTCTime
now = forall m. (IsString m, Monoid m) => [m] -> m
smconcat
  [ SQL
"WITH removed AS ("
  , SQL
"  DELETE FROM" forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
jobsTable
  , SQL
"  WHERE id = ANY(" forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> forall a. [a] -> Array1 a
Array1 [idx]
deletes forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
")"
  , SQL
")"
  , SQL
"UPDATE" forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
jobsTable forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
"SET"
  , SQL
"  reserved_by = NULL"
  , SQL
", run_at = CASE"
  , SQL
"    WHEN FALSE THEN run_at"
  ,      forall m. (IsString m, Monoid m) => [m] -> m
smconcat forall a b. (a -> b) -> a -> b
$ forall k a b. (k -> a -> b -> b) -> b -> Map k a -> b
M.foldrWithKey Either Interval UTCTime -> [idx] -> [SQL] -> [SQL]
retryToSQL [] Map (Either Interval UTCTime) [idx]
retries
  , SQL
"    ELSE NULL" -- processed
  , SQL
"  END"
  , SQL
", finished_at = CASE"
  , SQL
"    WHEN id = ANY(" forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> forall a. [a] -> Array1 a
Array1 [idx]
successes forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
") THEN " forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> UTCTime
now
  , SQL
"    ELSE NULL"
  , SQL
"  END"
  , SQL
"WHERE id = ANY(" forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> forall a. [a] -> Array1 a
Array1 (forall a b. (a -> b) -> [a] -> [b]
map forall a b. (a, b) -> a
fst [(idx, Result)]
updates) forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
")"
  ]
  where
    retryToSQL :: Either Interval UTCTime -> [idx] -> [SQL] -> [SQL]
retryToSQL (Left Interval
int) [idx]
ids =
      (SQL
"WHEN id = ANY(" forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> forall a. [a] -> Array1 a
Array1 [idx]
ids forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
") THEN " forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> UTCTime
now forall a. Semigroup a => a -> a -> a
<> SQL
" +" forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> Interval
int forall a. a -> [a] -> [a]
:)
    retryToSQL (Right UTCTime
time) [idx]
ids =
      (SQL
"WHEN id = ANY(" forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> forall a. [a] -> Array1 a
Array1 [idx]
ids forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
") THEN" forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> UTCTime
time forall a. a -> [a] -> [a]
:)

    retries :: Map (Either Interval UTCTime) [idx]
retries = forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr forall {a}.
(a, Action)
-> Map (Either Interval UTCTime) [a]
-> Map (Either Interval UTCTime) [a]
step forall k a. Map k a
M.empty forall a b. (a -> b) -> a -> b
$ forall a b. (a -> b) -> [a] -> [b]
map forall {a}. (a, Result) -> (a, Action)
getAction [(idx, Result)]
updates
      where
        getAction :: (a, Result) -> (a, Action)
getAction (a
idx, Result
result) = case Result
result of
          Ok     Action
action -> (a
idx, Action
action)
          Failed Action
action -> (a
idx, Action
action)

        step :: (a, Action)
-> Map (Either Interval UTCTime) [a]
-> Map (Either Interval UTCTime) [a]
step (a
idx, Action
action) Map (Either Interval UTCTime) [a]
iretries = case Action
action of
          Action
MarkProcessed  -> Map (Either Interval UTCTime) [a]
iretries
          RerunAfter Interval
int -> forall k a. Ord k => (a -> a -> a) -> k -> a -> Map k a -> Map k a
M.insertWith forall a. [a] -> [a] -> [a]
(++) (forall a b. a -> Either a b
Left Interval
int) [a
idx] Map (Either Interval UTCTime) [a]
iretries
          RerunAt UTCTime
time   -> forall k a. Ord k => (a -> a -> a) -> k -> a -> Map k a -> Map k a
M.insertWith forall a. [a] -> [a] -> [a]
(++) (forall a b. b -> Either a b
Right UTCTime
time) [a
idx] Map (Either Interval UTCTime) [a]
iretries
          Action
Remove         -> forall a. HasCallStack => String -> a
error String
"updateJobs: Remove should've been filtered out"

    successes :: [idx]
successes = forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr forall {a}. (a, Result) -> [a] -> [a]
step [] [(idx, Result)]
updates
      where
        step :: (a, Result) -> [a] -> [a]
step (a
idx, Ok     Action
_) [a]
acc = a
idx forall a. a -> [a] -> [a]
: [a]
acc
        step (a
_,   Failed Action
_) [a]
acc =       [a]
acc

    ([idx]
deletes, [(idx, Result)]
updates) = forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr forall {a}.
(a, Result) -> ([a], [(a, Result)]) -> ([a], [(a, Result)])
step ([], []) [(idx, Result)]
results
      where
        step :: (a, Result) -> ([a], [(a, Result)]) -> ([a], [(a, Result)])
step job :: (a, Result)
job@(a
idx, Result
result) ([a]
ideletes, [(a, Result)]
iupdates) = case Result
result of
          Ok     Action
Remove -> (a
idx forall a. a -> [a] -> [a]
: [a]
ideletes, [(a, Result)]
iupdates)
          Failed Action
Remove -> (a
idx forall a. a -> [a] -> [a]
: [a]
ideletes, [(a, Result)]
iupdates)
          Result
_             -> ([a]
ideletes, (a, Result)
job forall a. a -> [a] -> [a]
: [(a, Result)]
iupdates)


ts :: TransactionSettings
ts :: TransactionSettings
ts = TransactionSettings
defaultTransactionSettings {
  -- PostgreSQL doesn't seem to handle very high amount of
  -- concurrent transactions that modify multiple rows in
  -- the same table well (see updateJobs) and sometimes (very
  -- rarely though) ends up in a deadlock. It doesn't matter
  -- much though, we just restart the transaction in such case.
  tsRestartPredicate :: Maybe RestartPredicate
tsRestartPredicate = forall a. a -> Maybe a
Just forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall e. Exception e => (e -> Integer -> Bool) -> RestartPredicate
RestartPredicate
  forall a b. (a -> b) -> a -> b
$ \DetailedQueryError
e Integer
_ -> DetailedQueryError -> ErrorCode
qeErrorCode DetailedQueryError
e forall a. Eq a => a -> a -> Bool
== ErrorCode
DeadlockDetected
         Bool -> Bool -> Bool
|| DetailedQueryError -> ErrorCode
qeErrorCode DetailedQueryError
e forall a. Eq a => a -> a -> Bool
== ErrorCode
SerializationFailure
}

atomically :: MonadBase IO m => STM a -> m a
atomically :: forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically = forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
STM.atomically