-----------------------------------------------------------------------------
-- |
-- Module      :  Network.XMPP.Concurrent
-- Copyright   :  (c) pierre, 2007
-- License     :  BSD-style (see the file libraries/base/LICENSE)
-- 
-- Maintainer  :  k.pierre.k@gmail.com
-- Stability   :  experimental
-- Portability :  portable
--
-- Concurrent actions over single IO channel
--
-----------------------------------------------------------------------------
module Network.XMPP.Concurrent
  ( Thread
  , XmppThreadT
  , runThreaded
  , readChanS
  , writeChanS
  , withNewThread
  , loop
  , waitFor
  ) where

import Network.XMPP.Stream
import Network.XMPP.Stanza
import Network.XMPP.Types
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.STM.TChan
import Control.Monad.Trans (liftIO)
import Control.Monad
import Control.Monad.State 
import Control.Monad.Reader 
import Data.Maybe
import Network.XMPP.XEP.Version 
import Network.XMPP.IM.Presence    
import Network.XMPP.Utils

import System.IO
    
data Thread = Thread { inCh :: TChan Stanza
                     , outCh :: TChan Stanza
                     }

type XmppThreadT a = ReaderT Thread IO a
       
-- Two streams: input and output. Threads read from input stream and write to output stream.
-- | Runs thread in XmppState monad
runThreaded :: XmppThreadT () -> XmppStateT ()
runThreaded a = do
  in' <- liftIO $ atomically $ newTChan
  out' <- liftIO $ atomically $ newTChan
  liftIO $ forkIO $ runReaderT a (Thread in' out')
  s <- get
  liftIO $ forkIO $ loopWrite s out'
  liftIO $ forkIO $ connPersist (handle s)
  loopRead in' 
    where 
      loopRead in' = loop $ 
          do              
            st <- parseM
            liftIO $ atomically $ writeTChan in' st
      loopWrite s out' =
          do
            withNewStream $ do
              put s
              loop $ do
                st <- liftIO $ atomically $ readTChan out'
                outStanza st
            return ()                   
      loop = sequence_ . repeat
       
readChanS :: XmppThreadT Stanza
readChanS = do
  c <- asks inCh 
  st <- liftIO $ atomically $ readTChan c
  return st  

writeChanS :: Stanza -> XmppThreadT ()
writeChanS a = do
  c <- asks outCh
  st <- liftIO $ atomically $ writeTChan c a 
  return ()

-- | Runs specified action in parallel
withNewThread :: XmppThreadT () -> XmppThreadT ThreadId
withNewThread a = do
  in' <- asks inCh
  out' <- asks outCh
  newin <- liftIO $ atomically $ dupTChan in'
  liftIO $ forkIO $ runReaderT a (Thread newin out')

-- | Turns action into infinite loop
loop :: XmppThreadT () -> XmppThreadT ()
loop a = do
  a
  loop a

waitFor :: (Stanza -> Bool) -> XmppThreadT Stanza
waitFor f = do
  s <- readChanS
  if (f s) then 
    return s
    else do
      waitFor f

connPersist :: Handle -> IO ()
connPersist h = do
  hPutStr h " "
  putStrLn "<space added>"
  threadDelay 30000000
  connPersist h