module Network.XMPP.Concurrent
( Thread
, XmppThreadT
, runThreaded
, readChanS
, writeChanS
, withNewThread
, loop
, waitFor
) where
import Network.XMPP.Stream
import Network.XMPP.Stanza
import Network.XMPP.Types
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.STM.TChan
import Control.Monad.Trans (liftIO)
import Control.Monad
import Control.Monad.State
import Control.Monad.Reader
import Data.Maybe
import Network.XMPP.XEP.Version
import Network.XMPP.IM.Presence
import Network.XMPP.Utils
import System.IO
data Thread = Thread { inCh :: TChan Stanza
, outCh :: TChan Stanza
}
type XmppThreadT a = ReaderT Thread IO a
runThreaded :: XmppThreadT () -> XmppStateT ()
runThreaded a = do
in' <- liftIO $ atomically $ newTChan
out' <- liftIO $ atomically $ newTChan
liftIO $ forkIO $ runReaderT a (Thread in' out')
s <- get
liftIO $ forkIO $ loopWrite s out'
liftIO $ forkIO $ connPersist (handle s)
loopRead in'
where
loopRead in' = loop $
do
st <- parseM
liftIO $ atomically $ writeTChan in' st
loopWrite s out' =
do
withNewStream $ do
put s
loop $ do
st <- liftIO $ atomically $ readTChan out'
outStanza st
return ()
loop = sequence_ . repeat
readChanS :: XmppThreadT Stanza
readChanS = do
c <- asks inCh
st <- liftIO $ atomically $ readTChan c
return st
writeChanS :: Stanza -> XmppThreadT ()
writeChanS a = do
c <- asks outCh
st <- liftIO $ atomically $ writeTChan c a
return ()
withNewThread :: XmppThreadT () -> XmppThreadT ThreadId
withNewThread a = do
in' <- asks inCh
out' <- asks outCh
newin <- liftIO $ atomically $ dupTChan in'
liftIO $ forkIO $ runReaderT a (Thread newin out')
loop :: XmppThreadT () -> XmppThreadT ()
loop a = do
a
loop a
waitFor :: (Stanza -> Bool) -> XmppThreadT Stanza
waitFor f = do
s <- readChanS
if (f s) then
return s
else do
waitFor f
connPersist :: Handle -> IO ()
connPersist h = do
hPutStr h " "
putStrLn "<space added>"
threadDelay 30000000
connPersist h