{-# LANGUAGE TypeApplications #-} module Database.PostgreSQL.Consumers.Consumer ( ConsumerID , registerConsumer , unregisterConsumer ) where import Control.Applicative import Control.Monad.Base import Control.Monad.Catch import Control.Monad.Time import Data.Int import Data.Monoid import Data.Monoid.Utils import Database.PostgreSQL.PQTypes import Prelude import Database.PostgreSQL.Consumers.Config -- | ID of a consumer. newtype ConsumerID = ConsumerID Int64 deriving (Eq, Ord) instance PQFormat ConsumerID where pqFormat = pqFormat @Int64 instance FromSQL ConsumerID where type PQBase ConsumerID = PQBase Int64 fromSQL mbase = ConsumerID <$> fromSQL mbase instance ToSQL ConsumerID where type PQDest ConsumerID = PQDest Int64 toSQL (ConsumerID n) = toSQL n instance Show ConsumerID where showsPrec p (ConsumerID n) = showsPrec p n -- | Register consumer in the consumers table, -- so that it can reserve jobs using acquired ID. registerConsumer :: (MonadBase IO m, MonadMask m, MonadTime m) => ConsumerConfig n idx job -> ConnectionSourceM m -> m ConsumerID registerConsumer ConsumerConfig{..} cs = runDBT cs ts $ do now <- currentTime runSQL_ $ smconcat [ "INSERT INTO" <+> raw ccConsumersTable , "(name, last_activity) VALUES (" unRawSQL ccJobsTable <> ", " now <> ")" , "RETURNING id" ] fetchOne runIdentity where ts = def { tsAutoTransaction = False } -- | Unregister consumer with a given ID. unregisterConsumer :: (MonadBase IO m, MonadMask m) => ConsumerConfig n idx job -> ConnectionSourceM m -> ConsumerID -> m () unregisterConsumer ConsumerConfig{..} cs wid = runDBT cs ts $ do -- Free tasks manually in case there is no -- foreign key constraint on reserved_by, runSQL_ $ smconcat [ "UPDATE" <+> raw ccJobsTable , " SET reserved_by = NULL" , " WHERE reserved_by =" wid ] runSQL_ $ smconcat [ "DELETE FROM " <+> raw ccConsumersTable , "WHERE id =" wid , " AND name =" unRawSQL ccJobsTable ] where ts = def { tsRestartPredicate = Just . RestartPredicate $ \e _ -> qeErrorCode e == DeadlockDetected }