{-# LANGUAGE CPP #-} {-# LANGUAGE OverloadedStrings #-} -- generic AMQP publisher import Control.Concurrent import qualified Control.Exception as X import qualified Data.ByteString.Lazy.Char8 as BL #if linux_HOST_OS #if MIN_VERSION_hinotify(0,3,10) import qualified Data.ByteString.Char8 as BS #endif import Data.List (isSuffixOf) #endif import Data.Maybe import qualified Data.Text as T import Data.Time import Data.Time.Clock.POSIX import Data.Version (showVersion) import Data.Word (Word64) import Magic import Network.AMQP import Network.AMQP.Types import Network.AMQP.Utils.Connection import Network.AMQP.Utils.Helpers import Network.AMQP.Utils.Options import Paths_amqp_utils (version) import System.Directory import System.Environment import System.Exit #if linux_HOST_OS import System.INotify #endif import qualified System.Posix.Files as F main :: IO () main = do hr "starting" tid <- myThreadId args <- getArgs >>= parseargs 'a' printparam "client version" ["amqp-utils", showVersion version] printparam "routing key" $ rKey args printparam "exchange" $ currentExchange args isDir <- if inputFile args == "-" then return False else F.getFileStatus (inputFile args) >>= return . F.isDirectory if isDir then printparam "hotfolder" (inputFile args) >> printparam "initial scan" (initialScan args) else printparam "input file" [ inputFile args , if (lineMode args) then "(line-by-line)" else "" ] printparam "remove sent file" (removeSentFile args && isDir) (conn, chan) <- connect args addChannelExceptionHandler chan (X.throwTo tid) printparam "confirm mode" $ confirm args if (confirm args) then do confirmSelect chan False addConfirmationListener chan confirmCallback else return () let publishOneMsg = publishOneMsg' chan args {removeSentFile = removeSentFile args && isDir} X.catch (if isDir then do #if linux_HOST_OS setCurrentDirectory (inputFile args) if (initialScan args) then getDirectoryContents "." >>= mapM_ (\fn -> handleFile publishOneMsg (suffix args) fn) else return () inotify <- initINotify wd <- addWatch inotify [CloseWrite, MoveIn] "." (handleEvent publishOneMsg (suffix args)) hr $ "BEGIN watching " ++ (inputFile args) sleepingBeauty >>= printparam "exception" removeWatch wd hr $ "END watching " ++ (inputFile args) #else X.throw (X.ErrorCall "ERROR: watching a directory is only supported in Linux") #endif else do hr $ "BEGIN sending" messageFile <- if inputFile args == "-" then BL.getContents else BL.readFile (inputFile args) if (lineMode args) then mapM_ (publishOneMsg Nothing) (BL.lines messageFile) else publishOneMsg (Just (inputFile args)) messageFile hr "END sending") exceptionHandler -- all done. wait and close. if (confirm args) then waitForConfirms chan >>= printparam "confirmed" else return () X.catch (closeConnection conn) exceptionHandler -- | A handler for clean exit exceptionHandler :: AMQPException -> IO () exceptionHandler (ChannelClosedException Normal txt) = printparam "exit" txt >> exitWith ExitSuccess exceptionHandler (ConnectionClosedException Normal txt) = printparam "exit" txt >> exitWith ExitSuccess exceptionHandler x = printparam "exception" x >> exitWith (ExitFailure 1) -- | The handler for publisher confirms confirmCallback :: (Word64, Bool, AckType) -> IO () confirmCallback (deliveryTag, isAll, ackType) = printparam "confirmed" [ show deliveryTag , if isAll then "all" else "this" , show ackType ] #if linux_HOST_OS -- | Hotfolder event handler handleEvent :: (Maybe String -> BL.ByteString -> IO ()) -> [String] -> Event -> IO () -- just handle closewrite and movedin events #if MIN_VERSION_hinotify(0,3,10) handleEvent func suffixes (Closed False (Just fileName) True) = handleFile func suffixes (BS.unpack fileName) handleEvent func suffixes (MovedIn False fileName _) = handleFile func suffixes (BS.unpack fileName) #else handleEvent func suffixes (Closed False (Just fileName) True) = handleFile func suffixes fileName handleEvent func suffixes (MovedIn False fileName _) = handleFile func suffixes fileName #endif handleEvent _ _ _ = return () -- | Hotfolder file handler handleFile :: (Maybe String -> BL.ByteString -> IO ()) -> [String] -> FilePath -> IO () handleFile _ _ ('.':_) = return () -- ignore hidden files handleFile func suffixes@(_:_) fileName = if any (flip isSuffixOf fileName) suffixes then handleFile func [] fileName else return () handleFile func [] fileName = X.catch (BL.readFile fileName >>= func (Just fileName)) (\e -> printparam "exception in handleFile" (e :: X.IOException)) #endif -- | Publish one message with our settings publishOneMsg' :: Channel -> Args -> Maybe FilePath -> BL.ByteString -> IO () publishOneMsg' chan a fn content = do printparam "sending" fn (mtype, mencoding) <- if (magic a) && isJust fn then do m <- magicOpen [MagicMimeType] magicLoadDefault m t <- magicFile m (fromJust fn) magicSetFlags m [MagicMimeEncoding] e <- magicFile m (fromJust fn) return (Just (T.pack t), Just (T.pack e)) else return ((contenttype a), (contentencoding a)) now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds publishMsg chan (T.pack $ currentExchange a) (T.pack $ rKey a) newMsg { msgBody = content , msgDeliveryMode = persistent a , msgTimestamp = Just now , msgID = msgid a , msgType = msgtype a , msgUserID = userid a , msgApplicationID = appid a , msgClusterID = clusterid a , msgContentType = mtype , msgContentEncoding = mencoding , msgReplyTo = replyto a , msgPriority = prio a , msgCorrelationID = corrid a , msgExpiration = msgexp a , msgHeaders = substheader (fnheader a) fn $ msgheader a } >>= printparam "sent" removeSentFileIfRequested (removeSentFile a) fn where substheader :: [String] -> Maybe String -> Maybe FieldTable -> Maybe FieldTable substheader (s:r) (Just fname) old = substheader r (Just fname) (addheader old (s ++ "=" ++ fname)) substheader _ _ old = old removeSentFileIfRequested False _ = return () removeSentFileIfRequested True Nothing = return () removeSentFileIfRequested True (Just fname) = printparam "removing" fname >> removeFile fname