module Network.AMQP (
Connection,
openConnection,
openConnection',
closeConnection,
addConnectionClosedHandler,
Channel,
openChannel,
qos,
ExchangeOpts(..),
newExchange,
declareExchange,
bindExchange,
bindExchange',
unbindExchange,
unbindExchange',
deleteExchange,
QueueOpts(..),
newQueue,
declareQueue,
bindQueue,
bindQueue',
unbindQueue,
unbindQueue',
purgeQueue,
deleteQueue,
Message(..),
DeliveryMode(..),
newMsg,
Envelope(..),
ConsumerTag,
Ack(..),
consumeMsgs,
consumeMsgs',
cancelConsumer,
publishMsg,
getMsg,
rejectMsg,
recoverMsgs,
ackMsg,
ackEnv,
txSelect,
txCommit,
txRollback,
flow,
AMQPException(..)
) where
import Control.Concurrent
import Control.Monad
import Data.Binary
import Data.Binary.Get
import Data.Binary.Put as BPut
import Data.Maybe
import Data.Text (Text)
import Data.Typeable
import Network
import System.IO
import qualified Control.Exception as CE
import qualified Data.ByteString.Char8 as BS
import qualified Data.ByteString.Lazy.Char8 as BL
import qualified Data.Map as M
import qualified Data.Foldable as F
import qualified Data.IntMap as IM
import qualified Data.Sequence as Seq
import qualified Data.Text as T
import Network.AMQP.Protocol
import Network.AMQP.Types
import Network.AMQP.Helpers
import Network.AMQP.Generated
data ExchangeOpts = ExchangeOpts
{
exchangeName :: Text,
exchangeType :: Text,
exchangePassive :: Bool,
exchangeDurable :: Bool,
exchangeAutoDelete :: Bool,
exchangeInternal :: Bool
}
deriving (Eq, Ord, Read, Show)
newExchange :: ExchangeOpts
newExchange = ExchangeOpts "" "" False True False False
declareExchange :: Channel -> ExchangeOpts -> IO ()
declareExchange chan exchg = do
(SimpleMethod Exchange_declare_ok) <- request chan (SimpleMethod (Exchange_declare
1
(ShortString $ exchangeName exchg)
(ShortString $ exchangeType exchg)
(exchangePassive exchg)
(exchangeDurable exchg)
(exchangeAutoDelete exchg)
(exchangeInternal exchg)
False
(FieldTable M.empty)))
return ()
bindExchange :: Channel -> Text -> Text -> Text -> IO ()
bindExchange chan destinationName sourceName routingKey =
bindExchange' chan destinationName sourceName routingKey (FieldTable M.empty)
bindExchange' :: Channel -> Text -> Text -> Text -> FieldTable -> IO ()
bindExchange' chan destinationName sourceName routingKey args = do
(SimpleMethod Exchange_bind_ok) <- request chan (SimpleMethod (Exchange_bind
1
(ShortString destinationName)
(ShortString sourceName)
(ShortString routingKey)
False
args
))
return ()
unbindExchange :: Channel -> Text -> Text -> Text -> IO ()
unbindExchange chan destinationName sourceName routingKey =
unbindExchange' chan destinationName sourceName routingKey (FieldTable M.empty)
unbindExchange' :: Channel -> Text -> Text -> Text -> FieldTable -> IO ()
unbindExchange' chan destinationName sourceName routingKey args = do
SimpleMethod Exchange_unbind_ok <- request chan $ SimpleMethod $ Exchange_unbind
1
(ShortString destinationName)
(ShortString sourceName)
(ShortString routingKey)
False
args
return ()
deleteExchange :: Channel -> Text -> IO ()
deleteExchange chan exchange = do
(SimpleMethod Exchange_delete_ok) <- request chan (SimpleMethod (Exchange_delete
1
(ShortString exchange)
False
False
))
return ()
data QueueOpts = QueueOpts
{
queueName :: Text,
--optional
queuePassive :: Bool,
queueDurable :: Bool,
queueExclusive :: Bool,
queueAutoDelete :: Bool,
queueHeaders :: FieldTable
}
deriving (Eq, Ord, Read, Show)
newQueue :: QueueOpts
newQueue = QueueOpts "" False True False False (FieldTable M.empty)
declareQueue :: Channel -> QueueOpts -> IO (Text, Int, Int)
declareQueue chan queue = do
(SimpleMethod (Queue_declare_ok (ShortString qName) messageCount consumerCount)) <- request chan $ (SimpleMethod (Queue_declare
1
(ShortString $ queueName queue)
(queuePassive queue)
(queueDurable queue)
(queueExclusive queue)
(queueAutoDelete queue)
False
(queueHeaders queue)))
return (qName, fromIntegral messageCount, fromIntegral consumerCount)
bindQueue :: Channel -> Text -> Text -> Text -> IO ()
bindQueue chan queue exchange routingKey = bindQueue' chan queue exchange routingKey (FieldTable M.empty)
bindQueue' :: Channel -> Text -> Text -> Text -> FieldTable -> IO ()
bindQueue' chan queue exchange routingKey args = do
(SimpleMethod Queue_bind_ok) <- request chan (SimpleMethod (Queue_bind
1
(ShortString queue)
(ShortString exchange)
(ShortString routingKey)
False
args
))
return ()
unbindQueue :: Channel -> Text -> Text -> Text -> IO ()
unbindQueue chan queue exchange routingKey =
unbindQueue' chan queue exchange routingKey (FieldTable M.empty)
unbindQueue' :: Channel -> Text -> Text -> Text -> FieldTable -> IO ()
unbindQueue' chan queue exchange routingKey args = do
SimpleMethod Queue_unbind_ok <- request chan $ SimpleMethod $ Queue_unbind
1
(ShortString queue)
(ShortString exchange)
(ShortString routingKey)
args
return ()
purgeQueue :: Channel -> Text -> IO Word32
purgeQueue chan queue = do
(SimpleMethod (Queue_purge_ok msgCount)) <- request chan $ (SimpleMethod (Queue_purge
1
(ShortString queue)
False
))
return msgCount
deleteQueue :: Channel -> Text -> IO Word32
deleteQueue chan queue = do
(SimpleMethod (Queue_delete_ok msgCount)) <- request chan $ (SimpleMethod (Queue_delete
1
(ShortString queue)
False
False
False
))
return msgCount
type ConsumerTag = Text
data Ack = Ack | NoAck
deriving (Eq, Ord, Read, Show)
ackToBool :: Ack -> Bool
ackToBool Ack = False
ackToBool NoAck = True
consumeMsgs :: Channel -> Text -> Ack -> ((Message,Envelope) -> IO ()) -> IO ConsumerTag
consumeMsgs chan queue ack callback =
consumeMsgs' chan queue ack callback (FieldTable M.empty)
consumeMsgs' :: Channel -> Text -> Ack -> ((Message,Envelope) -> IO ()) -> FieldTable -> IO ConsumerTag
consumeMsgs' chan queue ack callback args = do
newConsumerTag <- (fmap (T.pack . show)) $ modifyMVar (lastConsumerTag chan) $ \c -> return (c+1,c+1)
modifyMVar_ (consumers chan) $ return . M.insert newConsumerTag callback
writeAssembly chan (SimpleMethod $ Basic_consume
1
(ShortString queue)
(ShortString newConsumerTag)
False
(ackToBool ack)
False
True
args
)
return newConsumerTag
cancelConsumer :: Channel -> ConsumerTag -> IO ()
cancelConsumer chan consumerTag = do
(SimpleMethod (Basic_cancel_ok _)) <- request chan $ (SimpleMethod (Basic_cancel
(ShortString consumerTag)
False
))
modifyMVar_ (consumers chan) $ return . M.delete consumerTag
publishMsg :: Channel -> Text -> Text -> Message -> IO ()
publishMsg chan exchange routingKey msg = do
writeAssembly chan (ContentMethod (Basic_publish
1
(ShortString exchange)
(ShortString routingKey)
False
False)
(CHBasic
(fmap ShortString $ msgContentType msg)
Nothing
(msgHeaders msg)
(fmap deliveryModeToInt $ msgDeliveryMode msg)
Nothing
(fmap ShortString $ msgCorrelationID msg)
(fmap ShortString $ msgReplyTo msg)
Nothing
(fmap ShortString $ msgID msg)
(msgTimestamp msg)
Nothing
Nothing
Nothing
Nothing
)
(msgBody msg))
return ()
getMsg :: Channel -> Ack -> Text -> IO (Maybe (Message, Envelope))
getMsg chan ack queue = do
ret <- request chan (SimpleMethod (Basic_get
1
(ShortString queue)
(ackToBool ack)
))
case ret of
ContentMethod (Basic_get_ok deliveryTag redelivered (ShortString exchange) (ShortString routingKey) _) properties body ->
return $ Just $ (msgFromContentHeaderProperties properties body,
Envelope {envDeliveryTag = deliveryTag, envRedelivered = redelivered,
envExchangeName = exchange, envRoutingKey = routingKey, envChannel = chan})
_ -> return Nothing
ackMsg :: Channel -> LongLongInt -> Bool -> IO ()
ackMsg chan deliveryTag multiple =
writeAssembly chan $ (SimpleMethod (Basic_ack
deliveryTag
multiple
))
ackEnv :: Envelope -> IO ()
ackEnv env = ackMsg (envChannel env) (envDeliveryTag env) False
rejectMsg :: Channel -> LongLongInt -> Bool -> IO ()
rejectMsg chan deliveryTag requeue =
writeAssembly chan $ (SimpleMethod (Basic_reject
deliveryTag
requeue
))
recoverMsgs :: Channel -> Bool -> IO ()
recoverMsgs chan requeue = do
SimpleMethod Basic_recover_ok <- request chan $ (SimpleMethod (Basic_recover
requeue
))
return ()
txSelect :: Channel -> IO ()
txSelect chan = do
(SimpleMethod Tx_select_ok) <- request chan $ SimpleMethod Tx_select
return ()
txCommit :: Channel -> IO ()
txCommit chan = do
(SimpleMethod Tx_commit_ok) <- request chan $ SimpleMethod Tx_commit
return ()
txRollback :: Channel -> IO ()
txRollback chan = do
(SimpleMethod Tx_rollback_ok) <- request chan $ SimpleMethod Tx_rollback
return ()
flow :: Channel -> Bool -> IO ()
flow chan active = do
(SimpleMethod (Channel_flow_ok _)) <- request chan $ SimpleMethod (Channel_flow active)
return ()
data Envelope = Envelope
{
envDeliveryTag :: LongLongInt,
envRedelivered :: Bool,
envExchangeName :: Text,
envRoutingKey :: Text,
envChannel :: Channel
}
data DeliveryMode = Persistent
| NonPersistent
deriving (Eq, Ord, Read, Show)
deliveryModeToInt :: DeliveryMode -> Octet
deliveryModeToInt NonPersistent = 1
deliveryModeToInt Persistent = 2
intToDeliveryMode :: Octet -> DeliveryMode
intToDeliveryMode 1 = NonPersistent
intToDeliveryMode 2 = Persistent
intToDeliveryMode n = error ("Unknown delivery mode int: " ++ show n)
data Message = Message {
msgBody :: BL.ByteString,
msgDeliveryMode :: Maybe DeliveryMode,
msgTimestamp :: Maybe Timestamp,
msgID :: Maybe Text,
msgContentType :: Maybe Text,
msgReplyTo :: Maybe Text,
msgCorrelationID :: Maybe Text,
msgHeaders :: Maybe FieldTable
}
deriving (Eq, Ord, Read, Show)
newMsg :: Message
newMsg = Message (BL.empty) Nothing Nothing Nothing Nothing Nothing Nothing Nothing
data Assembly = SimpleMethod MethodPayload
| ContentMethod MethodPayload ContentHeaderProperties BL.ByteString
deriving Show
readAssembly :: Chan FramePayload -> IO Assembly
readAssembly chan = do
m <- readChan chan
case m of
MethodPayload p ->
if hasContent m
then do
(props, msg) <- collectContent chan
return $ ContentMethod p props msg
else do
return $ SimpleMethod p
x -> error $ "didn't expect frame: " ++ show x
collectContent :: Chan FramePayload -> IO (ContentHeaderProperties, BL.ByteString)
collectContent chan = do
(ContentHeaderPayload _ _ bodySize props) <- readChan chan
content <- collect $ fromIntegral bodySize
return (props, BL.concat content)
where
collect x | x <= 0 = return []
collect x = do
(ContentBodyPayload payload) <- readChan chan
r <- collect (x (BL.length payload))
return $ payload : r
data Connection = Connection {
connHandle :: Handle,
connChannels :: (MVar (IM.IntMap (Channel, ThreadId))),
connMaxFrameSize :: Int,
connClosed :: MVar (Maybe String),
connClosedLock :: MVar (),
connWriteLock :: MVar (),
connClosedHandlers :: MVar [IO ()],
lastChannelID :: MVar Int
}
connectionReceiver :: Connection -> IO ()
connectionReceiver conn = do
Frame chanID payload <- readFrame (connHandle conn)
forwardToChannel chanID payload
connectionReceiver conn
where
forwardToChannel 0 (MethodPayload Connection_close_ok) = do
modifyMVar_ (connClosed conn) $ const $ return $ Just "closed by user"
killThread =<< myThreadId
forwardToChannel 0 (MethodPayload (Connection_close _ (ShortString errorMsg) _ _)) = do
writeFrame (connHandle conn) $ Frame 0 $ MethodPayload Connection_close_ok
modifyMVar_ (connClosed conn) $ const $ return $ Just $ T.unpack errorMsg
killThread =<< myThreadId
forwardToChannel 0 payload = putStrLn $ "Got unexpected msg on channel zero: " ++ show payload
forwardToChannel chanID payload = do
withMVar (connChannels conn) $ \cs -> do
case IM.lookup (fromIntegral chanID) cs of
Just c -> writeChan (inQueue $ fst c) payload
Nothing -> putStrLn $ "ERROR: channel not open " ++ show chanID
openConnection :: String -> Text -> Text -> Text -> IO Connection
openConnection host = openConnection' host 5672
openConnection' :: String -> PortNumber -> Text -> Text -> Text -> IO Connection
openConnection' host port vhost loginName loginPassword = withSocketsDo $ do
handle <- connectTo host $ PortNumber port
BL.hPut handle $ BPut.runPut $ do
BPut.putByteString $ BS.pack "AMQP"
BPut.putWord8 1
BPut.putWord8 1 --TCP/IP
BPut.putWord8 0
BPut.putWord8 9
Frame 0 (MethodPayload (Connection_start _ _ _ _ _)) <- readFrame handle
writeFrame handle start_ok
Frame 0 (MethodPayload (Connection_tune _ frame_max _)) <- readFrame handle
let maxFrameSize = (min 131072 frame_max)
writeFrame handle (Frame 0 (MethodPayload
(Connection_tune_ok 0 maxFrameSize 0)
))
writeFrame handle open
Frame 0 (MethodPayload (Connection_open_ok _)) <- readFrame handle
cChannels <- newMVar IM.empty
lastChanID <- newMVar 0
cClosed <- newMVar Nothing
writeLock <- newMVar ()
ccl <- newEmptyMVar
cClosedHandlers <- newMVar []
let conn = Connection handle cChannels (fromIntegral maxFrameSize) cClosed ccl writeLock cClosedHandlers lastChanID
void $ forkIO $ CE.finally (connectionReceiver conn)
(do
CE.catch (hClose handle) (\(_ :: CE.SomeException) -> return ())
modifyMVar_ cClosed $ return . Just . maybe "closed" id
void $ withMVar cChannels $ mapM_ (\c -> killThread $ snd c) . IM.elems
void $ withMVar cChannels $ const $ return $ IM.empty
void $ tryPutMVar ccl ()
withMVar cClosedHandlers sequence
)
return conn
where
start_ok = (Frame 0 (MethodPayload (Connection_start_ok (FieldTable M.empty)
(ShortString "AMQPLAIN")
(LongString $ T.pack $ drop 4 $ BL.unpack $ runPut $ put $ FieldTable $ M.fromList [("LOGIN",FVString loginName), ("PASSWORD", FVString loginPassword)])
(ShortString "en_US")) ))
open = (Frame 0 (MethodPayload (Connection_open
(ShortString vhost)
(ShortString $ T.pack "")
True)))
closeConnection :: Connection -> IO ()
closeConnection c = do
CE.catch (
withMVar (connWriteLock c) $ \_ -> writeFrame (connHandle c) $ (Frame 0 (MethodPayload (Connection_close
0
(ShortString "")
0
0
)))
)
(\ (_ :: CE.IOException) ->
return ()
)
readMVar $ connClosedLock c
return ()
addConnectionClosedHandler :: Connection -> Bool -> IO () -> IO ()
addConnectionClosedHandler conn ifClosed handler = do
withMVar (connClosed conn) $ \cc ->
case cc of
Just _ | ifClosed == True -> handler
_ -> modifyMVar_ (connClosedHandlers conn) $ \old -> return $ handler:old
readFrame :: Handle -> IO Frame
readFrame handle = do
dat <- BL.hGet handle 7
let len = fromIntegral $ peekFrameSize dat
dat' <- BL.hGet handle (len+1)
let ret = runGetOrFail get (BL.append dat dat')
case ret of
Left (_, _, errMsg) -> error $ "readFrame fail: " ++ errMsg
Right (_, consumedBytes, _) | consumedBytes /= fromIntegral (len+8) ->
error $ "readFrame: parser should read " ++ show (len+8) ++ " bytes; but read " ++ show consumedBytes
Right (_, _, frame) -> return frame
writeFrame :: Handle -> Frame -> IO ()
writeFrame handle = BL.hPut handle . runPut . put
data Channel = Channel {
connection :: Connection,
inQueue :: Chan FramePayload,
outstandingResponses :: MVar (Seq.Seq (MVar Assembly)),
channelID :: Word16,
lastConsumerTag :: MVar Int,
chanActive :: Lock,
chanClosed :: MVar (Maybe String),
consumers :: MVar (M.Map Text ((Message, Envelope) -> IO ()))
}
msgFromContentHeaderProperties :: ContentHeaderProperties -> BL.ByteString -> Message
msgFromContentHeaderProperties (CHBasic content_type _ headers delivery_mode _ correlation_id reply_to _ message_id timestamp _ _ _ _) body =
let msgId = fromShortString message_id
contentType = fromShortString content_type
replyTo = fromShortString reply_to
correlationID = fromShortString correlation_id
in Message body (fmap intToDeliveryMode delivery_mode) timestamp msgId contentType replyTo correlationID headers
where
fromShortString (Just (ShortString s)) = Just s
fromShortString _ = Nothing
msgFromContentHeaderProperties c _ = error ("Unknown content header properties: " ++ show c)
channelReceiver :: Channel -> IO ()
channelReceiver chan = do
p <- readAssembly $ inQueue chan
if isResponse p
then do
action <- modifyMVar (outstandingResponses chan) $ \val -> do
case Seq.viewl val of
x Seq.:< rest -> do
return (rest, putMVar x p)
Seq.EmptyL -> do
return (val, CE.throwIO $ userError "got response, but have no corresponding request")
action
else handleAsync p
channelReceiver chan
where
isResponse :: Assembly -> Bool
isResponse (ContentMethod (Basic_deliver _ _ _ _ _) _ _) = False
isResponse (ContentMethod (Basic_return _ _ _ _) _ _) = False
isResponse (SimpleMethod (Channel_flow _)) = False
isResponse (SimpleMethod (Channel_close _ _ _ _)) = False
isResponse _ = True
handleAsync (ContentMethod (Basic_deliver (ShortString consumerTag) deliveryTag redelivered (ShortString exchange)
(ShortString routingKey))
properties body) =
withMVar (consumers chan) (\s -> do
case M.lookup consumerTag s of
Just subscriber -> do
let msg = msgFromContentHeaderProperties properties body
let env = Envelope {envDeliveryTag = deliveryTag, envRedelivered = redelivered,
envExchangeName = exchange, envRoutingKey = routingKey, envChannel = chan}
CE.catch (subscriber (msg, env))
(\(e::CE.SomeException) -> putStrLn $ "AMQP callback threw exception: " ++ show e)
Nothing ->
return ()
)
handleAsync (SimpleMethod (Channel_close _ (ShortString errorMsg) _ _)) = do
closeChannel' chan errorMsg
killThread =<< myThreadId
handleAsync (SimpleMethod (Channel_flow active)) = do
if active
then openLock $ chanActive chan
else closeLock $ chanActive chan
return ()
--Basic.return
handleAsync (ContentMethod (Basic_return _ _ _ _) _ _) =
putStrLn ("BASIC.RETURN not implemented" :: String)
handleAsync m = error ("Unknown method: " ++ show m)
closeChannel' :: Channel -> Text -> IO ()
closeChannel' c reason = do
modifyMVar_ (connChannels $ connection c) $ \old -> return $ IM.delete (fromIntegral $ channelID c) old
modifyMVar_ (chanClosed c) $ \x -> do
if isNothing x
then do
void $ killLock $ chanActive c
killOutstandingResponses $ outstandingResponses c
return $ Just $ maybe (T.unpack reason) id x
else return x
where
killOutstandingResponses :: MVar (Seq.Seq (MVar a)) -> IO ()
killOutstandingResponses outResps = do
modifyMVar_ outResps $ \val -> do
F.mapM_ (\x -> tryPutMVar x $ error "channel closed") val
return undefined
openChannel :: Connection -> IO Channel
openChannel c = do
newInQueue <- newChan
outRes <- newMVar Seq.empty
lastConsTag <- newMVar 0
ca <- newLock
closed <- newMVar Nothing
conss <- newMVar M.empty
newChannelID <- modifyMVar (lastChannelID c) $ \x -> return (x+1, x+1)
let newChannel = Channel c newInQueue outRes (fromIntegral newChannelID) lastConsTag ca closed conss
thrID <- forkIO $ CE.finally (channelReceiver newChannel)
(closeChannel' newChannel "closed")
modifyMVar_ (connChannels c) (return . IM.insert newChannelID (newChannel, thrID))
(SimpleMethod (Channel_open_ok _)) <- request newChannel (SimpleMethod (Channel_open (ShortString "")))
return newChannel
writeFrames :: Channel -> [FramePayload] -> IO ()
writeFrames chan payloads =
let conn = connection chan in
withMVar (connChannels conn) $ \chans ->
if IM.member (fromIntegral $ channelID chan) chans
then
CE.catch
(withMVar (connWriteLock conn) $ \_ ->
mapM_ (\payload -> writeFrame (connHandle conn) (Frame (channelID chan) payload)) payloads)
( \(_ :: CE.IOException) -> do
CE.throwIO $ userError "connection not open"
)
else do
CE.throwIO $ userError "channel not open"
writeAssembly' :: Channel -> Assembly -> IO ()
writeAssembly' chan (ContentMethod m properties msg) = do
waitLock $ chanActive chan
let !toWrite =
[(MethodPayload m),
(ContentHeaderPayload
(getClassIDOf properties) --classID
0
(fromIntegral $ BL.length msg) --bodySize
properties)] ++
(if BL.length msg > 0
then do
map ContentBodyPayload
(splitLen msg $ (fromIntegral $ connMaxFrameSize $ connection chan) 8)
else []
)
writeFrames chan toWrite
where
splitLen str len | BL.length str > len = (BL.take len str):(splitLen (BL.drop len str) len)
splitLen str _ = [str]
writeAssembly' chan (SimpleMethod m) = writeFrames chan [MethodPayload m]
writeAssembly :: Channel -> Assembly -> IO ()
writeAssembly chan m =
CE.catches
(writeAssembly' chan m)
[CE.Handler (\ (_ :: AMQPException) -> throwMostRelevantAMQPException chan),
CE.Handler (\ (_ :: CE.ErrorCall) -> throwMostRelevantAMQPException chan),
CE.Handler (\ (_ :: CE.IOException) -> throwMostRelevantAMQPException chan)]
request :: Channel -> Assembly -> IO Assembly
request chan m = do
res <- newEmptyMVar
CE.catches (do
withMVar (chanClosed chan) $ \cc -> do
if isNothing cc
then do
modifyMVar_ (outstandingResponses chan) $ \val -> return $! val Seq.|> res
writeAssembly' chan m
else CE.throwIO $ userError "closed"
!r <- takeMVar res
return r
)
[CE.Handler (\ (_ :: AMQPException) -> throwMostRelevantAMQPException chan),
CE.Handler (\ (_ :: CE.ErrorCall) -> throwMostRelevantAMQPException chan),
CE.Handler (\ (_ :: CE.IOException) -> throwMostRelevantAMQPException chan)]
throwMostRelevantAMQPException :: Channel -> IO a
throwMostRelevantAMQPException chan = do
cc <- readMVar $ connClosed $ connection chan
case cc of
Just r -> CE.throwIO $ ConnectionClosedException r
Nothing -> do
chc <- readMVar $ chanClosed chan
case chc of
Just r -> CE.throwIO $ ChannelClosedException r
Nothing -> CE.throwIO $ ConnectionClosedException "unknown reason"
qos :: Channel -> Word32 -> Word16 -> IO ()
qos chan prefetchSize prefetchCount = do
(SimpleMethod Basic_qos_ok) <- request chan (SimpleMethod (Basic_qos
prefetchSize
prefetchCount
False
))
return ()
data AMQPException =
ChannelClosedException String
| ConnectionClosedException String
deriving (Typeable, Show, Ord, Eq)
instance CE.Exception AMQPException