{-# OPTIONS_HADDOCK hide #-}
{-# LANGUAGE OverloadedStrings #-}
module Network.Xmpp.Concurrent.Monad where

import           Control.Applicative ((<$>))
import           Control.Concurrent
import           Control.Concurrent.STM
import qualified Control.Exception.Lifted as Ex
import           Control.Monad.State
import           Network.Xmpp.Concurrent.Types
import           Network.Xmpp.Stream
import           Network.Xmpp.Types

-- TODO: Wait for presence error?

-- | Run an XmppConMonad action in isolation. Reader and writer workers will be
-- temporarily stopped and resumed with the new session details once the action
-- returns. The action will run in the calling thread. Any uncaught exceptions
-- will be interpreted as connection failure.
-- withConnection :: XmppConMonad a -> Context -> IO (Either StreamError a)
withConnection :: (Stream -> IO (b, Stream))
               -> Session
               -> IO (Either XmppFailure b)
withConnection :: (Stream -> IO (b, Stream)) -> Session -> IO (Either XmppFailure b)
withConnection Stream -> IO (b, Stream)
a Session
session =  do
    TMVar ()
wait <- IO (TMVar ())
forall a. IO (TMVar a)
newEmptyTMVarIO
    IO (Either XmppFailure b) -> IO (Either XmppFailure b)
forall (m :: * -> *) a. MonadBaseControl IO m => m a -> m a
Ex.mask_ (IO (Either XmppFailure b) -> IO (Either XmppFailure b))
-> IO (Either XmppFailure b) -> IO (Either XmppFailure b)
forall a b. (a -> b) -> a -> b
$ do
        -- Suspends the reader until the lock (wait) is released (set to `()').
        ThreadId -> Interrupt -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo (Session -> ThreadId
readerThread Session
session) (Interrupt -> IO ()) -> Interrupt -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar () -> Interrupt
Interrupt TMVar ()
wait
        -- We acquire the write and stateRef locks, to make sure that this is
        -- the only thread that can write to the stream and to perform a
        -- withConnection calculation. Afterwards, we release the lock and
        -- fetch an updated state.
        Stream
s <- IO Stream -> (SomeException -> IO Stream) -> IO Stream
forall (m :: * -> *) e a.
(MonadBaseControl IO m, Exception e) =>
m a -> (e -> m a) -> m a
Ex.catch
            (STM Stream -> IO Stream
forall a. STM a -> IO a
atomically (STM Stream -> IO Stream) -> STM Stream -> IO Stream
forall a b. (a -> b) -> a -> b
$ do
                 ByteString -> IO (Either XmppFailure ())
_ <- TMVar (ByteString -> IO (Either XmppFailure ()))
-> STM (ByteString -> IO (Either XmppFailure ()))
forall a. TMVar a -> STM a
takeTMVar (Session -> TMVar (ByteString -> IO (Either XmppFailure ()))
writeSemaphore Session
session)
                 Stream
s <- TMVar Stream -> STM Stream
forall a. TMVar a -> STM a
takeTMVar (Session -> TMVar Stream
streamRef Session
session)
                 TMVar () -> () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar ()
wait ()
                 Stream -> STM Stream
forall (m :: * -> *) a. Monad m => a -> m a
return Stream
s
            )
            -- If we catch an exception, we have failed to take the MVars above.
            (\SomeException
e -> STM () -> IO ()
forall a. STM a -> IO a
atomically (TMVar () -> () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar ()
wait ()) IO () -> IO Stream -> IO Stream
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>>
                 SomeException -> IO Stream
forall (m :: * -> *) e a. (MonadBase IO m, Exception e) => e -> m a
Ex.throwIO (SomeException
e :: Ex.SomeException)
            )
        -- Run the XmppMonad action, save the (possibly updated) states, release
        -- the locks, and return the result.
        IO (Either XmppFailure b)
-> [Handler IO (Either XmppFailure b)] -> IO (Either XmppFailure b)
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> [Handler m a] -> m a
Ex.catches
            (do
                 (b
res, Stream
s') <- Stream -> IO (b, Stream)
a Stream
s
                 ByteString -> IO (Either XmppFailure ())
wl <- StateT StreamState IO (ByteString -> IO (Either XmppFailure ()))
-> Stream -> IO (ByteString -> IO (Either XmppFailure ()))
forall a. StateT StreamState IO a -> Stream -> IO a
withStream' ((StreamState -> ByteString -> IO (Either XmppFailure ()))
-> StateT StreamState IO (ByteString -> IO (Either XmppFailure ()))
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets ((StreamState -> ByteString -> IO (Either XmppFailure ()))
 -> StateT
      StreamState IO (ByteString -> IO (Either XmppFailure ())))
-> (StreamState -> ByteString -> IO (Either XmppFailure ()))
-> StateT StreamState IO (ByteString -> IO (Either XmppFailure ()))
forall a b. (a -> b) -> a -> b
$ StreamHandle -> ByteString -> IO (Either XmppFailure ())
streamSend (StreamHandle -> ByteString -> IO (Either XmppFailure ()))
-> (StreamState -> StreamHandle)
-> StreamState
-> ByteString
-> IO (Either XmppFailure ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamState -> StreamHandle
streamHandle) Stream
s'
                 STM (Either XmppFailure b) -> IO (Either XmppFailure b)
forall a. STM a -> IO a
atomically (STM (Either XmppFailure b) -> IO (Either XmppFailure b))
-> STM (Either XmppFailure b) -> IO (Either XmppFailure b)
forall a b. (a -> b) -> a -> b
$ do
                     TMVar (ByteString -> IO (Either XmppFailure ()))
-> (ByteString -> IO (Either XmppFailure ())) -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar (Session -> TMVar (ByteString -> IO (Either XmppFailure ()))
writeSemaphore Session
session) ByteString -> IO (Either XmppFailure ())
wl
                     TMVar Stream -> Stream -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar (Session -> TMVar Stream
streamRef Session
session) Stream
s'
                     Either XmppFailure b -> STM (Either XmppFailure b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either XmppFailure b -> STM (Either XmppFailure b))
-> Either XmppFailure b -> STM (Either XmppFailure b)
forall a b. (a -> b) -> a -> b
$ b -> Either XmppFailure b
forall a b. b -> Either a b
Right b
res
            ) -- TODO: DO we have to replace the MVars in case of ane exception?
            -- We treat all Exceptions as fatal. If we catch a StreamError, we
            -- return it. Otherwise, we throw an exception.
            [ (XmppFailure -> IO (Either XmppFailure b))
-> Handler IO (Either XmppFailure b)
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Ex.Handler ((XmppFailure -> IO (Either XmppFailure b))
 -> Handler IO (Either XmppFailure b))
-> (XmppFailure -> IO (Either XmppFailure b))
-> Handler IO (Either XmppFailure b)
forall a b. (a -> b) -> a -> b
$ \XmppFailure
e -> Either XmppFailure b -> IO (Either XmppFailure b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either XmppFailure b -> IO (Either XmppFailure b))
-> Either XmppFailure b -> IO (Either XmppFailure b)
forall a b. (a -> b) -> a -> b
$ XmppFailure -> Either XmppFailure b
forall a b. a -> Either a b
Left (XmppFailure
e :: XmppFailure)
            , (SomeException -> IO (Either XmppFailure b))
-> Handler IO (Either XmppFailure b)
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Ex.Handler ((SomeException -> IO (Either XmppFailure b))
 -> Handler IO (Either XmppFailure b))
-> (SomeException -> IO (Either XmppFailure b))
-> Handler IO (Either XmppFailure b)
forall a b. (a -> b) -> a -> b
$ \SomeException
e -> Stream -> IO (Either XmppFailure ())
killStream Stream
s
                  IO (Either XmppFailure ())
-> IO (Either XmppFailure b) -> IO (Either XmppFailure b)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> SomeException -> IO (Either XmppFailure b)
forall (m :: * -> *) e a. (MonadBase IO m, Exception e) => e -> m a
Ex.throwIO (SomeException
e :: Ex.SomeException)
            ]

-- | Executes a function to update the event handlers.
modifyHandlers :: (EventHandlers -> EventHandlers) -> Session -> IO ()
modifyHandlers :: (EventHandlers -> EventHandlers) -> Session -> IO ()
modifyHandlers EventHandlers -> EventHandlers
f Session
session = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar EventHandlers -> (EventHandlers -> EventHandlers) -> STM ()
forall a. TMVar a -> (a -> a) -> STM ()
modifyTMVar_ (Session -> TMVar EventHandlers
eventHandlers Session
session) EventHandlers -> EventHandlers
f
  where
    -- Borrowing modifyTVar from
    -- http://hackage.haskell.org/packages/archive/stm/2.4/doc/html/src/Control-Concurrent-STM-TVar.html
    -- as it's not available in GHC 7.0.
    modifyTMVar_ :: TMVar a -> (a -> a) -> STM ()
    modifyTMVar_ :: TMVar a -> (a -> a) -> STM ()
modifyTMVar_ TMVar a
var a -> a
g = do
      a
x <- TMVar a -> STM a
forall a. TMVar a -> STM a
takeTMVar TMVar a
var
      TMVar a -> a -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar a
var (a -> a
g a
x)

-- | Changes the handler to be executed when the server connection is closed. To
-- avoid race conditions the initial value should be set in the configuration
-- when creating the session
setConnectionClosedHandler :: (XmppFailure -> Session -> IO ()) -> Session -> IO ()
setConnectionClosedHandler :: (XmppFailure -> Session -> IO ()) -> Session -> IO ()
setConnectionClosedHandler XmppFailure -> Session -> IO ()
eh Session
session = do
    (EventHandlers -> EventHandlers) -> Session -> IO ()
modifyHandlers (\EventHandlers
s -> EventHandlers
s{connectionClosedHandler :: XmppFailure -> IO ()
connectionClosedHandler =
                                 \XmppFailure
e -> XmppFailure -> Session -> IO ()
eh XmppFailure
e Session
session}) Session
session

runConnectionClosedHandler :: Session -> XmppFailure -> IO ()
runConnectionClosedHandler :: Session -> XmppFailure -> IO ()
runConnectionClosedHandler Session
session XmppFailure
e = do
    XmppFailure -> IO ()
h <- EventHandlers -> XmppFailure -> IO ()
connectionClosedHandler (EventHandlers -> XmppFailure -> IO ())
-> IO EventHandlers -> IO (XmppFailure -> IO ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM EventHandlers -> IO EventHandlers
forall a. STM a -> IO a
atomically (TMVar EventHandlers -> STM EventHandlers
forall a. TMVar a -> STM a
readTMVar
                                                  (TMVar EventHandlers -> STM EventHandlers)
-> TMVar EventHandlers -> STM EventHandlers
forall a b. (a -> b) -> a -> b
$ Session -> TMVar EventHandlers
eventHandlers Session
session)
    XmppFailure -> IO ()
h XmppFailure
e

-- | Run an event handler.
runHandler :: (EventHandlers -> IO a) -> Session -> IO a
runHandler :: (EventHandlers -> IO a) -> Session -> IO a
runHandler EventHandlers -> IO a
h Session
session = EventHandlers -> IO a
h (EventHandlers -> IO a) -> IO EventHandlers -> IO a
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< STM EventHandlers -> IO EventHandlers
forall a. STM a -> IO a
atomically (TMVar EventHandlers -> STM EventHandlers
forall a. TMVar a -> STM a
readTMVar (TMVar EventHandlers -> STM EventHandlers)
-> TMVar EventHandlers -> STM EventHandlers
forall a b. (a -> b) -> a -> b
$ Session -> TMVar EventHandlers
eventHandlers Session
session)


-- | End the current XMPP session. Kills the associated threads and closes the
-- connection.
--
-- Note that XMPP clients (that have signalled availability) should send
-- \"Unavailable\" presence prior to disconnecting.
--
-- The connectionClosedHandler will not be called (to avoid possibly
-- reestablishing the connection).
endSession :: Session -> IO ()
endSession :: Session -> IO ()
endSession Session
session =  do -- TODO: This has to be idempotent (is it?)
    Session -> IO ()
stopThreads Session
session
    Either XmppFailure ()
_ <- ((Stream -> IO ((), Stream))
 -> Session -> IO (Either XmppFailure ()))
-> Session
-> (Stream -> IO ((), Stream))
-> IO (Either XmppFailure ())
forall a b c. (a -> b -> c) -> b -> a -> c
flip (Stream -> IO ((), Stream))
-> Session -> IO (Either XmppFailure ())
forall b.
(Stream -> IO (b, Stream)) -> Session -> IO (Either XmppFailure b)
withConnection Session
session ((Stream -> IO ((), Stream)) -> IO (Either XmppFailure ()))
-> (Stream -> IO ((), Stream)) -> IO (Either XmppFailure ())
forall a b. (a -> b) -> a -> b
$ \Stream
stream -> do
        ()
_ <- Stream -> IO ()
closeStreams Stream
stream
        ((), Stream) -> IO ((), Stream)
forall (m :: * -> *) a. Monad m => a -> m a
return ((), Stream
stream)
    () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()


-- | Close the connection to the server. Closes the stream (by enforcing a
-- write lock and sending a \</stream:stream\> element), waits (blocks) for
-- three seconds, and then closes the connection.
closeConnection :: Session -> IO ()
closeConnection :: Session -> IO ()
closeConnection Session
session = do
    Either XmppFailure ()
_ <-((Stream -> IO ((), Stream))
 -> Session -> IO (Either XmppFailure ()))
-> Session
-> (Stream -> IO ((), Stream))
-> IO (Either XmppFailure ())
forall a b c. (a -> b -> c) -> b -> a -> c
flip (Stream -> IO ((), Stream))
-> Session -> IO (Either XmppFailure ())
forall b.
(Stream -> IO (b, Stream)) -> Session -> IO (Either XmppFailure b)
withConnection Session
session ((Stream -> IO ((), Stream)) -> IO (Either XmppFailure ()))
-> (Stream -> IO ((), Stream)) -> IO (Either XmppFailure ())
forall a b. (a -> b) -> a -> b
$ \Stream
stream -> do
        ()
_ <- Stream -> IO ()
closeStreams Stream
stream
        ((), Stream) -> IO ((), Stream)
forall (m :: * -> *) a. Monad m => a -> m a
return ((), Stream
stream)
    Session -> XmppFailure -> IO ()
runConnectionClosedHandler Session
session XmppFailure
StreamEndFailure