module Web.WindowsAzure.ServiceBus.Queue(
QLockedMsgInfo,
enQueueBS,
enQueueLBS,
enQueueBodySrc,
deQueue,
peekLockQueue,
unlockMessage,
deleteMessage,
renewLock
)where
import qualified Data.ByteString.Char8 as C
import qualified Data.ByteString.Lazy as L
import Data.Conduit
import Data.Int
import Web.WindowsAzure.ACS
import Web.WindowsAzure.ServiceBus.SBTypes
import qualified Data.ByteString.Char8 as C
import qualified Data.ByteString.Lazy as L
import Network.HTTP.Conduit hiding (requestBodySource)
import Network.HTTP.Client.Conduit hiding (httpLbs)
import Network.HTTP.Types.Method (methodDelete, methodPost,methodPut)
import Network.HTTP.Types.Header
import Network.HTTP.Types.Method
import qualified Data.CaseInsensitive as CI
import Network.Connection (TLSSettings (..))
import Data.Aeson
import Network(withSocketsDo)
data QLockedMsgInfo = QLockedMsgInfo String BrokerProperties
deriving (Show)
enQueueRequest :: String -> RequestBody -> SBContext -> IO ()
enQueueRequest queueName body (SBContext baseUrl manager aContext) = do
token <- acsToken manager aContext
reqInit <- parseUrl (baseUrl ++ "/" ++ queueName ++ "/messages")
withSocketsDo $ httpLbs (reqInit { method = methodPost,
requestHeaders = [token],
requestBody = body
}) manager
return ()
deQueueRequest :: String -> Int -> SBContext -> IO L.ByteString
deQueueRequest queueName timeout (SBContext baseUrl manager aContext) = do
token <- acsToken manager aContext
reqInit <- parseUrl (baseUrl ++ "/" ++ queueName ++ "/messages/head?timeout=" ++ (show timeout))
res <-withSocketsDo $ httpLbs ( reqInit { method = methodDelete,
requestHeaders = [token]
}) manager
return $ responseBody res
enQueueBS :: String -> C.ByteString -> SBContext -> IO ()
enQueueBS queueName content context =
enQueueRequest queueName (RequestBodyBS content) context
enQueueLBS :: String -> L.ByteString -> SBContext -> IO ()
enQueueLBS queueName content context =
enQueueRequest queueName (RequestBodyLBS content) context
enQueueBodySrc :: String -> Int64 -> Source IO C.ByteString -> SBContext -> IO ()
enQueueBodySrc queueName len bodysrc context = enQueueRequest queueName (requestBodySource len bodysrc) context
deQueue :: String -> Int -> SBContext -> IO (L.ByteString)
deQueue queueName timeout context = deQueueRequest queueName (timeout `mod` 55) context
peekLockQueue :: String -> Int -> SBContext -> IO (QLockedMsgInfo,L.ByteString)
peekLockQueue qName timeout (SBContext baseUrl manager aContext) = do
token <- acsToken manager aContext
reqInit <- parseUrl (baseUrl ++ "/" ++ qName ++ "/messages/head?timeout=" ++ (show timeout))
res <-withSocketsDo $ httpLbs (reqInit { method = methodPost,
requestHeaders = [token]
}) manager
return $ (getQLI res,responseBody res)
getQLI :: Response L.ByteString -> QLockedMsgInfo
getQLI res = QLockedMsgInfo loc bp
where
loc = case lookup hLocation (responseHeaders res) of
Nothing -> error "Expected Location Header in the response!"
Just x -> C.unpack x
bp = case lookup (CI.mk . C.pack $ "BrokerProperties") (responseHeaders res) of
Nothing -> emptyBP
Just bs -> case decode $ L.fromChunks [bs] of
Nothing -> emptyBP
Just b -> b
unlockMessage :: String -> QLockedMsgInfo -> SBContext -> IO ()
unlockMessage queueName (QLockedMsgInfo url brokerProps) (SBContext baseUrl manager acsContext) = do
token <- acsToken manager acsContext
reqInit <- parseUrl url
res <-withSocketsDo $ httpLbs (reqInit { method = methodPut,
requestHeaders = [token]
}) manager
return ()
deleteMessage :: String -> QLockedMsgInfo -> SBContext -> IO ()
deleteMessage queueName (QLockedMsgInfo url brokerProps) (SBContext baseUrl manager acsContext) = do
token <- acsToken manager acsContext
reqInit <- parseUrl url
res <-withSocketsDo $ httpLbs (reqInit { method = methodDelete,
requestHeaders = [token]
}) manager
return ()
renewLock :: String -> QLockedMsgInfo -> SBContext -> IO ()
renewLock queueName (QLockedMsgInfo url brokerProps) (SBContext baseUrl manager acsContext) = do
token <- acsToken manager acsContext
reqInit <- parseUrl url
res <-withSocketsDo $ httpLbs (reqInit { method = methodPost,
requestHeaders = [token]
}) manager
return ()