module Network.Xmpp.Stream where
import Control.Applicative ((<$>))
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.STM
import qualified Control.Exception as Ex
import qualified Control.Exception.Lifted as ExL
import Control.Monad
import Control.Monad.Error
import Control.Monad.State.Strict
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as BSC8
import Data.Char (isSpace)
import Data.Conduit
import qualified Data.Conduit.Internal as DCI
import qualified Data.Conduit.List as CL
import Data.IP
import Data.List
import Data.Maybe
import Data.Ord
import Data.Text (Text)
import qualified Data.Text as Text
import qualified Data.Text.Encoding as Text
import Data.Void (Void)
import Data.XML.Pickle
import Data.XML.Types
import qualified GHC.IO.Exception as GIE
import Network
import Network.DNS hiding (encode, lookup)
import Network.Xmpp.Marshal
import Network.Xmpp.Types
import System.IO
import System.Log.Logger
import System.Random (randomRIO)
import Text.XML.Stream.Parse as XP
import Network.Xmpp.Utilities
readMaybe_ :: (Read a) => String -> Maybe a
readMaybe_ string = case reads string of
[(a, "")] -> Just a
_ -> Nothing
mbl :: Maybe [a] -> [a]
mbl (Just l) = l
mbl Nothing = []
lmb :: [t] -> Maybe [t]
lmb [] = Nothing
lmb x = Just x
streamUnpickleElem :: PU [Node] a
-> Element
-> StreamSink a
streamUnpickleElem p x = do
case unpickleElem p x of
Left l -> do
liftIO $ warningM "Pontarius.Xmpp" $ "streamUnpickleElem: Unpickle error: " ++ ppUnpickleError l
throwError $ XmppOtherFailure
Right r -> return r
type StreamSink a = ConduitM Event Void (ErrorT XmppFailure IO) a
throwOutJunk :: Monad m => ConduitM Event a m ()
throwOutJunk = do
next <- CL.peek
case next of
Nothing -> return ()
Just (EventBeginElement _ _) -> return ()
_ -> CL.drop 1 >> throwOutJunk
openElementFromEvents :: StreamSink Element
openElementFromEvents = do
throwOutJunk
hd <- await
case hd of
Just (EventBeginElement name attrs) -> return $ Element name attrs []
_ -> do
liftIO $ warningM "Pontarius.Xmpp" "openElementFromEvents: Stream ended."
throwError XmppOtherFailure
startStream :: StateT StreamState IO (Either XmppFailure ())
startStream = runErrorT $ do
lift $ lift $ debugM "Pontarius.Xmpp" "Starting stream..."
st <- lift $ get
let expectedTo = case ( streamConnectionState st
, toJid $ streamConfiguration st) of
(Plain , (Just (j, True))) -> Just j
(Plain , _ ) -> Nothing
(Secured , (Just (j, _ ))) -> Just j
(Secured , Nothing ) -> Nothing
(Closed , _ ) -> Nothing
(Finished , _ ) -> Nothing
case streamAddress st of
Nothing -> do
lift $ lift $ errorM "Pontarius.Xmpp" "Server sent no hostname."
throwError XmppOtherFailure
Just address -> do
ErrorT $ pushXmlDecl
ErrorT . pushOpenElement . streamNSHack $
pickleElem xpStream ( "1.0"
, expectedTo
, Just (Jid Nothing (Nonempty address) Nothing)
, Nothing
, preferredLang $ streamConfiguration st
)
response <- ErrorT $ runEventsSink $ streamS expectedTo
case response of
Right (ver, from, to, sid, lt, features)
| (Text.unpack ver) /= "1.0" ->
closeStreamWithError StreamUnsupportedVersion Nothing
"Unknown version"
| isJust from && (from /= Just (Jid Nothing (Nonempty . fromJust $ streamAddress st) Nothing)) ->
closeStreamWithError StreamInvalidFrom Nothing
"Stream from is invalid"
| to /= expectedTo ->
closeStreamWithError StreamUndefinedCondition (Just $ Element "invalid-to" [] [])
"Stream to invalid"
| otherwise -> do
unless (isJust lt) $ liftIO $ warningM "Pontariusm.Xmpp"
"Stream has no language tag"
modify (\s -> s{ streamFeatures = features
, streamLang = lt
, streamId = sid
, streamFrom = from
} )
return ()
Left (Element name attrs _children)
| (nameLocalName name /= "stream") ->
closeStreamWithError StreamInvalidXml Nothing
"Root element is not stream"
| (nameNamespace name /= Just "http://etherx.jabber.org/streams") ->
closeStreamWithError StreamInvalidNamespace Nothing
"Wrong root element name space"
| (isJust $ namePrefix name) && (fromJust (namePrefix name) /= "stream") ->
closeStreamWithError StreamBadNamespacePrefix Nothing
"Root name prefix set and not stream"
| otherwise -> ErrorT $ checkchildren (flattenAttrs attrs)
where
streamNSHack e = e{elementAttributes = elementAttributes e
++ [( "xmlns"
, [ContentText "jabber:client"])]}
closeStreamWithError :: StreamErrorCondition -> Maybe Element -> String
-> ErrorT XmppFailure (StateT StreamState IO) ()
closeStreamWithError sec el msg = do
void . lift . pushElement . pickleElem xpStreamError
$ StreamErrorInfo sec Nothing el
void . lift $ closeStreams'
liftIO $ errorM "Pontarius.Xmpp" $ "closeStreamWithError: " ++ msg
throwError XmppOtherFailure
checkchildren children =
let to' = lookup "to" children
ver' = lookup "version" children
xl = lookup xmlLang children
in case () of () | Just Nothing == fmap jidFromText to' ->
runErrorT $ closeStreamWithError
StreamBadNamespacePrefix Nothing
"stream to not a valid JID"
| Nothing == ver' ->
runErrorT $ closeStreamWithError
StreamUnsupportedVersion Nothing
"stream no version"
| Just (Nothing :: Maybe LangTag) == (safeRead <$> xl) ->
runErrorT $ closeStreamWithError
StreamInvalidXml Nothing
"stream no language tag"
| otherwise ->
runErrorT $ closeStreamWithError
StreamBadFormat Nothing
""
safeRead x = case reads $ Text.unpack x of
[] -> Nothing
((y,_):_) -> Just y
flattenAttrs :: [(Name, [Content])] -> [(Name, Text.Text)]
flattenAttrs attrs = Prelude.map (\(name, cont) ->
( name
, Text.concat $ Prelude.map uncontentify cont)
)
attrs
where
uncontentify (ContentText t) = t
uncontentify _ = ""
restartStream :: StateT StreamState IO (Either XmppFailure ())
restartStream = do
liftIO $ debugM "Pontarius.Xmpp" "Restarting stream..."
raw <- gets streamHandle
let newSource = sourceStreamHandle raw $= XP.parseBytes def
buffered <- liftIO . bufferSrc $ newSource
modify (\s -> s{streamEventSource = buffered })
startStream
sourceStreamHandle :: (MonadIO m, MonadError XmppFailure m)
=> StreamHandle -> ConduitM i ByteString m ()
sourceStreamHandle s = loopRead $ streamReceive s
where
loopRead rd = do
bs' <- liftIO (rd 4096)
bs <- case bs' of
Left e -> throwError e
Right r -> return r
if BS.null bs
then return ()
else do
liftIO $ debugM "Pontarius.Xmpp" $ "in: " ++
(Text.unpack . Text.decodeUtf8 $ bs)
yield bs
loopRead rd
bufferSrc :: Source (ErrorT XmppFailure IO) o
-> IO (ConduitM i o (ErrorT XmppFailure IO) ())
bufferSrc src = do
ref <- newTMVarIO $ DCI.ResumableSource src (return ())
let go = do
dt <- liftIO $ Ex.bracketOnError
(atomically $ takeTMVar ref)
(\_ -> atomically . putTMVar ref $ zeroResumableSource)
(\s -> do
res <- runErrorT (s $$++ await)
case res of
Left e -> do
atomically $ putTMVar ref zeroResumableSource
return $ Left e
Right (s',b) -> do
atomically $ putTMVar ref s'
return $ Right b
)
case dt of
Left e -> throwError e
Right Nothing -> return ()
Right (Just d) -> yield d >> go
return go
where
zeroResumableSource = DCI.ResumableSource zeroSource (return ())
streamS :: Maybe Jid -> StreamSink (Either Element ( Text
, Maybe Jid
, Maybe Jid
, Maybe Text
, Maybe LangTag
, StreamFeatures ))
streamS _expectedTo = do
streamHeader <- xmppStreamHeader
case streamHeader of
Right (version, from, to, sid, lTag) -> do
features <- xmppStreamFeatures
return $ Right (version, from, to, sid, lTag, features)
Left el -> return $ Left el
where
xmppStreamHeader :: StreamSink (Either Element (Text, Maybe Jid, Maybe Jid, Maybe Text.Text, Maybe LangTag))
xmppStreamHeader = do
throwOutJunk
el <- openElementFromEvents
case unpickleElem xpStream el of
Left _ -> return $ Left el
Right r -> return $ Right r
xmppStreamFeatures :: StreamSink StreamFeatures
xmppStreamFeatures = do
e <- elements =$ await
case e of
Nothing -> do
lift $ lift $ errorM "Pontarius.Xmpp" "streamS: Stream ended."
throwError XmppOtherFailure
Just r -> streamUnpickleElem xpStreamFeatures r
openStream :: HostName -> StreamConfiguration -> IO (Either XmppFailure (Stream))
openStream realm config = runErrorT $ do
lift $ debugM "Pontarius.Xmpp" "Opening stream..."
stream' <- createStream realm config
ErrorT . liftIO $ withStream startStream stream'
return stream'
closeStreams :: Stream -> IO ()
closeStreams = withStream closeStreams'
closeStreams' :: StateT StreamState IO ()
closeStreams' = do
lift $ debugM "Pontarius.Xmpp" "Closing stream"
send <- gets (streamSend . streamHandle)
cc <- gets (streamClose . streamHandle)
lift $ debugM "Pontarius.Xmpp" "Sending closing tag"
void . liftIO $ send "</stream:stream>"
lift $ debugM "Pontarius.Xmpp" "Waiting for stream to close"
void $ liftIO $ forkIO $ do
threadDelay 3000000
void ((Ex.try cc) :: IO (Either Ex.SomeException ()))
return ()
put xmppNoStream{ streamConnectionState = Finished }
debugOut :: MonadIO m => ByteString -> m ()
debugOut outData = liftIO $ debugM "Pontarius.Xmpp"
("Out: " ++ (Text.unpack . Text.decodeUtf8 $ outData))
wrapIOException :: MonadIO m =>
IO a -> m (Either XmppFailure a)
wrapIOException action = do
r <- liftIO $ tryIOError action
case r of
Right b -> return $ Right b
Left e -> do
liftIO $ warningM "Pontarius.Xmpp" $ "wrapIOException: Exception wrapped: " ++ (show e)
return $ Left $ XmppIOException e
pushElement :: Element -> StateT StreamState IO (Either XmppFailure ())
pushElement x = do
send <- gets (streamSend . streamHandle)
let outData = renderElement $ nsHack x
debugOut outData
lift $ send outData
where
nsHack :: Element -> Element
nsHack e@(Element{elementName = n})
| nameNamespace n == Just "jabber:client" =
e{ elementName = Name (nameLocalName n) Nothing Nothing
, elementNodes = map mapNSHack $ elementNodes e
}
| otherwise = e
where
mapNSHack :: Node -> Node
mapNSHack (NodeElement el) = NodeElement $ nsHack el
mapNSHack nd = nd
pushStanza :: Stanza -> Stream -> IO (Either XmppFailure ())
pushStanza s = withStream' . pushElement $ pickleElem xpStanza s
pushXmlDecl :: StateT StreamState IO (Either XmppFailure ())
pushXmlDecl = do
con <- gets streamHandle
lift $ streamSend con "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>"
pushOpenElement :: Element -> StateT StreamState IO (Either XmppFailure ())
pushOpenElement e = do
send <- gets (streamSend . streamHandle)
let outData = renderOpenElement e
debugOut outData
lift $ send outData
runEventsSink :: Sink Event (ErrorT XmppFailure IO) b
-> StateT StreamState IO (Either XmppFailure b)
runEventsSink snk = do
src <- gets streamEventSource
lift . runErrorT $ src $$ snk
pullElement :: StateT StreamState IO (Either XmppFailure Element)
pullElement = do
e <- runEventsSink (elements =$ await)
case e of
Left l -> do
liftIO . errorM "Pontarius.Xmpp" $
"Error while retrieving XML element: " ++ show l
return $ Left l
Right Nothing -> do
liftIO $ errorM "Pontarius.Xmpp" "pullElement: Stream ended."
return . Left $ XmppOtherFailure
Right (Just r) -> return $ Right r
pullUnpickle :: PU [Node] a -> StateT StreamState IO (Either XmppFailure a)
pullUnpickle p = do
el <- pullElement
case el of
Left e -> return $ Left e
Right elem' -> do
let res = unpickleElem p elem'
case res of
Left e -> do
lift $ errorM "Pontarius.Xmpp" $ "pullUnpickle: Unpickle failed: " ++ (ppUnpickleError e)
return . Left $ XmppOtherFailure
Right r -> return $ Right r
pullStanza :: Stream -> IO (Either XmppFailure Stanza)
pullStanza = withStream' $ do
res <- pullUnpickle xpStreamStanza
case res of
Left e -> return $ Left e
Right (Left e) -> return $ Left $ StreamErrorFailure e
Right (Right r) -> return $ Right r
catchPush :: IO () -> IO (Either XmppFailure ())
catchPush p = ExL.catch
(p >> return (Right ()))
(\e -> case GIE.ioe_type e of
GIE.ResourceVanished -> return . Left $ XmppIOException e
GIE.IllegalOperation -> return . Left $ XmppIOException e
_ -> ExL.throwIO e
)
zeroHandle :: StreamHandle
zeroHandle = StreamHandle { streamSend = \_ -> return (Left XmppNoStream)
, streamReceive = \_ -> do
errorM "Pontarius.Xmpp"
"xmppNoStream: Stream is closed."
return $ Left XmppNoStream
, streamFlush = return ()
, streamClose = return ()
}
xmppNoStream :: StreamState
xmppNoStream = StreamState {
streamConnectionState = Closed
, streamHandle = zeroHandle
, streamEventSource = zeroSource
, streamFeatures = StreamFeatures Nothing [] Nothing []
, streamAddress = Nothing
, streamFrom = Nothing
, streamId = Nothing
, streamLang = Nothing
, streamJid = Nothing
, streamConfiguration = def
}
zeroSource :: Source (ErrorT XmppFailure IO) a
zeroSource = do
liftIO $ debugM "Pontarius.Xmpp" "zeroSource"
throwError XmppNoStream
handleToStreamHandle :: Handle -> StreamHandle
handleToStreamHandle h = StreamHandle { streamSend = \d ->
wrapIOException $ BS.hPut h d
, streamReceive = \n ->
wrapIOException $ BS.hGetSome h n
, streamFlush = hFlush h
, streamClose = hClose h
}
createStream :: HostName -> StreamConfiguration -> ErrorT XmppFailure IO (Stream)
createStream realm config = do
result <- connect realm config
case result of
Just hand -> ErrorT $ do
debugM "Pontarius.Xmpp" "Acquired handle."
debugM "Pontarius.Xmpp" "Setting NoBuffering mode on handle."
eSource <- liftIO . bufferSrc $
(sourceStreamHandle hand $= logConduit)
$= XP.parseBytes def
let stream = StreamState
{ streamConnectionState = Plain
, streamHandle = hand
, streamEventSource = eSource
, streamFeatures = StreamFeatures Nothing [] Nothing []
, streamAddress = Just $ Text.pack realm
, streamFrom = Nothing
, streamId = Nothing
, streamLang = Nothing
, streamJid = Nothing
, streamConfiguration = config
}
stream' <- mkStream stream
return $ Right stream'
Nothing -> do
lift $ debugM "Pontarius.Xmpp" "Did not acquire handle."
throwError TcpConnectionFailure
where
logConduit :: MonadIO m => Conduit ByteString m ByteString
logConduit = CL.mapM $ \d -> do
liftIO . debugM "Pontarius.Xmpp" $ "In: " ++ (BSC8.unpack d) ++
"."
return d
connect :: HostName -> StreamConfiguration -> ErrorT XmppFailure IO
(Maybe StreamHandle)
connect realm config = do
case connectionDetails config of
UseHost host port -> lift $ do
debugM "Pontarius.Xmpp" "Connecting to configured address."
h <- connectTcp $ [(host, port)]
case h of
Nothing -> return Nothing
Just h' -> do
liftIO $ hSetBuffering h' NoBuffering
return . Just $ handleToStreamHandle h'
UseSrv host -> do
h <- connectSrv (resolvConf config) host
case h of
Nothing -> return Nothing
Just h' -> do
liftIO $ hSetBuffering h' NoBuffering
return . Just $ handleToStreamHandle h'
UseRealm -> do
h <- connectSrv (resolvConf config) realm
case h of
Nothing -> return Nothing
Just h' -> do
liftIO $ hSetBuffering h' NoBuffering
return . Just $ handleToStreamHandle h'
UseConnection mkC -> Just <$> mkC
connectSrv :: ResolvConf -> String -> ErrorT XmppFailure IO (Maybe Handle)
connectSrv config host = do
case checkHostName (Text.pack host) of
Just host' -> do
resolvSeed <- lift $ makeResolvSeed config
lift $ debugM "Pontarius.Xmpp" "Performing SRV lookup..."
srvRecords <- srvLookup host' resolvSeed
case srvRecords of
Nothing -> do
lift $ debugM "Pontarius.Xmpp"
"No SRV records, using fallback process."
lift $ resolvAndConnectTcp resolvSeed (BSC8.pack $ host)
5222
Just srvRecords' -> do
lift $ debugM "Pontarius.Xmpp"
"SRV records found, performing A/AAAA lookups."
lift $ resolvSrvsAndConnectTcp resolvSeed srvRecords'
Nothing -> do
lift $ errorM "Pontarius.Xmpp"
"The hostname could not be validated."
throwError XmppIllegalTcpDetails
showPort :: PortID -> String
#if MIN_VERSION_network(2, 4, 1)
showPort = show
#else
showPort (PortNumber x) = "PortNumber " ++ show x
showPort (Service x) = "Service " ++ show x
#if !defined(mingw32_HOST_OS) && !defined(__MINGW32__)
showPort (UnixSocket x) = "UnixSocket " ++ show x
#endif
#endif
connectTcp :: [(HostName, PortID)] -> IO (Maybe Handle)
connectTcp [] = return Nothing
connectTcp ((address, port):remainder) = do
result <- Ex.try $ (do
debugM "Pontarius.Xmpp" $ "Connecting to " ++ address ++ " on port " ++
(showPort port) ++ "."
connectTo address port) :: IO (Either Ex.IOException Handle)
case result of
Right handle -> do
debugM "Pontarius.Xmpp" "Successfully connected to HostName."
return $ Just handle
Left _ -> do
debugM "Pontarius.Xmpp" "Connection to HostName could not be established."
connectTcp remainder
#if MIN_VERSION_dns(1, 0, 0)
fixDnsResult :: Either e a -> Maybe a
fixDnsResult = either (const Nothing) Just
#else
fixDnsResult :: Maybe a -> Maybe a
fixDnsResult = id
#endif
resolvAndConnectTcp :: ResolvSeed -> Domain -> Int -> IO (Maybe Handle)
resolvAndConnectTcp resolvSeed domain port = do
aaaaResults <- (Ex.try $ rethrowErrorCall $ withResolver resolvSeed $
\resolver -> fmap fixDnsResult $ lookupAAAA resolver domain) :: IO (Either Ex.IOException (Maybe [IPv6]))
handle <- case aaaaResults of
Right Nothing -> return Nothing
Right (Just ipv6s) -> connectTcp $
map (\ip -> ( show ip
, PortNumber $ fromIntegral port))
ipv6s
Left _e -> return Nothing
case handle of
Nothing -> do
aResults <- (Ex.try $ rethrowErrorCall $ withResolver resolvSeed $
\resolver -> fmap fixDnsResult $ lookupA resolver domain) :: IO (Either Ex.IOException (Maybe [IPv4]))
handle' <- case aResults of
Left _ -> return Nothing
Right Nothing -> return Nothing
Right (Just ipv4s) -> connectTcp $
map (\ip -> (show ip
, PortNumber
$ fromIntegral port))
ipv4s
case handle' of
Nothing -> return Nothing
Just handle'' -> return $ Just handle''
Just handle' -> return $ Just handle'
resolvSrvsAndConnectTcp :: ResolvSeed -> [(Domain, Int)] -> IO (Maybe Handle)
resolvSrvsAndConnectTcp _ [] = return Nothing
resolvSrvsAndConnectTcp resolvSeed ((domain, port):remaining) = do
result <- resolvAndConnectTcp resolvSeed domain port
case result of
Just handle -> return $ Just handle
Nothing -> resolvSrvsAndConnectTcp resolvSeed remaining
rethrowErrorCall :: IO a -> IO a
rethrowErrorCall action = do
result <- Ex.try action
case result of
Right result' -> return result'
Left (Ex.ErrorCall e) -> Ex.ioError $ userError
$ "rethrowErrorCall: " ++ e
srvLookup :: Text -> ResolvSeed -> ErrorT XmppFailure IO (Maybe [(Domain, Int)])
srvLookup realm resolvSeed = ErrorT $ do
result <- Ex.try $ rethrowErrorCall $ withResolver resolvSeed
$ \resolver -> do
srvResult <- lookupSRV resolver $ BSC8.pack $ "_xmpp-client._tcp." ++ (Text.unpack realm) ++ "."
case fixDnsResult srvResult of
Just [(_, _, _, ".")] -> do
debugM "Pontarius.Xmpp" $ "\".\" SRV result returned."
return $ Just []
Just srvResult' -> do
debugM "Pontarius.Xmpp" $ "SRV result: " ++ (show srvResult')
orderedSrvResult <- orderSrvResult srvResult'
return $ Just $ Prelude.map (\(_, _, port, domain) -> (domain, port)) orderedSrvResult
Nothing -> do
debugM "Pontarius.Xmpp" "No SRV result returned."
return Nothing
case result of
Right result' -> return $ Right result'
Left e -> return $ Left $ XmppIOException e
where
orderSrvResult :: [(Int, Int, Int, Domain)] -> IO [(Int, Int, Int, Domain)]
orderSrvResult srvResult = do
let srvResult' = sortBy (comparing (\(priority, _, _, _) -> priority)) srvResult
let srvResult'' = Data.List.groupBy (\(priority, _, _, _) (priority', _, _, _) -> priority == priority') srvResult' :: [[(Int, Int, Int, Domain)]]
let srvResult''' = Data.List.map (\sublist -> let (a, b) = partition (\(_, weight, _, _) -> weight == 0) sublist in Data.List.concat [a, b]) srvResult''
srvResult'''' <- mapM orderSublist srvResult'''
return $ Data.List.concat srvResult''''
where
orderSublist :: [(Int, Int, Int, Domain)] -> IO [(Int, Int, Int, Domain)]
orderSublist [] = return []
orderSublist sublist = do
let (total, sublist') = Data.List.mapAccumL (\total' (priority, weight, port, domain) -> (total' + weight, (priority, weight, port, domain, total' + weight))) 0 sublist
randomNumber <- randomRIO (0, total)
let (beginning, ((priority, weight, port, domain, _):end)) = Data.List.break (\(_, _, _, _, running) -> randomNumber <= running) sublist'
let sublist'' = Data.List.map (\(priority', weight', port', domain', _) -> (priority', weight', port', domain')) (Data.List.concat [beginning, end])
rest <- orderSublist sublist''
return $ ((priority, weight, port, domain):rest)
killStream :: Stream -> IO (Either XmppFailure ())
killStream = withStream $ do
cc <- gets (streamClose . streamHandle)
err <- wrapIOException cc
put xmppNoStream{ streamConnectionState = Finished }
return err
pushIQ :: Text
-> Maybe Jid
-> IQRequestType
-> Maybe LangTag
-> Element
-> Stream
-> IO (Either XmppFailure (Either IQError IQResult))
pushIQ iqID to tp lang body stream = runErrorT $ do
ErrorT $ pushStanza
(IQRequestS $ IQRequest iqID Nothing to lang tp body) stream
res <- lift $ pullStanza stream
case res of
Left e -> throwError e
Right (IQErrorS e) -> return $ Left e
Right (IQResultS r) -> do
unless
(iqID == iqResultID r) $ liftIO $ do
liftIO $ errorM "Pontarius.Xmpp" $ "pushIQ: ID mismatch (" ++ (show iqID) ++ " /= " ++ (show $ iqResultID r) ++ ")."
liftIO $ ExL.throwIO XmppOtherFailure
return $ Right r
_ -> do
liftIO $ errorM "Pontarius.Xmpp" $ "pushIQ: Unexpected stanza type."
throwError XmppOtherFailure
debugConduit :: (Show o, MonadIO m) => ConduitM o o m b
debugConduit = forever $ do
s' <- await
case s' of
Just s -> do
liftIO $ debugM "Pontarius.Xmpp" $ "debugConduit: In: " ++ (show s)
yield s
Nothing -> return ()
elements :: MonadError XmppFailure m => Conduit Event m Element
elements = do
x <- await
case x of
Just (EventBeginElement n as) -> do
goE n as >>= yield
elements
Just (EventEndElement _) -> throwError StreamEndFailure
Just (EventContent (ContentText ct)) | Text.all isSpace ct ->
elements
Nothing -> return ()
_ -> throwError $ XmppInvalidXml $ "not an element: " ++ show x
where
many' f =
go id
where
go front = do
x <- f
case x of
Left l -> return $ (l, front [])
Right r -> go (front . (:) r)
goE n as = do
(y, ns) <- many' goN
if y == Just (EventEndElement n)
then return $ Element n (map (id >< compressContents) as)
(compressNodes ns)
else throwError . XmppInvalidXml $ "Missing close tag: " ++ show n
goN = do
x <- await
case x of
Just (EventBeginElement n as) -> (Right . NodeElement) <$> goE n as
Just (EventInstruction i) -> return $ Right $ NodeInstruction i
Just (EventContent c) -> return $ Right $ NodeContent c
Just (EventComment t) -> return $ Right $ NodeComment t
Just (EventCDATA t) -> return $ Right $ NodeContent $ ContentText t
_ -> return $ Left x
compressNodes :: [Node] -> [Node]
compressNodes [] = []
compressNodes [x] = [x]
compressNodes (NodeContent (ContentText x) : NodeContent (ContentText y) : z) =
compressNodes $ NodeContent (ContentText $ x `Text.append` y) : z
compressNodes (x:xs) = x : compressNodes xs
compressContents :: [Content] -> [Content]
compressContents cs = [ContentText $ Text.concat (map unwrap cs)]
where unwrap (ContentText t) = t
unwrap (ContentEntity t) = t
(><) f g (x, y) = (f x, g y)
withStream :: StateT StreamState IO a -> Stream -> IO a
withStream action (Stream stream) = Ex.bracketOnError
(atomically $ takeTMVar stream )
(atomically . putTMVar stream)
(\s -> do
(r, s') <- runStateT action s
atomically $ putTMVar stream s'
return r
)
withStream' :: StateT StreamState IO a -> Stream -> IO a
withStream' action (Stream stream) = do
stream_ <- atomically $ readTMVar stream
(r, _) <- runStateT action stream_
return r
mkStream :: StreamState -> IO Stream
mkStream con = Stream `fmap` atomically (newTMVar con)
tryIOError :: IO a -> IO (Either IOError a)
tryIOError f = Ex.catch (Right <$> f) (return . Left)