{-# LANGUAGE CPP #-} -- generic AMQP publisher import Control.Concurrent import qualified Control.Exception as X import Control.Monad (forever) import qualified Data.ByteString.Lazy.Char8 as BL #if MIN_VERSION_hinotify(0,3,10) import qualified Data.ByteString.Char8 as BS #endif import Data.List (isSuffixOf) import Data.Maybe import qualified Data.Text as T import Data.Time import Data.Time.Clock.POSIX import Data.Version (showVersion) import Data.Word (Word64) import Magic import Network.AMQP import Network.AMQP.Types import Network.AMQP.Utils.Connection import Network.AMQP.Utils.Helpers import Network.AMQP.Utils.Options import Paths_amqp_utils (version) import System.Environment import System.Exit import System.INotify import qualified System.Posix.Files as F main :: IO () main = do hr "starting" tid <- myThreadId args <- getArgs >>= parseargs 'a' printparam "client version" ["amqp-utils", showVersion version] printparam "routing key" $ rKey args printparam "exchange" $ currentExchange args isDir <- if inputFile args == "-" then return False else F.getFileStatus (inputFile args) >>= return . F.isDirectory if isDir then printparam "hotfolder" $ inputFile args else printparam "input file" [ inputFile args , if (lineMode args) then "(line-by-line)" else "" ] (conn, chan) <- connect args addChannelExceptionHandler chan (X.throwTo tid) printparam "confirm mode" $ confirm args if (confirm args) then do confirmSelect chan False addConfirmationListener chan confirmCallback else return () let publishOneMsg = publishOneMsg' chan args X.catch (if isDir then do inotify <- initINotify wd <- addWatch inotify [CloseWrite, MoveIn] #if MIN_VERSION_hinotify(0,3,10) (BS.pack (inputFile args)) #else (inputFile args) #endif (handleEvent publishOneMsg (suffix args) (inputFile args)) hr $ "BEGIN watching " ++ (inputFile args) _ <- forever $ threadDelay 1000000 removeWatch wd hr $ "END watching " ++ (inputFile args) else do hr $ "BEGIN sending" messageFile <- if inputFile args == "-" then BL.getContents else BL.readFile (inputFile args) if (lineMode args) then mapM_ (publishOneMsg Nothing) (BL.lines messageFile) else publishOneMsg (Just (inputFile args)) messageFile hr "END sending") exceptionHandler -- all done. wait and close. if (confirm args) then waitForConfirms chan >>= printparam "confirmed" else return () X.catch (closeConnection conn) exceptionHandler -- | A handler for clean exit exceptionHandler :: AMQPException -> IO () exceptionHandler (ChannelClosedException Normal txt) = printparam "exit" txt >> exitWith ExitSuccess exceptionHandler (ConnectionClosedException Normal txt) = printparam "exit" txt >> exitWith ExitSuccess exceptionHandler x = printparam "exception" x >> exitWith (ExitFailure 1) -- | The handler for publisher confirms confirmCallback :: (Word64, Bool, AckType) -> IO () confirmCallback (deliveryTag, isAll, ackType) = printparam "confirmed" [ show deliveryTag , if isAll then "all" else "this" , show ackType ] -- | Hotfolder event handler handleEvent :: (Maybe String -> BL.ByteString -> IO ()) -> [String] -> String -> Event -> IO () -- just handle closewrite and movedin events #if MIN_VERSION_hinotify(0,3,10) handleEvent f s p (Closed False (Just x) True) = handleFile f s (p ++ "/" ++ (BS.unpack x)) handleEvent f s p (MovedIn False x _) = handleFile f s (p ++ "/" ++ (BS.unpack x)) #else handleEvent f s p (Closed False (Just x) True) = handleFile f s (p ++ "/" ++ x) handleEvent f s p (MovedIn False x _) = handleFile f s (p ++ "/" ++ x) #endif handleEvent _ _ _ _ = return () -- | Hotfolder file handler handleFile :: (Maybe String -> BL.ByteString -> IO ()) -> [String] -> FilePath -> IO () handleFile _ _ ('.':_) = return () -- ignore hidden files handleFile f s@(_:_) x = if any (flip isSuffixOf x) s then handleFile f [] x else return () handleFile f [] x = X.catch (BL.readFile x >>= f (Just x)) (\e -> printparam "exception in handleFile" (e :: X.SomeException)) -- | Publish one message with our settings publishOneMsg' :: Channel -> Args -> Maybe String -> BL.ByteString -> IO () publishOneMsg' c a fn f = do printparam "sending" fn (mtype, mencoding) <- if (magic a) && isJust fn then do m <- magicOpen [MagicMimeType] magicLoadDefault m t <- magicFile m (fromJust fn) magicSetFlags m [MagicMimeEncoding] e <- magicFile m (fromJust fn) return (Just (T.pack t), Just (T.pack e)) else return ((contenttype a), (contentencoding a)) now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds publishMsg c (T.pack $ currentExchange a) (T.pack $ rKey a) newMsg { msgBody = f , msgDeliveryMode = persistent a , msgTimestamp = Just now , msgID = msgid a , msgType = msgtype a , msgUserID = userid a , msgApplicationID = appid a , msgClusterID = clusterid a , msgContentType = mtype , msgContentEncoding = mencoding , msgReplyTo = replyto a , msgPriority = prio a , msgCorrelationID = corrid a , msgExpiration = msgexp a , msgHeaders = substheader (fnheader a) fn $ msgheader a } >>= printparam "sent" where substheader :: [String] -> Maybe String -> Maybe FieldTable -> Maybe FieldTable substheader (s:r) (Just fname) old = substheader r (Just fname) (addheader old (s ++ "=" ++ fname)) substheader _ _ old = old