module Database.PostgreSQL.Consumers.Components (
runConsumer
, runConsumerWithIdleSignal
, spawnListener
, spawnMonitor
, spawnDispatcher
) where
import Control.Applicative
import Control.Concurrent.Lifted
import Control.Concurrent.STM hiding (atomically)
import Control.Exception (AsyncException(ThreadKilled))
import Control.Monad
import Control.Monad.Base
import Control.Monad.Catch
import Control.Monad.Time
import Control.Monad.Trans
import Control.Monad.Trans.Control
import Data.Function
import Data.Int
import Data.Maybe
import Data.Monoid
import Data.Monoid.Utils
import Database.PostgreSQL.PQTypes
import Log
import Prelude
import qualified Control.Concurrent.STM as STM
import qualified Control.Concurrent.Thread.Lifted as T
import qualified Data.Foldable as F
import qualified Data.Map.Strict as M
import Database.PostgreSQL.Consumers.Config
import Database.PostgreSQL.Consumers.Consumer
import Database.PostgreSQL.Consumers.Utils
runConsumer
:: ( MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m, Eq idx, Show idx
, FromSQL idx, ToSQL idx )
=> ConsumerConfig m idx job
-> ConnectionSourceM m
-> m (m ())
runConsumer :: forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
Eq idx, Show idx, FromSQL idx, ToSQL idx) =>
ConsumerConfig m idx job -> ConnectionSourceM m -> m (m ())
runConsumer ConsumerConfig m idx job
cc ConnectionSourceM m
cs = forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
Eq idx, Show idx, FromSQL idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> Maybe (TMVar Bool) -> m (m ())
runConsumerWithMaybeIdleSignal ConsumerConfig m idx job
cc ConnectionSourceM m
cs forall a. Maybe a
Nothing
runConsumerWithIdleSignal
:: ( MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m, Eq idx, Show idx
, FromSQL idx, ToSQL idx )
=> ConsumerConfig m idx job
-> ConnectionSourceM m
-> TMVar Bool
-> m (m ())
runConsumerWithIdleSignal :: forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
Eq idx, Show idx, FromSQL idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> TMVar Bool -> m (m ())
runConsumerWithIdleSignal ConsumerConfig m idx job
cc ConnectionSourceM m
cs TMVar Bool
idleSignal = forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
Eq idx, Show idx, FromSQL idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> Maybe (TMVar Bool) -> m (m ())
runConsumerWithMaybeIdleSignal ConsumerConfig m idx job
cc ConnectionSourceM m
cs (forall a. a -> Maybe a
Just TMVar Bool
idleSignal)
runConsumerWithMaybeIdleSignal
:: ( MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m, Eq idx, Show idx
, FromSQL idx, ToSQL idx )
=> ConsumerConfig m idx job
-> ConnectionSourceM m
-> Maybe (TMVar Bool)
-> m (m ())
runConsumerWithMaybeIdleSignal :: forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
Eq idx, Show idx, FromSQL idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> Maybe (TMVar Bool) -> m (m ())
runConsumerWithMaybeIdleSignal ConsumerConfig m idx job
cc ConnectionSourceM m
cs Maybe (TMVar Bool)
mIdleSignal
| forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccMaxRunningJobs ConsumerConfig m idx job
cc forall a. Ord a => a -> a -> Bool
< Int
1 = do
forall (m :: * -> *). MonadLog m => Text -> m ()
logInfo_ Text
"ccMaxRunningJobs < 1, not starting the consumer"
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return ()
| Bool
otherwise = do
MVar ()
semaphore <- forall (m :: * -> *) a. MonadBase IO m => a -> m (MVar a)
newMVar ()
TVar (Map ThreadId idx)
runningJobsInfo <- forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (TVar a)
newTVarIO forall k a. Map k a
M.empty
TVar Int
runningJobs <- forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (TVar a)
newTVarIO Int
0
Either DBException ()
skipLockedTest :: Either DBException () <-
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
(MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
defaultTransactionSettings forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *). MonadDB m => SQL -> m ()
runSQL_ SQL
"SELECT TRUE FOR UPDATE SKIP LOCKED"
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (forall a b. a -> b -> a
const forall a b. (a -> b) -> a -> b
$ forall a. HasCallStack => String -> a
error String
"PostgreSQL version with support for SKIP LOCKED is required") forall (f :: * -> *) a. Applicative f => a -> f a
pure Either DBException ()
skipLockedTest
ConsumerID
cid <- forall (m :: * -> *) (n :: * -> *) idx job.
(MonadBase IO m, MonadMask m, MonadTime m) =>
ConsumerConfig n idx job -> ConnectionSourceM m -> m ConsumerID
registerConsumer ConsumerConfig m idx job
cc ConnectionSourceM m
cs
forall (m :: * -> *) a. MonadLog m => [Pair] -> m a -> m a
localData [Key
"consumer_id" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall a. Show a => a -> String
show ConsumerID
cid] forall a b. (a -> b) -> a -> b
$ do
ThreadId
listener <- forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadMask m) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> MVar () -> m ThreadId
spawnListener ConsumerConfig m idx job
cc ConnectionSourceM m
cs MVar ()
semaphore
ThreadId
monitor <- forall (m :: * -> *) a. MonadLog m => Text -> m a -> m a
localDomain Text
"monitor" forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
Show idx, FromSQL idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> ConsumerID -> m ThreadId
spawnMonitor ConsumerConfig m idx job
cc ConnectionSourceM m
cs ConsumerID
cid
ThreadId
dispatcher <- forall (m :: * -> *) a. MonadLog m => Text -> m a -> m a
localDomain Text
"dispatcher" forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
Show idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m
-> ConsumerID
-> MVar ()
-> TVar (Map ThreadId idx)
-> TVar Int
-> Maybe (TMVar Bool)
-> m ThreadId
spawnDispatcher ConsumerConfig m idx job
cc
ConnectionSourceM m
cs ConsumerID
cid MVar ()
semaphore TVar (Map ThreadId idx)
runningJobsInfo TVar Int
runningJobs Maybe (TMVar Bool)
mIdleSignal
forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. MonadLog m => Text -> m a -> m a
localDomain Text
"finalizer" forall a b. (a -> b) -> a -> b
$ do
forall (m :: * -> *). MonadBase IO m => ThreadId -> m ()
stopExecution ThreadId
listener
forall (m :: * -> *). MonadBase IO m => ThreadId -> m ()
stopExecution ThreadId
dispatcher
forall {m :: * -> *} {a} {k} {a}.
(MonadBase IO m, MonadLog m, Num a, Eq a, Eq k, Eq a, Show a) =>
TVar (Map k a) -> TVar a -> m ()
waitForRunningJobs TVar (Map ThreadId idx)
runningJobsInfo TVar Int
runningJobs
forall (m :: * -> *). MonadBase IO m => ThreadId -> m ()
stopExecution ThreadId
monitor
forall (m :: * -> *) (n :: * -> *) idx job.
(MonadBase IO m, MonadMask m) =>
ConsumerConfig n idx job
-> ConnectionSourceM m -> ConsumerID -> m ()
unregisterConsumer ConsumerConfig m idx job
cc ConnectionSourceM m
cs ConsumerID
cid
where
waitForRunningJobs :: TVar (Map k a) -> TVar a -> m ()
waitForRunningJobs TVar (Map k a)
runningJobsInfo TVar a
runningJobs = do
Map k a
initialJobs <- forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> IO a
readTVarIO TVar (Map k a)
runningJobsInfo
(forall a. (a -> a) -> a
`fix` Map k a
initialJobs) forall a b. (a -> b) -> a -> b
$ \Map k a -> m ()
loop Map k a
jobsInfo -> do
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not forall a b. (a -> b) -> a -> b
$ forall k a. Map k a -> Bool
M.null Map k a
jobsInfo) forall a b. (a -> b) -> a -> b
$ do
forall (m :: * -> *) a. (MonadLog m, ToJSON a) => Text -> a -> m ()
logInfo Text
"Waiting for running jobs" forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [
Key
"job_id" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall {k}. Map k a -> [String]
showJobsInfo Map k a
jobsInfo
]
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
a
jobs <- forall a. TVar a -> STM a
readTVar TVar a
runningJobs
if a
jobs forall a. Eq a => a -> a -> Bool
== a
0
then forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return ()
else do
Map k a
newJobsInfo <- forall a. TVar a -> STM a
readTVar TVar (Map k a)
runningJobsInfo
if (Map k a
newJobsInfo forall a. Eq a => a -> a -> Bool
== Map k a
jobsInfo)
then forall a. STM a
retry
else forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Map k a -> m ()
loop Map k a
newJobsInfo
where
showJobsInfo :: Map k a -> [String]
showJobsInfo = forall a b k. (a -> b -> b) -> b -> Map k a -> b
M.foldr (\a
idx [String]
acc -> forall a. Show a => a -> String
show a
idx forall a. a -> [a] -> [a]
: [String]
acc) []
spawnListener
:: (MonadBaseControl IO m, MonadMask m)
=> ConsumerConfig m idx job
-> ConnectionSourceM m
-> MVar ()
-> m ThreadId
spawnListener :: forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadMask m) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> MVar () -> m ThreadId
spawnListener ConsumerConfig m idx job
cc ConnectionSourceM m
cs MVar ()
semaphore =
forall (m :: * -> *).
MonadBaseControl IO m =>
String -> m () -> m ThreadId
forkP String
"listener" forall a b. (a -> b) -> a -> b
$
case forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> Maybe Channel
ccNotificationChannel ConsumerConfig m idx job
cc of
Just Channel
chan ->
forall (m :: * -> *) a.
(MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
noTs forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a c b. MonadMask m => m a -> m c -> m b -> m b
bracket_ (forall (m :: * -> *). MonadDB m => Channel -> m ()
listen Channel
chan) (forall (m :: * -> *). MonadDB m => Channel -> m ()
unlisten Channel
chan)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *). MonadDB m => Int -> m (Maybe Notification)
getNotification forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccNotificationTimeout ConsumerConfig m idx job
cc
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift m Bool
signalDispatcher
Maybe Channel
Nothing -> forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *). MonadBase IO m => Int -> m ()
threadDelay forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccNotificationTimeout ConsumerConfig m idx job
cc
m Bool
signalDispatcher
where
signalDispatcher :: m Bool
signalDispatcher = do
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadBase IO m => MVar a -> a -> m Bool
tryPutMVar MVar ()
semaphore ()
noTs :: TransactionSettings
noTs = TransactionSettings
defaultTransactionSettings {
tsAutoTransaction :: Bool
tsAutoTransaction = Bool
False
}
spawnMonitor
:: forall m idx job. (MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
Show idx, FromSQL idx, ToSQL idx)
=> ConsumerConfig m idx job
-> ConnectionSourceM m
-> ConsumerID
-> m ThreadId
spawnMonitor :: forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
Show idx, FromSQL idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m -> ConsumerID -> m ThreadId
spawnMonitor ConsumerConfig{Int
[SQL]
Maybe Channel
RawSQL ()
job -> idx
job -> m Result
row -> job
SomeException -> job -> m Action
ccOnException :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> SomeException -> job -> m Action
ccProcessJob :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> m Result
ccJobIndex :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> idx
ccJobFetcher :: ()
ccJobSelectors :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> [SQL]
ccConsumersTable :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> RawSQL ()
ccJobsTable :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> RawSQL ()
ccOnException :: SomeException -> job -> m Action
ccProcessJob :: job -> m Result
ccMaxRunningJobs :: Int
ccNotificationTimeout :: Int
ccNotificationChannel :: Maybe Channel
ccJobIndex :: job -> idx
ccJobFetcher :: row -> job
ccJobSelectors :: [SQL]
ccConsumersTable :: RawSQL ()
ccJobsTable :: RawSQL ()
ccNotificationTimeout :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccNotificationChannel :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> Maybe Channel
ccMaxRunningJobs :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
..} ConnectionSourceM m
cs ConsumerID
cid = forall (m :: * -> *).
MonadBaseControl IO m =>
String -> m () -> m ThreadId
forkP String
"monitor" forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
forall (m :: * -> *) a.
(MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
ts forall a b. (a -> b) -> a -> b
$ do
UTCTime
now <- forall (m :: * -> *). MonadTime m => m UTCTime
currentTime
Bool
ok <- forall (m :: * -> *). (MonadDB m, MonadThrow m) => SQL -> m Bool
runSQL01 forall a b. (a -> b) -> a -> b
$ forall m. (IsString m, Monoid m) => [m] -> m
smconcat [
SQL
"UPDATE" forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccConsumersTable
, SQL
"SET last_activity = " forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> UTCTime
now
, SQL
"WHERE id =" forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> ConsumerID
cid
, SQL
" AND name =" forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> RawSQL () -> Text
unRawSQL RawSQL ()
ccJobsTable
]
if Bool
ok
then forall (m :: * -> *). MonadLog m => Text -> m ()
logInfo_ Text
"Activity of the consumer updated"
else do
forall (m :: * -> *). MonadLog m => Text -> m ()
logInfo_ forall a b. (a -> b) -> a -> b
$ Text
"Consumer is not registered"
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM AsyncException
ThreadKilled
(Int
inactiveConsumers, [idx]
freedJobs) <- forall (m :: * -> *) a.
(MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
ts forall a b. (a -> b) -> a -> b
$ do
UTCTime
now <- forall (m :: * -> *). MonadTime m => m UTCTime
currentTime
forall (m :: * -> *). MonadDB m => SQL -> m ()
runSQL_ forall a b. (a -> b) -> a -> b
$ forall m. (IsString m, Monoid m) => [m] -> m
smconcat
[ SQL
"SELECT id::bigint"
, SQL
"FROM" forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccConsumersTable
, SQL
"WHERE last_activity +" forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> Int32 -> Interval
iminutes Int32
1 forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
"<= " forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> UTCTime
now
, SQL
" AND name =" forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> RawSQL () -> Text
unRawSQL RawSQL ()
ccJobsTable
, SQL
"FOR UPDATE SKIP LOCKED"
]
forall (m :: * -> *) row t.
(MonadDB m, FromRow row) =>
(row -> t) -> m [t]
fetchMany (forall a. Identity a -> a
runIdentity @Int64) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
[] -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int
0, [])
[Int64]
inactive -> do
forall (m :: * -> *). MonadDB m => SQL -> m ()
runSQL_ forall a b. (a -> b) -> a -> b
$ forall m. (IsString m, Monoid m) => [m] -> m
smconcat
[ SQL
"SELECT" forall m. (IsString m, Monoid m) => m -> m -> m
<+> forall m. Monoid m => m -> [m] -> m
mintercalate SQL
", " [SQL]
ccJobSelectors
, SQL
"FROM" forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccJobsTable
, SQL
"WHERE reserved_by = ANY(" forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> forall a. [a] -> Array1 a
Array1 [Int64]
inactive forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
")"
, SQL
"FOR UPDATE SKIP LOCKED"
]
[job]
stuckJobs <- forall (m :: * -> *) row t.
(MonadDB m, FromRow row) =>
(row -> t) -> m [t]
fetchMany row -> job
ccJobFetcher
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (forall (t :: * -> *) a. Foldable t => t a -> Bool
null [job]
stuckJobs) forall a b. (a -> b) -> a -> b
$ do
[(idx, Result)]
results <- forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [job]
stuckJobs forall a b. (a -> b) -> a -> b
$ \job
job -> do
Action
action <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall a b. (a -> b) -> a -> b
$ SomeException -> job -> m Action
ccOnException (forall e. Exception e => e -> SomeException
toException AsyncException
ThreadKilled) job
job
forall (f :: * -> *) a. Applicative f => a -> f a
pure (job -> idx
ccJobIndex job
job, Action -> Result
Failed Action
action)
forall (m :: * -> *). MonadDB m => SQL -> m ()
runSQL_ forall a b. (a -> b) -> a -> b
$ forall idx.
(Show idx, ToSQL idx) =>
RawSQL () -> [(idx, Result)] -> UTCTime -> SQL
updateJobsQuery RawSQL ()
ccJobsTable [(idx, Result)]
results UTCTime
now
forall (m :: * -> *). MonadDB m => SQL -> m ()
runSQL_ forall a b. (a -> b) -> a -> b
$ forall m. (IsString m, Monoid m) => [m] -> m
smconcat
[ SQL
"DELETE FROM" forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccConsumersTable
, SQL
"WHERE id = ANY(" forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> forall a. [a] -> Array1 a
Array1 [Int64]
inactive forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
")"
]
forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall (t :: * -> *) a. Foldable t => t a -> Int
length [Int64]
inactive, forall a b. (a -> b) -> [a] -> [b]
map job -> idx
ccJobIndex [job]
stuckJobs)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
inactiveConsumers forall a. Ord a => a -> a -> Bool
> Int
0) forall a b. (a -> b) -> a -> b
$ do
forall (m :: * -> *) a. (MonadLog m, ToJSON a) => Text -> a -> m ()
logInfo Text
"Unregistered inactive consumers" forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [
Key
"inactive_consumers" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Int
inactiveConsumers
]
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (forall (t :: * -> *) a. Foldable t => t a -> Bool
null [idx]
freedJobs) forall a b. (a -> b) -> a -> b
$ do
forall (m :: * -> *) a. (MonadLog m, ToJSON a) => Text -> a -> m ()
logInfo Text
"Freed locked jobs" forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [
Key
"freed_jobs" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall a b. (a -> b) -> [a] -> [b]
map forall a. Show a => a -> String
show [idx]
freedJobs
]
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *). MonadBase IO m => Int -> m ()
threadDelay forall a b. (a -> b) -> a -> b
$ Int
30 forall a. Num a => a -> a -> a
* Int
1000000
spawnDispatcher
:: forall m idx job. ( MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m
, Show idx, ToSQL idx )
=> ConsumerConfig m idx job
-> ConnectionSourceM m
-> ConsumerID
-> MVar ()
-> TVar (M.Map ThreadId idx)
-> TVar Int
-> Maybe (TMVar Bool)
-> m ThreadId
spawnDispatcher :: forall (m :: * -> *) idx job.
(MonadBaseControl IO m, MonadLog m, MonadMask m, MonadTime m,
Show idx, ToSQL idx) =>
ConsumerConfig m idx job
-> ConnectionSourceM m
-> ConsumerID
-> MVar ()
-> TVar (Map ThreadId idx)
-> TVar Int
-> Maybe (TMVar Bool)
-> m ThreadId
spawnDispatcher ConsumerConfig{Int
[SQL]
Maybe Channel
RawSQL ()
job -> idx
job -> m Result
row -> job
SomeException -> job -> m Action
ccOnException :: SomeException -> job -> m Action
ccProcessJob :: job -> m Result
ccMaxRunningJobs :: Int
ccNotificationTimeout :: Int
ccNotificationChannel :: Maybe Channel
ccJobIndex :: job -> idx
ccJobFetcher :: row -> job
ccJobSelectors :: [SQL]
ccConsumersTable :: RawSQL ()
ccJobsTable :: RawSQL ()
ccOnException :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> SomeException -> job -> m Action
ccProcessJob :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> m Result
ccJobIndex :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> idx
ccJobFetcher :: ()
ccJobSelectors :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> [SQL]
ccConsumersTable :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> RawSQL ()
ccJobsTable :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> RawSQL ()
ccNotificationTimeout :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccNotificationChannel :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> Maybe Channel
ccMaxRunningJobs :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
..} ConnectionSourceM m
cs ConsumerID
cid MVar ()
semaphore
TVar (Map ThreadId idx)
runningJobsInfo TVar Int
runningJobs Maybe (TMVar Bool)
mIdleSignal =
forall (m :: * -> *).
MonadBaseControl IO m =>
String -> m () -> m ThreadId
forkP String
"dispatcher" forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadBase IO m => MVar a -> m a
takeMVar MVar ()
semaphore
Bool
someJobWasProcessed <- Int -> m Bool
loop Int
1
if Bool
someJobWasProcessed
then forall (m' :: * -> *). MonadBaseControl IO m' => Bool -> m' ()
setIdle Bool
False
else forall (m' :: * -> *). MonadBaseControl IO m' => Bool -> m' ()
setIdle Bool
True
where
setIdle :: forall m' . (MonadBaseControl IO m') => Bool -> m' ()
setIdle :: forall (m' :: * -> *). MonadBaseControl IO m' => Bool -> m' ()
setIdle Bool
isIdle = case Maybe (TMVar Bool)
mIdleSignal of
Maybe (TMVar Bool)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just TMVar Bool
idleSignal -> forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
Maybe Bool
_ <- forall a. TMVar a -> STM (Maybe a)
tryTakeTMVar TMVar Bool
idleSignal
forall a. TMVar a -> a -> STM ()
putTMVar TMVar Bool
idleSignal Bool
isIdle
loop :: Int -> m Bool
loop :: Int -> m Bool
loop Int
limit = do
([job]
batch, Int
batchSize) <- Int -> m ([job], Int)
reserveJobs Int
limit
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
batchSize forall a. Ord a => a -> a -> Bool
> Int
0) forall a b. (a -> b) -> a -> b
$ do
forall (m :: * -> *) a. (MonadLog m, ToJSON a) => Text -> a -> m ()
logInfo Text
"Processing batch" forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object [
Key
"batch_size" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Int
batchSize
]
forall (m :: * -> *) b.
MonadMask m =>
((forall a. m a -> m a) -> m b) -> m b
mask forall a b. (a -> b) -> a -> b
$ \forall a. m a -> m a
restore -> do
forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
runningJobs (forall a. Num a => a -> a -> a
+Int
batchSize)
let subtractJobs :: m ()
subtractJobs = forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
runningJobs (forall a. Num a => a -> a -> a
subtract Int
batchSize)
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *).
MonadBaseControl IO m =>
String -> m () -> m ThreadId
forkP String
"batch processor"
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally` m ()
subtractJobs) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. m a -> m a
restore forall a b. (a -> b) -> a -> b
$ do
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM job -> m (job, m (Result Result))
startJob [job]
batch forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (job, m (Result Result)) -> m (idx, Result)
joinJob forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= [(idx, Result)] -> m ()
updateJobs
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
batchSize forall a. Eq a => a -> a -> Bool
== Int
limit) forall a b. (a -> b) -> a -> b
$ do
Int
maxBatchSize <- forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
Int
jobs <- forall a. TVar a -> STM a
readTVar TVar Int
runningJobs
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
jobs forall a. Ord a => a -> a -> Bool
>= Int
ccMaxRunningJobs) forall a. STM a
retry
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Int
ccMaxRunningJobs forall a. Num a => a -> a -> a
- Int
jobs
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ Int -> m Bool
loop forall a b. (a -> b) -> a -> b
$ forall a. Ord a => a -> a -> a
min Int
maxBatchSize (Int
2forall a. Num a => a -> a -> a
*Int
limit)
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
batchSize forall a. Ord a => a -> a -> Bool
> Int
0)
reserveJobs :: Int -> m ([job], Int)
reserveJobs :: Int -> m ([job], Int)
reserveJobs Int
limit = forall (m :: * -> *) a.
(MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
ts forall a b. (a -> b) -> a -> b
$ do
UTCTime
now <- forall (m :: * -> *). MonadTime m => m UTCTime
currentTime
Int
n <- forall (m :: * -> *). MonadDB m => SQL -> m Int
runSQL forall a b. (a -> b) -> a -> b
$ forall m. (IsString m, Monoid m) => [m] -> m
smconcat [
SQL
"UPDATE" forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccJobsTable forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
"SET"
, SQL
" reserved_by =" forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> ConsumerID
cid
, SQL
", attempts = CASE"
, SQL
" WHEN finished_at IS NULL THEN attempts + 1"
, SQL
" ELSE 1"
, SQL
" END"
, SQL
"WHERE id IN (" forall a. Semigroup a => a -> a -> a
<> UTCTime -> SQL
reservedJobs UTCTime
now forall a. Semigroup a => a -> a -> a
<> SQL
")"
, SQL
"RETURNING" forall m. (IsString m, Monoid m) => m -> m -> m
<+> forall m. Monoid m => m -> [m] -> m
mintercalate SQL
", " [SQL]
ccJobSelectors
]
(, Int
n) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) a. Foldable t => t a -> [a]
F.toList forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap row -> job
ccJobFetcher forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) row.
(MonadDB m, MonadThrow m, FromRow row) =>
m (QueryResult row)
queryResult
where
reservedJobs :: UTCTime -> SQL
reservedJobs :: UTCTime -> SQL
reservedJobs UTCTime
now = forall m. (IsString m, Monoid m) => [m] -> m
smconcat [
SQL
"SELECT id FROM" forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccJobsTable
, SQL
"WHERE"
, SQL
" reserved_by IS NULL"
, SQL
" AND run_at IS NOT NULL"
, SQL
" AND run_at <= " forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> UTCTime
now
, SQL
" ORDER BY run_at"
, SQL
"LIMIT" forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> Int
limit
, SQL
"FOR UPDATE SKIP LOCKED"
]
startJob :: job -> m (job, m (T.Result Result))
startJob :: job -> m (job, m (Result Result))
startJob job
job = do
(ThreadId
_, m (Result Result)
joinFork) <- forall (m :: * -> *) b.
MonadMask m =>
((forall a. m a -> m a) -> m b) -> m b
mask forall a b. (a -> b) -> a -> b
$ \forall a. m a -> m a
restore -> forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (ThreadId, m (Result a))
T.fork forall a b. (a -> b) -> a -> b
$ do
ThreadId
tid <- forall (m :: * -> *). MonadBase IO m => m ThreadId
myThreadId
forall (m :: * -> *) a c b. MonadMask m => m a -> m c -> m b -> m b
bracket_ (ThreadId -> m ()
registerJob ThreadId
tid) (ThreadId -> m ()
unregisterJob ThreadId
tid) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. m a -> m a
restore forall a b. (a -> b) -> a -> b
$ do
job -> m Result
ccProcessJob job
job
forall (m :: * -> *) a. Monad m => a -> m a
return (job
job, m (Result Result)
joinFork)
where
registerJob :: ThreadId -> m ()
registerJob ThreadId
tid = forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map ThreadId idx)
runningJobsInfo forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert ThreadId
tid forall a b. (a -> b) -> a -> b
$ job -> idx
ccJobIndex job
job
unregisterJob :: ThreadId -> m ()
unregisterJob ThreadId
tid = forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map ThreadId idx)
runningJobsInfo forall a b. (a -> b) -> a -> b
$ forall k a. Ord k => k -> Map k a -> Map k a
M.delete ThreadId
tid
joinJob :: (job, m (T.Result Result)) -> m (idx, Result)
joinJob :: (job, m (Result Result)) -> m (idx, Result)
joinJob (job
job, m (Result Result)
joinFork) = m (Result Result)
joinFork forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Result Result
eres -> case Result Result
eres of
Right Result
result -> forall (m :: * -> *) a. Monad m => a -> m a
return (job -> idx
ccJobIndex job
job, Result
result)
Left SomeException
ex -> do
Action
action <- SomeException -> job -> m Action
ccOnException SomeException
ex job
job
forall (m :: * -> *) a. (MonadLog m, ToJSON a) => Text -> a -> m ()
logAttention Text
"Unexpected exception caught while processing job" forall a b. (a -> b) -> a -> b
$
[Pair] -> Value
object [
Key
"job_id" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall a. Show a => a -> String
show (job -> idx
ccJobIndex job
job)
, Key
"exception" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall a. Show a => a -> String
show SomeException
ex
, Key
"action" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall a. Show a => a -> String
show Action
action
]
forall (m :: * -> *) a. Monad m => a -> m a
return (job -> idx
ccJobIndex job
job, Action -> Result
Failed Action
action)
updateJobs :: [(idx, Result)] -> m ()
updateJobs :: [(idx, Result)] -> m ()
updateJobs [(idx, Result)]
results = forall (m :: * -> *) a.
(MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
ts forall a b. (a -> b) -> a -> b
$ do
UTCTime
now <- forall (m :: * -> *). MonadTime m => m UTCTime
currentTime
forall (m :: * -> *). MonadDB m => SQL -> m ()
runSQL_ forall a b. (a -> b) -> a -> b
$ forall idx.
(Show idx, ToSQL idx) =>
RawSQL () -> [(idx, Result)] -> UTCTime -> SQL
updateJobsQuery RawSQL ()
ccJobsTable [(idx, Result)]
results UTCTime
now
updateJobsQuery
:: (Show idx, ToSQL idx)
=> RawSQL ()
-> [(idx, Result)]
-> UTCTime
-> SQL
updateJobsQuery :: forall idx.
(Show idx, ToSQL idx) =>
RawSQL () -> [(idx, Result)] -> UTCTime -> SQL
updateJobsQuery RawSQL ()
jobsTable [(idx, Result)]
results UTCTime
now = forall m. (IsString m, Monoid m) => [m] -> m
smconcat
[ SQL
"WITH removed AS ("
, SQL
" DELETE FROM" forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
jobsTable
, SQL
" WHERE id = ANY(" forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> forall a. [a] -> Array1 a
Array1 [idx]
deletes forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
")"
, SQL
")"
, SQL
"UPDATE" forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
jobsTable forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
"SET"
, SQL
" reserved_by = NULL"
, SQL
", run_at = CASE"
, SQL
" WHEN FALSE THEN run_at"
, forall m. (IsString m, Monoid m) => [m] -> m
smconcat forall a b. (a -> b) -> a -> b
$ forall k a b. (k -> a -> b -> b) -> b -> Map k a -> b
M.foldrWithKey Either Interval UTCTime -> [idx] -> [SQL] -> [SQL]
retryToSQL [] Map (Either Interval UTCTime) [idx]
retries
, SQL
" ELSE NULL"
, SQL
" END"
, SQL
", finished_at = CASE"
, SQL
" WHEN id = ANY(" forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> forall a. [a] -> Array1 a
Array1 [idx]
successes forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
") THEN " forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> UTCTime
now
, SQL
" ELSE NULL"
, SQL
" END"
, SQL
"WHERE id = ANY(" forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> forall a. [a] -> Array1 a
Array1 (forall a b. (a -> b) -> [a] -> [b]
map forall a b. (a, b) -> a
fst [(idx, Result)]
updates) forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
")"
]
where
retryToSQL :: Either Interval UTCTime -> [idx] -> [SQL] -> [SQL]
retryToSQL (Left Interval
int) [idx]
ids =
(SQL
"WHEN id = ANY(" forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> forall a. [a] -> Array1 a
Array1 [idx]
ids forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
") THEN " forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> UTCTime
now forall a. Semigroup a => a -> a -> a
<> SQL
" +" forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> Interval
int forall a. a -> [a] -> [a]
:)
retryToSQL (Right UTCTime
time) [idx]
ids =
(SQL
"WHEN id = ANY(" forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> forall a. [a] -> Array1 a
Array1 [idx]
ids forall m. (IsString m, Monoid m) => m -> m -> m
<+> SQL
") THEN" forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> UTCTime
time forall a. a -> [a] -> [a]
:)
retries :: Map (Either Interval UTCTime) [idx]
retries = forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr forall {a}.
(a, Action)
-> Map (Either Interval UTCTime) [a]
-> Map (Either Interval UTCTime) [a]
step forall k a. Map k a
M.empty forall a b. (a -> b) -> a -> b
$ forall a b. (a -> b) -> [a] -> [b]
map forall {a}. (a, Result) -> (a, Action)
getAction [(idx, Result)]
updates
where
getAction :: (a, Result) -> (a, Action)
getAction (a
idx, Result
result) = case Result
result of
Ok Action
action -> (a
idx, Action
action)
Failed Action
action -> (a
idx, Action
action)
step :: (a, Action)
-> Map (Either Interval UTCTime) [a]
-> Map (Either Interval UTCTime) [a]
step (a
idx, Action
action) Map (Either Interval UTCTime) [a]
iretries = case Action
action of
Action
MarkProcessed -> Map (Either Interval UTCTime) [a]
iretries
RerunAfter Interval
int -> forall k a. Ord k => (a -> a -> a) -> k -> a -> Map k a -> Map k a
M.insertWith forall a. [a] -> [a] -> [a]
(++) (forall a b. a -> Either a b
Left Interval
int) [a
idx] Map (Either Interval UTCTime) [a]
iretries
RerunAt UTCTime
time -> forall k a. Ord k => (a -> a -> a) -> k -> a -> Map k a -> Map k a
M.insertWith forall a. [a] -> [a] -> [a]
(++) (forall a b. b -> Either a b
Right UTCTime
time) [a
idx] Map (Either Interval UTCTime) [a]
iretries
Action
Remove -> forall a. HasCallStack => String -> a
error String
"updateJobs: Remove should've been filtered out"
successes :: [idx]
successes = forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr forall {a}. (a, Result) -> [a] -> [a]
step [] [(idx, Result)]
updates
where
step :: (a, Result) -> [a] -> [a]
step (a
idx, Ok Action
_) [a]
acc = a
idx forall a. a -> [a] -> [a]
: [a]
acc
step (a
_, Failed Action
_) [a]
acc = [a]
acc
([idx]
deletes, [(idx, Result)]
updates) = forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr forall {a}.
(a, Result) -> ([a], [(a, Result)]) -> ([a], [(a, Result)])
step ([], []) [(idx, Result)]
results
where
step :: (a, Result) -> ([a], [(a, Result)]) -> ([a], [(a, Result)])
step job :: (a, Result)
job@(a
idx, Result
result) ([a]
ideletes, [(a, Result)]
iupdates) = case Result
result of
Ok Action
Remove -> (a
idx forall a. a -> [a] -> [a]
: [a]
ideletes, [(a, Result)]
iupdates)
Failed Action
Remove -> (a
idx forall a. a -> [a] -> [a]
: [a]
ideletes, [(a, Result)]
iupdates)
Result
_ -> ([a]
ideletes, (a, Result)
job forall a. a -> [a] -> [a]
: [(a, Result)]
iupdates)
ts :: TransactionSettings
ts :: TransactionSettings
ts = TransactionSettings
defaultTransactionSettings {
tsRestartPredicate :: Maybe RestartPredicate
tsRestartPredicate = forall a. a -> Maybe a
Just forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall e. Exception e => (e -> Integer -> Bool) -> RestartPredicate
RestartPredicate
forall a b. (a -> b) -> a -> b
$ \DetailedQueryError
e Integer
_ -> DetailedQueryError -> ErrorCode
qeErrorCode DetailedQueryError
e forall a. Eq a => a -> a -> Bool
== ErrorCode
DeadlockDetected
Bool -> Bool -> Bool
|| DetailedQueryError -> ErrorCode
qeErrorCode DetailedQueryError
e forall a. Eq a => a -> a -> Bool
== ErrorCode
SerializationFailure
}
atomically :: MonadBase IO m => STM a -> m a
atomically :: forall (m :: * -> *) a. MonadBase IO m => STM a -> m a
atomically = forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
STM.atomically