-- generic amqp consumer import Control.Concurrent import qualified Control.Exception as X import Control.Monad import qualified Data.Text as T import Data.Time import Data.Version (showVersion) import Network.AMQP import Network.AMQP.Utils.Connection import Network.AMQP.Utils.Helpers import Network.AMQP.Utils.Options import Paths_amqp_utils (version) import System.Environment import System.IO main :: IO () main = do hr "starting" tid <- myThreadId args <- getArgs >>= parseargs 'k' let addiArgs = reverse $ additionalArgs args printparam' "client version" $ "amqp-utils " ++ (showVersion version) (conn, chan) <- connect args addChannelExceptionHandler chan (X.throwTo tid) -- set prefetch printparam' "prefetch" $ show $ preFetch args qos chan 0 (fromIntegral $ preFetch args) False -- attach to given queue? or build exclusive queue and bind it? queue <- maybe (tempQueue chan (tmpQName args) (bindings args) (currentExchange args)) (return) (fmap T.pack (qName args)) printparam' "queue name" $ T.unpack queue printparam "shown body chars" $ fmap show $ anRiss args printparam "temp dir" $ tempDir args printparam "callback" $ fileProcess args printparam "callback args" $ listToMaybeUnwords addiArgs -- subscribe to the queue ctag <- consumeMsgs chan queue (if ack args then Ack else NoAck) (myCallback args addiArgs tid) printparam' "consumer tag" $ T.unpack ctag printparam' "send acks" $ show (ack args) printparam "requeue if rejected" $ if (ack args) then Just (show (requeuenack args)) else Nothing hr "entering main loop" X.catch (forever $ threadDelay 5000000) (\exception -> printparam' "exception" $ show (exception :: X.SomeException)) closeConnection conn hr "connection closed" -- | exclusive temp queue tempQueue :: Channel -> String -> [(String, String)] -> String -> IO T.Text tempQueue chan tmpqname bindlist x = do (q, _, _) <- declareQueue chan newQueue {queueExclusive = True, queueName = T.pack tmpqname} mapM_ (\(xchange, bkey) -> bindQueue chan q (T.pack xchange) (T.pack bkey) >> printparam' "binding" (xchange ++ ":" ++ bkey)) (if null bindlist then [(x, "#")] else bindlist) return q -- | process received message myCallback :: Args -> [String] -> ThreadId -> (Message, Envelope) -> IO () myCallback a addi tid m@(_, envi) = do let numstring = show $ envDeliveryTag envi hr $ "BEGIN " ++ numstring now <- getZonedTime callbackoptions <- X.catch (printmsg stderr m (anRiss a) now) (\x -> X.throwTo tid (x :: X.SomeException) >> return []) either (\e -> printparam' "ERROR" (show (e :: X.SomeException)) >> reje envi a) return =<< X.try (optionalFileStuff m callbackoptions addi numstring a tid Nothing) hr $ "END " ++ numstring