{-# LANGUAGE TypeApplications #-}
module Database.PostgreSQL.Consumers.Consumer (
    ConsumerID
  , registerConsumer
  , unregisterConsumer
  ) where

import Control.Applicative
import Control.Monad.Base
import Control.Monad.Catch
import Control.Monad.Time
import Data.Int
import Data.Monoid
import Data.Monoid.Utils
import Database.PostgreSQL.PQTypes
import Prelude

import Database.PostgreSQL.Consumers.Config

-- | ID of a consumer.
newtype ConsumerID = ConsumerID Int64
  deriving (ConsumerID -> ConsumerID -> Bool
(ConsumerID -> ConsumerID -> Bool)
-> (ConsumerID -> ConsumerID -> Bool) -> Eq ConsumerID
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ConsumerID -> ConsumerID -> Bool
$c/= :: ConsumerID -> ConsumerID -> Bool
== :: ConsumerID -> ConsumerID -> Bool
$c== :: ConsumerID -> ConsumerID -> Bool
Eq, Eq ConsumerID
Eq ConsumerID
-> (ConsumerID -> ConsumerID -> Ordering)
-> (ConsumerID -> ConsumerID -> Bool)
-> (ConsumerID -> ConsumerID -> Bool)
-> (ConsumerID -> ConsumerID -> Bool)
-> (ConsumerID -> ConsumerID -> Bool)
-> (ConsumerID -> ConsumerID -> ConsumerID)
-> (ConsumerID -> ConsumerID -> ConsumerID)
-> Ord ConsumerID
ConsumerID -> ConsumerID -> Bool
ConsumerID -> ConsumerID -> Ordering
ConsumerID -> ConsumerID -> ConsumerID
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: ConsumerID -> ConsumerID -> ConsumerID
$cmin :: ConsumerID -> ConsumerID -> ConsumerID
max :: ConsumerID -> ConsumerID -> ConsumerID
$cmax :: ConsumerID -> ConsumerID -> ConsumerID
>= :: ConsumerID -> ConsumerID -> Bool
$c>= :: ConsumerID -> ConsumerID -> Bool
> :: ConsumerID -> ConsumerID -> Bool
$c> :: ConsumerID -> ConsumerID -> Bool
<= :: ConsumerID -> ConsumerID -> Bool
$c<= :: ConsumerID -> ConsumerID -> Bool
< :: ConsumerID -> ConsumerID -> Bool
$c< :: ConsumerID -> ConsumerID -> Bool
compare :: ConsumerID -> ConsumerID -> Ordering
$ccompare :: ConsumerID -> ConsumerID -> Ordering
$cp1Ord :: Eq ConsumerID
Ord)

instance PQFormat ConsumerID where
  pqFormat :: ByteString
pqFormat = PQFormat Int64 => ByteString
forall t. PQFormat t => ByteString
pqFormat @Int64
instance FromSQL ConsumerID where
  type PQBase ConsumerID = PQBase Int64
  fromSQL :: Maybe (PQBase ConsumerID) -> IO ConsumerID
fromSQL Maybe (PQBase ConsumerID)
mbase = Int64 -> ConsumerID
ConsumerID (Int64 -> ConsumerID) -> IO Int64 -> IO ConsumerID
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (PQBase Int64) -> IO Int64
forall t. FromSQL t => Maybe (PQBase t) -> IO t
fromSQL Maybe (PQBase Int64)
Maybe (PQBase ConsumerID)
mbase
instance ToSQL ConsumerID where
  type PQDest ConsumerID = PQDest Int64
  toSQL :: ConsumerID
-> ParamAllocator -> (Ptr (PQDest ConsumerID) -> IO r) -> IO r
toSQL (ConsumerID Int64
n) = Int64 -> ParamAllocator -> (Ptr (PQDest Int64) -> IO r) -> IO r
forall t r.
ToSQL t =>
t -> ParamAllocator -> (Ptr (PQDest t) -> IO r) -> IO r
toSQL Int64
n

instance Show ConsumerID where
  showsPrec :: Int -> ConsumerID -> ShowS
showsPrec Int
p (ConsumerID Int64
n) = Int -> Int64 -> ShowS
forall a. Show a => Int -> a -> ShowS
showsPrec Int
p Int64
n

-- | Register consumer in the consumers table,
-- so that it can reserve jobs using acquired ID.
registerConsumer
  :: (MonadBase IO m, MonadMask m, MonadTime m)
  => ConsumerConfig n idx job
  -> ConnectionSourceM m
  -> m ConsumerID
registerConsumer :: ConsumerConfig n idx job -> ConnectionSourceM m -> m ConsumerID
registerConsumer ConsumerConfig{Int
[SQL]
Maybe Channel
RawSQL ()
job -> idx
job -> n Result
row -> job
SomeException -> job -> n Action
ccOnException :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> SomeException -> job -> m Action
ccProcessJob :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> m Result
ccMaxRunningJobs :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccNotificationTimeout :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccNotificationChannel :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> Maybe Channel
ccJobIndex :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> idx
ccJobFetcher :: ()
ccJobSelectors :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> [SQL]
ccConsumersTable :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> RawSQL ()
ccJobsTable :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> RawSQL ()
ccOnException :: SomeException -> job -> n Action
ccProcessJob :: job -> n Result
ccMaxRunningJobs :: Int
ccNotificationTimeout :: Int
ccNotificationChannel :: Maybe Channel
ccJobIndex :: job -> idx
ccJobFetcher :: row -> job
ccJobSelectors :: [SQL]
ccConsumersTable :: RawSQL ()
ccJobsTable :: RawSQL ()
..} ConnectionSourceM m
cs = ConnectionSourceM m
-> TransactionSettings -> DBT m ConsumerID -> m ConsumerID
forall (m :: * -> *) a.
(MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
ts (DBT m ConsumerID -> m ConsumerID)
-> DBT m ConsumerID -> m ConsumerID
forall a b. (a -> b) -> a -> b
$ do
  UTCTime
now <- DBT_ m m UTCTime
forall (m :: * -> *). MonadTime m => m UTCTime
currentTime
  SQL -> DBT_ m m ()
forall (m :: * -> *). MonadDB m => SQL -> m ()
runSQL_ (SQL -> DBT_ m m ()) -> SQL -> DBT_ m m ()
forall a b. (a -> b) -> a -> b
$ [SQL] -> SQL
forall m. (IsString m, Monoid m) => [m] -> m
smconcat [
      SQL
"INSERT INTO" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccConsumersTable
    , SQL
"(name, last_activity) VALUES (" SQL -> Text -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> RawSQL () -> Text
unRawSQL RawSQL ()
ccJobsTable SQL -> SQL -> SQL
forall a. Semigroup a => a -> a -> a
<> SQL
", " SQL -> UTCTime -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> UTCTime
now SQL -> SQL -> SQL
forall a. Semigroup a => a -> a -> a
<> SQL
")"
    , SQL
"RETURNING id"
    ]
  (Identity ConsumerID -> ConsumerID) -> DBT m ConsumerID
forall (m :: * -> *) row t.
(MonadDB m, MonadThrow m, FromRow row) =>
(row -> t) -> m t
fetchOne Identity ConsumerID -> ConsumerID
forall a. Identity a -> a
runIdentity
  where
    ts :: TransactionSettings
ts = TransactionSettings
defaultTransactionSettings {
      tsAutoTransaction :: Bool
tsAutoTransaction = Bool
False
    }

-- | Unregister consumer with a given ID.
unregisterConsumer
  :: (MonadBase IO m, MonadMask m)
  => ConsumerConfig n idx job
  -> ConnectionSourceM m
  -> ConsumerID
  -> m ()
unregisterConsumer :: ConsumerConfig n idx job
-> ConnectionSourceM m -> ConsumerID -> m ()
unregisterConsumer ConsumerConfig{Int
[SQL]
Maybe Channel
RawSQL ()
job -> idx
job -> n Result
row -> job
SomeException -> job -> n Action
ccOnException :: SomeException -> job -> n Action
ccProcessJob :: job -> n Result
ccMaxRunningJobs :: Int
ccNotificationTimeout :: Int
ccNotificationChannel :: Maybe Channel
ccJobIndex :: job -> idx
ccJobFetcher :: row -> job
ccJobSelectors :: [SQL]
ccConsumersTable :: RawSQL ()
ccJobsTable :: RawSQL ()
ccOnException :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> SomeException -> job -> m Action
ccProcessJob :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> m Result
ccMaxRunningJobs :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccNotificationTimeout :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> Int
ccNotificationChannel :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> Maybe Channel
ccJobIndex :: forall (m :: * -> *) idx job.
ConsumerConfig m idx job -> job -> idx
ccJobFetcher :: ()
ccJobSelectors :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> [SQL]
ccConsumersTable :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> RawSQL ()
ccJobsTable :: forall (m :: * -> *) idx job. ConsumerConfig m idx job -> RawSQL ()
..} ConnectionSourceM m
cs ConsumerID
wid = ConnectionSourceM m -> TransactionSettings -> DBT m () -> m ()
forall (m :: * -> *) a.
(MonadBase IO m, MonadMask m) =>
ConnectionSourceM m -> TransactionSettings -> DBT m a -> m a
runDBT ConnectionSourceM m
cs TransactionSettings
ts (DBT m () -> m ()) -> DBT m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
  -- Free tasks manually in case there is no
  -- foreign key constraint on reserved_by,
  SQL -> DBT m ()
forall (m :: * -> *). MonadDB m => SQL -> m ()
runSQL_ (SQL -> DBT m ()) -> SQL -> DBT m ()
forall a b. (a -> b) -> a -> b
$ [SQL] -> SQL
forall m. (IsString m, Monoid m) => [m] -> m
smconcat [
      SQL
"UPDATE" SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccJobsTable
    , SQL
"   SET reserved_by = NULL"
    , SQL
" WHERE reserved_by =" SQL -> ConsumerID -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> ConsumerID
wid
    ]
  SQL -> DBT m ()
forall (m :: * -> *). MonadDB m => SQL -> m ()
runSQL_ (SQL -> DBT m ()) -> SQL -> DBT m ()
forall a b. (a -> b) -> a -> b
$ [SQL] -> SQL
forall m. (IsString m, Monoid m) => [m] -> m
smconcat [
      SQL
"DELETE FROM " SQL -> SQL -> SQL
forall m. (IsString m, Monoid m) => m -> m -> m
<+> RawSQL () -> SQL
raw RawSQL ()
ccConsumersTable
    , SQL
"WHERE id =" SQL -> ConsumerID -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> ConsumerID
wid
    , SQL
"  AND name =" SQL -> Text -> SQL
forall t. (Show t, ToSQL t) => SQL -> t -> SQL
<?> RawSQL () -> Text
unRawSQL RawSQL ()
ccJobsTable
    ]
  where
    ts :: TransactionSettings
ts = TransactionSettings
defaultTransactionSettings {
      tsRestartPredicate :: Maybe RestartPredicate
tsRestartPredicate = RestartPredicate -> Maybe RestartPredicate
forall a. a -> Maybe a
Just (RestartPredicate -> Maybe RestartPredicate)
-> ((DetailedQueryError -> Integer -> Bool) -> RestartPredicate)
-> (DetailedQueryError -> Integer -> Bool)
-> Maybe RestartPredicate
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (DetailedQueryError -> Integer -> Bool) -> RestartPredicate
forall e. Exception e => (e -> Integer -> Bool) -> RestartPredicate
RestartPredicate
      ((DetailedQueryError -> Integer -> Bool) -> Maybe RestartPredicate)
-> (DetailedQueryError -> Integer -> Bool)
-> Maybe RestartPredicate
forall a b. (a -> b) -> a -> b
$ \DetailedQueryError
e Integer
_ -> DetailedQueryError -> ErrorCode
qeErrorCode DetailedQueryError
e ErrorCode -> ErrorCode -> Bool
forall a. Eq a => a -> a -> Bool
== ErrorCode
DeadlockDetected
    }