{-# LANGUAGE RecursiveDo #-}
{-# LANGUAGE TemplateHaskell #-}

-- | The shard logic
module Calamity.Gateway.Shard (
  Shard (..),
  newShard,
) where

import Calamity.Gateway.DispatchEvents
import Calamity.Gateway.Intents
import Calamity.Gateway.Types
import Calamity.Internal.RunIntoIO
import Calamity.Internal.Utils
import Calamity.Metrics.Eff
import Calamity.Types.LogEff
import Calamity.Types.Token
import Control.Concurrent
import Control.Concurrent.Async
import qualified Control.Concurrent.Chan.Unagi as UC
import Control.Concurrent.STM
import Control.Concurrent.STM.TBMQueue
import Control.Exception
import qualified Control.Exception.Safe as Ex
import Control.Lens
import Control.Monad
import Control.Monad.State.Lazy
import qualified Data.Aeson as A
import qualified Data.ByteString.Lazy as LBS
import Data.Default.Class (def)
import Data.Functor
import Data.IORef
import Data.Maybe
import qualified Data.Text as T
import DiPolysemy hiding (debug, error, info)
import qualified Network.Connection as NC
import qualified Network.TLS as NT
import qualified Network.TLS.Extra as NT
import Network.WebSockets (
  Connection,
  ConnectionException (..),
  receiveData,
  sendCloseCode,
  sendTextData,
 )
import qualified Network.WebSockets as NW
import qualified Network.WebSockets.Stream as NW
import Polysemy (Sem)
import qualified Polysemy as P
import qualified Polysemy.Async as P
import qualified Polysemy.AtomicState as P
import qualified Polysemy.Error as P
import qualified Polysemy.Resource as P
import PyF
import qualified System.X509 as X509
import TextShow (showt)
import Prelude hiding (error)

runWebsocket ::
  P.Members '[LogEff, P.Final IO, P.Embed IO] r =>
  T.Text ->
  T.Text ->
  (Connection -> P.Sem r a) ->
  P.Sem r (Maybe a)
runWebsocket :: Text -> Text -> (Connection -> Sem r a) -> Sem r (Maybe a)
runWebsocket Text
host Text
path Connection -> Sem r a
ma = do
  Connection -> IO (Maybe a)
inner <- (Connection -> Sem r a) -> Sem r (Connection -> IO (Maybe a))
forall (r :: EffectRow) p a.
Member (Final IO) r =>
(p -> Sem r a) -> Sem r (p -> IO (Maybe a))
bindSemToIO Connection -> Sem r a
ma

  -- We have to do this all ourself I think?
  -- TODO: see if this isn't needed
  let logExc :: t -> Sem r ()
logExc t
e = Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug [fmt|runWebsocket raised with {e:s}|]
  SomeException -> IO (Maybe ())
logExc' <- (SomeException -> Sem r ())
-> Sem r (SomeException -> IO (Maybe ()))
forall (r :: EffectRow) p a.
Member (Final IO) r =>
(p -> Sem r a) -> Sem r (p -> IO (Maybe a))
bindSemToIO SomeException -> Sem r ()
forall (r :: EffectRow) t.
(Member LogEff r, PyFToString t) =>
t -> Sem r ()
logExc
  let handler :: SomeException -> IO (Maybe a)
handler SomeException
e = do
        IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ SomeException -> IO (Maybe ())
logExc' SomeException
e
        Maybe a -> IO (Maybe a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
forall a. Maybe a
Nothing

  IO (Maybe a) -> Sem r (Maybe a)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO (Maybe a) -> Sem r (Maybe a))
-> (IO (Maybe a) -> IO (Maybe a))
-> IO (Maybe a)
-> Sem r (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (SomeException -> IO (Maybe a)) -> IO (Maybe a) -> IO (Maybe a)
forall (m :: * -> *) a.
MonadCatch m =>
(SomeException -> m a) -> m a -> m a
Ex.handleAny SomeException -> IO (Maybe a)
handler (IO (Maybe a) -> Sem r (Maybe a))
-> IO (Maybe a) -> Sem r (Maybe a)
forall a b. (a -> b) -> a -> b
$ do
    ConnectionContext
ctx <- IO ConnectionContext
NC.initConnectionContext
    CertificateStore
certStore <- IO CertificateStore
X509.getSystemCertificateStore
    let clientParams :: ClientParams
clientParams =
          (String -> ByteString -> ClientParams
NT.defaultParamsClient (Text -> String
T.unpack Text
host) ByteString
"443")
            { clientSupported :: Supported
NT.clientSupported = Supported
forall a. Default a => a
def{supportedCiphers :: [Cipher]
NT.supportedCiphers = [Cipher]
NT.ciphersuite_default}
            , clientShared :: Shared
NT.clientShared =
                Shared
forall a. Default a => a
def
                  { sharedCAStore :: CertificateStore
NT.sharedCAStore = CertificateStore
certStore
                  }
            }
    let tlsSettings :: TLSSettings
tlsSettings = ClientParams -> TLSSettings
NC.TLSSettings ClientParams
clientParams
        connParams :: ConnectionParams
connParams = String
-> PortNumber
-> Maybe TLSSettings
-> Maybe ProxySettings
-> ConnectionParams
NC.ConnectionParams (Text -> String
T.unpack Text
host) PortNumber
443 (TLSSettings -> Maybe TLSSettings
forall a. a -> Maybe a
Just TLSSettings
tlsSettings) Maybe ProxySettings
forall a. Maybe a
Nothing

    IO Connection
-> (Connection -> IO ())
-> (Connection -> IO (Maybe a))
-> IO (Maybe a)
forall (m :: * -> *) a b c.
MonadMask m =>
m a -> (a -> m b) -> (a -> m c) -> m c
Ex.bracket
      (ConnectionContext -> ConnectionParams -> IO Connection
NC.connectTo ConnectionContext
ctx ConnectionParams
connParams)
      Connection -> IO ()
NC.connectionClose
      ( \Connection
conn -> do
          Stream
stream <-
            IO (Maybe ByteString) -> (Maybe ByteString -> IO ()) -> IO Stream
NW.makeStream
              (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (ByteString -> Maybe ByteString)
-> IO ByteString -> IO (Maybe ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO ByteString
NC.connectionGetChunk Connection
conn)
              (IO () -> (ByteString -> IO ()) -> Maybe ByteString -> IO ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (Connection -> ByteString -> IO ()
NC.connectionPut Connection
conn (ByteString -> IO ())
-> (ByteString -> ByteString) -> ByteString -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
LBS.toStrict))
          Stream
-> String
-> String
-> ConnectionOptions
-> Headers
-> (Connection -> IO (Maybe a))
-> IO (Maybe a)
forall a.
Stream
-> String
-> String
-> ConnectionOptions
-> Headers
-> ClientApp a
-> IO a
NW.runClientWithStream Stream
stream (Text -> String
T.unpack Text
host) (Text -> String
T.unpack Text
path) ConnectionOptions
NW.defaultConnectionOptions [] Connection -> IO (Maybe a)
inner
      )

newShardState :: Shard -> ShardState
newShardState :: Shard -> ShardState
newShardState Shard
shard = Shard
-> Maybe Int
-> Maybe (Async (Maybe ()))
-> Bool
-> Maybe Text
-> Maybe Text
-> Maybe Connection
-> ShardState
ShardState Shard
shard Maybe Int
forall a. Maybe a
Nothing Maybe (Async (Maybe ()))
forall a. Maybe a
Nothing Bool
False Maybe Text
forall a. Maybe a
Nothing Maybe Text
forall a. Maybe a
Nothing Maybe Connection
forall a. Maybe a
Nothing

-- | Creates and launches a shard
newShard ::
  P.Members '[LogEff, MetricEff, P.Embed IO, P.Final IO, P.Async] r =>
  T.Text ->
  Int ->
  Int ->
  Token ->
  Maybe StatusUpdateData ->
  Intents ->
  UC.InChan CalamityEvent ->
  Sem r (UC.InChan ControlMessage, Async (Maybe ()))
newShard :: Text
-> Int
-> Int
-> Token
-> Maybe StatusUpdateData
-> Intents
-> InChan CalamityEvent
-> Sem r (InChan ControlMessage, Async (Maybe ()))
newShard Text
gateway Int
id Int
count Token
token Maybe StatusUpdateData
presence Intents
intents InChan CalamityEvent
evtIn = do
  (InChan ControlMessage
cmdIn, OutChan ControlMessage
cmdOut) <- IO (InChan ControlMessage, OutChan ControlMessage)
-> Sem r (InChan ControlMessage, OutChan ControlMessage)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed IO (InChan ControlMessage, OutChan ControlMessage)
forall a. IO (InChan a, OutChan a)
UC.newChan
  let shard :: Shard
shard = Int
-> Int
-> Text
-> InChan CalamityEvent
-> OutChan ControlMessage
-> Text
-> Maybe StatusUpdateData
-> Intents
-> Shard
Shard Int
id Int
count Text
gateway InChan CalamityEvent
evtIn OutChan ControlMessage
cmdOut (Token -> Text
rawToken Token
token) Maybe StatusUpdateData
presence Intents
intents
  IORef ShardState
stateVar <- IO (IORef ShardState) -> Sem r (IORef ShardState)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO (IORef ShardState) -> Sem r (IORef ShardState))
-> (ShardState -> IO (IORef ShardState))
-> ShardState
-> Sem r (IORef ShardState)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ShardState -> IO (IORef ShardState)
forall a. a -> IO (IORef a)
newIORef (ShardState -> Sem r (IORef ShardState))
-> ShardState -> Sem r (IORef ShardState)
forall a b. (a -> b) -> a -> b
$ Shard -> ShardState
newShardState Shard
shard

  let runShard :: Sem r ()
runShard = IORef ShardState -> Sem (AtomicState ShardState : r) () -> Sem r ()
forall s (r :: EffectRow) a.
Member (Embed IO) r =>
IORef s -> Sem (AtomicState s : r) a -> Sem r a
P.runAtomicStateIORef IORef ShardState
stateVar Sem (AtomicState ShardState : r) ()
forall (r :: EffectRow). ShardC r => Sem r ()
shardLoop
  let action :: Sem r ()
action = Segment -> Sem r () -> Sem r ()
forall level msg (r :: EffectRow) a.
Member (Di level Path msg) r =>
Segment -> Sem r a -> Sem r a
push Segment
"calamity-shard" (Sem r () -> Sem r ())
-> (Sem r () -> Sem r ()) -> Sem r () -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Key -> Int -> Sem r () -> Sem r ()
forall value level msg (r :: EffectRow) a.
(ToValue value, Member (Di level Path msg) r) =>
Key -> value -> Sem r a -> Sem r a
attr Key
"shard-id" Int
id (Sem r () -> Sem r ()) -> Sem r () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ Sem r ()
runShard

  Async (Maybe ())
thread' <- Sem r () -> Sem r (Async (Maybe ()))
forall (r :: EffectRow) a.
Member Async r =>
Sem r a -> Sem r (Async (Maybe a))
P.async Sem r ()
action

  (InChan ControlMessage, Async (Maybe ()))
-> Sem r (InChan ControlMessage, Async (Maybe ()))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InChan ControlMessage
cmdIn, Async (Maybe ())
thread')

sendToWs :: ShardC r => SentDiscordMessage -> Sem r ()
sendToWs :: SentDiscordMessage -> Sem r ()
sendToWs SentDiscordMessage
data' = do
  Maybe Connection
wsConn' <- (ShardState -> Maybe Connection) -> Sem r (Maybe Connection)
forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets ShardState -> Maybe Connection
wsConn
  case Maybe Connection
wsConn' of
    Just Connection
wsConn -> do
      let encodedData :: ByteString
encodedData = SentDiscordMessage -> ByteString
forall a. ToJSON a => a -> ByteString
A.encode SentDiscordMessage
data'
      Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug [fmt|sending {data':s} encoded to {encodedData:s} to gateway|]
      IO () -> Sem r ()
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem r ())
-> (ByteString -> IO ()) -> ByteString -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> ByteString -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
sendTextData Connection
wsConn (ByteString -> Sem r ()) -> ByteString -> Sem r ()
forall a b. (a -> b) -> a -> b
$ ByteString
encodedData
    Maybe Connection
Nothing -> Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"tried to send to closed WS"

tryWriteTBMQueue' :: TBMQueue a -> a -> STM Bool
tryWriteTBMQueue' :: TBMQueue a -> a -> STM Bool
tryWriteTBMQueue' TBMQueue a
q a
v = do
  Maybe Bool
v' <- TBMQueue a -> a -> STM (Maybe Bool)
forall a. TBMQueue a -> a -> STM (Maybe Bool)
tryWriteTBMQueue TBMQueue a
q a
v
  case Maybe Bool
v' of
    Just Bool
False -> STM Bool
forall a. STM a
retry
    Just Bool
True -> Bool -> STM Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
    Maybe Bool
Nothing -> Bool -> STM Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False

restartUnless :: P.Members '[LogEff, P.Error ShardFlowControl] r => T.Text -> Maybe a -> P.Sem r a
restartUnless :: Text -> Maybe a -> Sem r a
restartUnless Text
_ (Just a
a) = a -> Sem r a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
a
restartUnless Text
msg Maybe a
Nothing = do
  Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
error Text
msg
  ShardFlowControl -> Sem r a
forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
P.throw ShardFlowControl
ShardFlowRestart

-- | The loop a shard will run on
shardLoop :: ShardC r => Sem r ()
shardLoop :: Sem r ()
shardLoop = do
  Gauge
activeShards <- Text -> [(Text, Text)] -> Sem r Gauge
forall (r :: EffectRow).
Member MetricEff r =>
Text -> [(Text, Text)] -> Sem r Gauge
registerGauge Text
"active_shards" [(Text, Text)]
forall a. Monoid a => a
mempty
  Sem r Double -> Sem r ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Sem r Double -> Sem r ()) -> Sem r Double -> Sem r ()
forall a b. (a -> b) -> a -> b
$ (Double -> Double) -> Gauge -> Sem r Double
forall (r :: EffectRow).
Member MetricEff r =>
(Double -> Double) -> Gauge -> Sem r Double
modifyGauge (Double -> Double -> Double
forall a. Num a => a -> a -> a
+ Double
1) Gauge
activeShards
  Sem r () -> Sem r ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void Sem r ()
forall (r :: EffectRow). ShardC r => Sem r ()
outerloop
  Sem r Double -> Sem r ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Sem r Double -> Sem r ()) -> Sem r Double -> Sem r ()
forall a b. (a -> b) -> a -> b
$ (Double -> Double) -> Gauge -> Sem r Double
forall (r :: EffectRow).
Member MetricEff r =>
(Double -> Double) -> Gauge -> Sem r Double
modifyGauge (Double -> Double -> Double
forall a. Num a => a -> a -> a
subtract Double
1) Gauge
activeShards
  Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Shard shut down"
 where
  controlStream :: Shard -> TBMQueue ShardMsg -> IO ()
  controlStream :: Shard -> TBMQueue ShardMsg -> IO ()
controlStream Shard
shard TBMQueue ShardMsg
outqueue = IO ()
inner
   where
    q :: OutChan ControlMessage
q = Shard
shard Shard
-> Getting (OutChan ControlMessage) Shard (OutChan ControlMessage)
-> OutChan ControlMessage
forall s a. s -> Getting a s a -> a
^. IsLabel
  "cmdOut"
  (Getting (OutChan ControlMessage) Shard (OutChan ControlMessage))
Getting (OutChan ControlMessage) Shard (OutChan ControlMessage)
#cmdOut
    inner :: IO ()
inner = do
      ControlMessage
v <- OutChan ControlMessage -> IO ControlMessage
forall a. OutChan a -> IO a
UC.readChan OutChan ControlMessage
q
      Bool
r <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TBMQueue ShardMsg -> ShardMsg -> STM Bool
forall a. TBMQueue a -> a -> STM Bool
tryWriteTBMQueue' TBMQueue ShardMsg
outqueue (ControlMessage -> ShardMsg
Control ControlMessage
v)
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
r IO ()
inner

  handleWSException :: SomeException -> IO (Either (ControlMessage, Maybe T.Text) a)
  handleWSException :: SomeException -> IO (Either (ControlMessage, Maybe Text) a)
handleWSException SomeException
e = Either (ControlMessage, Maybe Text) a
-> IO (Either (ControlMessage, Maybe Text) a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either (ControlMessage, Maybe Text) a
 -> IO (Either (ControlMessage, Maybe Text) a))
-> Either (ControlMessage, Maybe Text) a
-> IO (Either (ControlMessage, Maybe Text) a)
forall a b. (a -> b) -> a -> b
$ case SomeException -> Maybe ConnectionException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e of
    Just (CloseRequest Word16
code ByteString
_)
      | Word16
code Word16 -> [Word16] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [Word16
4004, Word16
4010, Word16
4011, Word16
4012, Word16
4013, Word16
4014] ->
        (ControlMessage, Maybe Text)
-> Either (ControlMessage, Maybe Text) a
forall a b. a -> Either a b
Left (ControlMessage
ShutDownShard, Text -> Maybe Text
forall a. a -> Maybe a
Just (Text -> Maybe Text) -> (Word16 -> Text) -> Word16 -> Maybe Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word16 -> Text
forall a. TextShow a => a -> Text
showt (Word16 -> Maybe Text) -> Word16 -> Maybe Text
forall a b. (a -> b) -> a -> b
$ Word16
code)
    Maybe ConnectionException
e -> (ControlMessage, Maybe Text)
-> Either (ControlMessage, Maybe Text) a
forall a b. a -> Either a b
Left (ControlMessage
RestartShard, Text -> Maybe Text
forall a. a -> Maybe a
Just (Text -> Maybe Text)
-> (Maybe ConnectionException -> Text)
-> Maybe ConnectionException
-> Maybe Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack (String -> Text)
-> (Maybe ConnectionException -> String)
-> Maybe ConnectionException
-> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe ConnectionException -> String
forall a. Show a => a -> String
show (Maybe ConnectionException -> Maybe Text)
-> Maybe ConnectionException -> Maybe Text
forall a b. (a -> b) -> a -> b
$ Maybe ConnectionException
e)

  discordStream :: P.Members '[LogEff, MetricEff, P.Embed IO, P.Final IO] r => Connection -> TBMQueue ShardMsg -> Sem r ()
  discordStream :: Connection -> TBMQueue ShardMsg -> Sem r ()
discordStream Connection
ws TBMQueue ShardMsg
outqueue = Sem r ()
inner
   where
    inner :: Sem r ()
inner = do
      Either (ControlMessage, Maybe Text) ByteString
msg <- IO (Either (ControlMessage, Maybe Text) ByteString)
-> Sem r (Either (ControlMessage, Maybe Text) ByteString)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO (Either (ControlMessage, Maybe Text) ByteString)
 -> Sem r (Either (ControlMessage, Maybe Text) ByteString))
-> IO (Either (ControlMessage, Maybe Text) ByteString)
-> Sem r (Either (ControlMessage, Maybe Text) ByteString)
forall a b. (a -> b) -> a -> b
$ IO (Either (ControlMessage, Maybe Text) ByteString)
-> (SomeException
    -> IO (Either (ControlMessage, Maybe Text) ByteString))
-> IO (Either (ControlMessage, Maybe Text) ByteString)
forall (m :: * -> *) a.
MonadCatch m =>
m a -> (SomeException -> m a) -> m a
Ex.catchAny (ByteString -> Either (ControlMessage, Maybe Text) ByteString
forall a b. b -> Either a b
Right (ByteString -> Either (ControlMessage, Maybe Text) ByteString)
-> IO ByteString
-> IO (Either (ControlMessage, Maybe Text) ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO ByteString
forall a. WebSocketsData a => Connection -> IO a
receiveData Connection
ws) SomeException
-> IO (Either (ControlMessage, Maybe Text) ByteString)
forall a.
SomeException -> IO (Either (ControlMessage, Maybe Text) a)
handleWSException

      case Either (ControlMessage, Maybe Text) ByteString
msg of
        Left (ControlMessage
c, Maybe Text
reason) -> do
          Maybe Text -> (Text -> Sem r ()) -> Sem r ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
whenJust Maybe Text
reason (\Text
r -> Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
error [fmt|Shard closed with reason: {r}|])
          IO () -> Sem r ()
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem r ()) -> (STM () -> IO ()) -> STM () -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> Sem r ()) -> STM () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ TBMQueue ShardMsg -> ShardMsg -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue ShardMsg
outqueue (ControlMessage -> ShardMsg
Control ControlMessage
c)
        Right ByteString
msg' -> do
          -- debug [fmt|Got msg: {msg'}|]
          let decoded :: Either String ReceivedDiscordMessage
decoded = ByteString -> Either String ReceivedDiscordMessage
forall a. FromJSON a => ByteString -> Either String a
A.eitherDecode ByteString
msg'
          Bool
r <- case Either String ReceivedDiscordMessage
decoded of
            Right ReceivedDiscordMessage
a ->
              IO Bool -> Sem r Bool
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO Bool -> Sem r Bool)
-> (STM Bool -> IO Bool) -> STM Bool -> Sem r Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> Sem r Bool) -> STM Bool -> Sem r Bool
forall a b. (a -> b) -> a -> b
$ TBMQueue ShardMsg -> ShardMsg -> STM Bool
forall a. TBMQueue a -> a -> STM Bool
tryWriteTBMQueue' TBMQueue ShardMsg
outqueue (ReceivedDiscordMessage -> ShardMsg
Discord ReceivedDiscordMessage
a)
            Left String
e -> do
              Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
error [fmt|Failed to decode: {e}|]
              Bool -> Sem r Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
          Bool -> Sem r () -> Sem r ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
r Sem r ()
inner
  outerloop :: ShardC r => Sem r ()
  outerloop :: Sem r ()
outerloop = Sem r Bool -> Sem r ()
forall (r :: EffectRow). Sem r Bool -> Sem r ()
whileMFinalIO (Sem r Bool -> Sem r ()) -> Sem r Bool -> Sem r ()
forall a b. (a -> b) -> a -> b
$ do
    Shard
shard :: Shard <- (ShardState -> Shard) -> Sem r Shard
forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState -> Getting Shard ShardState Shard -> Shard
forall s a. s -> Getting a s a -> a
^. IsLabel "shardS" (Getting Shard ShardState Shard)
Getting Shard ShardState Shard
#shardS)
    let host :: Text
host = Shard
shard Shard -> Getting Text Shard Text -> Text
forall s a. s -> Getting a s a -> a
^. IsLabel "gateway" (Getting Text Shard Text)
Getting Text Shard Text
#gateway
    let host' :: Text
host' = Text -> Maybe Text -> Text
forall a. a -> Maybe a -> a
fromMaybe Text
host (Maybe Text -> Text) -> Maybe Text -> Text
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Maybe Text
T.stripPrefix Text
"wss://" Text
host
    Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info [fmt|starting up shard {shardID shard} of {shardCount shard}|]

    Maybe ShardFlowControl
innerLoopVal <- Text
-> Text
-> (Connection -> Sem r ShardFlowControl)
-> Sem r (Maybe ShardFlowControl)
forall (r :: EffectRow) a.
Members '[LogEff, Final IO, Embed IO] r =>
Text -> Text -> (Connection -> Sem r a) -> Sem r (Maybe a)
runWebsocket Text
host' Text
"/?v=9&encoding=json" Connection -> Sem r ShardFlowControl
forall (r :: EffectRow).
ShardC r =>
Connection -> Sem r ShardFlowControl
innerloop

    case Maybe ShardFlowControl
innerLoopVal of
      Just ShardFlowControl
ShardFlowShutDown -> do
        Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info Text
"Shutting down shard"
        Bool -> Sem r Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
      Just ShardFlowControl
ShardFlowRestart -> do
        Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info Text
"Restaring shard"
        Bool -> Sem r Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
      -- we restart normally when we loop

      Maybe ShardFlowControl
Nothing -> do
        -- won't happen unless innerloop starts using a non-deterministic effect or connecting to the ws dies
        Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info Text
"Restarting shard (abnormal reasons?)"
        Bool -> Sem r Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True

  innerloop :: ShardC r => Connection -> Sem r ShardFlowControl
  innerloop :: Connection -> Sem r ShardFlowControl
innerloop Connection
ws = do
    Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Entering inner loop of shard"

    Shard
shard <- (ShardState -> Shard) -> Sem r Shard
forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState -> Getting Shard ShardState Shard -> Shard
forall s a. s -> Getting a s a -> a
^. IsLabel "shardS" (Getting Shard ShardState Shard)
Getting Shard ShardState Shard
#shardS)
    (ShardState -> ShardState) -> Sem r ()
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (IsLabel
  "wsConn"
  (ASetter
     ShardState ShardState (Maybe Connection) (Maybe Connection))
ASetter ShardState ShardState (Maybe Connection) (Maybe Connection)
#wsConn ASetter ShardState ShardState (Maybe Connection) (Maybe Connection)
-> Connection -> ShardState -> ShardState
forall s t a b. ASetter s t a (Maybe b) -> b -> s -> t
?~ Connection
ws)

    Maybe Int
seqNum' <- (ShardState -> Maybe Int) -> Sem r (Maybe Int)
forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState
-> Getting (Maybe Int) ShardState (Maybe Int) -> Maybe Int
forall s a. s -> Getting a s a -> a
^. IsLabel "seqNum" (Getting (Maybe Int) ShardState (Maybe Int))
Getting (Maybe Int) ShardState (Maybe Int)
#seqNum)
    Maybe Text
sessionID' <- (ShardState -> Maybe Text) -> Sem r (Maybe Text)
forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState
-> Getting (Maybe Text) ShardState (Maybe Text) -> Maybe Text
forall s a. s -> Getting a s a -> a
^. IsLabel "sessionID" (Getting (Maybe Text) ShardState (Maybe Text))
Getting (Maybe Text) ShardState (Maybe Text)
#sessionID)

    case (Maybe Int
seqNum', Maybe Text
sessionID') of
      (Just Int
n, Just Text
s) -> do
        Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug [fmt|Resuming shard (sessionID: {s}, seq: {n})|]
        SentDiscordMessage -> Sem r ()
forall (r :: EffectRow). ShardC r => SentDiscordMessage -> Sem r ()
sendToWs
          ( ResumeData -> SentDiscordMessage
Resume
              ResumeData :: Text -> Text -> Int -> ResumeData
ResumeData
                { $sel:token:ResumeData :: Text
token = Shard
shard Shard -> Getting Text Shard Text -> Text
forall s a. s -> Getting a s a -> a
^. IsLabel "token" (Getting Text Shard Text)
Getting Text Shard Text
#token
                , $sel:sessionID:ResumeData :: Text
sessionID = Text
s
                , $sel:seq:ResumeData :: Int
seq = Int
n
                }
          )
      (Maybe Int, Maybe Text)
_noActiveSession -> do
        Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Identifying shard"
        SentDiscordMessage -> Sem r ()
forall (r :: EffectRow). ShardC r => SentDiscordMessage -> Sem r ()
sendToWs
          ( IdentifyData -> SentDiscordMessage
Identify
              IdentifyData :: Text
-> IdentifyProps
-> Bool
-> Int
-> (Int, Int)
-> Maybe StatusUpdateData
-> Intents
-> IdentifyData
IdentifyData
                { $sel:token:IdentifyData :: Text
token = Shard
shard Shard -> Getting Text Shard Text -> Text
forall s a. s -> Getting a s a -> a
^. IsLabel "token" (Getting Text Shard Text)
Getting Text Shard Text
#token
                , $sel:properties:IdentifyData :: IdentifyProps
properties =
                    IdentifyProps :: Text -> Text -> IdentifyProps
IdentifyProps
                      { $sel:browser:IdentifyProps :: Text
browser = Text
"Calamity: https://github.com/simmsb/calamity"
                      , $sel:device:IdentifyProps :: Text
device = Text
"Calamity: https://github.com/simmsb/calamity"
                      }
                , $sel:compress:IdentifyData :: Bool
compress = Bool
False
                , $sel:largeThreshold:IdentifyData :: Int
largeThreshold = Int
250
                , $sel:shard:IdentifyData :: (Int, Int)
shard =
                    ( Shard
shard Shard -> Getting Int Shard Int -> Int
forall s a. s -> Getting a s a -> a
^. IsLabel "shardID" (Getting Int Shard Int)
Getting Int Shard Int
#shardID
                    , Shard
shard Shard -> Getting Int Shard Int -> Int
forall s a. s -> Getting a s a -> a
^. IsLabel "shardCount" (Getting Int Shard Int)
Getting Int Shard Int
#shardCount
                    )
                , $sel:presence:IdentifyData :: Maybe StatusUpdateData
presence = Shard
shard Shard
-> Getting (Maybe StatusUpdateData) Shard (Maybe StatusUpdateData)
-> Maybe StatusUpdateData
forall s a. s -> Getting a s a -> a
^. IsLabel
  "initialStatus"
  (Getting (Maybe StatusUpdateData) Shard (Maybe StatusUpdateData))
Getting (Maybe StatusUpdateData) Shard (Maybe StatusUpdateData)
#initialStatus
                , $sel:intents:IdentifyData :: Intents
intents = Shard
shard Shard -> Getting Intents Shard Intents -> Intents
forall s a. s -> Getting a s a -> a
^. IsLabel "intents" (Getting Intents Shard Intents)
Getting Intents Shard Intents
#intents
                }
          )

    ShardFlowControl
result <-
      Sem (Resource : r) ShardFlowControl -> Sem r ShardFlowControl
forall (r :: EffectRow) a.
Member (Final IO) r =>
Sem (Resource : r) a -> Sem r a
P.resourceToIOFinal (Sem (Resource : r) ShardFlowControl -> Sem r ShardFlowControl)
-> Sem (Resource : r) ShardFlowControl -> Sem r ShardFlowControl
forall a b. (a -> b) -> a -> b
$
        Sem (Resource : r) (TBMQueue ShardMsg)
-> (TBMQueue ShardMsg -> Sem (Resource : r) ())
-> (TBMQueue ShardMsg -> Sem (Resource : r) ShardFlowControl)
-> Sem (Resource : r) ShardFlowControl
forall (r :: EffectRow) a c b.
Member Resource r =>
Sem r a -> (a -> Sem r c) -> (a -> Sem r b) -> Sem r b
P.bracket
          (IO (TBMQueue ShardMsg) -> Sem (Resource : r) (TBMQueue ShardMsg)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO (TBMQueue ShardMsg) -> Sem (Resource : r) (TBMQueue ShardMsg))
-> IO (TBMQueue ShardMsg) -> Sem (Resource : r) (TBMQueue ShardMsg)
forall a b. (a -> b) -> a -> b
$ Int -> IO (TBMQueue ShardMsg)
forall a. Int -> IO (TBMQueue a)
newTBMQueueIO Int
1)
          (IO () -> Sem (Resource : r) ()
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem (Resource : r) ())
-> (TBMQueue ShardMsg -> IO ())
-> TBMQueue ShardMsg
-> Sem (Resource : r) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ())
-> (TBMQueue ShardMsg -> STM ()) -> TBMQueue ShardMsg -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TBMQueue ShardMsg -> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue)
          ( \TBMQueue ShardMsg
q -> do
              Text -> Sem (Resource : r) ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"handling events now"
              Async (Maybe ())
_controlThread <- Sem (Resource : r) () -> Sem (Resource : r) (Async (Maybe ()))
forall (r :: EffectRow) a.
Member Async r =>
Sem r a -> Sem r (Async (Maybe a))
P.async (Sem (Resource : r) () -> Sem (Resource : r) (Async (Maybe ())))
-> (IO () -> Sem (Resource : r) ())
-> IO ()
-> Sem (Resource : r) (Async (Maybe ()))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> Sem (Resource : r) ()
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem (Resource : r) (Async (Maybe ())))
-> IO () -> Sem (Resource : r) (Async (Maybe ()))
forall a b. (a -> b) -> a -> b
$ Shard -> TBMQueue ShardMsg -> IO ()
controlStream Shard
shard TBMQueue ShardMsg
q
              Async (Maybe ())
_discordThread <- Sem (Resource : r) () -> Sem (Resource : r) (Async (Maybe ()))
forall (r :: EffectRow) a.
Member Async r =>
Sem r a -> Sem r (Async (Maybe a))
P.async (Sem (Resource : r) () -> Sem (Resource : r) (Async (Maybe ())))
-> Sem (Resource : r) () -> Sem (Resource : r) (Async (Maybe ()))
forall a b. (a -> b) -> a -> b
$ Connection -> TBMQueue ShardMsg -> Sem (Resource : r) ()
forall (r :: EffectRow).
Members '[LogEff, MetricEff, Embed IO, Final IO] r =>
Connection -> TBMQueue ShardMsg -> Sem r ()
discordStream Connection
ws TBMQueue ShardMsg
q
              Sem r ShardFlowControl -> Sem (Resource : r) ShardFlowControl
forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
Sem r a -> Sem (e : r) a
P.raise (Sem r ShardFlowControl -> Sem (Resource : r) ShardFlowControl)
-> (Sem (Error ShardFlowControl : r) () -> Sem r ShardFlowControl)
-> Sem (Error ShardFlowControl : r) ()
-> Sem (Resource : r) ShardFlowControl
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem r (Maybe ShardFlowControl) -> Sem r ShardFlowControl
forall (r :: EffectRow) a.
Member (Final IO) r =>
Sem r (Maybe a) -> Sem r a
untilJustFinalIO (Sem r (Maybe ShardFlowControl) -> Sem r ShardFlowControl)
-> (Sem (Error ShardFlowControl : r) ()
    -> Sem r (Maybe ShardFlowControl))
-> Sem (Error ShardFlowControl : r) ()
-> Sem r ShardFlowControl
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Either ShardFlowControl () -> Maybe ShardFlowControl
forall e a. Either e a -> Maybe e
leftToMaybe (Either ShardFlowControl () -> Maybe ShardFlowControl)
-> Sem r (Either ShardFlowControl ())
-> Sem r (Maybe ShardFlowControl)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) (Sem r (Either ShardFlowControl ())
 -> Sem r (Maybe ShardFlowControl))
-> (Sem (Error ShardFlowControl : r) ()
    -> Sem r (Either ShardFlowControl ()))
-> Sem (Error ShardFlowControl : r) ()
-> Sem r (Maybe ShardFlowControl)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem (Error ShardFlowControl : r) ()
-> Sem r (Either ShardFlowControl ())
forall e (r :: EffectRow) a.
Sem (Error e : r) a -> Sem r (Either e a)
P.runError (Sem (Error ShardFlowControl : r) ()
 -> Sem (Resource : r) ShardFlowControl)
-> Sem (Error ShardFlowControl : r) ()
-> Sem (Resource : r) ShardFlowControl
forall a b. (a -> b) -> a -> b
$ do
                -- only we close the queue
                Maybe ShardMsg
msg <- IO (Maybe ShardMsg)
-> Sem (Error ShardFlowControl : r) (Maybe ShardMsg)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO (Maybe ShardMsg)
 -> Sem (Error ShardFlowControl : r) (Maybe ShardMsg))
-> (STM (Maybe ShardMsg) -> IO (Maybe ShardMsg))
-> STM (Maybe ShardMsg)
-> Sem (Error ShardFlowControl : r) (Maybe ShardMsg)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (Maybe ShardMsg) -> IO (Maybe ShardMsg)
forall a. STM a -> IO a
atomically (STM (Maybe ShardMsg)
 -> Sem (Error ShardFlowControl : r) (Maybe ShardMsg))
-> STM (Maybe ShardMsg)
-> Sem (Error ShardFlowControl : r) (Maybe ShardMsg)
forall a b. (a -> b) -> a -> b
$ TBMQueue ShardMsg -> STM (Maybe ShardMsg)
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue ShardMsg
q
                ShardMsg -> Sem (Error ShardFlowControl : r) ()
forall (r :: EffectRow).
(ShardC r, Member (Error ShardFlowControl) r) =>
ShardMsg -> Sem r ()
handleMsg (ShardMsg -> Sem (Error ShardFlowControl : r) ())
-> Sem (Error ShardFlowControl : r) ShardMsg
-> Sem (Error ShardFlowControl : r) ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Text -> Maybe ShardMsg -> Sem (Error ShardFlowControl : r) ShardMsg
forall (r :: EffectRow) a.
Members '[LogEff, Error ShardFlowControl] r =>
Text -> Maybe a -> Sem r a
restartUnless Text
"shard message stream closed by someone other than the sink" Maybe ShardMsg
msg
          )

    Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Exiting inner loop of shard"

    (ShardState -> ShardState) -> Sem r ()
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (IsLabel
  "wsConn"
  (ASetter
     ShardState ShardState (Maybe Connection) (Maybe Connection))
ASetter ShardState ShardState (Maybe Connection) (Maybe Connection)
#wsConn ASetter ShardState ShardState (Maybe Connection) (Maybe Connection)
-> Maybe Connection -> ShardState -> ShardState
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Maybe Connection
forall a. Maybe a
Nothing)
    Sem r ()
forall (r :: EffectRow). ShardC r => Sem r ()
haltHeartBeat
    ShardFlowControl -> Sem r ShardFlowControl
forall (f :: * -> *) a. Applicative f => a -> f a
pure ShardFlowControl
result
  handleMsg :: (ShardC r, P.Member (P.Error ShardFlowControl) r) => ShardMsg -> Sem r ()
  handleMsg :: ShardMsg -> Sem r ()
handleMsg (Discord ReceivedDiscordMessage
msg) = case ReceivedDiscordMessage
msg of
    EvtDispatch Int
sn DispatchData
data' -> do
      -- trace $ "Handling event: ("+||data'||+")"
      (ShardState -> ShardState) -> Sem r ()
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (IsLabel
  "seqNum" (ASetter ShardState ShardState (Maybe Int) (Maybe Int))
ASetter ShardState ShardState (Maybe Int) (Maybe Int)
#seqNum ASetter ShardState ShardState (Maybe Int) (Maybe Int)
-> Int -> ShardState -> ShardState
forall s t a b. ASetter s t a (Maybe b) -> b -> s -> t
?~ Int
sn)

      case DispatchData
data' of
        Ready ReadyData
rdata' ->
          (ShardState -> ShardState) -> Sem r ()
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (IsLabel
  "sessionID"
  (ASetter ShardState ShardState (Maybe Text) (Maybe Text))
ASetter ShardState ShardState (Maybe Text) (Maybe Text)
#sessionID ASetter ShardState ShardState (Maybe Text) (Maybe Text)
-> Text -> ShardState -> ShardState
forall s t a b. ASetter s t a (Maybe b) -> b -> s -> t
?~ (ReadyData
rdata' ReadyData -> Getting Text ReadyData Text -> Text
forall s a. s -> Getting a s a -> a
^. IsLabel "sessionID" (Getting Text ReadyData Text)
Getting Text ReadyData Text
#sessionID))
        DispatchData
_NotReady -> () -> Sem r ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

      Shard
shard <- (ShardState -> Shard) -> Sem r Shard
forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState -> Getting Shard ShardState Shard -> Shard
forall s a. s -> Getting a s a -> a
^. IsLabel "shardS" (Getting Shard ShardState Shard)
Getting Shard ShardState Shard
#shardS)
      IO () -> Sem r ()
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem r ()) -> IO () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ InChan CalamityEvent -> CalamityEvent -> IO ()
forall a. InChan a -> a -> IO ()
UC.writeChan (Shard
shard Shard
-> Getting (InChan CalamityEvent) Shard (InChan CalamityEvent)
-> InChan CalamityEvent
forall s a. s -> Getting a s a -> a
^. IsLabel
  "evtIn"
  (Getting (InChan CalamityEvent) Shard (InChan CalamityEvent))
Getting (InChan CalamityEvent) Shard (InChan CalamityEvent)
#evtIn) (Int -> DispatchData -> CalamityEvent
Dispatch (Shard
shard Shard -> Getting Int Shard Int -> Int
forall s a. s -> Getting a s a -> a
^. IsLabel "shardID" (Getting Int Shard Int)
Getting Int Shard Int
#shardID) DispatchData
data')
    ReceivedDiscordMessage
HeartBeatReq -> do
      Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Received heartbeat request"
      Sem r ()
forall (r :: EffectRow). ShardC r => Sem r ()
sendHeartBeat
    ReceivedDiscordMessage
Reconnect -> do
      Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Being asked to restart by Discord"
      ShardFlowControl -> Sem r ()
forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
P.throw ShardFlowControl
ShardFlowRestart
    InvalidSession Bool
resumable -> do
      if Bool
resumable
        then Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info Text
"Received resumable invalid session"
        else do
          Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info Text
"Received non-resumable invalid session, sleeping for 15 seconds then retrying"
          (ShardState -> ShardState) -> Sem r ()
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (IsLabel
  "sessionID"
  (ASetter ShardState ShardState (Maybe Text) (Maybe Text))
ASetter ShardState ShardState (Maybe Text) (Maybe Text)
#sessionID ASetter ShardState ShardState (Maybe Text) (Maybe Text)
-> Maybe Text -> ShardState -> ShardState
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Maybe Text
forall a. Maybe a
Nothing)
          (ShardState -> ShardState) -> Sem r ()
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (IsLabel
  "seqNum" (ASetter ShardState ShardState (Maybe Int) (Maybe Int))
ASetter ShardState ShardState (Maybe Int) (Maybe Int)
#seqNum ASetter ShardState ShardState (Maybe Int) (Maybe Int)
-> Maybe Int -> ShardState -> ShardState
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Maybe Int
forall a. Maybe a
Nothing)
          IO () -> Sem r ()
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem r ()) -> IO () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Int
15 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000)
      ShardFlowControl -> Sem r ()
forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
P.throw ShardFlowControl
ShardFlowRestart
    Hello Int
interval -> do
      Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info [fmt|Received hello, beginning to heartbeat at an interval of {interval}ms|]
      Int -> Sem r ()
forall (r :: EffectRow). ShardC r => Int -> Sem r ()
startHeartBeatLoop Int
interval
    ReceivedDiscordMessage
HeartBeatAck -> do
      Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Received heartbeat ack"
      (ShardState -> ShardState) -> Sem r ()
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (IsLabel "hbResponse" (ASetter ShardState ShardState Bool Bool)
ASetter ShardState ShardState Bool Bool
#hbResponse ASetter ShardState ShardState Bool Bool
-> Bool -> ShardState -> ShardState
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Bool
True)
  handleMsg (Control ControlMessage
msg) = case ControlMessage
msg of
    SendPresence StatusUpdateData
data' -> do
      Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug [fmt|Sending presence: ({data':s})|]
      SentDiscordMessage -> Sem r ()
forall (r :: EffectRow). ShardC r => SentDiscordMessage -> Sem r ()
sendToWs (SentDiscordMessage -> Sem r ()) -> SentDiscordMessage -> Sem r ()
forall a b. (a -> b) -> a -> b
$ StatusUpdateData -> SentDiscordMessage
StatusUpdate StatusUpdateData
data'
    ControlMessage
RestartShard -> ShardFlowControl -> Sem r ()
forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
P.throw ShardFlowControl
ShardFlowRestart
    ControlMessage
ShutDownShard -> ShardFlowControl -> Sem r ()
forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
P.throw ShardFlowControl
ShardFlowShutDown

startHeartBeatLoop :: ShardC r => Int -> Sem r ()
startHeartBeatLoop :: Int -> Sem r ()
startHeartBeatLoop Int
interval = do
  Sem r ()
forall (r :: EffectRow). ShardC r => Sem r ()
haltHeartBeat -- cancel any currently running hb thread
  Async (Maybe ())
thread <- Sem r () -> Sem r (Async (Maybe ()))
forall (r :: EffectRow) a.
Member Async r =>
Sem r a -> Sem r (Async (Maybe a))
P.async (Sem r () -> Sem r (Async (Maybe ())))
-> Sem r () -> Sem r (Async (Maybe ()))
forall a b. (a -> b) -> a -> b
$ Int -> Sem r ()
forall (r :: EffectRow). ShardC r => Int -> Sem r ()
heartBeatLoop Int
interval
  (ShardState -> ShardState) -> Sem r ()
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (IsLabel
  "hbThread"
  (ASetter
     ShardState
     ShardState
     (Maybe (Async (Maybe ())))
     (Maybe (Async (Maybe ()))))
ASetter
  ShardState
  ShardState
  (Maybe (Async (Maybe ())))
  (Maybe (Async (Maybe ())))
#hbThread ASetter
  ShardState
  ShardState
  (Maybe (Async (Maybe ())))
  (Maybe (Async (Maybe ())))
-> Async (Maybe ()) -> ShardState -> ShardState
forall s t a b. ASetter s t a (Maybe b) -> b -> s -> t
?~ Async (Maybe ())
thread)

haltHeartBeat :: ShardC r => Sem r ()
haltHeartBeat :: Sem r ()
haltHeartBeat = do
  Maybe (Async (Maybe ()))
thread <- forall a (r :: EffectRow).
Member (AtomicState ShardState) r =>
(ShardState -> (ShardState, a)) -> Sem r a
forall s a (r :: EffectRow).
Member (AtomicState s) r =>
(s -> (s, a)) -> Sem r a
P.atomicState @ShardState ((ShardState -> (ShardState, Maybe (Async (Maybe ()))))
 -> Sem r (Maybe (Async (Maybe ()))))
-> (State ShardState (Maybe (Async (Maybe ())))
    -> ShardState -> (ShardState, Maybe (Async (Maybe ()))))
-> State ShardState (Maybe (Async (Maybe ())))
-> Sem r (Maybe (Async (Maybe ())))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((Maybe (Async (Maybe ())), ShardState)
-> (ShardState, Maybe (Async (Maybe ())))
forall a b. (a, b) -> (b, a)
swap ((Maybe (Async (Maybe ())), ShardState)
 -> (ShardState, Maybe (Async (Maybe ()))))
-> (ShardState -> (Maybe (Async (Maybe ())), ShardState))
-> ShardState
-> (ShardState, Maybe (Async (Maybe ())))
forall b c a. (b -> c) -> (a -> b) -> a -> c
.) ((ShardState -> (Maybe (Async (Maybe ())), ShardState))
 -> ShardState -> (ShardState, Maybe (Async (Maybe ()))))
-> (State ShardState (Maybe (Async (Maybe ())))
    -> ShardState -> (Maybe (Async (Maybe ())), ShardState))
-> State ShardState (Maybe (Async (Maybe ())))
-> ShardState
-> (ShardState, Maybe (Async (Maybe ())))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. State ShardState (Maybe (Async (Maybe ())))
-> ShardState -> (Maybe (Async (Maybe ())), ShardState)
forall s a. State s a -> s -> (a, s)
runState (State ShardState (Maybe (Async (Maybe ())))
 -> Sem r (Maybe (Async (Maybe ()))))
-> State ShardState (Maybe (Async (Maybe ())))
-> Sem r (Maybe (Async (Maybe ())))
forall a b. (a -> b) -> a -> b
$ do
    Maybe (Async (Maybe ()))
thread <- Getting
  (Maybe (Async (Maybe ()))) ShardState (Maybe (Async (Maybe ())))
-> State ShardState (Maybe (Async (Maybe ())))
forall s (m :: * -> *) a. MonadState s m => Getting a s a -> m a
use IsLabel
  "hbThread"
  (Getting
     (Maybe (Async (Maybe ()))) ShardState (Maybe (Async (Maybe ()))))
Getting
  (Maybe (Async (Maybe ()))) ShardState (Maybe (Async (Maybe ())))
#hbThread
    #hbThread .= Nothing
    Maybe (Async (Maybe ()))
-> State ShardState (Maybe (Async (Maybe ())))
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Async (Maybe ()))
thread
  case Maybe (Async (Maybe ()))
thread of
    Just Async (Maybe ())
t -> do
      Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Stopping heartbeat thread"
      IO () -> Sem r ()
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Async (Maybe ()) -> IO ()
forall a. Async a -> IO ()
cancel Async (Maybe ())
t)
    Maybe (Async (Maybe ()))
Nothing -> () -> Sem r ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

sendHeartBeat :: ShardC r => Sem r ()
sendHeartBeat :: Sem r ()
sendHeartBeat = do
  Maybe Int
sn <- (ShardState -> Maybe Int) -> Sem r (Maybe Int)
forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState
-> Getting (Maybe Int) ShardState (Maybe Int) -> Maybe Int
forall s a. s -> Getting a s a -> a
^. IsLabel "seqNum" (Getting (Maybe Int) ShardState (Maybe Int))
Getting (Maybe Int) ShardState (Maybe Int)
#seqNum)
  Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug [fmt|Sending heartbeat (seq: {sn:s})|]
  SentDiscordMessage -> Sem r ()
forall (r :: EffectRow). ShardC r => SentDiscordMessage -> Sem r ()
sendToWs (SentDiscordMessage -> Sem r ()) -> SentDiscordMessage -> Sem r ()
forall a b. (a -> b) -> a -> b
$ Maybe Int -> SentDiscordMessage
HeartBeat Maybe Int
sn
  (ShardState -> ShardState) -> Sem r ()
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (IsLabel "hbResponse" (ASetter ShardState ShardState Bool Bool)
ASetter ShardState ShardState Bool Bool
#hbResponse ASetter ShardState ShardState Bool Bool
-> Bool -> ShardState -> ShardState
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Bool
False)

heartBeatLoop :: ShardC r => Int -> Sem r ()
heartBeatLoop :: Int -> Sem r ()
heartBeatLoop Int
interval = Sem r (Maybe ()) -> Sem r ()
forall (r :: EffectRow) a.
Member (Final IO) r =>
Sem r (Maybe a) -> Sem r a
untilJustFinalIO (Sem r (Maybe ()) -> Sem r ())
-> (Sem (Error () : r) () -> Sem r (Maybe ()))
-> Sem (Error () : r) ()
-> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Either () () -> Maybe ()
forall e a. Either e a -> Maybe e
leftToMaybe (Either () () -> Maybe ())
-> Sem r (Either () ()) -> Sem r (Maybe ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) (Sem r (Either () ()) -> Sem r (Maybe ()))
-> (Sem (Error () : r) () -> Sem r (Either () ()))
-> Sem (Error () : r) ()
-> Sem r (Maybe ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem (Error () : r) () -> Sem r (Either () ())
forall e (r :: EffectRow) a.
Sem (Error e : r) a -> Sem r (Either e a)
P.runError (Sem (Error () : r) () -> Sem r ())
-> Sem (Error () : r) () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ do
  Sem (Error () : r) ()
forall (r :: EffectRow). ShardC r => Sem r ()
sendHeartBeat
  IO () -> Sem (Error () : r) ()
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem (Error () : r) ())
-> (Int -> IO ()) -> Int -> Sem (Error () : r) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO ()
threadDelay (Int -> Sem (Error () : r) ()) -> Int -> Sem (Error () : r) ()
forall a b. (a -> b) -> a -> b
$ Int
interval Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000
  Sem (Error () : r) Bool
-> Sem (Error () : r) () -> Sem (Error () : r) ()
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
unlessM ((ShardState -> Bool) -> Sem (Error () : r) Bool
forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState -> Getting Bool ShardState Bool -> Bool
forall s a. s -> Getting a s a -> a
^. IsLabel "hbResponse" (Getting Bool ShardState Bool)
Getting Bool ShardState Bool
#hbResponse)) (Sem (Error () : r) () -> Sem (Error () : r) ())
-> Sem (Error () : r) () -> Sem (Error () : r) ()
forall a b. (a -> b) -> a -> b
$ do
    Text -> Sem (Error () : r) ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"No heartbeat response, restarting shard"
    Connection
wsConn <- () -> Maybe Connection -> Sem (Error () : r) Connection
forall e (r :: EffectRow) a.
Member (Error e) r =>
e -> Maybe a -> Sem r a
P.note () (Maybe Connection -> Sem (Error () : r) Connection)
-> Sem (Error () : r) (Maybe Connection)
-> Sem (Error () : r) Connection
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (ShardState -> Maybe Connection)
-> Sem (Error () : r) (Maybe Connection)
forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState
-> Getting (Maybe Connection) ShardState (Maybe Connection)
-> Maybe Connection
forall s a. s -> Getting a s a -> a
^. IsLabel
  "wsConn" (Getting (Maybe Connection) ShardState (Maybe Connection))
Getting (Maybe Connection) ShardState (Maybe Connection)
#wsConn)
    IO () -> Sem (Error () : r) ()
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem (Error () : r) ()) -> IO () -> Sem (Error () : r) ()
forall a b. (a -> b) -> a -> b
$ Connection -> Word16 -> Text -> IO ()
forall a. WebSocketsData a => Connection -> Word16 -> a -> IO ()
sendCloseCode Connection
wsConn Word16
4000 (Text
"No heartbeat in time" :: T.Text)
    () -> Sem (Error () : r) ()
forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
P.throw ()