module Antiope.SQS
( QueueUrl(..)
, SQSError(..)
, readQueue
, drainQueue
, ackMessage
, ackMessages
, mBody
) where
import Antiope.Messages (QueueUrl (QueueUrl), SQSError (DeleteMessageBatchError))
import Control.Lens
import Control.Monad (join)
import Control.Monad.Loops (unfoldWhileM)
import Data.Maybe (catMaybes)
import Data.Text (pack)
import Network.AWS (MonadAWS)
import Network.AWS.SQS
import qualified Network.AWS as AWS
readQueue :: MonadAWS m
=> QueueUrl
-> m [Message]
readQueue (QueueUrl queueUrl) = do
resp <- AWS.send $ receiveMessage queueUrl & rmWaitTimeSeconds ?~ 10 & rmMaxNumberOfMessages ?~ 10
return $ resp ^. rmrsMessages
drainQueue :: MonadAWS m
=> QueueUrl
-> m [Message]
drainQueue queueUrl = join <$> unfoldWhileM (not . null) (readQueue queueUrl)
ackMessage :: MonadAWS m
=> QueueUrl
-> Message
-> m (Either SQSError ())
ackMessage q msg = ackMessages q [msg]
ackMessages :: MonadAWS m
=> QueueUrl
-> [Message]
-> m (Either SQSError ())
ackMessages (QueueUrl queueUrl) msgs = do
let receipts = catMaybes $ msgs ^.. each . mReceiptHandle
let dmbres = (\(r, i) -> deleteMessageBatchRequestEntry (pack (show i)) r) <$> zip receipts ([0..] :: [Int])
resp <- AWS.send $ deleteMessageBatch queueUrl & dmbEntries .~ dmbres
if resp ^. dmbrsResponseStatus == 200
then case resp ^. dmbrsFailed of
[] -> return $ Right ()
_ -> return $ Left DeleteMessageBatchError
else return $ Left DeleteMessageBatchError