module Network.AMQP.Conduit (
amqpReceiveSource
, amqpSendSink
, AmqpConf (..)
, AmqpConn (..)
, ExchangeKey
, Exchange
, QueueName
, AmqpURI
, withChannel
, createConnectionChannel
, destoryConnection
, createQueue
, createExchange
, bindQueueExchange
, createConsumer
, deleteConsumer
, pauseConsumer
, resumeConsumer
) where
import Control.Exception (throwIO)
import Control.Exception.Lifted (bracket)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Conduit (Sink, Source, await, yield)
import Data.Text (Text)
import Network.AMQP (AMQPException (ConnectionClosedException),
Ack, Channel, Connection,
ConsumerTag, Envelope,
ExchangeOpts, Message, QueueOpts,
addConnectionClosedHandler,
bindQueue, cancelConsumer,
closeConnection, consumeMsgs,
declareExchange, declareQueue,
exchangeName, flow, fromURI,
openChannel, openConnection'',
publishMsg, queueName)
data AmqpConn = AmqpConn
{ amqpConn :: Connection
, amqpChan :: (Channel, Maybe ConsumerTag)
}
data AmqpConf = AmqpConf
{
amqpUri :: AmqpURI
, amqpQueue :: QueueOpts
, amqpExchange :: ExchangeOpts
, amqpExchanKey :: ExchangeKey
}
type ExchangeKey = Text
type Exchange = Text
type QueueName = Text
type AmqpURI = String
withChannel:: (MonadIO m, MonadBaseControl IO m)
=> AmqpConf
-> (AmqpConn -> m a)
-> m a
withChannel conf f = do
bracket
(do
liftIO $ connect (amqpUri conf))
(\conn -> do
liftIO $ disconnect conn)
(\conn -> do
f conn)
createConnectionChannel :: AmqpConf
-> IO AmqpConn
createConnectionChannel conf = connect $ amqpUri conf
destoryConnection :: AmqpConn
-> IO ()
destoryConnection conn = do
addConnectionClosedHandler (amqpConn conn) False (return ())
closeConnection (amqpConn conn)
createConsumer :: AmqpConn
-> QueueName
-> Ack
-> ((Message, Envelope) -> IO ())
-> IO AmqpConn
createConsumer conn queue ack f = do
tag' <- getTag chan
return $ conn {amqpChan =(chan, Just tag')}
where
getTag chan' = consumeMsgs chan' queue ack f
chan = fst $ amqpChan conn
deleteConsumer :: AmqpConn -> IO AmqpConn
deleteConsumer conn =
case tag of
Nothing -> return conn
Just s -> do
putStrLn "cancel cunsumer channel."
cancelConsumer chan s
return $ conn {amqpChan = (chan, Nothing)}
where
(chan, tag) = amqpChan conn
pauseConsumer :: AmqpConn
-> IO AmqpConn
pauseConsumer chan = flowConsumer chan False
resumeConsumer :: AmqpConn
-> IO AmqpConn
resumeConsumer chan = flowConsumer chan True
flowConsumer :: AmqpConn
-> Bool
-> IO AmqpConn
flowConsumer conn flag =
case tag of
Nothing -> return conn
Just _ -> do
flow chan flag
return conn
where
(chan, tag) = amqpChan conn
createQueue :: AmqpConf -> AmqpConn -> IO (QueueName, Int, Int)
createQueue conf conn = declareQueue (fst $ amqpChan conn) (amqpQueue conf)
createExchange :: AmqpConf -> AmqpConn -> IO ()
createExchange conf conn = declareExchange (fst $ amqpChan conn) (amqpExchange conf)
bindQueueExchange :: AmqpConf -> AmqpConn -> IO ()
bindQueueExchange conf conn =
bindQueue (fst $ amqpChan conn) (queueName (amqpQueue conf)) (exchangeName (amqpExchange conf)) (amqpExchanKey conf)
connect :: AmqpURI
-> IO AmqpConn
connect uri = do
conn <- openConnection'' $ fromURI uri
chan <- openChannel conn
addConnectionClosedHandler conn True (throwIO (ConnectionClosedException "Connection Closed."))
return $ AmqpConn conn (chan, Nothing)
disconnect :: AmqpConn
-> IO ()
disconnect conn = do
closeConnection (amqpConn conn)
sendMsg :: AmqpConn
-> Exchange
-> ExchangeKey
-> Message
-> IO ()
sendMsg conn exchange key msg = do
publishMsg chan exchange key msg
where
chan = fst (amqpChan conn)
amqpReceiveSource :: MonadIO m
=> (Message, Envelope)
-> Source m (Message, Envelope)
amqpReceiveSource (msg, env) = loop
where
loop = do
yield (msg, env)
loop
amqpSendSink :: MonadIO m
=> AmqpConn
-> Exchange
-> ExchangeKey
-> Sink Message m ()
amqpSendSink conn exchange key = loop
where
loop = await >>= maybe (return ()) (\v -> (liftIO $ sendMsg conn exchange key v) >> loop)