{-# OPTIONS_HADDOCK hide #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Network.Xmpp.Concurrent.Threads where
import Control.Applicative((<$>))
import Control.Concurrent
import Control.Concurrent.STM
import qualified Control.Exception.Lifted as Ex
import Control.Monad
import Control.Monad.Except
import Control.Monad.IO.Class
import qualified Data.ByteString as BS
import GHC.IO (unsafeUnmask)
import Network.Xmpp.Concurrent.Types
import Network.Xmpp.Stream
import Network.Xmpp.Types
import System.Log.Logger
readWorker :: (XmppElement -> IO ())
-> (XmppFailure -> IO ())
-> TMVar Stream
-> IO a
readWorker :: forall a.
(XmppElement -> IO ())
-> (XmppFailure -> IO ()) -> TMVar Stream -> IO a
readWorker XmppElement -> IO ()
onElement XmppFailure -> IO ()
onCClosed TMVar Stream
stateRef = forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. MonadBaseControl IO m => m a -> m a
Ex.mask_ forall a b. (a -> b) -> a -> b
$ do
Maybe Stream
s' <- forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> [Handler m a] -> m a
Ex.catches ( do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
s :: Stream
s@(Stream TMVar StreamState
con) <- forall a. TMVar a -> STM a
readTMVar TMVar Stream
stateRef
ConnectionState
scs <- StreamState -> ConnectionState
streamConnectionState forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TMVar a -> STM a
readTMVar TMVar StreamState
con
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConnectionState -> Bool
stateIsClosed ConnectionState
scs)
forall a. STM a
retry
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just Stream
s
)
[ forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Ex.Handler forall a b. (a -> b) -> a -> b
$ \(Interrupt TMVar ()
t) -> do
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ [TMVar ()] -> IO [()]
handleInterrupts [TMVar ()
t]
forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
]
case Maybe Stream
s' of
Maybe Stream
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just Stream
s -> do
Maybe XmppElement
res <- forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> [Handler m a] -> m a
Ex.catches (do
IO ()
allowInterrupt
Either XmppFailure XmppElement
res <- Stream -> IO (Either XmppFailure XmppElement)
pullXmppElement Stream
s
case Either XmppFailure XmppElement
res of
Left XmppFailure
e -> do
[Char] -> [Char] -> IO ()
errorM [Char]
"Pontarius.Xmpp" forall a b. (a -> b) -> a -> b
$ [Char]
"Read error: "
forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show XmppFailure
e
()
_ <- Stream -> IO ()
closeStreams Stream
s
XmppFailure -> IO ()
onCClosed XmppFailure
e
forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
Right XmppElement
r -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just XmppElement
r
)
[ forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Ex.Handler forall a b. (a -> b) -> a -> b
$ \(Interrupt TMVar ()
t) -> do
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ [TMVar ()] -> IO [()]
handleInterrupts [TMVar ()
t]
forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
]
case Maybe XmppElement
res of
Maybe XmppElement
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just XmppElement
sta -> forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ XmppElement -> IO ()
onElement XmppElement
sta
where
allowInterrupt :: IO ()
allowInterrupt :: IO ()
allowInterrupt = forall a. IO a -> IO a
unsafeUnmask forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return ()
handleInterrupts :: [TMVar ()] -> IO [()]
handleInterrupts :: [TMVar ()] -> IO [()]
handleInterrupts [TMVar ()]
ts =
forall (m :: * -> *) e a.
(MonadBaseControl IO m, Exception e) =>
m a -> (e -> m a) -> m a
Ex.catch (forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [TMVar ()]
ts forall a. TMVar a -> STM a
takeTMVar)
(\(Interrupt TMVar ()
t) -> [TMVar ()] -> IO [()]
handleInterrupts (TMVar ()
tforall a. a -> [a] -> [a]
:[TMVar ()]
ts))
stateIsClosed :: ConnectionState -> Bool
stateIsClosed ConnectionState
Closed = Bool
True
stateIsClosed ConnectionState
Finished = Bool
True
stateIsClosed ConnectionState
_ = Bool
False
startThreadsWith :: TMVar (BS.ByteString -> IO (Either XmppFailure ()))
-> (XmppElement -> IO ())
-> TMVar EventHandlers
-> Stream
-> Maybe Int
-> IO (Either XmppFailure (IO (),
TMVar Stream,
ThreadId))
startThreadsWith :: TMVar (ByteString -> IO (Either XmppFailure ()))
-> (XmppElement -> IO ())
-> TMVar EventHandlers
-> Stream
-> Maybe Int
-> IO (Either XmppFailure (IO (), TMVar Stream, ThreadId))
startThreadsWith TMVar (ByteString -> IO (Either XmppFailure ()))
writeSem XmppElement -> IO ()
stanzaHandler TMVar EventHandlers
eh Stream
con Maybe Int
keepAlive = do
TMVar Stream
conS <- forall a. a -> IO (TMVar a)
newTMVarIO Stream
con
ThreadId
cp <- IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ forall a. Maybe Int -> TMVar (ByteString -> IO a) -> IO ()
connPersist Maybe Int
keepAlive TMVar (ByteString -> IO (Either XmppFailure ()))
writeSem
let onConClosed :: XmppFailure -> IO ()
onConClosed XmppFailure
failure = do
IO ()
stopWrites
TMVar EventHandlers -> XmppFailure -> IO ()
noCon TMVar EventHandlers
eh XmppFailure
failure
ThreadId
rdw <- IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ forall a.
(XmppElement -> IO ())
-> (XmppFailure -> IO ()) -> TMVar Stream -> IO a
readWorker XmppElement -> IO ()
stanzaHandler XmppFailure -> IO ()
onConClosed TMVar Stream
conS
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. b -> Either a b
Right ( forall {m :: * -> *} {t :: * -> *}.
(MonadIO m, Traversable t) =>
t ThreadId -> m ()
killConnection [ThreadId
rdw, ThreadId
cp]
, TMVar Stream
conS
, ThreadId
rdw
)
where
stopWrites :: IO ()
stopWrites = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
ByteString -> IO (Either XmppFailure ())
_ <- forall a. TMVar a -> STM a
takeTMVar TMVar (ByteString -> IO (Either XmppFailure ()))
writeSem
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (ByteString -> IO (Either XmppFailure ()))
writeSem forall a b. (a -> b) -> a -> b
$ \ByteString
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left XmppFailure
XmppNoStream
killConnection :: t ThreadId -> m ()
killConnection t ThreadId
threads = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
[Char] -> [Char] -> IO ()
debugM [Char]
"Pontarius.Xmpp" [Char]
"killing connection"
IO ()
stopWrites
[Char] -> [Char] -> IO ()
debugM [Char]
"Pontarius.Xmpp" [Char]
"killing threads"
t ()
_ <- forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM t ThreadId
threads ThreadId -> IO ()
killThread
forall (m :: * -> *) a. Monad m => a -> m a
return ()
noCon :: TMVar EventHandlers -> XmppFailure -> IO ()
noCon :: TMVar EventHandlers -> XmppFailure -> IO ()
noCon TMVar EventHandlers
h XmppFailure
e = do
EventHandlers
hands <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> STM a
readTMVar TMVar EventHandlers
h
ThreadId
_ <- IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ EventHandlers -> XmppFailure -> IO ()
connectionClosedHandler EventHandlers
hands XmppFailure
e
forall (m :: * -> *) a. Monad m => a -> m a
return ()
connPersist :: Maybe Int -> TMVar (BS.ByteString -> IO a) -> IO ()
connPersist :: forall a. Maybe Int -> TMVar (ByteString -> IO a) -> IO ()
connPersist (Just Int
delay) TMVar (ByteString -> IO a)
sem = forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
ByteString -> IO a
pushBS <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> STM a
takeTMVar TMVar (ByteString -> IO a)
sem
a
_ <- ByteString -> IO a
pushBS ByteString
" "
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> a -> STM ()
putTMVar TMVar (ByteString -> IO a)
sem ByteString -> IO a
pushBS
Int -> IO ()
threadDelay (Int
delayforall a. Num a => a -> a -> a
*Int
1000000)
connPersist Maybe Int
Nothing TMVar (ByteString -> IO a)
_ = forall (m :: * -> *) a. Monad m => a -> m a
return ()