----------------------------------------------------------------------------- -- | -- Module : Network.XMPP.Concurrent -- Copyright : (c) pierre, 2007 -- License : BSD-style (see the file libraries/base/LICENSE) -- -- Maintainer : k.pierre.k@gmail.com -- Stability : experimental -- Portability : portable -- -- Concurrent actions over single IO channel -- ----------------------------------------------------------------------------- module Network.XMPP.Concurrent ( Thread , XmppThreadT , runThreaded , readChanS , writeChanS , withNewThread , loop , waitFor ) where import Network.XMPP.Stream import Network.XMPP.Stanza import Network.XMPP.Types import Control.Concurrent import Control.Concurrent.STM import Control.Concurrent.STM.TChan import Control.Monad.Trans (liftIO) import Control.Monad import Control.Monad.State import Control.Monad.Reader import Data.Maybe import Network.XMPP.XEP.Version import Network.XMPP.IM.Presence import Network.XMPP.Utils import System.IO data Thread = Thread { inCh :: TChan Stanza , outCh :: TChan Stanza } type XmppThreadT a = ReaderT Thread IO a -- Two streams: input and output. Threads read from input stream and write to output stream. -- | Runs thread in XmppState monad runThreaded :: XmppThreadT () -> XmppStateT () runThreaded a = do in' <- liftIO $ atomically $ newTChan out' <- liftIO $ atomically $ newTChan liftIO $ forkIO $ runReaderT a (Thread in' out') s <- get liftIO $ forkIO $ loopWrite s out' liftIO $ forkIO $ connPersist (handle s) loopRead in' where loopRead in' = loop $ do st <- parseM liftIO $ atomically $ writeTChan in' st loopWrite s out' = do withNewStream $ do put s loop $ do st <- liftIO $ atomically $ readTChan out' outStanza st return () loop = sequence_ . repeat readChanS :: XmppThreadT Stanza readChanS = do c <- asks inCh st <- liftIO $ atomically $ readTChan c return st writeChanS :: Stanza -> XmppThreadT () writeChanS a = do c <- asks outCh st <- liftIO $ atomically $ writeTChan c a return () -- | Runs specified action in parallel withNewThread :: XmppThreadT () -> XmppThreadT ThreadId withNewThread a = do in' <- asks inCh out' <- asks outCh newin <- liftIO $ atomically $ dupTChan in' liftIO $ forkIO $ runReaderT a (Thread newin out') -- | Turns action into infinite loop loop :: XmppThreadT () -> XmppThreadT () loop a = do a loop a waitFor :: (Stanza -> Bool) -> XmppThreadT Stanza waitFor f = do s <- readChanS if (f s) then return s else do waitFor f connPersist :: Handle -> IO () connPersist h = do hPutStr h " " putStrLn "" threadDelay 30000000 connPersist h