{-# LANGUAGE OverloadedStrings #-}
module Discord.Internal.Gateway.EventLoop where
import Prelude hiding (log)
import Control.Monad (forever)
import Control.Monad.Random (getRandomR)
import Control.Concurrent.Async (race)
import Control.Concurrent.Chan
import Control.Concurrent (threadDelay, killThread, forkIO)
import Control.Exception.Safe (try, finally, handle, SomeException)
import Data.Monoid ((<>))
import Data.IORef
import Data.Aeson (eitherDecode, encode)
import qualified Data.Text as T
import qualified Data.Text.Encoding as TE
import qualified Data.ByteString.Lazy as BL
import Wuss (runSecureClient)
import Network.WebSockets (ConnectionException(..), Connection,
receiveData, sendTextData)
import Discord.Internal.Types
data GatewayException = GatewayExceptionCouldNotConnect T.Text
| GatewayExceptionEventParseError T.Text T.Text
| GatewayExceptionUnexpected GatewayReceivable T.Text
| GatewayExceptionConnection ConnectionException T.Text
deriving (Show)
data ConnLoopState = ConnStart
| ConnClosed
| ConnReconnect Auth T.Text Integer
deriving Show
connect :: (Connection -> IO a) -> IO a
connect = runSecureClient "gateway.discord.gg" 443 "/?v=6&encoding=json"
type DiscordHandleGateway = (Chan (Either GatewayException Event), Chan GatewaySendable)
connectionLoop :: Auth -> DiscordHandleGateway -> Chan T.Text -> IO ()
connectionLoop auth (events, userSend) log = loop ConnStart 0
where
loop :: ConnLoopState -> Int -> IO ()
loop s retries =
case s of
(ConnClosed) -> pure ()
(ConnStart) -> do
next <- try $ connect $ \conn -> do
msg <- getPayload conn log
case msg of
Right (Hello interval) -> do
sendTextData conn (encode (Identify auth False 50 (0, 1)))
msg2 <- getPayload conn log
case msg2 of
Right (Dispatch r@(Ready _ _ _ _ seshID) _) -> do
writeChan events (Right r)
startEventStream (ConnData conn seshID auth events) interval 0 userSend log
Right m -> do writeChan events (Left (GatewayExceptionUnexpected m
"Response to Identify must be Ready"))
pure ConnClosed
Left ce -> do writeChan events (Left (GatewayExceptionConnection ce
"Response to Identify"))
pure ConnClosed
Right m -> do writeChan log ("gateway - first message must be hello: " <> T.pack (show msg))
writeChan events (Left (GatewayExceptionUnexpected m
"Response to connecting must be hello"))
pure ConnClosed
Left ce -> do writeChan events (Left (GatewayExceptionConnection ce
"Response to connecting"))
pure ConnClosed
case next :: Either SomeException ConnLoopState of
Left _ -> do writeChan events (Left (GatewayExceptionCouldNotConnect
"SomeException in gateway Connection"))
loop ConnClosed 0
Right n -> loop n 0
(ConnReconnect (Auth tok) seshID seqID) -> do
next <- try $ connect $ \conn -> do
sendTextData conn (encode (Resume tok seshID seqID))
eitherPayload <- getPayload conn log
case eitherPayload of
Right (Hello interval) ->
startEventStream (ConnData conn seshID auth events) interval seqID userSend log
Right (InvalidSession retry) -> do
t <- getRandomR (1,5)
threadDelay (t * 10^6)
pure $ if retry
then ConnReconnect (Auth tok) seshID seqID
else ConnStart
Right payload -> do
writeChan events (Left (GatewayExceptionUnexpected payload
"Response to Resume must be Hello/Invalid Session"))
pure ConnClosed
Left e -> do
writeChan events (Left (GatewayExceptionConnection e
"Could not ConnReconnect"))
pure ConnClosed
case next :: Either SomeException ConnLoopState of
Left _ -> do t <- getRandomR (3,20)
threadDelay (t * 10^6)
writeChan log ("gateway - trying to reconnect after " <> T.pack (show retries)
<> " failures")
loop (ConnReconnect (Auth tok) seshID seqID) (retries + 1)
Right n -> loop n 1
getPayloadTimeout :: Connection -> Int -> Chan T.Text -> IO (Either ConnectionException GatewayReceivable)
getPayloadTimeout conn interval log = do
res <- race (threadDelay ((interval * 1000 * 3) `div` 2))
(getPayload conn log)
case res of
Left () -> pure (Right Reconnect)
Right other -> pure other
getPayload :: Connection -> Chan T.Text -> IO (Either ConnectionException GatewayReceivable)
getPayload conn log = try $ do
msg' <- receiveData conn
case eitherDecode msg' of
Right msg -> pure msg
Left err -> do writeChan log ("gateway - received parse Error - " <> T.pack err
<> " while decoding " <> TE.decodeUtf8 (BL.toStrict msg'))
pure (ParseError (T.pack err))
heartbeat :: Chan GatewaySendable -> Int -> IORef Integer -> IO ()
heartbeat send interval seqKey = do
threadDelay (1 * 10^6)
forever $ do
num <- readIORef seqKey
writeChan send (Heartbeat num)
threadDelay (interval * 1000)
setSequence :: IORef Integer -> Integer -> IO ()
setSequence key i = writeIORef key i
data ConnectionData = ConnData { connection :: Connection
, connSessionID :: T.Text
, connAuth :: Auth
, connChan :: Chan (Either GatewayException Event)
}
startEventStream :: ConnectionData -> Int -> Integer -> Chan GatewaySendable -> Chan T.Text -> IO ConnLoopState
startEventStream conndata interval seqN userSend log = do
seqKey <- newIORef seqN
let err :: SomeException -> IO ConnLoopState
err e = do writeChan log ("gateway - eventStream error: " <> T.pack (show e))
ConnReconnect (connAuth conndata) (connSessionID conndata) <$> readIORef seqKey
handle err $ do
gateSends <- newChan
sendsId <- forkIO $ sendableLoop (connection conndata) (Sendables userSend gateSends)
heart <- forkIO $ heartbeat gateSends interval seqKey
finally (eventStream conndata seqKey interval gateSends log)
(killThread heart >> killThread sendsId)
eventStream :: ConnectionData -> IORef Integer -> Int -> Chan GatewaySendable
-> Chan T.Text -> IO ConnLoopState
eventStream (ConnData conn seshID auth eventChan) seqKey interval send log = loop
where
loop :: IO ConnLoopState
loop = do
eitherPayload <- getPayloadTimeout conn interval log
case eitherPayload :: Either ConnectionException GatewayReceivable of
Left (CloseRequest code str) -> case code of
1000 -> ConnReconnect auth seshID <$> readIORef seqKey
1001 -> ConnReconnect auth seshID <$> readIORef seqKey
4000 -> ConnReconnect auth seshID <$> readIORef seqKey
4006 -> pure ConnStart
4007 -> ConnReconnect auth seshID <$> readIORef seqKey
4014 -> ConnReconnect auth seshID <$> readIORef seqKey
_ -> do writeChan eventChan (Left (GatewayExceptionConnection (CloseRequest code str)
"Normal event loop close request"))
pure ConnClosed
Left _ -> ConnReconnect auth seshID <$> readIORef seqKey
Right (Dispatch event sq) -> do setSequence seqKey sq
writeChan eventChan (Right event)
loop
Right (HeartbeatRequest sq) -> do setSequence seqKey sq
writeChan send (Heartbeat sq)
loop
Right (Reconnect) -> ConnReconnect auth seshID <$> readIORef seqKey
Right (InvalidSession retry) -> if retry
then ConnReconnect auth seshID <$> readIORef seqKey
else pure ConnStart
Right (HeartbeatAck) -> loop
Right (Hello e) -> do writeChan eventChan (Left (GatewayExceptionUnexpected (Hello e)
"Normal event loop"))
pure ConnClosed
Right (ParseError e) -> do writeChan eventChan (Left (GatewayExceptionEventParseError e
"Normal event loop"))
pure ConnClosed
data Sendables = Sendables {
userSends :: Chan GatewaySendable
, gatewaySends :: Chan GatewaySendable
}
sendableLoop :: Connection -> Sendables -> IO ()
sendableLoop conn sends = forever $ do
threadDelay (round (10^6 * (62 / 120)))
let e :: Either GatewaySendable GatewaySendable -> GatewaySendable
e = either id id
payload <- e <$> race (readChan (userSends sends)) (readChan (gatewaySends sends))
sendTextData conn (encode payload)