{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE OverloadedStrings #-}
module Network.AMQP.Worker.Connection
( Connection (..)
, connect
, disconnect
, withChannel
) where
import Control.Concurrent.MVar
( MVar
, newEmptyMVar
, putMVar
, readMVar
, takeMVar
)
import Control.Monad.Catch (catch, throwM)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.Pool (Pool)
import qualified Data.Pool as Pool
import Data.Text (Text)
import Network.AMQP (AMQPException (..), Channel)
import qualified Network.AMQP as AMQP
type ExchangeName = Text
data Connection = Connection
{ Connection -> MVar Connection
amqpConn :: MVar AMQP.Connection
, Connection -> Pool Channel
pool :: Pool Channel
, Connection -> ExchangeName
exchange :: ExchangeName
}
connect :: MonadIO m => AMQP.ConnectionOpts -> m Connection
connect :: forall (m :: * -> *). MonadIO m => ConnectionOpts -> m Connection
connect ConnectionOpts
opts = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
let exchangeName :: ExchangeName
exchangeName = ExchangeName
"amq.topic"
MVar Connection
cvar <- forall a. IO (MVar a)
newEmptyMVar
MVar Connection -> IO ()
openConnection MVar Connection
cvar
let config :: PoolConfig Channel
config = forall a. IO a -> (a -> IO ()) -> Double -> Int -> PoolConfig a
Pool.defaultPoolConfig (MVar Connection -> IO Channel
create MVar Connection
cvar) Channel -> IO ()
destroy Double
openTime Int
numChans
Pool Channel
chans <- forall a. PoolConfig a -> IO (Pool a)
Pool.newPool PoolConfig Channel
config
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ MVar Connection -> Pool Channel -> ExchangeName -> Connection
Connection MVar Connection
cvar Pool Channel
chans ExchangeName
exchangeName
where
openTime :: Double
openTime = Double
10
numChans :: Int
numChans = Int
4
openConnection :: MVar Connection -> IO ()
openConnection MVar Connection
cvar = do
Connection
conn <- ConnectionOpts -> IO Connection
AMQP.openConnection'' ConnectionOpts
opts
forall a. MVar a -> a -> IO ()
putMVar MVar Connection
cvar Connection
conn
reopenConnection :: MVar Connection -> IO ()
reopenConnection MVar Connection
cvar = do
Connection
_ <- forall a. MVar a -> IO a
takeMVar MVar Connection
cvar
MVar Connection -> IO ()
openConnection MVar Connection
cvar
create :: MVar Connection -> IO Channel
create MVar Connection
cvar = do
Connection
conn <- forall a. MVar a -> IO a
readMVar MVar Connection
cvar
Channel
chan <- forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
catch (Connection -> IO Channel
AMQP.openChannel Connection
conn) (MVar Connection -> AMQPException -> IO Channel
createEx MVar Connection
cvar)
forall (m :: * -> *) a. Monad m => a -> m a
return Channel
chan
createEx :: MVar Connection -> AMQPException -> IO Channel
createEx MVar Connection
cvar (ConnectionClosedException CloseType
_ String
_) = do
MVar Connection -> IO ()
reopenConnection MVar Connection
cvar
MVar Connection -> IO Channel
create MVar Connection
cvar
createEx MVar Connection
_ AMQPException
ex = forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM AMQPException
ex
destroy :: Channel -> IO ()
destroy Channel
chan = do
Channel -> IO ()
AMQP.closeChannel Channel
chan
disconnect :: MonadIO m => Connection -> m ()
disconnect :: forall (m :: * -> *). MonadIO m => Connection -> m ()
disconnect Connection
c = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
Connection
conn <- forall a. MVar a -> IO a
readMVar forall a b. (a -> b) -> a -> b
$ Connection -> MVar Connection
amqpConn Connection
c
forall a. Pool a -> IO ()
Pool.destroyAllResources forall a b. (a -> b) -> a -> b
$ Connection -> Pool Channel
pool Connection
c
Connection -> IO ()
AMQP.closeConnection Connection
conn
withChannel :: Connection -> (Channel -> IO b) -> IO b
withChannel :: forall b. Connection -> (Channel -> IO b) -> IO b
withChannel (Connection MVar Connection
_ Pool Channel
p ExchangeName
_) Channel -> IO b
action = do
forall a r. Pool a -> (a -> IO r) -> IO r
Pool.withResource Pool Channel
p Channel -> IO b
action