-- | Threads.hs -- A module which runs threads for downloading and parsing pages and -- then send messages via xmpp. module Threads ( runLoop, q ) where import Config import DB import DBState import HTTP import Parsers import Happstack.State import Control.Concurrent hiding (Chan) import Control.Concurrent.MVar import Control.Concurrent.QSem import System.IO.Unsafe import Network import Network.XMPP import Data.List import Text.Regex.Posix -- | Do some prepares and running loop. runLoop :: TCPConnection -> Config -> Chans -> IO () runLoop c cf chans = do mvars <- mapM (const newEmptyMVar) [1..cthreads cf] runLoop' c mvars chans q :: QSem q = unsafePerformIO $ do qq <- newQSem 0 signalQSem qq return qq runLoop' :: TCPConnection -> [MVar Bool] -> Chans -> IO () runLoop' c mvars chans = do putStrLn "---" -- get pages for downloading and parsing db <- query GetDB let urls = getList db -- run threads mapM_ runThread urls putStrLn "---" -- big timeout before running new cycle threadDelay 10000000 runLoop' c mvars chans where runThread url = do threadDelay 100000 -- small timeout for scatter threads spawn mmvar <- findEmptyMVar mvars case mmvar of Just mvar -> do putMVar mvar True forkIO $ doThreadUpdates url mvar _ -> runThread url findEmptyMVar [] = return Nothing findEmptyMVar (mvar:mvars) = do isEmpty <- isEmptyMVar mvar if isEmpty then return $ Just mvar else findEmptyMVar mvars doThreadUpdates url@(URL u) mvar = do -- try to find parser for url in DB case find (\(r,_) -> u =~ r) chans of Just (_, chan) -> do -- download page putStrLn $ "INFO: fetching "++u resp <- downloadPage chan url -- matching response case resp of Left SomeError -> putStrLn $ "WARNING: some error while downloading "++u Left PageDeleted -> do update $ DelThreadAll' url putStrLn $ "WARNING: 404 at "++u++", deleting from DB" Right body -> do putStrLn $ "INFO: got "++u++", parsing" db <- query GetDB let (lastPost, jids) = getThread url db result <- parsePage chan lastPost body case result of Just (lastPost', msg) | lastPost' > lastPost -> do update $ UpdateThread' url lastPost' putStrLn $ "INFO: "++u++" has new posts" sendToXMPP c url jids msg _ -> putStrLn $ "INFO: "++u++" hasn't new posts" Nothing -> putStrLn $ "ERROR: can't find chan for "++u -- release mvar a <- takeMVar mvar return () -- | Send info about update in thread via xmpp. sendToXMPP :: TCPConnection -> URL -> [JID] -> String -> IO () sendToXMPP c url@(URL u) jids msg = do -- block waitQSem q -- send msgs db <- query GetDB runXMPP c $ mapM_ (processJID db) jids -- unblock signalQSem q where -- send message with description and wait a little processJID db jid@(JID j) = do let descr = getDescription url jid db sendMessage j ("*"++descr++"* \n"++u++"\n"++msg) liftIO $ threadDelay 200000