-- | The shard logic
module Calamity.Gateway.Shard (
  Shard (..),
  newShard,
) where

import Calamity.Gateway.DispatchEvents (
  CalamityEvent (Dispatch),
  DispatchData (Ready),
 )
import Calamity.Gateway.Intents (Intents)
import Calamity.Gateway.Types (
  ControlMessage (..),
  IdentifyData (
    IdentifyData,
    compress,
    intents,
    largeThreshold,
    presence,
    properties,
    shard,
    token
  ),
  IdentifyProps (IdentifyProps, browser, device),
  ReceivedDiscordMessage (
    EvtDispatch,
    HeartBeatAck,
    HeartBeatReq,
    Hello,
    InvalidSession,
    Reconnect
  ),
  ResumeData (ResumeData, seq, sessionID, token),
  SentDiscordMessage (HeartBeat, Identify, Resume, StatusUpdate),
  Shard (..),
  ShardC,
  ShardFlowControl (..),
  ShardMsg (..),
  ShardState (ShardState, wsConn),
  StatusUpdateData,
 )
import Calamity.Internal.RunIntoIO (bindSemToIO)
import Calamity.Internal.Utils (
  debug,
  error,
  info,
  leftToMaybe,
  swap,
  unlessM,
  untilJustFinalIO,
  whenJust,
  whileMFinalIO,
 )
import Calamity.Metrics.Eff (
  MetricEff,
  modifyGauge,
  registerGauge,
 )
import Calamity.Types.LogEff (LogEff)
import Calamity.Types.Token (Token, rawToken)
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (Async, cancel)
import Control.Concurrent.Chan.Unagi qualified as UC
import Control.Concurrent.STM (STM, atomically, retry)
import Control.Concurrent.STM.TBMQueue (
  TBMQueue,
  closeTBMQueue,
  newTBMQueueIO,
  readTBMQueue,
  tryWriteTBMQueue,
  writeTBMQueue,
 )
import Control.Exception (
  Exception (fromException),
  SomeException,
 )
import Control.Exception.Safe qualified as Ex
import Control.Monad (void, when)
import Control.Monad.State.Lazy (runState)
import Data.Aeson qualified as A
import Data.ByteString.Lazy qualified as LBS
import Data.Default.Class (def)
import Data.IORef (newIORef)
import Data.Maybe (fromMaybe)
import Data.Text qualified as T
import DiPolysemy (attr, push)
import Network.Connection qualified as NC
import Network.TLS qualified as NT
import Network.TLS.Extra qualified as NT
import Network.WebSockets (
  Connection,
  ConnectionException (..),
  receiveData,
  sendCloseCode,
  sendTextData,
 )
import Network.WebSockets qualified as NW
import Network.WebSockets.Stream qualified as NW
import Optics
import Optics.State.Operators
import Polysemy (Sem)
import Polysemy qualified as P
import Polysemy.Async qualified as P
import Polysemy.AtomicState qualified as P
import Polysemy.Error qualified as P
import Polysemy.Resource qualified as P
import System.X509 qualified as X509
import TextShow (showt)
import Prelude hiding (error)

runWebsocket ::
  P.Members '[LogEff, P.Final IO, P.Embed IO] r =>
  T.Text ->
  T.Text ->
  (Connection -> P.Sem r a) ->
  P.Sem r (Maybe a)
runWebsocket :: forall (r :: EffectRow) a.
Members '[LogEff, Final IO, Embed IO] r =>
Text -> Text -> (Connection -> Sem r a) -> Sem r (Maybe a)
runWebsocket Text
host Text
path Connection -> Sem r a
ma = do
  Connection -> IO (Maybe a)
inner <- (Connection -> Sem r a) -> Sem r (Connection -> IO (Maybe a))
forall (r :: EffectRow) p a.
Member (Final IO) r =>
(p -> Sem r a) -> Sem r (p -> IO (Maybe a))
bindSemToIO Connection -> Sem r a
ma

  -- We have to do this all ourself I think?
  -- TODO: see if this isn't needed
  let logExc :: p -> Sem r ()
logExc p
e = Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug (Text -> Sem r ()) -> Text -> Sem r ()
forall a b. (a -> b) -> a -> b
$ Text
"runWebsocket raised with " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> (String -> Text
T.pack (String -> Text) -> (p -> String) -> p -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. p -> String
forall a. Show a => a -> String
show (p -> Text) -> p -> Text
forall a b. (a -> b) -> a -> b
$ p
e)
  SomeException -> IO (Maybe ())
logExc' <- (SomeException -> Sem r ())
-> Sem r (SomeException -> IO (Maybe ()))
forall (r :: EffectRow) p a.
Member (Final IO) r =>
(p -> Sem r a) -> Sem r (p -> IO (Maybe a))
bindSemToIO SomeException -> Sem r ()
forall {r :: EffectRow} {p}.
(Member LogEff r, Show p) =>
p -> Sem r ()
logExc
  let handler :: SomeException -> IO (Maybe a)
handler SomeException
e = do
        IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ SomeException -> IO (Maybe ())
logExc' SomeException
e
        Maybe a -> IO (Maybe a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
forall a. Maybe a
Nothing

  IO (Maybe a) -> Sem r (Maybe a)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO (Maybe a) -> Sem r (Maybe a))
-> (IO (Maybe a) -> IO (Maybe a))
-> IO (Maybe a)
-> Sem r (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (SomeException -> IO (Maybe a)) -> IO (Maybe a) -> IO (Maybe a)
forall (m :: * -> *) a.
MonadCatch m =>
(SomeException -> m a) -> m a -> m a
Ex.handleAny SomeException -> IO (Maybe a)
handler (IO (Maybe a) -> Sem r (Maybe a))
-> IO (Maybe a) -> Sem r (Maybe a)
forall a b. (a -> b) -> a -> b
$ do
    ConnectionContext
ctx <- IO ConnectionContext
NC.initConnectionContext
    CertificateStore
certStore <- IO CertificateStore
X509.getSystemCertificateStore
    let clientParams :: ClientParams
clientParams =
          (String -> ByteString -> ClientParams
NT.defaultParamsClient (Text -> String
T.unpack Text
host) ByteString
"443")
            { clientSupported :: Supported
NT.clientSupported = Supported
forall a. Default a => a
def {supportedCiphers :: [Cipher]
NT.supportedCiphers = [Cipher]
NT.ciphersuite_default}
            , clientShared :: Shared
NT.clientShared =
                Shared
forall a. Default a => a
def
                  { sharedCAStore :: CertificateStore
NT.sharedCAStore = CertificateStore
certStore
                  }
            }
    let tlsSettings :: TLSSettings
tlsSettings = ClientParams -> TLSSettings
NC.TLSSettings ClientParams
clientParams
        connParams :: ConnectionParams
connParams = String
-> PortNumber
-> Maybe TLSSettings
-> Maybe ProxySettings
-> ConnectionParams
NC.ConnectionParams (Text -> String
T.unpack Text
host) PortNumber
443 (TLSSettings -> Maybe TLSSettings
forall a. a -> Maybe a
Just TLSSettings
tlsSettings) Maybe ProxySettings
forall a. Maybe a
Nothing

    IO Connection
-> (Connection -> IO ())
-> (Connection -> IO (Maybe a))
-> IO (Maybe a)
forall (m :: * -> *) a b c.
MonadMask m =>
m a -> (a -> m b) -> (a -> m c) -> m c
Ex.bracket
      (ConnectionContext -> ConnectionParams -> IO Connection
NC.connectTo ConnectionContext
ctx ConnectionParams
connParams)
      Connection -> IO ()
NC.connectionClose
      ( \Connection
conn -> do
          Stream
stream <-
            IO (Maybe ByteString) -> (Maybe ByteString -> IO ()) -> IO Stream
NW.makeStream
              (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (ByteString -> Maybe ByteString)
-> IO ByteString -> IO (Maybe ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO ByteString
NC.connectionGetChunk Connection
conn)
              (IO () -> (ByteString -> IO ()) -> Maybe ByteString -> IO ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (Connection -> ByteString -> IO ()
NC.connectionPut Connection
conn (ByteString -> IO ())
-> (ByteString -> ByteString) -> ByteString -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
LBS.toStrict))
          Stream
-> String
-> String
-> ConnectionOptions
-> Headers
-> (Connection -> IO (Maybe a))
-> IO (Maybe a)
forall a.
Stream
-> String
-> String
-> ConnectionOptions
-> Headers
-> ClientApp a
-> IO a
NW.runClientWithStream Stream
stream (Text -> String
T.unpack Text
host) (Text -> String
T.unpack Text
path) ConnectionOptions
NW.defaultConnectionOptions [] Connection -> IO (Maybe a)
inner
      )

newShardState :: Shard -> ShardState
newShardState :: Shard -> ShardState
newShardState Shard
shard = Shard
-> Maybe Int
-> Maybe (Async (Maybe ()))
-> Bool
-> Maybe Text
-> Maybe Text
-> Maybe Connection
-> ShardState
ShardState Shard
shard Maybe Int
forall a. Maybe a
Nothing Maybe (Async (Maybe ()))
forall a. Maybe a
Nothing Bool
False Maybe Text
forall a. Maybe a
Nothing Maybe Text
forall a. Maybe a
Nothing Maybe Connection
forall a. Maybe a
Nothing

-- | Creates and launches a shard
newShard ::
  P.Members '[LogEff, MetricEff, P.Embed IO, P.Final IO, P.Async] r =>
  T.Text ->
  Int ->
  Int ->
  Token ->
  Maybe StatusUpdateData ->
  Intents ->
  UC.InChan CalamityEvent ->
  Sem r (UC.InChan ControlMessage, Async (Maybe ()))
newShard :: forall (r :: EffectRow).
Members '[LogEff, MetricEff, Embed IO, Final IO, Async] r =>
Text
-> Int
-> Int
-> Token
-> Maybe StatusUpdateData
-> Intents
-> InChan CalamityEvent
-> Sem r (InChan ControlMessage, Async (Maybe ()))
newShard Text
gateway Int
id Int
count Token
token Maybe StatusUpdateData
presence Intents
intents InChan CalamityEvent
evtIn = do
  (InChan ControlMessage
cmdIn, OutChan ControlMessage
cmdOut) <- IO (InChan ControlMessage, OutChan ControlMessage)
-> Sem r (InChan ControlMessage, OutChan ControlMessage)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed IO (InChan ControlMessage, OutChan ControlMessage)
forall a. IO (InChan a, OutChan a)
UC.newChan
  let shard :: Shard
shard = Int
-> Int
-> Text
-> InChan CalamityEvent
-> OutChan ControlMessage
-> Text
-> Maybe StatusUpdateData
-> Intents
-> Shard
Shard Int
id Int
count Text
gateway InChan CalamityEvent
evtIn OutChan ControlMessage
cmdOut (Token -> Text
rawToken Token
token) Maybe StatusUpdateData
presence Intents
intents
  IORef ShardState
stateVar <- IO (IORef ShardState) -> Sem r (IORef ShardState)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO (IORef ShardState) -> Sem r (IORef ShardState))
-> (ShardState -> IO (IORef ShardState))
-> ShardState
-> Sem r (IORef ShardState)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ShardState -> IO (IORef ShardState)
forall a. a -> IO (IORef a)
newIORef (ShardState -> Sem r (IORef ShardState))
-> ShardState -> Sem r (IORef ShardState)
forall a b. (a -> b) -> a -> b
$ Shard -> ShardState
newShardState Shard
shard

  let runShard :: Sem r ()
runShard = IORef ShardState -> Sem (AtomicState ShardState : r) () -> Sem r ()
forall s (r :: EffectRow) a.
Member (Embed IO) r =>
IORef s -> Sem (AtomicState s : r) a -> Sem r a
P.runAtomicStateIORef IORef ShardState
stateVar Sem (AtomicState ShardState : r) ()
forall (r :: EffectRow). ShardC r => Sem r ()
shardLoop
  let action :: Sem r ()
action = Segment -> Sem r () -> Sem r ()
forall level msg (r :: EffectRow) a.
Member (Di level Path msg) r =>
Segment -> Sem r a -> Sem r a
push Segment
"calamity-shard" (Sem r () -> Sem r ())
-> (Sem r () -> Sem r ()) -> Sem r () -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Key -> Int -> Sem r () -> Sem r ()
forall value level msg (r :: EffectRow) a.
(ToValue value, Member (Di level Path msg) r) =>
Key -> value -> Sem r a -> Sem r a
attr Key
"shard-id" Int
id (Sem r () -> Sem r ()) -> Sem r () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ Sem r ()
runShard

  Async (Maybe ())
thread' <- Sem r () -> Sem r (Async (Maybe ()))
forall (r :: EffectRow) a.
Member Async r =>
Sem r a -> Sem r (Async (Maybe a))
P.async Sem r ()
action

  (InChan ControlMessage, Async (Maybe ()))
-> Sem r (InChan ControlMessage, Async (Maybe ()))
forall a. a -> Sem r a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InChan ControlMessage
cmdIn, Async (Maybe ())
thread')

sendToWs :: ShardC r => SentDiscordMessage -> Sem r ()
sendToWs :: forall (r :: EffectRow). ShardC r => SentDiscordMessage -> Sem r ()
sendToWs SentDiscordMessage
data' = do
  Maybe Connection
wsConn' <- (ShardState -> Maybe Connection) -> Sem r (Maybe Connection)
forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets ShardState -> Maybe Connection
wsConn
  case Maybe Connection
wsConn' of
    Just Connection
wsConn -> do
      let encodedData :: ByteString
encodedData = SentDiscordMessage -> ByteString
forall a. ToJSON a => a -> ByteString
A.encode SentDiscordMessage
data'
      Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug (Text -> Sem r ()) -> (String -> Text) -> String -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack (String -> Sem r ()) -> String -> Sem r ()
forall a b. (a -> b) -> a -> b
$ String
"sending " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> SentDiscordMessage -> String
forall a. Show a => a -> String
show SentDiscordMessage
data' String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" encoded to " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ByteString -> String
forall a. Show a => a -> String
show ByteString
encodedData String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" to gateway"
      IO () -> Sem r ()
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem r ())
-> (ByteString -> IO ()) -> ByteString -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> ByteString -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
sendTextData Connection
wsConn (ByteString -> Sem r ()) -> ByteString -> Sem r ()
forall a b. (a -> b) -> a -> b
$ ByteString
encodedData
    Maybe Connection
Nothing -> Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"tried to send to closed WS"

tryWriteTBMQueue' :: TBMQueue a -> a -> STM Bool
tryWriteTBMQueue' :: forall a. TBMQueue a -> a -> STM Bool
tryWriteTBMQueue' TBMQueue a
q a
v = do
  Maybe Bool
v' <- TBMQueue a -> a -> STM (Maybe Bool)
forall a. TBMQueue a -> a -> STM (Maybe Bool)
tryWriteTBMQueue TBMQueue a
q a
v
  case Maybe Bool
v' of
    Just Bool
False -> STM Bool
forall a. STM a
retry
    Just Bool
True -> Bool -> STM Bool
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
    Maybe Bool
Nothing -> Bool -> STM Bool
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False

restartUnless :: P.Members '[LogEff, P.Error ShardFlowControl] r => T.Text -> Maybe a -> P.Sem r a
restartUnless :: forall (r :: EffectRow) a.
Members '[LogEff, Error ShardFlowControl] r =>
Text -> Maybe a -> Sem r a
restartUnless Text
_ (Just a
a) = a -> Sem r a
forall a. a -> Sem r a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
a
restartUnless Text
msg Maybe a
Nothing = do
  Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
error Text
msg
  ShardFlowControl -> Sem r a
forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
P.throw ShardFlowControl
ShardFlowRestart

-- | The loop a shard will run on
shardLoop :: ShardC r => Sem r ()
shardLoop :: forall (r :: EffectRow). ShardC r => Sem r ()
shardLoop = do
  Gauge
activeShards <- Text -> [(Text, Text)] -> Sem r Gauge
forall (r :: EffectRow).
Member MetricEff r =>
Text -> [(Text, Text)] -> Sem r Gauge
registerGauge Text
"active_shards" [(Text, Text)]
forall a. Monoid a => a
mempty
  Sem r Double -> Sem r ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Sem r Double -> Sem r ()) -> Sem r Double -> Sem r ()
forall a b. (a -> b) -> a -> b
$ (Double -> Double) -> Gauge -> Sem r Double
forall (r :: EffectRow).
Member MetricEff r =>
(Double -> Double) -> Gauge -> Sem r Double
modifyGauge (Double -> Double -> Double
forall a. Num a => a -> a -> a
+ Double
1) Gauge
activeShards
  Sem r () -> Sem r ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void Sem r ()
forall (r :: EffectRow). ShardC r => Sem r ()
outerloop
  Sem r Double -> Sem r ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Sem r Double -> Sem r ()) -> Sem r Double -> Sem r ()
forall a b. (a -> b) -> a -> b
$ (Double -> Double) -> Gauge -> Sem r Double
forall (r :: EffectRow).
Member MetricEff r =>
(Double -> Double) -> Gauge -> Sem r Double
modifyGauge (Double -> Double -> Double
forall a. Num a => a -> a -> a
subtract Double
1) Gauge
activeShards
  Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Shard shut down"
  where
    controlStream :: Shard -> TBMQueue ShardMsg -> IO ()
    controlStream :: Shard -> TBMQueue ShardMsg -> IO ()
controlStream Shard
shard TBMQueue ShardMsg
outqueue = IO ()
inner
      where
        q :: OutChan ControlMessage
q = Shard
shard Shard
-> Optic' A_Lens NoIx Shard (OutChan ControlMessage)
-> OutChan ControlMessage
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx Shard (OutChan ControlMessage)
#cmdOut
        inner :: IO ()
inner = do
          ControlMessage
v <- OutChan ControlMessage -> IO ControlMessage
forall a. OutChan a -> IO a
UC.readChan OutChan ControlMessage
q
          Bool
r <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TBMQueue ShardMsg -> ShardMsg -> STM Bool
forall a. TBMQueue a -> a -> STM Bool
tryWriteTBMQueue' TBMQueue ShardMsg
outqueue (ControlMessage -> ShardMsg
Control ControlMessage
v)
          Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
r IO ()
inner

    handleWSException :: SomeException -> IO (Either (ControlMessage, Maybe T.Text) a)
    handleWSException :: forall a.
SomeException -> IO (Either (ControlMessage, Maybe Text) a)
handleWSException SomeException
e = Either (ControlMessage, Maybe Text) a
-> IO (Either (ControlMessage, Maybe Text) a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either (ControlMessage, Maybe Text) a
 -> IO (Either (ControlMessage, Maybe Text) a))
-> Either (ControlMessage, Maybe Text) a
-> IO (Either (ControlMessage, Maybe Text) a)
forall a b. (a -> b) -> a -> b
$ case SomeException -> Maybe ConnectionException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e of
      Just (CloseRequest Word16
code ByteString
_)
        | Word16
code Word16 -> [Word16] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [Word16
4004, Word16
4010, Word16
4011, Word16
4012, Word16
4013, Word16
4014] ->
            (ControlMessage, Maybe Text)
-> Either (ControlMessage, Maybe Text) a
forall a b. a -> Either a b
Left (ControlMessage
ShutDownShard, Text -> Maybe Text
forall a. a -> Maybe a
Just (Text -> Maybe Text) -> (Word16 -> Text) -> Word16 -> Maybe Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word16 -> Text
forall a. TextShow a => a -> Text
showt (Word16 -> Maybe Text) -> Word16 -> Maybe Text
forall a b. (a -> b) -> a -> b
$ Word16
code)
      Maybe ConnectionException
e -> (ControlMessage, Maybe Text)
-> Either (ControlMessage, Maybe Text) a
forall a b. a -> Either a b
Left (ControlMessage
RestartShard, Text -> Maybe Text
forall a. a -> Maybe a
Just (Text -> Maybe Text)
-> (Maybe ConnectionException -> Text)
-> Maybe ConnectionException
-> Maybe Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack (String -> Text)
-> (Maybe ConnectionException -> String)
-> Maybe ConnectionException
-> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe ConnectionException -> String
forall a. Show a => a -> String
show (Maybe ConnectionException -> Maybe Text)
-> Maybe ConnectionException -> Maybe Text
forall a b. (a -> b) -> a -> b
$ Maybe ConnectionException
e)

    discordStream :: P.Members '[LogEff, MetricEff, P.Embed IO, P.Final IO] r => Connection -> TBMQueue ShardMsg -> Sem r ()
    discordStream :: forall (r :: EffectRow).
Members '[LogEff, MetricEff, Embed IO, Final IO] r =>
Connection -> TBMQueue ShardMsg -> Sem r ()
discordStream Connection
ws TBMQueue ShardMsg
outqueue = Sem r ()
inner
      where
        inner :: Sem r ()
inner = do
          Either (ControlMessage, Maybe Text) ByteString
msg <- IO (Either (ControlMessage, Maybe Text) ByteString)
-> Sem r (Either (ControlMessage, Maybe Text) ByteString)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO (Either (ControlMessage, Maybe Text) ByteString)
 -> Sem r (Either (ControlMessage, Maybe Text) ByteString))
-> IO (Either (ControlMessage, Maybe Text) ByteString)
-> Sem r (Either (ControlMessage, Maybe Text) ByteString)
forall a b. (a -> b) -> a -> b
$ IO (Either (ControlMessage, Maybe Text) ByteString)
-> (SomeException
    -> IO (Either (ControlMessage, Maybe Text) ByteString))
-> IO (Either (ControlMessage, Maybe Text) ByteString)
forall (m :: * -> *) a.
MonadCatch m =>
m a -> (SomeException -> m a) -> m a
Ex.catchAny (ByteString -> Either (ControlMessage, Maybe Text) ByteString
forall a b. b -> Either a b
Right (ByteString -> Either (ControlMessage, Maybe Text) ByteString)
-> IO ByteString
-> IO (Either (ControlMessage, Maybe Text) ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO ByteString
forall a. WebSocketsData a => Connection -> IO a
receiveData Connection
ws) SomeException
-> IO (Either (ControlMessage, Maybe Text) ByteString)
forall a.
SomeException -> IO (Either (ControlMessage, Maybe Text) a)
handleWSException

          case Either (ControlMessage, Maybe Text) ByteString
msg of
            Left (ControlMessage
c, Maybe Text
reason) -> do
              Maybe Text -> (Text -> Sem r ()) -> Sem r ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
whenJust Maybe Text
reason (\Text
r -> Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
error (Text -> Sem r ()) -> (String -> Text) -> String -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack (String -> Sem r ()) -> String -> Sem r ()
forall a b. (a -> b) -> a -> b
$ String
"Shard closed with reason: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Text -> String
forall a. Show a => a -> String
show Text
r)
              IO () -> Sem r ()
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem r ()) -> (STM () -> IO ()) -> STM () -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> Sem r ()) -> STM () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ TBMQueue ShardMsg -> ShardMsg -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue ShardMsg
outqueue (ControlMessage -> ShardMsg
Control ControlMessage
c)
            Right ByteString
msg' -> do
              -- debug [fmt|Got msg: {msg'}|]
              let decoded :: Either String ReceivedDiscordMessage
decoded = ByteString -> Either String ReceivedDiscordMessage
forall a. FromJSON a => ByteString -> Either String a
A.eitherDecode ByteString
msg'
              Bool
r <- case Either String ReceivedDiscordMessage
decoded of
                Right ReceivedDiscordMessage
a ->
                  IO Bool -> Sem r Bool
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO Bool -> Sem r Bool)
-> (STM Bool -> IO Bool) -> STM Bool -> Sem r Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> Sem r Bool) -> STM Bool -> Sem r Bool
forall a b. (a -> b) -> a -> b
$ TBMQueue ShardMsg -> ShardMsg -> STM Bool
forall a. TBMQueue a -> a -> STM Bool
tryWriteTBMQueue' TBMQueue ShardMsg
outqueue (ReceivedDiscordMessage -> ShardMsg
Discord ReceivedDiscordMessage
a)
                Left String
e -> do
                  Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
error (Text -> Sem r ()) -> (String -> Text) -> String -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack (String -> Sem r ()) -> String -> Sem r ()
forall a b. (a -> b) -> a -> b
$ String
"Failed to decode " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
e String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
": " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ByteString -> String
forall a. Show a => a -> String
show ByteString
msg'
                  Bool -> Sem r Bool
forall a. a -> Sem r a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
              Bool -> Sem r () -> Sem r ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
r Sem r ()
inner
    outerloop :: ShardC r => Sem r ()
    outerloop :: forall (r :: EffectRow). ShardC r => Sem r ()
outerloop = Sem r Bool -> Sem r ()
forall (r :: EffectRow).
Member (Final IO) r =>
Sem r Bool -> Sem r ()
whileMFinalIO (Sem r Bool -> Sem r ()) -> Sem r Bool -> Sem r ()
forall a b. (a -> b) -> a -> b
$ do
      Shard
shard :: Shard <- (ShardState -> Shard) -> Sem r Shard
forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState -> Optic' A_Lens NoIx ShardState Shard -> Shard
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx ShardState Shard
#shardS)
      let host :: Text
host = Shard
shard Shard -> Optic' A_Lens NoIx Shard Text -> Text
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx Shard Text
#gateway
      let host' :: Text
host' = Text -> Maybe Text -> Text
forall a. a -> Maybe a -> a
fromMaybe Text
host (Maybe Text -> Text) -> Maybe Text -> Text
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Maybe Text
T.stripPrefix Text
"wss://" Text
host
      Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info (Text -> Sem r ()) -> (String -> Text) -> String -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack (String -> Sem r ()) -> String -> Sem r ()
forall a b. (a -> b) -> a -> b
$ String
"starting up shard " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show (Shard -> Int
shardID Shard
shard) String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" of " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show (Shard -> Int
shardCount Shard
shard)

      Maybe ShardFlowControl
innerLoopVal <- Text
-> Text
-> (Connection -> Sem r ShardFlowControl)
-> Sem r (Maybe ShardFlowControl)
forall (r :: EffectRow) a.
Members '[LogEff, Final IO, Embed IO] r =>
Text -> Text -> (Connection -> Sem r a) -> Sem r (Maybe a)
runWebsocket Text
host' Text
"/?v=9&encoding=json" Connection -> Sem r ShardFlowControl
forall (r :: EffectRow).
ShardC r =>
Connection -> Sem r ShardFlowControl
innerloop

      case Maybe ShardFlowControl
innerLoopVal of
        Just ShardFlowControl
ShardFlowShutDown -> do
          Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info Text
"Shutting down shard"
          Bool -> Sem r Bool
forall a. a -> Sem r a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
        Just ShardFlowControl
ShardFlowRestart -> do
          Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info Text
"Restaring shard"
          Bool -> Sem r Bool
forall a. a -> Sem r a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
        -- we restart normally when we loop

        Maybe ShardFlowControl
Nothing -> do
          -- won't happen unless innerloop starts using a non-deterministic effect or connecting to the ws dies
          Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info Text
"Restarting shard (abnormal reasons?)"
          Bool -> Sem r Bool
forall a. a -> Sem r a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True

    innerloop :: ShardC r => Connection -> Sem r ShardFlowControl
    innerloop :: forall (r :: EffectRow).
ShardC r =>
Connection -> Sem r ShardFlowControl
innerloop Connection
ws = do
      Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Entering inner loop of shard"

      Shard
shard <- (ShardState -> Shard) -> Sem r Shard
forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState -> Optic' A_Lens NoIx ShardState Shard -> Shard
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx ShardState Shard
#shardS)
      (ShardState -> ShardState) -> Sem r ()
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (Optic
  A_Lens
  NoIx
  ShardState
  ShardState
  (Maybe Connection)
  (Maybe Connection)
#wsConn Optic
  A_Lens
  NoIx
  ShardState
  ShardState
  (Maybe Connection)
  (Maybe Connection)
-> Connection -> ShardState -> ShardState
forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a (Maybe b) -> b -> s -> t
?~ Connection
ws)

      Maybe Int
seqNum' <- (ShardState -> Maybe Int) -> Sem r (Maybe Int)
forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState
-> Optic' A_Lens NoIx ShardState (Maybe Int) -> Maybe Int
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx ShardState (Maybe Int)
#seqNum)
      Maybe Text
sessionID' <- (ShardState -> Maybe Text) -> Sem r (Maybe Text)
forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState
-> Optic' A_Lens NoIx ShardState (Maybe Text) -> Maybe Text
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx ShardState (Maybe Text)
#sessionID)

      case (Maybe Int
seqNum', Maybe Text
sessionID') of
        (Just Int
n, Just Text
s) -> do
          Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug (Text -> Sem r ()) -> Text -> Sem r ()
forall a b. (a -> b) -> a -> b
$ Text
"Resuming shard (sessionID: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
s Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
", seq: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Int -> String
forall a. Show a => a -> String
show Int
n)
          SentDiscordMessage -> Sem r ()
forall (r :: EffectRow). ShardC r => SentDiscordMessage -> Sem r ()
sendToWs
            ( ResumeData -> SentDiscordMessage
Resume
                ResumeData
                  { $sel:token:ResumeData :: Text
token = Shard
shard Shard -> Optic' A_Lens NoIx Shard Text -> Text
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx Shard Text
#token
                  , $sel:sessionID:ResumeData :: Text
sessionID = Text
s
                  , $sel:seq:ResumeData :: Int
seq = Int
n
                  }
            )
        (Maybe Int, Maybe Text)
_noActiveSession -> do
          Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Identifying shard"
          SentDiscordMessage -> Sem r ()
forall (r :: EffectRow). ShardC r => SentDiscordMessage -> Sem r ()
sendToWs
            ( IdentifyData -> SentDiscordMessage
Identify
                IdentifyData
                  { $sel:token:IdentifyData :: Text
token = Shard
shard Shard -> Optic' A_Lens NoIx Shard Text -> Text
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx Shard Text
#token
                  , $sel:properties:IdentifyData :: IdentifyProps
properties =
                      IdentifyProps
                        { $sel:browser:IdentifyProps :: Text
browser = Text
"Calamity: https://github.com/simmsb/calamity"
                        , $sel:device:IdentifyProps :: Text
device = Text
"Calamity: https://github.com/simmsb/calamity"
                        }
                  , $sel:compress:IdentifyData :: Bool
compress = Bool
False
                  , $sel:largeThreshold:IdentifyData :: Maybe Int
largeThreshold = Maybe Int
forall a. Maybe a
Nothing
                  , $sel:shard:IdentifyData :: Maybe (Int, Int)
shard =
                      (Int, Int) -> Maybe (Int, Int)
forall a. a -> Maybe a
Just (Shard
shard Shard -> Optic' A_Lens NoIx Shard Int -> Int
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx Shard Int
#shardID, Shard
shard Shard -> Optic' A_Lens NoIx Shard Int -> Int
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx Shard Int
#shardCount)
                  , $sel:presence:IdentifyData :: Maybe StatusUpdateData
presence = Shard
shard Shard
-> Optic' A_Lens NoIx Shard (Maybe StatusUpdateData)
-> Maybe StatusUpdateData
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx Shard (Maybe StatusUpdateData)
#initialStatus
                  , $sel:intents:IdentifyData :: Intents
intents = Shard
shard Shard -> Optic' A_Lens NoIx Shard Intents -> Intents
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx Shard Intents
#intents
                  }
            )

      ShardFlowControl
result <-
        Sem (Resource : r) ShardFlowControl -> Sem r ShardFlowControl
forall (r :: EffectRow) a.
Member (Final IO) r =>
Sem (Resource : r) a -> Sem r a
P.resourceToIOFinal (Sem (Resource : r) ShardFlowControl -> Sem r ShardFlowControl)
-> Sem (Resource : r) ShardFlowControl -> Sem r ShardFlowControl
forall a b. (a -> b) -> a -> b
$
          Sem (Resource : r) (TBMQueue ShardMsg)
-> (TBMQueue ShardMsg -> Sem (Resource : r) ())
-> (TBMQueue ShardMsg -> Sem (Resource : r) ShardFlowControl)
-> Sem (Resource : r) ShardFlowControl
forall (r :: EffectRow) a c b.
Member Resource r =>
Sem r a -> (a -> Sem r c) -> (a -> Sem r b) -> Sem r b
P.bracket
            (IO (TBMQueue ShardMsg) -> Sem (Resource : r) (TBMQueue ShardMsg)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO (TBMQueue ShardMsg) -> Sem (Resource : r) (TBMQueue ShardMsg))
-> IO (TBMQueue ShardMsg) -> Sem (Resource : r) (TBMQueue ShardMsg)
forall a b. (a -> b) -> a -> b
$ Int -> IO (TBMQueue ShardMsg)
forall a. Int -> IO (TBMQueue a)
newTBMQueueIO Int
1)
            (IO () -> Sem (Resource : r) ()
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem (Resource : r) ())
-> (TBMQueue ShardMsg -> IO ())
-> TBMQueue ShardMsg
-> Sem (Resource : r) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ())
-> (TBMQueue ShardMsg -> STM ()) -> TBMQueue ShardMsg -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TBMQueue ShardMsg -> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue)
            ( \TBMQueue ShardMsg
q -> do
                Text -> Sem (Resource : r) ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"handling events now"
                Async (Maybe ())
_controlThread <- Sem (Resource : r) () -> Sem (Resource : r) (Async (Maybe ()))
forall (r :: EffectRow) a.
Member Async r =>
Sem r a -> Sem r (Async (Maybe a))
P.async (Sem (Resource : r) () -> Sem (Resource : r) (Async (Maybe ())))
-> (IO () -> Sem (Resource : r) ())
-> IO ()
-> Sem (Resource : r) (Async (Maybe ()))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> Sem (Resource : r) ()
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem (Resource : r) (Async (Maybe ())))
-> IO () -> Sem (Resource : r) (Async (Maybe ()))
forall a b. (a -> b) -> a -> b
$ Shard -> TBMQueue ShardMsg -> IO ()
controlStream Shard
shard TBMQueue ShardMsg
q
                Async (Maybe ())
_discordThread <- Sem (Resource : r) () -> Sem (Resource : r) (Async (Maybe ()))
forall (r :: EffectRow) a.
Member Async r =>
Sem r a -> Sem r (Async (Maybe a))
P.async (Sem (Resource : r) () -> Sem (Resource : r) (Async (Maybe ())))
-> Sem (Resource : r) () -> Sem (Resource : r) (Async (Maybe ()))
forall a b. (a -> b) -> a -> b
$ Connection -> TBMQueue ShardMsg -> Sem (Resource : r) ()
forall (r :: EffectRow).
Members '[LogEff, MetricEff, Embed IO, Final IO] r =>
Connection -> TBMQueue ShardMsg -> Sem r ()
discordStream Connection
ws TBMQueue ShardMsg
q
                Sem r ShardFlowControl -> Sem (Resource : r) ShardFlowControl
forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
Sem r a -> Sem (e : r) a
P.raise (Sem r ShardFlowControl -> Sem (Resource : r) ShardFlowControl)
-> (Sem (Error ShardFlowControl : r) () -> Sem r ShardFlowControl)
-> Sem (Error ShardFlowControl : r) ()
-> Sem (Resource : r) ShardFlowControl
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem r (Maybe ShardFlowControl) -> Sem r ShardFlowControl
forall (r :: EffectRow) a.
Member (Final IO) r =>
Sem r (Maybe a) -> Sem r a
untilJustFinalIO (Sem r (Maybe ShardFlowControl) -> Sem r ShardFlowControl)
-> (Sem (Error ShardFlowControl : r) ()
    -> Sem r (Maybe ShardFlowControl))
-> Sem (Error ShardFlowControl : r) ()
-> Sem r ShardFlowControl
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Either ShardFlowControl () -> Maybe ShardFlowControl
forall e a. Either e a -> Maybe e
leftToMaybe (Either ShardFlowControl () -> Maybe ShardFlowControl)
-> Sem r (Either ShardFlowControl ())
-> Sem r (Maybe ShardFlowControl)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) (Sem r (Either ShardFlowControl ())
 -> Sem r (Maybe ShardFlowControl))
-> (Sem (Error ShardFlowControl : r) ()
    -> Sem r (Either ShardFlowControl ()))
-> Sem (Error ShardFlowControl : r) ()
-> Sem r (Maybe ShardFlowControl)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem (Error ShardFlowControl : r) ()
-> Sem r (Either ShardFlowControl ())
forall e (r :: EffectRow) a.
Sem (Error e : r) a -> Sem r (Either e a)
P.runError (Sem (Error ShardFlowControl : r) ()
 -> Sem (Resource : r) ShardFlowControl)
-> Sem (Error ShardFlowControl : r) ()
-> Sem (Resource : r) ShardFlowControl
forall a b. (a -> b) -> a -> b
$ do
                  -- only we close the queue
                  Maybe ShardMsg
msg <- IO (Maybe ShardMsg)
-> Sem (Error ShardFlowControl : r) (Maybe ShardMsg)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO (Maybe ShardMsg)
 -> Sem (Error ShardFlowControl : r) (Maybe ShardMsg))
-> (STM (Maybe ShardMsg) -> IO (Maybe ShardMsg))
-> STM (Maybe ShardMsg)
-> Sem (Error ShardFlowControl : r) (Maybe ShardMsg)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (Maybe ShardMsg) -> IO (Maybe ShardMsg)
forall a. STM a -> IO a
atomically (STM (Maybe ShardMsg)
 -> Sem (Error ShardFlowControl : r) (Maybe ShardMsg))
-> STM (Maybe ShardMsg)
-> Sem (Error ShardFlowControl : r) (Maybe ShardMsg)
forall a b. (a -> b) -> a -> b
$ TBMQueue ShardMsg -> STM (Maybe ShardMsg)
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue ShardMsg
q
                  ShardMsg -> Sem (Error ShardFlowControl : r) ()
forall (r :: EffectRow).
(ShardC r, Member (Error ShardFlowControl) r) =>
ShardMsg -> Sem r ()
handleMsg (ShardMsg -> Sem (Error ShardFlowControl : r) ())
-> Sem (Error ShardFlowControl : r) ShardMsg
-> Sem (Error ShardFlowControl : r) ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Text -> Maybe ShardMsg -> Sem (Error ShardFlowControl : r) ShardMsg
forall (r :: EffectRow) a.
Members '[LogEff, Error ShardFlowControl] r =>
Text -> Maybe a -> Sem r a
restartUnless Text
"shard message stream closed by someone other than the sink" Maybe ShardMsg
msg
            )

      Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Exiting inner loop of shard"

      (ShardState -> ShardState) -> Sem r ()
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (Optic
  A_Lens
  NoIx
  ShardState
  ShardState
  (Maybe Connection)
  (Maybe Connection)
#wsConn Optic
  A_Lens
  NoIx
  ShardState
  ShardState
  (Maybe Connection)
  (Maybe Connection)
-> Maybe Connection -> ShardState -> ShardState
forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a b -> b -> s -> t
.~ Maybe Connection
forall a. Maybe a
Nothing)
      Sem r ()
forall (r :: EffectRow). ShardC r => Sem r ()
haltHeartBeat
      ShardFlowControl -> Sem r ShardFlowControl
forall a. a -> Sem r a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ShardFlowControl
result
    handleMsg :: (ShardC r, P.Member (P.Error ShardFlowControl) r) => ShardMsg -> Sem r ()
    handleMsg :: forall (r :: EffectRow).
(ShardC r, Member (Error ShardFlowControl) r) =>
ShardMsg -> Sem r ()
handleMsg (Discord ReceivedDiscordMessage
msg) = case ReceivedDiscordMessage
msg of
      EvtDispatch Int
sn DispatchData
data' -> do
        -- trace $ "Handling event: ("+||data'||+")"
        (ShardState -> ShardState) -> Sem r ()
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (Optic' A_Lens NoIx ShardState (Maybe Int)
#seqNum Optic' A_Lens NoIx ShardState (Maybe Int)
-> Int -> ShardState -> ShardState
forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a (Maybe b) -> b -> s -> t
?~ Int
sn)

        case DispatchData
data' of
          Ready ReadyData
rdata' ->
            (ShardState -> ShardState) -> Sem r ()
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (Optic' A_Lens NoIx ShardState (Maybe Text)
#sessionID Optic' A_Lens NoIx ShardState (Maybe Text)
-> Text -> ShardState -> ShardState
forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a (Maybe b) -> b -> s -> t
?~ (ReadyData
rdata' ReadyData -> Optic' A_Lens NoIx ReadyData Text -> Text
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx ReadyData Text
#sessionID))
          DispatchData
_NotReady -> () -> Sem r ()
forall a. a -> Sem r a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

        Shard
shard <- (ShardState -> Shard) -> Sem r Shard
forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState -> Optic' A_Lens NoIx ShardState Shard -> Shard
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx ShardState Shard
#shardS)
        IO () -> Sem r ()
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem r ()) -> IO () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ InChan CalamityEvent -> CalamityEvent -> IO ()
forall a. InChan a -> a -> IO ()
UC.writeChan (Shard
shard Shard
-> Optic' A_Lens NoIx Shard (InChan CalamityEvent)
-> InChan CalamityEvent
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx Shard (InChan CalamityEvent)
#evtIn) (Int -> DispatchData -> CalamityEvent
Dispatch (Shard
shard Shard -> Optic' A_Lens NoIx Shard Int -> Int
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx Shard Int
#shardID) DispatchData
data')
      ReceivedDiscordMessage
HeartBeatReq -> do
        Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Received heartbeat request"
        Sem r ()
forall (r :: EffectRow). ShardC r => Sem r ()
sendHeartBeat
      ReceivedDiscordMessage
Reconnect -> do
        Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Being asked to restart by Discord"
        ShardFlowControl -> Sem r ()
forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
P.throw ShardFlowControl
ShardFlowRestart
      InvalidSession Bool
resumable -> do
        if Bool
resumable
          then Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info Text
"Received resumable invalid session"
          else do
            Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info Text
"Received non-resumable invalid session, sleeping for 15 seconds then retrying"
            (ShardState -> ShardState) -> Sem r ()
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (Optic' A_Lens NoIx ShardState (Maybe Text)
#sessionID Optic' A_Lens NoIx ShardState (Maybe Text)
-> Maybe Text -> ShardState -> ShardState
forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a b -> b -> s -> t
.~ Maybe Text
forall a. Maybe a
Nothing)
            (ShardState -> ShardState) -> Sem r ()
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (Optic' A_Lens NoIx ShardState (Maybe Int)
#seqNum Optic' A_Lens NoIx ShardState (Maybe Int)
-> Maybe Int -> ShardState -> ShardState
forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a b -> b -> s -> t
.~ Maybe Int
forall a. Maybe a
Nothing)
            IO () -> Sem r ()
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem r ()) -> IO () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Int
15 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000)
        ShardFlowControl -> Sem r ()
forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
P.throw ShardFlowControl
ShardFlowRestart
      Hello Int
interval -> do
        Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info (Text -> Sem r ()) -> (String -> Text) -> String -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack (String -> Sem r ()) -> String -> Sem r ()
forall a b. (a -> b) -> a -> b
$ String
"Received hello, beginning to heartbeat at an interval of " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
interval String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"ms"
        Int -> Sem r ()
forall (r :: EffectRow). ShardC r => Int -> Sem r ()
startHeartBeatLoop Int
interval
      ReceivedDiscordMessage
HeartBeatAck -> do
        Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Received heartbeat ack"
        (ShardState -> ShardState) -> Sem r ()
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (Optic A_Lens NoIx ShardState ShardState Bool Bool
#hbResponse Optic A_Lens NoIx ShardState ShardState Bool Bool
-> Bool -> ShardState -> ShardState
forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a b -> b -> s -> t
.~ Bool
True)
    handleMsg (Control ControlMessage
msg) = case ControlMessage
msg of
      SendPresence StatusUpdateData
data' -> do
        Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug (Text -> Sem r ()) -> (String -> Text) -> String -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack (String -> Sem r ()) -> String -> Sem r ()
forall a b. (a -> b) -> a -> b
$ String
"Sending presence: (" String -> String -> String
forall a. Semigroup a => a -> a -> a
<> StatusUpdateData -> String
forall a. Show a => a -> String
show StatusUpdateData
data' String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
")"
        SentDiscordMessage -> Sem r ()
forall (r :: EffectRow). ShardC r => SentDiscordMessage -> Sem r ()
sendToWs (SentDiscordMessage -> Sem r ()) -> SentDiscordMessage -> Sem r ()
forall a b. (a -> b) -> a -> b
$ StatusUpdateData -> SentDiscordMessage
StatusUpdate StatusUpdateData
data'
      ControlMessage
RestartShard -> ShardFlowControl -> Sem r ()
forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
P.throw ShardFlowControl
ShardFlowRestart
      ControlMessage
ShutDownShard -> ShardFlowControl -> Sem r ()
forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
P.throw ShardFlowControl
ShardFlowShutDown

startHeartBeatLoop :: ShardC r => Int -> Sem r ()
startHeartBeatLoop :: forall (r :: EffectRow). ShardC r => Int -> Sem r ()
startHeartBeatLoop Int
interval = do
  Sem r ()
forall (r :: EffectRow). ShardC r => Sem r ()
haltHeartBeat -- cancel any currently running hb thread
  Async (Maybe ())
thread <- Sem r () -> Sem r (Async (Maybe ()))
forall (r :: EffectRow) a.
Member Async r =>
Sem r a -> Sem r (Async (Maybe a))
P.async (Sem r () -> Sem r (Async (Maybe ())))
-> Sem r () -> Sem r (Async (Maybe ()))
forall a b. (a -> b) -> a -> b
$ Int -> Sem r ()
forall (r :: EffectRow). ShardC r => Int -> Sem r ()
heartBeatLoop Int
interval
  (ShardState -> ShardState) -> Sem r ()
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (Optic
  A_Lens
  NoIx
  ShardState
  ShardState
  (Maybe (Async (Maybe ())))
  (Maybe (Async (Maybe ())))
#hbThread Optic
  A_Lens
  NoIx
  ShardState
  ShardState
  (Maybe (Async (Maybe ())))
  (Maybe (Async (Maybe ())))
-> Async (Maybe ()) -> ShardState -> ShardState
forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a (Maybe b) -> b -> s -> t
?~ Async (Maybe ())
thread)

haltHeartBeat :: ShardC r => Sem r ()
haltHeartBeat :: forall (r :: EffectRow). ShardC r => Sem r ()
haltHeartBeat = do
  Maybe (Async (Maybe ()))
thread <- forall s a (r :: EffectRow).
Member (AtomicState s) r =>
(s -> (s, a)) -> Sem r a
P.atomicState @ShardState ((ShardState -> (ShardState, Maybe (Async (Maybe ()))))
 -> Sem r (Maybe (Async (Maybe ()))))
-> (State ShardState (Maybe (Async (Maybe ())))
    -> ShardState -> (ShardState, Maybe (Async (Maybe ()))))
-> State ShardState (Maybe (Async (Maybe ())))
-> Sem r (Maybe (Async (Maybe ())))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((Maybe (Async (Maybe ())), ShardState)
-> (ShardState, Maybe (Async (Maybe ())))
forall a b. (a, b) -> (b, a)
swap ((Maybe (Async (Maybe ())), ShardState)
 -> (ShardState, Maybe (Async (Maybe ()))))
-> (ShardState -> (Maybe (Async (Maybe ())), ShardState))
-> ShardState
-> (ShardState, Maybe (Async (Maybe ())))
forall b c a. (b -> c) -> (a -> b) -> a -> c
.) ((ShardState -> (Maybe (Async (Maybe ())), ShardState))
 -> ShardState -> (ShardState, Maybe (Async (Maybe ()))))
-> (State ShardState (Maybe (Async (Maybe ())))
    -> ShardState -> (Maybe (Async (Maybe ())), ShardState))
-> State ShardState (Maybe (Async (Maybe ())))
-> ShardState
-> (ShardState, Maybe (Async (Maybe ())))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. State ShardState (Maybe (Async (Maybe ())))
-> ShardState -> (Maybe (Async (Maybe ())), ShardState)
forall s a. State s a -> s -> (a, s)
runState (State ShardState (Maybe (Async (Maybe ())))
 -> Sem r (Maybe (Async (Maybe ()))))
-> State ShardState (Maybe (Async (Maybe ())))
-> Sem r (Maybe (Async (Maybe ())))
forall a b. (a -> b) -> a -> b
$ do
    Maybe (Async (Maybe ()))
thread <- Optic
  A_Lens
  NoIx
  ShardState
  ShardState
  (Maybe (Async (Maybe ())))
  (Maybe (Async (Maybe ())))
-> State ShardState (Maybe (Async (Maybe ())))
forall k s (m :: * -> *) (is :: IxList) a.
(Is k A_Getter, MonadState s m) =>
Optic' k is s a -> m a
use Optic
  A_Lens
  NoIx
  ShardState
  ShardState
  (Maybe (Async (Maybe ())))
  (Maybe (Async (Maybe ())))
#hbThread
    #hbThread .= Nothing
    Maybe (Async (Maybe ()))
-> State ShardState (Maybe (Async (Maybe ())))
forall a. a -> StateT ShardState Identity a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Async (Maybe ()))
thread
  case Maybe (Async (Maybe ()))
thread of
    Just Async (Maybe ())
t -> do
      Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Stopping heartbeat thread"
      IO () -> Sem r ()
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Async (Maybe ()) -> IO ()
forall a. Async a -> IO ()
cancel Async (Maybe ())
t)
    Maybe (Async (Maybe ()))
Nothing -> () -> Sem r ()
forall a. a -> Sem r a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

sendHeartBeat :: ShardC r => Sem r ()
sendHeartBeat :: forall (r :: EffectRow). ShardC r => Sem r ()
sendHeartBeat = do
  Maybe Int
sn <- (ShardState -> Maybe Int) -> Sem r (Maybe Int)
forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState
-> Optic' A_Lens NoIx ShardState (Maybe Int) -> Maybe Int
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx ShardState (Maybe Int)
#seqNum)
  Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug (Text -> Sem r ()) -> (String -> Text) -> String -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack (String -> Sem r ()) -> String -> Sem r ()
forall a b. (a -> b) -> a -> b
$ String
"Sending heartbeat (seq: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Maybe Int -> String
forall a. Show a => a -> String
show Maybe Int
sn String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
")"
  SentDiscordMessage -> Sem r ()
forall (r :: EffectRow). ShardC r => SentDiscordMessage -> Sem r ()
sendToWs (SentDiscordMessage -> Sem r ()) -> SentDiscordMessage -> Sem r ()
forall a b. (a -> b) -> a -> b
$ Maybe Int -> SentDiscordMessage
HeartBeat Maybe Int
sn
  (ShardState -> ShardState) -> Sem r ()
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (Optic A_Lens NoIx ShardState ShardState Bool Bool
#hbResponse Optic A_Lens NoIx ShardState ShardState Bool Bool
-> Bool -> ShardState -> ShardState
forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a b -> b -> s -> t
.~ Bool
False)

heartBeatLoop :: ShardC r => Int -> Sem r ()
heartBeatLoop :: forall (r :: EffectRow). ShardC r => Int -> Sem r ()
heartBeatLoop Int
interval = Sem r (Maybe ()) -> Sem r ()
forall (r :: EffectRow) a.
Member (Final IO) r =>
Sem r (Maybe a) -> Sem r a
untilJustFinalIO (Sem r (Maybe ()) -> Sem r ())
-> (Sem (Error () : r) () -> Sem r (Maybe ()))
-> Sem (Error () : r) ()
-> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Either () () -> Maybe ()
forall e a. Either e a -> Maybe e
leftToMaybe (Either () () -> Maybe ())
-> Sem r (Either () ()) -> Sem r (Maybe ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) (Sem r (Either () ()) -> Sem r (Maybe ()))
-> (Sem (Error () : r) () -> Sem r (Either () ()))
-> Sem (Error () : r) ()
-> Sem r (Maybe ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem (Error () : r) () -> Sem r (Either () ())
forall e (r :: EffectRow) a.
Sem (Error e : r) a -> Sem r (Either e a)
P.runError (Sem (Error () : r) () -> Sem r ())
-> Sem (Error () : r) () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ do
  Sem (Error () : r) ()
forall (r :: EffectRow). ShardC r => Sem r ()
sendHeartBeat
  IO () -> Sem (Error () : r) ()
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem (Error () : r) ())
-> (Int -> IO ()) -> Int -> Sem (Error () : r) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO ()
threadDelay (Int -> Sem (Error () : r) ()) -> Int -> Sem (Error () : r) ()
forall a b. (a -> b) -> a -> b
$ Int
interval Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000
  Sem (Error () : r) Bool
-> Sem (Error () : r) () -> Sem (Error () : r) ()
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
unlessM ((ShardState -> Bool) -> Sem (Error () : r) Bool
forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState
-> Optic A_Lens NoIx ShardState ShardState Bool Bool -> Bool
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic A_Lens NoIx ShardState ShardState Bool Bool
#hbResponse)) (Sem (Error () : r) () -> Sem (Error () : r) ())
-> Sem (Error () : r) () -> Sem (Error () : r) ()
forall a b. (a -> b) -> a -> b
$ do
    Text -> Sem (Error () : r) ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"No heartbeat response, restarting shard"
    Connection
wsConn <- () -> Maybe Connection -> Sem (Error () : r) Connection
forall e (r :: EffectRow) a.
Member (Error e) r =>
e -> Maybe a -> Sem r a
P.note () (Maybe Connection -> Sem (Error () : r) Connection)
-> Sem (Error () : r) (Maybe Connection)
-> Sem (Error () : r) Connection
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (ShardState -> Maybe Connection)
-> Sem (Error () : r) (Maybe Connection)
forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState
-> Optic
     A_Lens
     NoIx
     ShardState
     ShardState
     (Maybe Connection)
     (Maybe Connection)
-> Maybe Connection
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic
  A_Lens
  NoIx
  ShardState
  ShardState
  (Maybe Connection)
  (Maybe Connection)
#wsConn)
    IO () -> Sem (Error () : r) ()
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem (Error () : r) ()) -> IO () -> Sem (Error () : r) ()
forall a b. (a -> b) -> a -> b
$ Connection -> Word16 -> Text -> IO ()
forall a. WebSocketsData a => Connection -> Word16 -> a -> IO ()
sendCloseCode Connection
wsConn Word16
4000 (Text
"No heartbeat in time" :: T.Text)
    () -> Sem (Error () : r) ()
forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
P.throw ()