{-# LANGUAGE OverloadedStrings #-} -- generic AMQP rpc server import Control.Concurrent import qualified Control.Exception as X import Control.Monad import Data.Map (singleton) import Data.Maybe import qualified Data.Text as T import Data.Time import Data.Version (showVersion) import Network.AMQP import Network.AMQP.Types import Network.AMQP.Utils.Connection import Network.AMQP.Utils.Helpers import Network.AMQP.Utils.Options import Paths_amqp_utils (version) import System.Environment import System.IO main :: IO () main = do hr "starting" tid <- myThreadId args <- getArgs >>= parseargs 'r' X.onException (printparam' "worker" $ fromJust $ fileProcess args) (error "-X option required") let addiArgs = reverse $ additionalArgs args printparam' "client version" $ "amqp-utils " ++ (showVersion version) (conn, chan) <- connect args addChannelExceptionHandler chan (X.throwTo tid) queue <- maybe (declareQueue chan newQueue {queueExclusive = True, queueName = (T.pack $ tmpQName args)} >>= (\(x, _, _) -> return x)) (return) (fmap T.pack (qName args)) printparam' "queue name" $ T.unpack queue ctag <- consumeMsgs chan queue (if ack args then Ack else NoAck) (rpcServerCallback tid args addiArgs chan) printparam' "consumer tag" $ T.unpack ctag printparam' "send acks" $ show (ack args) printparam "requeue if rejected" $ if (ack args) then Just (show (requeuenack args)) else Nothing hr "entering main loop" X.catch (forever $ threadDelay 5000000) (\exception -> printparam' "exception" $ show (exception :: X.SomeException)) closeConnection conn hr "connection closed" rpcServerCallback :: ThreadId -> Args -> [String] -> Channel -> (Message, Envelope) -> IO () rpcServerCallback tid a addi c m@(msg, env) = do let numstring = show $ envDeliveryTag env hr $ "BEGIN " ++ numstring now <- getZonedTime callbackoptions <- X.catch (printmsg stderr m (anRiss a) now) (\x -> X.throwTo tid (x :: X.SomeException) >> return []) either (\e -> printparam' "ERROR" (show (e :: X.SomeException))) return =<< X.try (optionalFileStuff m callbackoptions addi numstring a tid (Just reply)) hr $ "END " ++ numstring where reply e contents = do void $ publishMsg c (envExchangeName env) (fromJust $ msgReplyTo msg) newMsg { msgBody = contents , msgCorrelationID = msgCorrelationID msg , msgTimestamp = msgTimestamp msg , msgExpiration = msgExpiration msg , msgHeaders = Just $ FieldTable $ singleton "exitcode" $ FVString $ T.pack $ show e }