module Database.PostgreSQL.Simple.Queue
(
PayloadId (..)
, State (..)
, Payload (..)
, enqueueDB
, dequeueDB
, withPayloadDB
, getCountDB
, enqueue
, tryDequeue
, dequeue
, withPayload
, getCount
) where
import Control.Monad
import Control.Monad.Catch
import Data.Aeson
import Data.Function
import Data.Int
import Data.Text (Text)
import Data.Time
import Database.PostgreSQL.Simple (Connection, Only (..))
import qualified Database.PostgreSQL.Simple as Simple
import Database.PostgreSQL.Simple.FromField
import Database.PostgreSQL.Simple.FromRow
import Database.PostgreSQL.Simple.Notification
import Database.PostgreSQL.Simple.SqlQQ
import Database.PostgreSQL.Simple.ToField
import Database.PostgreSQL.Simple.ToRow
import Database.PostgreSQL.Simple.Transaction
import Database.PostgreSQL.Transact
import Data.Monoid
import Data.String
import Control.Monad.IO.Class
import Data.Maybe
newtype PayloadId = PayloadId { unPayloadId :: Int64 }
deriving (Eq, Show, FromField, ToField)
instance FromRow PayloadId where
fromRow = fromOnly <$> fromRow
instance ToRow PayloadId where
toRow = toRow . Only
data Payload = Payload
{ pId :: PayloadId
, pValue :: Value
, pState :: State
, pAttempts :: Int
, pCreatedAt :: UTCTime
, pModifiedAt :: UTCTime
} deriving (Show, Eq)
instance FromRow Payload where
fromRow = Payload <$> field <*> field <*> field <*> field <*> field <*> field
data State = Enqueued | Dequeued
deriving (Show, Eq, Ord, Enum, Bounded)
instance ToField State where
toField = toField . \case
Enqueued -> "enqueued" :: Text
Dequeued -> "dequeued"
instance FromField State where
fromField f y = do
n <- typename f
if n == "state_t" then case y of
Nothing -> returnError UnexpectedNull f "state can't be NULL"
Just y' -> case y' of
"enqueued" -> return Enqueued
"dequeued" -> return Dequeued
x -> returnError ConversionFailed f (show x)
else
returnError Incompatible f $
"Expect type name to be state but it was " ++ show n
withSchema :: String -> Simple.Query -> Simple.Query
withSchema schemaName q = "SET search_path TO " <> fromString schemaName <> "; " <> q
notifyName :: IsString s => String -> s
notifyName schemaName = fromString $ schemaName <> "_enqueue"
enqueueDB :: String -> Value -> DB PayloadId
enqueueDB schemaName value = enqueueWithDB schemaName value 0
enqueueWithDB :: String -> Value -> Int -> DB PayloadId
enqueueWithDB schemaName value attempts =
fmap head $ query (withSchema schemaName $ [sql|
NOTIFY |] <> " " <> notifyName schemaName <> ";" <> [sql|
INSERT INTO payloads (attempts, value)
VALUES (?, ?)
RETURNING id;|]
)
$ (attempts, value)
retryDB :: String -> Value -> Int -> DB PayloadId
retryDB schemaName value attempts = enqueueWithDB schemaName value $ attempts + 1
dequeueDB :: String -> DB (Maybe Payload)
dequeueDB schemaName = fmap listToMaybe $ query_ $ withSchema schemaName
[sql| UPDATE payloads
SET state='dequeued'
WHERE id in
( SELECT id
FROM payloads
WHERE state='enqueued'
ORDER BY modified_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING id, value, state, attempts, created_at, modified_at
|]
withPayloadDB :: String
-> Int
-> (Payload -> IO a)
-> DB (Either SomeException (Maybe a))
withPayloadDB schemaName retryCount f
= query_
( withSchema schemaName
$ [sql|
SELECT id, value, state, attempts, created_at, modified_at
FROM payloads
WHERE state='enqueued'
ORDER BY modified_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
|]
)
>>= \case
[] -> return $ return Nothing
[payload@Payload {..}] -> do
execute [sql| UPDATE payloads SET state='dequeued' WHERE id = ? |] pId
handle (\e -> when (pAttempts < retryCount)
(void $ retryDB schemaName pValue pAttempts)
>> return (Left e)
)
$ Right . Just <$> liftIO (f payload)
xs -> return
$ Left
$ toException
$ userError
$ "LIMIT is 1 but got more than one row: "
++ show xs
getCountDB :: String -> DB Int64
getCountDB schemaName = fmap (fromOnly . head) $ query_ $ withSchema schemaName
[sql| SELECT count(*)
FROM payloads
WHERE state='enqueued'
|]
enqueue :: String -> Connection -> Value -> IO PayloadId
enqueue schemaName conn value = runDBT (enqueueDB schemaName value) ReadCommitted conn
notifyPayload :: String -> Connection -> IO ()
notifyPayload schemaName conn = do
Notification {..} <- getNotification conn
unless (notificationChannel == notifyName schemaName) $ notifyPayload schemaName conn
tryDequeue :: String -> Connection -> IO (Maybe Payload)
tryDequeue schemaName conn = runDBT (dequeueDB schemaName) ReadCommitted conn
dequeue :: String -> Connection -> IO Payload
dequeue schemaName conn = bracket_
(Simple.execute_ conn $ "LISTEN " <> notifyName schemaName)
(Simple.execute_ conn $ "UNLISTEN " <> notifyName schemaName)
$ fix $ \continue -> do
m <- tryDequeue schemaName conn
case m of
Nothing -> do
notifyPayload schemaName conn
continue
Just x -> return x
withPayload :: String
-> Connection
-> Int
-> (Payload -> IO a)
-> IO (Either SomeException a)
withPayload schemaName conn retryCount f = bracket_
(Simple.execute_ conn $ "LISTEN " <> notifyName schemaName)
(Simple.execute_ conn $ "UNLISTEN " <> notifyName schemaName)
$ fix
$ \continue -> runDBT (withPayloadDB schemaName retryCount f) ReadCommitted conn
>>= \case
Left x -> return $ Left x
Right Nothing -> do
notifyPayload schemaName conn
continue
Right (Just x) -> return $ Right x
getCount :: String -> Connection -> IO Int64
getCount schemaName = runDBT (getCountDB schemaName) ReadCommitted