module Network.MicrosoftAzure.ServiceBus.Topic(
sendTopicBS,
sendTopicLBS,
sendTopicBodySrc,
destructiveRead,
peekLockTopic
)where
import qualified Data.ByteString.Char8 as C
import qualified Data.ByteString.Lazy as L
import Data.Conduit
import Data.Int
import Network.MicrosoftAzure.ACS
import Network.MicrosoftAzure.ServiceBus.SBTypes
import qualified Data.ByteString.Char8 as C
import qualified Data.ByteString.Lazy as L
import Network.HTTP.Conduit hiding (requestBodySource)
import Network.HTTP.Client.Conduit hiding (httpLbs)
import Network.HTTP.Types.Method (methodDelete, methodPost,methodPut)
import Network.HTTP.Types.Header
import Network.HTTP.Types.Method
import qualified Data.CaseInsensitive as CI
import Network.Connection (TLSSettings (..))
import Data.Aeson
import Network(withSocketsDo)
sendTopicRequest :: String -> RequestBody -> SBContext -> IO ()
sendTopicRequest topicName body (SBContext baseUrl manager aContext) = do
token <- acsToken manager aContext
reqInit <- parseUrl (baseUrl ++ "/" ++ topicName ++ "/messages")
withSocketsDo $ httpLbs (reqInit { method = methodPost,
requestHeaders = [token],
requestBody = body
}) manager
return ()
destructiveReadRequest :: String -> String -> Int -> SBContext -> IO L.ByteString
destructiveReadRequest topic subsc timeout (SBContext baseUrl manager aContext) = do
token <- acsToken manager aContext
reqInit <- parseUrl (baseUrl ++ "/" ++ topic ++ "/Subscriptions/" ++ subsc ++ "/messages/head?timeout=" ++ (show timeout))
res <-withSocketsDo $ httpLbs ( reqInit { method = methodDelete,
requestHeaders = [token]
}) manager
return $ responseBody res
sendTopicBS :: String -> C.ByteString -> SBContext -> IO ()
sendTopicBS topicName content context =
sendTopicRequest topicName (RequestBodyBS content) context
sendTopicLBS :: String -> L.ByteString -> SBContext -> IO ()
sendTopicLBS topicName content context =
sendTopicRequest topicName (RequestBodyLBS content) context
sendTopicBodySrc :: String -> Int64 -> Source IO C.ByteString -> SBContext -> IO ()
sendTopicBodySrc topicName len bodysrc context = sendTopicRequest topicName (requestBodySource len bodysrc) context
destructiveRead :: String -> String -> Int -> SBContext -> IO (L.ByteString)
destructiveRead topic subsc timeout context = destructiveReadRequest topic subsc (timeout `mod` 55) context
peekLockTopic :: String -> String -> Int -> SBContext -> IO (LockedMsgInfo,L.ByteString)
peekLockTopic topic subscr timeout (SBContext baseUrl manager aContext) = do
token <- acsToken manager aContext
reqInit <- parseUrl (baseUrl ++ "/" ++ topic ++ "/Subscriptions/" ++ subscr ++ "/messages/head?timeout=" ++ (show timeout))
res <-withSocketsDo $ httpLbs (reqInit { method = methodPost,
requestHeaders = [token]
}) manager
return $ (getQLI res,responseBody res)