module Network.AMQP.Utils.Helpers where import Control.Concurrent import Control.Monad import qualified Data.ByteString.Lazy.Char8 as BL import Data.List import qualified Data.Map as M import Data.Maybe import qualified Data.Text as T import Data.Time import Data.Time.Clock.POSIX import Network.AMQP import Network.AMQP.Types import Network.AMQP.Utils.Options import System.Exit import System.IO import System.Process -- | log cmdline options listToMaybeUnwords :: [String] -> Maybe String listToMaybeUnwords [] = Nothing listToMaybeUnwords x = Just $ unwords x -- | Strings or ByteStrings with label, oder nothing at all printwithlabel :: String -> Maybe (IO ()) -> IO () printwithlabel _ Nothing = return () printwithlabel labl (Just i) = do mapM_ (hPutStr stderr) [" --- ", labl, ": "] i hFlush stderr -- | optional parameters printparam :: String -> Maybe String -> IO () printparam labl ms = printwithlabel labl $ fmap (hPutStrLn stderr) ms -- | required parameters printparam' :: String -> String -> IO () printparam' d s = printparam d (Just s) -- | head chars of body printbody :: Handle -> (String, Maybe BL.ByteString) -> IO () printbody h (labl, ms) = printwithlabel labl $ fmap (\s -> hPutStrLn stderr "" >> BL.hPutStr h s >> hPutStrLn stderr "") ms -- | log marker hr :: String -> IO () hr x = hPutStrLn stderr hr' >> hFlush stderr where hr' = take 72 $ (take 25 hr'') ++ " " ++ x ++ " " ++ hr'' hr'' = repeat '-' formatheaders :: ((T.Text, FieldValue) -> [a]) -> FieldTable -> [a] formatheaders f (FieldTable ll) = concat $ map f $ M.toList ll -- | log formatting fieldshow :: (T.Text, FieldValue) -> String fieldshow (k, v) = "\n " ++ T.unpack k ++ ": " ++ valueshow v -- | callback cmdline formatting fieldshow' :: (T.Text, FieldValue) -> [String] fieldshow' (k, v) = ["-h", T.unpack k ++ "=" ++ valueshow v] -- | showing a FieldValue valueshow :: FieldValue -> String valueshow (FVString value) = T.unpack value valueshow (FVInt32 value) = show value valueshow value = show value -- | skip showing body head if binary type isimage :: Maybe String -> Bool isimage Nothing = False isimage (Just ctype) | isPrefixOf "application/xml" ctype = False | isPrefixOf "application/json" ctype = False | otherwise = any (flip isPrefixOf ctype) ["application", "image"] -- | show the first bytes of message body anriss' :: Maybe Int -> BL.ByteString -> BL.ByteString anriss' x = case x of Nothing -> id Just y -> BL.take (fromIntegral y) -- | callback cmdline with optional parameters printopt :: (String, Maybe String) -> [String] printopt (_, Nothing) = [] printopt (opt, Just s) = [opt, s] -- | prints header and head on stderr and returns cmdline options to callback printmsg :: Handle -> (Message, Envelope) -> Maybe Int -> ZonedTime -> IO [String] printmsg h (msg, envi) anR now = do mapM_ (uncurry printparam) [ ("routing key", rkey) , ("message-id", messageid) , ("headers", headers) , ("content-type", ctype) , ("content-encoding", cenc) , ("redelivered", redeliv) , ("timestamp", timestamp'') , ("time now", now') , ("size", size) , ("priority", pri) , ("type", mtype) , ("user id", muserid) , ("application id", mappid) , ("cluster id", mclusterid) , ("reply to", mreplyto) , ("correlation id", mcorrid) , ("expiration", mexp) , ("delivery mode", mdelivmode) ] printbody h (label, anriss) return $ concat (map printopt [ ("-r", rkey) , ("-m", ctype) , ("-e", cenc) , ("-i", messageid) , ("-t", timestamp) , ("-p", pri) ] ++ maybeToList headers') where headers = fmap (formatheaders fieldshow) $ msgHeaders msg headers' = fmap (formatheaders fieldshow') $ msgHeaders msg body = msgBody msg anriss = if isimage ctype then Nothing else Just (anriss' anR body) :: Maybe BL.ByteString anriss'' = maybe "" (\a -> "first " ++ (show a) ++ " bytes of ") anR label = anriss'' ++ "body" ctype = fmap T.unpack $ msgContentType msg cenc = fmap T.unpack $ msgContentEncoding msg rkey = Just . T.unpack $ envRoutingKey envi messageid = fmap T.unpack $ msgID msg pri = fmap show $ msgPriority msg mtype = fmap show $ msgType msg muserid = fmap show $ msgUserID msg mappid = fmap show $ msgApplicationID msg mclusterid = fmap show $ msgClusterID msg mreplyto = fmap show $ msgReplyTo msg mcorrid = fmap show $ msgCorrelationID msg mexp = fmap show $ msgExpiration msg mdelivmode = fmap show $ msgDeliveryMode msg size = Just . show $ BL.length body redeliv = if envRedelivered envi then Just "YES" else Nothing tz = zonedTimeZone now nowutc = zonedTimeToUTCFLoor now msgtime = msgTimestamp msg msgtimeutc = fmap (posixSecondsToUTCTime . realToFrac) msgtime timestamp = fmap show msgtime timediff = fmap (difftime nowutc) msgtimeutc now' = case timediff of Just "now" -> Nothing _ -> showtime tz $ Just nowutc timestamp' = showtime tz msgtimeutc timestamp'' = liftM3 (\a b c -> a ++ " (" ++ b ++ ") (" ++ c ++ ")") timestamp timestamp' timediff -- | timestamp conversion zonedTimeToUTCFLoor :: ZonedTime -> UTCTime zonedTimeToUTCFLoor x = posixSecondsToUTCTime $ realToFrac ((floor . utcTimeToPOSIXSeconds . zonedTimeToUTC) x :: Timestamp) -- | show the timestamp showtime :: TimeZone -> Maybe UTCTime -> Maybe String showtime tz = fmap (show . (utcToZonedTime tz)) -- | show difference between two timestamps difftime :: UTCTime -> UTCTime -> String difftime now msg | now == msg = "now" | now > msg = diff ++ " ago" | otherwise = diff ++ " in the future" where diff = show (diffUTCTime now msg) -- | if the message is to be saved -- and maybe processed further optionalFileStuff :: (Message, Envelope) -> [String] -> [String] -> String -> Args -> ThreadId -> Maybe (ExitCode -> BL.ByteString -> IO ()) -> IO () optionalFileStuff (msg, envi) callbackoptions addi numstring a tid action = do path <- saveFile (tempDir a) numstring (msgBody msg) printparam "saved to" path let callbackcmdline = liftM2 (constructCallbackCmdLine callbackoptions addi numstring) (fileProcess a) path printparam "calling" $ fmap unwords callbackcmdline maybe (acke envi a) (\c -> forkFinally (doProc a numstring envi c action) (either (throwTo tid) return) >> return ()) callbackcmdline -- | save message into temp file saveFile :: Maybe String -> String -> BL.ByteString -> IO (Maybe String) saveFile Nothing _ _ = return Nothing saveFile (Just tempD) numstring body = do (p, h) <- openBinaryTempFileWithDefaultPermissions tempD ("konsum-" ++ numstring ++ "-.tmp") BL.hPut h body hClose h return $ Just p -- | construct cmdline for callback script constructCallbackCmdLine :: [String] -> [String] -> String -> String -> String -> [String] constructCallbackCmdLine opts addi num exe path = exe : "-f" : path : "-n" : num : opts ++ addi -- | call callback script doProc :: Args -> String -> Envelope -> [String] -> Maybe (ExitCode -> BL.ByteString -> IO ()) -> IO () doProc a numstring envi (exe:args) action = do (_, h, _, processhandle) <- createProcess (proc exe args) {std_out = out, std_err = Inherit} sout <- mapM BL.hGetContents h exitcode <- maybe 0 id (fmap BL.length sout) `seq` waitForProcess processhandle printparam' (numstring ++ " call returned") $ show exitcode if isJust action && isJust sout then ((fromJust action $ exitcode) (fromJust sout)) >> acke envi a else case exitcode of ExitSuccess -> acke envi a ExitFailure _ -> reje envi a where out = if isJust action then CreatePipe else Inherit doProc _ _ _ _ _ = return () -- | ack acke :: Envelope -> Args -> IO () acke envi a | (ack a) = ackEnv envi | otherwise = return () -- | reject reje :: Envelope -> Args -> IO () reje envi a | (ack a) = rejectEnv envi (requeuenack a) | otherwise = return ()