module Network.Xmpp.Concurrent.Threads where
import Network.Xmpp.Types
import Control.Applicative((<$>),(<*>))
import Control.Concurrent
import Control.Concurrent.STM
import qualified Control.Exception.Lifted as Ex
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Reader
import Control.Monad.State.Strict
import qualified Data.ByteString as BS
import Data.IORef
import qualified Data.Map as Map
import Data.Maybe
import Data.XML.Types
import Network.Xmpp.Monad
import Network.Xmpp.Marshal
import Network.Xmpp.Pickle
import Network.Xmpp.Concurrent.Types
import Text.XML.Stream.Elements
import GHC.IO (unsafeUnmask)
readWorker :: TChan (Either MessageError Message)
-> TChan (Either PresenceError Presence)
-> TChan Stanza
-> TVar IQHandlers
-> TVar EventHandlers
-> TMVar XmppConnection
-> IO ()
readWorker messageC presenceC stanzaC iqHands handlers stateRef =
Ex.mask_ . forever $ do
res <- liftIO $ Ex.catches ( do
s <- liftIO . atomically $ do
sr <- readTMVar stateRef
when (sConnectionState sr == XmppConnectionClosed)
retry
return sr
allowInterrupt
Just . fst <$> runStateT pullStanza s
)
[ Ex.Handler $ \(Interrupt t) -> do
void $ handleInterrupts [t]
return Nothing
, Ex.Handler $ \(e :: StreamError) -> do
hands <- atomically $ readTVar handlers
_ <- forkIO $ connectionClosedHandler hands e
return Nothing
]
liftIO . atomically $ do
case res of
Nothing -> return ()
Just sta -> do
writeTChan stanzaC sta
void $ readTChan stanzaC
case sta of
MessageS m -> do writeTChan messageC $ Right m
_ <- readTChan messageC
return ()
MessageErrorS m -> do writeTChan messageC $ Left m
_ <- readTChan messageC
return ()
PresenceS p -> do
writeTChan presenceC $ Right p
_ <- readTChan presenceC
return ()
PresenceErrorS p -> do
writeTChan presenceC $ Left p
_ <- readTChan presenceC
return ()
IQRequestS i -> handleIQRequest iqHands i
IQResultS i -> handleIQResponse iqHands (Right i)
IQErrorS i -> handleIQResponse iqHands (Left i)
where
allowInterrupt :: IO ()
allowInterrupt = unsafeUnmask $ return ()
noCon :: TVar EventHandlers -> StreamError -> IO (Maybe a)
noCon h e = do
hands <- atomically $ readTVar h
_ <- forkIO $ connectionClosedHandler hands e
return Nothing
handleInterrupts :: [TMVar ()] -> IO [()]
handleInterrupts ts =
Ex.catch (atomically $ forM ts takeTMVar)
(\(Interrupt t) -> handleInterrupts (t:ts))
handleIQRequest :: TVar IQHandlers -> IQRequest -> STM ()
handleIQRequest handlers iq = do
(byNS, _) <- readTVar handlers
let iqNS = fromMaybe "" (nameNamespace . elementName $ iqRequestPayload iq)
case Map.lookup (iqRequestType iq, iqNS) byNS of
Nothing -> return ()
Just ch -> do
sent <- newTVar False
writeTChan ch $ IQRequestTicket sent iq
handleIQResponse :: TVar IQHandlers -> Either IQError IQResult -> STM ()
handleIQResponse handlers iq = do
(byNS, byID) <- readTVar handlers
case Map.updateLookupWithKey (\_ _ -> Nothing) (iqID iq) byID of
(Nothing, _) -> return ()
(Just tmvar, byID') -> do
let answer = either IQResponseError IQResponseResult iq
_ <- tryPutTMVar tmvar answer
writeTVar handlers (byNS, byID')
where
iqID (Left err) = iqErrorID err
iqID (Right iq') = iqResultID iq'
writeWorker :: TChan Stanza -> TMVar (BS.ByteString -> IO Bool) -> IO ()
writeWorker stCh writeR = forever $ do
(write, next) <- atomically $ (,) <$>
takeTMVar writeR <*>
readTChan stCh
r <- write $ renderElement (pickleElem xpStanza next)
atomically $ putTMVar writeR write
unless r $ do
atomically $ unGetTChan stCh next
threadDelay 250000
startThreads :: IO ( TChan (Either MessageError Message)
, TChan (Either PresenceError Presence)
, TChan Stanza
, TVar IQHandlers
, TChan Stanza
, IO ()
, TMVar (BS.ByteString -> IO Bool)
, TMVar XmppConnection
, ThreadId
, TVar EventHandlers
)
startThreads = do
writeLock <- newTMVarIO (\_ -> return False)
messageC <- newTChanIO
presenceC <- newTChanIO
outC <- newTChanIO
stanzaC <- newTChanIO
handlers <- newTVarIO (Map.empty, Map.empty)
eh <- newTVarIO zeroEventHandlers
conS <- newTMVarIO xmppNoConnection
lw <- forkIO $ writeWorker outC writeLock
cp <- forkIO $ connPersist writeLock
rd <- forkIO $ readWorker messageC presenceC stanzaC handlers eh conS
return ( messageC
, presenceC
, stanzaC
, handlers
, outC
, killConnection writeLock [lw, rd, cp]
, writeLock
, conS
, rd
, eh)
where
killConnection writeLock threads = liftIO $ do
_ <- atomically $ takeTMVar writeLock
_ <- forM threads killThread
return ()
zeroEventHandlers :: EventHandlers
zeroEventHandlers = EventHandlers
{ connectionClosedHandler = \_ -> return ()
}
newSession :: IO Session
newSession = do
(mC, pC, sC, hand, outC, stopThreads', writeR, conS, rdr, eh) <- startThreads
workermCh <- newIORef $ Nothing
workerpCh <- newIORef $ Nothing
idRef <- newTVarIO 1
let getId = atomically $ do
curId <- readTVar idRef
writeTVar idRef (curId + 1 :: Integer)
return . read. show $ curId
return $ Session
mC
pC
sC
workermCh
workerpCh
outC
hand
writeR
rdr
getId
conS
eh
stopThreads'
connPersist :: TMVar (BS.ByteString -> IO Bool) -> IO ()
connPersist lock = forever $ do
pushBS <- atomically $ takeTMVar lock
_ <- pushBS " "
atomically $ putTMVar lock pushBS
threadDelay 30000000