module Network.WebSockets.Monad
( WebSocketsOptions (..)
, defaultWebSocketsOptions
, WebSockets (..)
, runWebSockets
, runWebSocketsWith
, runWebSocketsHandshake
, runWebSocketsWithHandshake
, runWebSocketsWith'
, receive
, sendBuilder
, send
, Sink
, sendSink
, getSink
, getOptions
, getProtocol
, getVersion
, throwWsError
, catchWsError
, spawnPingThread
) where
import Control.Applicative (Applicative, (<$>))
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.MVar (MVar, modifyMVar_, newMVar)
import Control.Exception (Exception (..), SomeException, throw)
import Control.Monad (forever)
import Control.Monad.Reader (ReaderT, ask, runReaderT)
import Control.Monad.Trans (MonadIO, lift, liftIO)
import Data.Foldable (forM_)
import Blaze.ByteString.Builder (Builder)
import Data.ByteString (ByteString)
import Data.Enumerator (Enumerator, Iteratee, ($$), (>>==), (=$))
import qualified Blaze.ByteString.Builder as BB
import qualified Data.Attoparsec as A
import qualified Data.ByteString.Lazy as BL
import qualified Data.Attoparsec.Enumerator as AE
import qualified Data.Enumerator as E
import qualified Data.Enumerator.List as EL
import Network.WebSockets.Handshake
import Network.WebSockets.Handshake.Http
import Network.WebSockets.Protocol
import Network.WebSockets.Types
data WebSocketsOptions = WebSocketsOptions
{ onPong :: IO ()
}
defaultWebSocketsOptions :: WebSocketsOptions
defaultWebSocketsOptions = WebSocketsOptions
{ onPong = return ()
}
data WebSocketsEnv p = WebSocketsEnv
{ envOptions :: WebSocketsOptions
, envSendBuilder :: Builder -> IO ()
, envSink :: Sink p
, envProtocol :: p
}
newtype Sink p = Sink
{ unSink :: MVar (E.Iteratee (Message p) IO ())
}
newtype WebSockets p a = WebSockets
{ unWebSockets :: ReaderT (WebSocketsEnv p) (Iteratee (Message p) IO) a
} deriving (Applicative, Functor, Monad, MonadIO)
runWebSocketsHandshake :: Protocol p
=> Bool
-> (Request -> WebSockets p a)
-> Iteratee ByteString IO ()
-> Iteratee ByteString IO a
runWebSocketsHandshake = runWebSocketsWithHandshake defaultWebSocketsOptions
runWebSocketsWithHandshake :: Protocol p
=> WebSocketsOptions
-> Bool
-> (Request -> WebSockets p a)
-> Iteratee ByteString IO ()
-> Iteratee ByteString IO a
runWebSocketsWithHandshake opts isSecure goWs outIter = do
httpReq <- receiveIteratee $ decodeRequest isSecure
runWebSocketsWith opts httpReq goWs outIter
runWebSockets :: Protocol p
=> RequestHttpPart
-> (Request -> WebSockets p a)
-> Iteratee ByteString IO ()
-> Iteratee ByteString IO a
runWebSockets = runWebSocketsWith defaultWebSocketsOptions
runWebSocketsWith :: forall p a. Protocol p
=> WebSocketsOptions
-> RequestHttpPart
-> (Request -> WebSockets p a)
-> Iteratee ByteString IO ()
-> Iteratee ByteString IO a
runWebSocketsWith opts httpReq goWs outIter = E.catchError ok $ \e -> do
forM_ (fromException e) $ \he ->
let builder = encodeResponse $ responseError (undefined :: p) he
in liftIO $ makeBuilderSender outIter builder
E.throwError e
where
ok = do
(rq, p) <- handshake httpReq
runWebSocketsWith' opts p (goWs rq) outIter
runWebSocketsWith' :: Protocol p
=> WebSocketsOptions
-> p
-> WebSockets p a
-> Iteratee ByteString IO ()
-> Iteratee ByteString IO a
runWebSocketsWith' opts proto ws outIter = do
let sinkIter = encodeMessages proto =$ builderToByteString =$ outIter
sink <- Sink <$> liftIO (newMVar sinkIter)
let sender = makeBuilderSender outIter
env = WebSocketsEnv opts sender sink proto
iter = runReaderT (unWebSockets ws) env
decodeMessages proto =$ iter
makeBuilderSender :: MonadIO m => Iteratee ByteString m b -> Builder -> m ()
makeBuilderSender outIter x = do
ok <- E.run $ singleton x $$ builderToByteString $$ outIter
case ok of
Left err -> throw err
Right _ -> return ()
spawnPingThread :: BinaryProtocol p => Int -> WebSockets p ()
spawnPingThread i = do
sink <- getSink
_ <- liftIO $ forkIO $ forever $ do
threadDelay (i * 1000 * 1000)
sendSink sink $ ping ("Hi" :: ByteString)
return ()
receiveIteratee :: A.Parser a -> Iteratee ByteString IO a
receiveIteratee parser = do
eof <- E.isEOF
if eof
then E.throwError ConnectionClosed
else wrappingParseError . AE.iterParser $ parser
wrappingParseError :: (Monad m) => Iteratee a m b -> Iteratee a m b
wrappingParseError = flip E.catchError $ \e -> E.throwError $
maybe e (toException . ParseError) $ fromException e
receive :: Protocol p => WebSockets p (Message p)
receive = liftIteratee $ do
mmsg <- EL.head
case mmsg of
Nothing -> E.throwError ConnectionClosed
Just msg -> return msg
sendBuilder :: Builder -> WebSockets p ()
sendBuilder builder = WebSockets $ do
sb <- envSendBuilder <$> ask
liftIO $ sb builder
send :: Protocol p => Message p -> WebSockets p ()
send msg = getSink >>= \sink -> liftIO $ sendSink sink msg
sendSink :: Sink p -> Message p -> IO ()
sendSink sink msg = modifyMVar_ (unSink sink) $ \iter -> do
step <- E.runIteratee $ singleton msg $$ iter
case step of
E.Error err -> throw err
_ -> return $! E.returnI step
getSink :: Protocol p => WebSockets p (Sink p)
getSink = WebSockets $ envSink <$> ask
singleton :: Monad m => a -> Enumerator a m b
singleton c = E.checkContinue0 $ \_ f -> f (E.Chunks [c]) >>== E.returnI
builderToByteString :: Monad m => E.Enumeratee Builder ByteString m a
builderToByteString = EL.concatMap $ BL.toChunks . BB.toLazyByteString
getOptions :: WebSockets p WebSocketsOptions
getOptions = WebSockets $ ask >>= return . envOptions
getProtocol :: WebSockets p p
getProtocol = WebSockets $ envProtocol <$> ask
getVersion :: Protocol p => WebSockets p String
getVersion = version <$> getProtocol
throwWsError :: (Exception e) => e -> WebSockets p a
throwWsError = liftIteratee . E.throwError
catchWsError :: WebSockets p a
-> (SomeException -> WebSockets p a)
-> WebSockets p a
catchWsError act c = WebSockets $ do
env <- ask
let it = peelWebSockets env $ act
cit = peelWebSockets env . c
lift $ it `E.catchError` cit
where
peelWebSockets env = flip runReaderT env . unWebSockets
liftIteratee :: Iteratee (Message p) IO a -> WebSockets p a
liftIteratee = WebSockets . lift