module Network.Xmpp.Concurrent.Threads where
import Control.Applicative((<$>))
import Control.Concurrent
import Control.Concurrent.STM
import qualified Control.Exception.Lifted as Ex
import Control.Monad
import Control.Monad.Error
import qualified Data.ByteString as BS
import GHC.IO (unsafeUnmask)
import Network.Xmpp.Concurrent.Types
import Network.Xmpp.Stream
import Network.Xmpp.Types
import System.Log.Logger
readWorker :: (XmppElement -> IO ())
-> (XmppFailure -> IO ())
-> TMVar Stream
-> IO a
readWorker onElement onCClosed stateRef = forever . Ex.mask_ $ do
s' <- Ex.catches ( do
atomically $ do
s@(Stream con) <- readTMVar stateRef
scs <- streamConnectionState <$> readTMVar con
when (stateIsClosed scs)
retry
return $ Just s
)
[ Ex.Handler $ \(Interrupt t) -> do
void $ handleInterrupts [t]
return Nothing
]
case s' of
Nothing -> return ()
Just s -> do
res <- Ex.catches (do
allowInterrupt
res <- pullXmppElement s
case res of
Left e -> do
errorM "Pontarius.Xmpp" $ "Read error: "
++ show e
_ <- closeStreams s
onCClosed e
return Nothing
Right r -> return $ Just r
)
[ Ex.Handler $ \(Interrupt t) -> do
void $ handleInterrupts [t]
return Nothing
]
case res of
Nothing -> return ()
Just sta -> void $ onElement sta
where
allowInterrupt :: IO ()
allowInterrupt = unsafeUnmask $ return ()
handleInterrupts :: [TMVar ()] -> IO [()]
handleInterrupts ts =
Ex.catch (atomically $ forM ts takeTMVar)
(\(Interrupt t) -> handleInterrupts (t:ts))
stateIsClosed Closed = True
stateIsClosed Finished = True
stateIsClosed _ = False
startThreadsWith :: TMVar (BS.ByteString -> IO (Either XmppFailure ()))
-> (XmppElement -> IO ())
-> TMVar EventHandlers
-> Stream
-> Maybe Int
-> IO (Either XmppFailure (IO (),
TMVar Stream,
ThreadId))
startThreadsWith writeSem stanzaHandler eh con keepAlive = do
conS <- newTMVarIO con
cp <- forkIO $ connPersist keepAlive writeSem
let onConClosed failure = do
stopWrites
noCon eh failure
rdw <- forkIO $ readWorker stanzaHandler onConClosed conS
return $ Right ( killConnection [rdw, cp]
, conS
, rdw
)
where
stopWrites = atomically $ do
_ <- takeTMVar writeSem
putTMVar writeSem $ \_ -> return $ Left XmppNoStream
killConnection threads = liftIO $ do
debugM "Pontarius.Xmpp" "killing connection"
stopWrites
debugM "Pontarius.Xmpp" "killing threads"
_ <- forM threads killThread
return ()
noCon :: TMVar EventHandlers -> XmppFailure -> IO ()
noCon h e = do
hands <- atomically $ readTMVar h
_ <- forkIO $ connectionClosedHandler hands e
return ()
connPersist :: Maybe Int -> TMVar (BS.ByteString -> IO a) -> IO ()
connPersist (Just delay) sem = forever $ do
pushBS <- atomically $ takeTMVar sem
_ <- pushBS " "
atomically $ putTMVar sem pushBS
threadDelay (delay*1000000)
connPersist Nothing _ = return ()