{-# OPTIONS_HADDOCK hide #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Network.Xmpp.Concurrent.Threads where

import           Control.Applicative((<$>))
import           Control.Concurrent
import           Control.Concurrent.STM
import qualified Control.Exception.Lifted as Ex
import           Control.Monad
import           Control.Monad.Except
import           Control.Monad.IO.Class
import qualified Data.ByteString as BS
import           GHC.IO (unsafeUnmask)
import           Network.Xmpp.Concurrent.Types
import           Network.Xmpp.Stream
import           Network.Xmpp.Types
import           System.Log.Logger

-- Worker to read stanzas from the stream and concurrently distribute them to
-- all listener threads.
readWorker :: (XmppElement -> IO ())
           -> (XmppFailure -> IO ())
           -> TMVar Stream
           -> IO a
readWorker :: forall a.
(XmppElement -> IO ())
-> (XmppFailure -> IO ()) -> TMVar Stream -> IO a
readWorker XmppElement -> IO ()
onElement XmppFailure -> IO ()
onCClosed TMVar Stream
stateRef = forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. MonadBaseControl IO m => m a -> m a
Ex.mask_ forall a b. (a -> b) -> a -> b
$ do
    Maybe Stream
s' <- forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> [Handler m a] -> m a
Ex.catches ( do
                        forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
                            s :: Stream
s@(Stream TMVar StreamState
con) <- forall a. TMVar a -> STM a
readTMVar TMVar Stream
stateRef
                            ConnectionState
scs <- StreamState -> ConnectionState
streamConnectionState forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TMVar a -> STM a
readTMVar TMVar StreamState
con
                            forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConnectionState -> Bool
stateIsClosed ConnectionState
scs)
                                 forall a. STM a
retry
                            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just Stream
s
                   )
               [ forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Ex.Handler forall a b. (a -> b) -> a -> b
$ \(Interrupt TMVar ()
t) -> do
                     forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ [TMVar ()] -> IO [()]
handleInterrupts [TMVar ()
t]
                     forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing

               ]
    case Maybe Stream
s' of -- Maybe Stream
        Maybe Stream
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Just Stream
s -> do -- Stream
            Maybe XmppElement
res <- forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> [Handler m a] -> m a
Ex.catches (do
                   -- we don't know whether pull will
                   -- necessarily be interruptible
                             IO ()
allowInterrupt
                             Either XmppFailure XmppElement
res <- Stream -> IO (Either XmppFailure XmppElement)
pullXmppElement Stream
s
                             case Either XmppFailure XmppElement
res of
                                 Left XmppFailure
e -> do
                                     [Char] -> [Char] -> IO ()
errorM [Char]
"Pontarius.Xmpp" forall a b. (a -> b) -> a -> b
$ [Char]
"Read error: "
                                         forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show XmppFailure
e
                                     ()
_ <- Stream -> IO ()
closeStreams Stream
s
                                     XmppFailure -> IO ()
onCClosed XmppFailure
e
                                     forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
                                 Right XmppElement
r -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just XmppElement
r
                              )
                       [ forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Ex.Handler forall a b. (a -> b) -> a -> b
$ \(Interrupt TMVar ()
t) -> do
                              forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ [TMVar ()] -> IO [()]
handleInterrupts [TMVar ()
t]
                              forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
                       ]
            case Maybe XmppElement
res of
                Maybe XmppElement
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return () -- Caught an exception, nothing to
                                     -- do. TODO: Can this happen?
                Just XmppElement
sta -> forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ XmppElement -> IO ()
onElement XmppElement
sta
  where
    -- Defining an Control.Exception.allowInterrupt equivalent for GHC 7
    -- compatibility.
    allowInterrupt :: IO ()
    allowInterrupt :: IO ()
allowInterrupt = forall a. IO a -> IO a
unsafeUnmask forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return ()
    -- While waiting for the first semaphore(s) to flip we might receive another
    -- interrupt. When that happens we add it's semaphore to the list and retry
    -- waiting.
    handleInterrupts :: [TMVar ()] -> IO [()]
    handleInterrupts :: [TMVar ()] -> IO [()]
handleInterrupts [TMVar ()]
ts =
        forall (m :: * -> *) e a.
(MonadBaseControl IO m, Exception e) =>
m a -> (e -> m a) -> m a
Ex.catch (forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [TMVar ()]
ts forall a. TMVar a -> STM a
takeTMVar)
            (\(Interrupt TMVar ()
t) -> [TMVar ()] -> IO [()]
handleInterrupts (TMVar ()
tforall a. a -> [a] -> [a]
:[TMVar ()]
ts))
    stateIsClosed :: ConnectionState -> Bool
stateIsClosed ConnectionState
Closed   = Bool
True
    stateIsClosed ConnectionState
Finished = Bool
True
    stateIsClosed ConnectionState
_        = Bool
False

-- | Runs thread in XmppState monad. Returns channel of incoming and outgoing
-- stances, respectively, and an Action to stop the Threads and close the
-- connection.
startThreadsWith :: TMVar (BS.ByteString -> IO (Either XmppFailure ()))
                 -> (XmppElement -> IO ())
                 -> TMVar EventHandlers
                 -> Stream
                 -> Maybe Int
                 -> IO (Either XmppFailure (IO (),
                                            TMVar Stream,
                                            ThreadId))
startThreadsWith :: TMVar (ByteString -> IO (Either XmppFailure ()))
-> (XmppElement -> IO ())
-> TMVar EventHandlers
-> Stream
-> Maybe Int
-> IO (Either XmppFailure (IO (), TMVar Stream, ThreadId))
startThreadsWith TMVar (ByteString -> IO (Either XmppFailure ()))
writeSem XmppElement -> IO ()
stanzaHandler TMVar EventHandlers
eh Stream
con Maybe Int
keepAlive = do
    -- read' <- withStream' (gets $ streamSend . streamHandle) con
    -- writeSem <- newTMVarIO read'
    TMVar Stream
conS <- forall a. a -> IO (TMVar a)
newTMVarIO Stream
con
    ThreadId
cp <- IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ forall a. Maybe Int -> TMVar (ByteString -> IO a) -> IO ()
connPersist Maybe Int
keepAlive TMVar (ByteString -> IO (Either XmppFailure ()))
writeSem
    let onConClosed :: XmppFailure -> IO ()
onConClosed XmppFailure
failure = do
            IO ()
stopWrites
            TMVar EventHandlers -> XmppFailure -> IO ()
noCon TMVar EventHandlers
eh XmppFailure
failure
    ThreadId
rdw <- IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ forall a.
(XmppElement -> IO ())
-> (XmppFailure -> IO ()) -> TMVar Stream -> IO a
readWorker XmppElement -> IO ()
stanzaHandler XmppFailure -> IO ()
onConClosed TMVar Stream
conS
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. b -> Either a b
Right ( forall {m :: * -> *} {t :: * -> *}.
(MonadIO m, Traversable t) =>
t ThreadId -> m ()
killConnection [ThreadId
rdw, ThreadId
cp]
                   , TMVar Stream
conS
                   , ThreadId
rdw
                   )
  where
    stopWrites :: IO ()
stopWrites = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
        ByteString -> IO (Either XmppFailure ())
_ <- forall a. TMVar a -> STM a
takeTMVar TMVar (ByteString -> IO (Either XmppFailure ()))
writeSem
        forall a. TMVar a -> a -> STM ()
putTMVar TMVar (ByteString -> IO (Either XmppFailure ()))
writeSem forall a b. (a -> b) -> a -> b
$ \ByteString
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left XmppFailure
XmppNoStream
    killConnection :: t ThreadId -> m ()
killConnection t ThreadId
threads = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
        [Char] -> [Char] -> IO ()
debugM [Char]
"Pontarius.Xmpp" [Char]
"killing connection"
        IO ()
stopWrites
        [Char] -> [Char] -> IO ()
debugM [Char]
"Pontarius.Xmpp" [Char]
"killing threads"
        t ()
_ <- forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM t ThreadId
threads ThreadId -> IO ()
killThread
        forall (m :: * -> *) a. Monad m => a -> m a
return ()
    -- Call the connection closed handlers.
    noCon :: TMVar EventHandlers -> XmppFailure -> IO ()
    noCon :: TMVar EventHandlers -> XmppFailure -> IO ()
noCon TMVar EventHandlers
h XmppFailure
e = do
        EventHandlers
hands <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> STM a
readTMVar TMVar EventHandlers
h
        ThreadId
_ <- IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ EventHandlers -> XmppFailure -> IO ()
connectionClosedHandler EventHandlers
hands XmppFailure
e
        forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- Acquires the write lock, pushes a space, and releases the lock.
-- | Sends a blank space every <delay> seconds to keep the connection alive.
connPersist :: Maybe Int -> TMVar (BS.ByteString -> IO a) -> IO ()
connPersist :: forall a. Maybe Int -> TMVar (ByteString -> IO a) -> IO ()
connPersist (Just Int
delay) TMVar (ByteString -> IO a)
sem = forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
    ByteString -> IO a
pushBS <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> STM a
takeTMVar TMVar (ByteString -> IO a)
sem
    a
_ <- ByteString -> IO a
pushBS ByteString
" "
    forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> a -> STM ()
putTMVar TMVar (ByteString -> IO a)
sem ByteString -> IO a
pushBS
    Int -> IO ()
threadDelay (Int
delayforall a. Num a => a -> a -> a
*Int
1000000)
connPersist Maybe Int
Nothing TMVar (ByteString -> IO a)
_ = forall (m :: * -> *) a. Monad m => a -> m a
return ()