{-# OPTIONS_HADDOCK hide #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} module Network.Xmpp.Concurrent.Threads where import Control.Applicative((<$>)) import Control.Concurrent import Control.Concurrent.STM import qualified Control.Exception.Lifted as Ex import Control.Monad import Control.Monad.Except import Control.Monad.IO.Class import qualified Data.ByteString as BS import GHC.IO (unsafeUnmask) import Network.Xmpp.Concurrent.Types import Network.Xmpp.Stream import Network.Xmpp.Types import System.Log.Logger -- Worker to read stanzas from the stream and concurrently distribute them to -- all listener threads. readWorker :: (XmppElement -> IO ()) -> (XmppFailure -> IO ()) -> TMVar Stream -> IO a readWorker onElement onCClosed stateRef = forever . Ex.mask_ $ do s' <- Ex.catches ( do atomically $ do s@(Stream con) <- readTMVar stateRef scs <- streamConnectionState <$> readTMVar con when (stateIsClosed scs) retry return $ Just s ) [ Ex.Handler $ \(Interrupt t) -> do void $ handleInterrupts [t] return Nothing ] case s' of -- Maybe Stream Nothing -> return () Just s -> do -- Stream res <- Ex.catches (do -- we don't know whether pull will -- necessarily be interruptible allowInterrupt res <- pullXmppElement s case res of Left e -> do errorM "Pontarius.Xmpp" $ "Read error: " ++ show e _ <- closeStreams s onCClosed e return Nothing Right r -> return $ Just r ) [ Ex.Handler $ \(Interrupt t) -> do void $ handleInterrupts [t] return Nothing ] case res of Nothing -> return () -- Caught an exception, nothing to -- do. TODO: Can this happen? Just sta -> void $ onElement sta where -- Defining an Control.Exception.allowInterrupt equivalent for GHC 7 -- compatibility. allowInterrupt :: IO () allowInterrupt = unsafeUnmask $ return () -- While waiting for the first semaphore(s) to flip we might receive another -- interrupt. When that happens we add it's semaphore to the list and retry -- waiting. handleInterrupts :: [TMVar ()] -> IO [()] handleInterrupts ts = Ex.catch (atomically $ forM ts takeTMVar) (\(Interrupt t) -> handleInterrupts (t:ts)) stateIsClosed Closed = True stateIsClosed Finished = True stateIsClosed _ = False -- | Runs thread in XmppState monad. Returns channel of incoming and outgoing -- stances, respectively, and an Action to stop the Threads and close the -- connection. startThreadsWith :: TMVar (BS.ByteString -> IO (Either XmppFailure ())) -> (XmppElement -> IO ()) -> TMVar EventHandlers -> Stream -> Maybe Int -> IO (Either XmppFailure (IO (), TMVar Stream, ThreadId)) startThreadsWith writeSem stanzaHandler eh con keepAlive = do -- read' <- withStream' (gets $ streamSend . streamHandle) con -- writeSem <- newTMVarIO read' conS <- newTMVarIO con cp <- forkIO $ connPersist keepAlive writeSem let onConClosed failure = do stopWrites noCon eh failure rdw <- forkIO $ readWorker stanzaHandler onConClosed conS return $ Right ( killConnection [rdw, cp] , conS , rdw ) where stopWrites = atomically $ do _ <- takeTMVar writeSem putTMVar writeSem $ \_ -> return $ Left XmppNoStream killConnection threads = liftIO $ do debugM "Pontarius.Xmpp" "killing connection" stopWrites debugM "Pontarius.Xmpp" "killing threads" _ <- forM threads killThread return () -- Call the connection closed handlers. noCon :: TMVar EventHandlers -> XmppFailure -> IO () noCon h e = do hands <- atomically $ readTMVar h _ <- forkIO $ connectionClosedHandler hands e return () -- Acquires the write lock, pushes a space, and releases the lock. -- | Sends a blank space every seconds to keep the connection alive. connPersist :: Maybe Int -> TMVar (BS.ByteString -> IO a) -> IO () connPersist (Just delay) sem = forever $ do pushBS <- atomically $ takeTMVar sem _ <- pushBS " " atomically $ putTMVar sem pushBS threadDelay (delay*1000000) connPersist Nothing _ = return ()