module Database.PostgreSQL.Consumers.Consumer (
ConsumerID
, registerConsumer
, unregisterConsumer
) where
import Control.Applicative
import Control.Monad.Base
import Control.Monad.Catch
import Control.Monad.Time
import Data.Int
import Data.Monoid
import Data.Monoid.Utils
import Database.PostgreSQL.PQTypes
import Prelude
import Database.PostgreSQL.Consumers.Config
newtype ConsumerID = ConsumerID Int64
deriving (Eq, Ord, PQFormat)
instance FromSQL ConsumerID where
type PQBase ConsumerID = PQBase Int64
fromSQL mbase = ConsumerID <$> fromSQL mbase
instance ToSQL ConsumerID where
type PQDest ConsumerID = PQDest Int64
toSQL (ConsumerID n) = toSQL n
instance Show ConsumerID where
showsPrec p (ConsumerID n) = showsPrec p n
registerConsumer
:: (MonadBase IO m, MonadMask m, MonadTime m)
=> ConsumerConfig n idx job
-> ConnectionSourceM m
-> m ConsumerID
registerConsumer ConsumerConfig{..} cs = runDBT cs ts $ do
now <- currentTime
runSQL_ $ smconcat [
"INSERT INTO" <+> raw ccConsumersTable
, "(name, last_activity) VALUES (" <?> unRawSQL ccJobsTable <> ", " <?> now <> ")"
, "RETURNING id"
]
fetchOne runIdentity
where
ts = def {
tsAutoTransaction = False
}
unregisterConsumer
:: (MonadBase IO m, MonadMask m)
=> ConsumerConfig n idx job
-> ConnectionSourceM m
-> ConsumerID
-> m ()
unregisterConsumer ConsumerConfig{..} cs wid = runDBT cs ts $ do
runSQL_ $ smconcat [
"UPDATE" <+> raw ccJobsTable
, " SET reserved_by = NULL"
, " WHERE reserved_by =" <?> wid
]
runSQL_ $ smconcat [
"DELETE FROM " <+> raw ccConsumersTable
, "WHERE id =" <?> wid
, " AND name =" <?> unRawSQL ccJobsTable
]
where
ts = def {
tsRestartPredicate = Just . RestartPredicate
$ \e _ -> qeErrorCode e == DeadlockDetected
}