{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}

module Network.AMQP.Worker.Connection
    ( connect
    , connect'
    , disconnect
    , withChannel
    , Connection (..)
    , WorkerOpts (..)
    , ExchangeName
    , AMQP.ConnectionOpts (..)
    , AMQP.defaultConnectionOpts
    , AMQP.fromURI
    ) where

import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, readMVar, takeMVar)
import Control.Monad.Catch (catch, throwM)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.Function ((&))
import Data.Pool (Pool)
import qualified Data.Pool as Pool
import Data.Text (Text)
import Network.AMQP (AMQPException (..), Channel)
import qualified Network.AMQP as AMQP

type ExchangeName = Text

data Connection = Connection
    { Connection -> MVar Connection
amqpConn :: MVar AMQP.Connection
    , Connection -> Pool Channel
pool :: Pool Channel
    , Connection -> ExchangeName
exchange :: ExchangeName
    }

data WorkerOpts = WorkerOpts
    { WorkerOpts -> ExchangeName
exchange :: ExchangeName
    -- ^ Everything goes on one exchange
    , WorkerOpts -> Double
openTime :: Double
    -- ^ Number of seconds connections in the pool remain open for re-use
    , WorkerOpts -> Int
maxChannels :: Int
    -- ^ Number of concurrent connectinos available in the pool
    , WorkerOpts -> Maybe Int
numStripes :: Maybe Int
    }
    deriving (Int -> WorkerOpts -> ShowS
[WorkerOpts] -> ShowS
WorkerOpts -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [WorkerOpts] -> ShowS
$cshowList :: [WorkerOpts] -> ShowS
show :: WorkerOpts -> String
$cshow :: WorkerOpts -> String
showsPrec :: Int -> WorkerOpts -> ShowS
$cshowsPrec :: Int -> WorkerOpts -> ShowS
Show, WorkerOpts -> WorkerOpts -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: WorkerOpts -> WorkerOpts -> Bool
$c/= :: WorkerOpts -> WorkerOpts -> Bool
== :: WorkerOpts -> WorkerOpts -> Bool
$c== :: WorkerOpts -> WorkerOpts -> Bool
Eq)

-- | Connect to the AMQP server using simple defaults
--
-- > conn <- connect (fromURI "amqp://guest:guest@localhost:5672")
connect :: MonadIO m => AMQP.ConnectionOpts -> m Connection
connect :: forall (m :: * -> *). MonadIO m => ConnectionOpts -> m Connection
connect ConnectionOpts
opts =
    forall (m :: * -> *).
MonadIO m =>
ConnectionOpts -> WorkerOpts -> m Connection
connect' ConnectionOpts
opts forall a b. (a -> b) -> a -> b
$
        WorkerOpts
            { $sel:exchange:WorkerOpts :: ExchangeName
exchange = ExchangeName
"amq.topic"
            , $sel:openTime:WorkerOpts :: Double
openTime = Double
10
            , $sel:maxChannels:WorkerOpts :: Int
maxChannels = Int
8
            , $sel:numStripes:WorkerOpts :: Maybe Int
numStripes = forall a. a -> Maybe a
Just Int
1
            }

connect' :: MonadIO m => AMQP.ConnectionOpts -> WorkerOpts -> m Connection
connect' :: forall (m :: * -> *).
MonadIO m =>
ConnectionOpts -> WorkerOpts -> m Connection
connect' ConnectionOpts
copt WorkerOpts
wopt = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
    -- create a single connection in an mvar
    MVar Connection
cvar <- forall a. IO (MVar a)
newEmptyMVar
    MVar Connection -> IO ()
openConnection MVar Connection
cvar

    -- open a shared pool for channels
    Pool Channel
chans <- forall a. PoolConfig a -> IO (Pool a)
Pool.newPool (MVar Connection -> PoolConfig Channel
config MVar Connection
cvar)

    forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ MVar Connection -> Pool Channel -> ExchangeName -> Connection
Connection MVar Connection
cvar Pool Channel
chans WorkerOpts
wopt.exchange
  where
    config :: MVar Connection -> PoolConfig Channel
config MVar Connection
cvar =
        forall a. IO a -> (a -> IO ()) -> Double -> Int -> PoolConfig a
Pool.defaultPoolConfig (MVar Connection -> IO Channel
create MVar Connection
cvar) Channel -> IO ()
destroy WorkerOpts
wopt.openTime WorkerOpts
wopt.maxChannels
            forall a b. a -> (a -> b) -> b
& forall a. Maybe Int -> PoolConfig a -> PoolConfig a
Pool.setNumStripes WorkerOpts
wopt.numStripes

    openConnection :: MVar Connection -> IO ()
openConnection MVar Connection
cvar = do
        -- open a connection and store in the mvar
        Connection
conn <- ConnectionOpts -> IO Connection
AMQP.openConnection'' ConnectionOpts
copt
        forall a. MVar a -> a -> IO ()
putMVar MVar Connection
cvar Connection
conn

    reopenConnection :: MVar Connection -> IO ()
reopenConnection MVar Connection
cvar = do
        -- clear the mvar and reopen
        Connection
_ <- forall a. MVar a -> IO a
takeMVar MVar Connection
cvar
        MVar Connection -> IO ()
openConnection MVar Connection
cvar

    create :: MVar Connection -> IO Channel
create MVar Connection
cvar = do
        Connection
conn <- forall a. MVar a -> IO a
readMVar MVar Connection
cvar
        forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
catch (Connection -> IO Channel
AMQP.openChannel Connection
conn) (MVar Connection -> AMQPException -> IO Channel
createEx MVar Connection
cvar)

    -- Reopen closed connections
    createEx :: MVar Connection -> AMQPException -> IO Channel
createEx MVar Connection
cvar (ConnectionClosedException CloseType
_ String
_) = do
        MVar Connection -> IO ()
reopenConnection MVar Connection
cvar
        MVar Connection -> IO Channel
create MVar Connection
cvar
    createEx MVar Connection
_ AMQPException
ex = forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM AMQPException
ex

    destroy :: Channel -> IO ()
destroy Channel
chan = do
        Channel -> IO ()
AMQP.closeChannel Channel
chan

disconnect :: MonadIO m => Connection -> m ()
disconnect :: forall (m :: * -> *). MonadIO m => Connection -> m ()
disconnect Connection
c = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
    Connection
conn <- forall a. MVar a -> IO a
readMVar forall a b. (a -> b) -> a -> b
$ Connection -> MVar Connection
amqpConn Connection
c
    forall a. Pool a -> IO ()
Pool.destroyAllResources forall a b. (a -> b) -> a -> b
$ Connection -> Pool Channel
pool Connection
c
    Connection -> IO ()
AMQP.closeConnection Connection
conn

-- | Perform an action with a channel resource, and give it back at the end
withChannel :: Connection -> (Channel -> IO b) -> IO b
withChannel :: forall b. Connection -> (Channel -> IO b) -> IO b
withChannel (Connection MVar Connection
_ Pool Channel
p ExchangeName
_) Channel -> IO b
action = do
    forall a r. Pool a -> (a -> IO r) -> IO r
Pool.withResource Pool Channel
p Channel -> IO b
action