-- | The shard logic
{-# LANGUAGE RecursiveDo #-}
{-# LANGUAGE TemplateHaskell #-}

module Calamity.Gateway.Shard
    ( Shard(..)
    , newShard ) where

import           Calamity.Gateway.DispatchEvents
import           Calamity.Gateway.Types
import           Calamity.Internal.Utils
import           Calamity.LogEff
import           Calamity.Metrics.Eff
import           Calamity.Types.Token

import           Control.Concurrent
import           Control.Concurrent.Async
import           Control.Concurrent.STM
import           Control.Concurrent.STM.TBMQueue
import           Control.Exception
import           Control.Lens
import           Control.Monad
import           Control.Monad.State.Lazy

import qualified Data.Aeson                      as A
import           Data.Functor
import           Data.Maybe
import           Data.Text.Lazy                  ( Text, stripPrefix )
import           Data.Text.Lazy.Lens
import           Data.Void

import           DiPolysemy                      hiding ( debug, error, info )

import           Fmt

import           Network.WebSockets              ( Connection, ConnectionException(..), receiveData, sendCloseCode
                                                 , sendTextData )

import           Polysemy                        ( Sem )
import qualified Polysemy                        as P
import qualified Polysemy.Async                  as P
import qualified Polysemy.AtomicState            as P
import qualified Polysemy.Error                  as P
import qualified Polysemy.Resource               as P

import           Prelude                         hiding ( error )

import           TextShow

import           Wuss

data Websocket m a where
  RunWebsocket :: Text -> Text -> (Connection -> m a) -> Websocket m a

P.makeSem ''Websocket

websocketToIO :: forall r a. P.Member (P.Embed IO) r => Sem (Websocket ': r) a -> Sem r a
websocketToIO :: Sem (Websocket : r) a -> Sem r a
websocketToIO = (forall x (m :: * -> *). Websocket m x -> Tactical Websocket m r x)
-> Sem (Websocket : r) a -> Sem r a
forall (e :: (* -> *) -> * -> *) (r :: [(* -> *) -> * -> *]) a.
(forall x (m :: * -> *). e m x -> Tactical e m r x)
-> Sem (e : r) a -> Sem r a
P.interpretH
  (\case
     RunWebsocket host path a -> do
       f ()
istate <- Sem (WithTactics Websocket f m r) (f ())
forall (f :: * -> *) (m :: * -> *) (r :: [(* -> *) -> * -> *])
       (e :: (* -> *) -> * -> *).
Sem (WithTactics e f m r) (f ())
P.getInitialStateT
       f Connection -> Sem (Websocket : r) (f x)
ma <- (Connection -> m x)
-> Sem
     (WithTactics Websocket f m r)
     (f Connection -> Sem (Websocket : r) (f x))
forall a (m :: * -> *) b (e :: (* -> *) -> * -> *) (f :: * -> *)
       (r :: [(* -> *) -> * -> *]).
(a -> m b) -> Sem (WithTactics e f m r) (f a -> Sem (e : r) (f b))
P.bindT Connection -> m x
a

       ((forall x. Sem (WithTactics Websocket f m r) x -> IO x)
 -> IO () -> IO (f x))
-> Sem (WithTactics Websocket f m r) (f x)
forall (r :: [(* -> *) -> * -> *]) a.
Member (Embed IO) r =>
((forall x. Sem r x -> IO x) -> IO () -> IO a) -> Sem r a
P.withLowerToIO (((forall x. Sem (WithTactics Websocket f m r) x -> IO x)
  -> IO () -> IO (f x))
 -> Sem (WithTactics Websocket f m r) (f x))
-> ((forall x. Sem (WithTactics Websocket f m r) x -> IO x)
    -> IO () -> IO (f x))
-> Sem (WithTactics Websocket f m r) (f x)
forall a b. (a -> b) -> a -> b
$ \lower :: forall x. Sem (WithTactics Websocket f m r) x -> IO x
lower finish :: IO ()
finish -> do
         let done :: Sem (Websocket ': r) x -> IO x
             done :: Sem (Websocket : r) x -> IO x
done = Sem (WithTactics Websocket f m r) x -> IO x
forall x. Sem (WithTactics Websocket f m r) x -> IO x
lower (Sem (WithTactics Websocket f m r) x -> IO x)
-> (Sem (Websocket : r) x -> Sem (WithTactics Websocket f m r) x)
-> Sem (Websocket : r) x
-> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem r x -> Sem (WithTactics Websocket f m r) x
forall (e :: (* -> *) -> * -> *) (r :: [(* -> *) -> * -> *]) a.
Sem r a -> Sem (e : r) a
P.raise (Sem r x -> Sem (WithTactics Websocket f m r) x)
-> (Sem (Websocket : r) x -> Sem r x)
-> Sem (Websocket : r) x
-> Sem (WithTactics Websocket f m r) x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem (Websocket : r) x -> Sem r x
forall (r :: [(* -> *) -> * -> *]) a.
Member (Embed IO) r =>
Sem (Websocket : r) a -> Sem r a
websocketToIO

         HostName -> PortNumber -> HostName -> ClientApp (f x) -> IO (f x)
forall a. HostName -> PortNumber -> HostName -> ClientApp a -> IO a
runSecureClient (Text
host Text -> Getting HostName Text HostName -> HostName
forall s a. s -> Getting a s a -> a
^. Getting HostName Text HostName
Iso' Text HostName
unpacked) 443 (Text
path Text -> Getting HostName Text HostName -> HostName
forall s a. s -> Getting a s a -> a
^. Getting HostName Text HostName
Iso' Text HostName
unpacked)
           (\x :: Connection
x -> do
              f x
res <- Sem (Websocket : r) (f x) -> IO (f x)
forall x. Sem (Websocket : r) x -> IO x
done (f Connection -> Sem (Websocket : r) (f x)
ma (f Connection -> Sem (Websocket : r) (f x))
-> f Connection -> Sem (Websocket : r) (f x)
forall a b. (a -> b) -> a -> b
$ f ()
istate f () -> Connection -> f Connection
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Connection
x)
              IO ()
finish
              f x -> IO (f x)
forall (f :: * -> *) a. Applicative f => a -> f a
pure f x
res))

newShardState :: Shard -> ShardState
newShardState :: Shard -> ShardState
newShardState shard :: Shard
shard = Shard
-> Maybe Int
-> Maybe (Async (Maybe ()))
-> Bool
-> Maybe Text
-> Maybe Text
-> Maybe Connection
-> ShardState
ShardState Shard
shard Maybe Int
forall a. Maybe a
Nothing Maybe (Async (Maybe ()))
forall a. Maybe a
Nothing Bool
False Maybe Text
forall a. Maybe a
Nothing Maybe Text
forall a. Maybe a
Nothing Maybe Connection
forall a. Maybe a
Nothing

-- | Creates and launches a shard
newShard :: P.Members '[LogEff, MetricEff, P.Embed IO, P.Final IO, P.Async] r
         => Text
         -> Int
         -> Int
         -> Token
         -> TQueue DispatchMessage
         -> Sem r (Shard, Async (Maybe ()))
newShard :: Text
-> Int
-> Int
-> Token
-> TQueue DispatchMessage
-> Sem r (Shard, Async (Maybe ()))
newShard gateway :: Text
gateway id :: Int
id count :: Int
count token :: Token
token evtQueue :: TQueue DispatchMessage
evtQueue = do
  (shard :: Shard
shard, stateVar :: TVar ShardState
stateVar) <- IO (Shard, TVar ShardState) -> Sem r (Shard, TVar ShardState)
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO (Shard, TVar ShardState) -> Sem r (Shard, TVar ShardState))
-> IO (Shard, TVar ShardState) -> Sem r (Shard, TVar ShardState)
forall a b. (a -> b) -> a -> b
$ mdo
    TQueue ControlMessage
cmdQueue' <- IO (TQueue ControlMessage)
forall a. IO (TQueue a)
newTQueueIO
    TVar ShardState
stateVar <- ShardState -> IO (TVar ShardState)
forall a. a -> IO (TVar a)
newTVarIO (Shard -> ShardState
newShardState Shard
shard)
    let shard :: Shard
shard = Int
-> Int
-> Text
-> TQueue DispatchMessage
-> TQueue ControlMessage
-> TVar ShardState
-> Text
-> Shard
Shard Int
id Int
count Text
gateway TQueue DispatchMessage
evtQueue TQueue ControlMessage
cmdQueue' TVar ShardState
stateVar (Token -> Text
rawToken Token
token)
    (Shard, TVar ShardState) -> IO (Shard, TVar ShardState)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Shard
shard, TVar ShardState
stateVar)

  let runShard :: Sem r ()
runShard = TVar ShardState -> Sem (AtomicState ShardState : r) () -> Sem r ()
forall (r :: [(* -> *) -> * -> *]) s a.
Member (Embed IO) r =>
TVar s -> Sem (AtomicState s : r) a -> Sem r a
P.runAtomicStateTVar TVar ShardState
stateVar Sem (AtomicState ShardState : r) ()
forall (r :: [(* -> *) -> * -> *]). ShardC r => Sem r ()
shardLoop
  let action :: Sem r ()
action = Key -> Int -> Sem r () -> Sem r ()
forall value level msg (r :: [(* -> *) -> * -> *]) a.
(ToValue value, Member (Di level Path msg) r) =>
Key -> value -> Sem r a -> Sem r a
attr "shard-id" Int
id (Sem r () -> Sem r ())
-> (Sem r () -> Sem r ()) -> Sem r () -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Segment -> Sem r () -> Sem r ()
forall level msg (r :: [(* -> *) -> * -> *]) a.
MemberWithError (Di level Path msg) r =>
Segment -> Sem r a -> Sem r a
push "calamity-shard" (Sem r () -> Sem r ()) -> Sem r () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ Sem r ()
runShard

  Async (Maybe ())
thread' <- Sem r () -> Sem r (Async (Maybe ()))
forall (r :: [(* -> *) -> * -> *]) a.
MemberWithError Async r =>
Sem r a -> Sem r (Async (Maybe a))
P.async Sem r ()
action

  (Shard, Async (Maybe ())) -> Sem r (Shard, Async (Maybe ()))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Shard
shard, Async (Maybe ())
thread')

sendToWs :: ShardC r => SentDiscordMessage -> Sem r ()
sendToWs :: SentDiscordMessage -> Sem r ()
sendToWs data' :: SentDiscordMessage
data' = do
  Maybe Connection
wsConn' <- (ShardState -> Maybe Connection) -> Sem r (Maybe Connection)
forall s s' (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets ShardState -> Maybe Connection
wsConn
  case Maybe Connection
wsConn' of
    Just wsConn :: Connection
wsConn -> do
      let encodedData :: ByteString
encodedData = SentDiscordMessage -> ByteString
forall a. ToJSON a => a -> ByteString
A.encode SentDiscordMessage
data'
      Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
debug (Text -> Sem r ()) -> Text -> Sem r ()
forall a b. (a -> b) -> a -> b
$ "sending " Builder -> Builder -> Text
forall b. FromBuilder b => Builder -> Builder -> b
+|| SentDiscordMessage
data' SentDiscordMessage -> Builder -> Builder
forall a b. (Show a, FromBuilder b) => a -> Builder -> b
||+ " encoded to " Builder -> Builder -> Builder
forall b. FromBuilder b => Builder -> Builder -> b
+|| ByteString
encodedData ByteString -> Builder -> Builder
forall a b. (Show a, FromBuilder b) => a -> Builder -> b
||+ " to gateway"
      IO () -> Sem r ()
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem r ())
-> (ByteString -> IO ()) -> ByteString -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> ByteString -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
sendTextData Connection
wsConn (ByteString -> Sem r ()) -> ByteString -> Sem r ()
forall a b. (a -> b) -> a -> b
$ ByteString
encodedData
    Nothing -> Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
debug "tried to send to closed WS"

fromEitherVoid :: Either a Void -> a
fromEitherVoid :: Either a Void -> a
fromEitherVoid (Left a :: a
a) = a
a
fromEitherVoid (Right a :: Void
a) = Void -> a
forall a. Void -> a
absurd Void
a -- yeet

-- | Catches ws close events and decides if we can restart or not
checkWSClose :: IO a -> IO (Either ControlMessage a)
checkWSClose :: IO a -> IO (Either ControlMessage a)
checkWSClose m :: IO a
m = (a -> Either ControlMessage a
forall a b. b -> Either a b
Right (a -> Either ControlMessage a)
-> IO a -> IO (Either ControlMessage a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO a
m) IO (Either ControlMessage a)
-> (ConnectionException -> IO (Either ControlMessage a))
-> IO (Either ControlMessage a)
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` \case
  e :: ConnectionException
e@(CloseRequest code :: Word16
code _) -> do
    ConnectionException -> IO ()
forall a. Show a => a -> IO ()
print ConnectionException
e
    if Word16
code Word16 -> [Word16] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [1000, 4004, 4010, 4011]
      then Either ControlMessage a -> IO (Either ControlMessage a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either ControlMessage a -> IO (Either ControlMessage a))
-> (ControlMessage -> Either ControlMessage a)
-> ControlMessage
-> IO (Either ControlMessage a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ControlMessage -> Either ControlMessage a
forall a b. a -> Either a b
Left (ControlMessage -> IO (Either ControlMessage a))
-> ControlMessage -> IO (Either ControlMessage a)
forall a b. (a -> b) -> a -> b
$ ControlMessage
ShutDownShard
      else Either ControlMessage a -> IO (Either ControlMessage a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either ControlMessage a -> IO (Either ControlMessage a))
-> (ControlMessage -> Either ControlMessage a)
-> ControlMessage
-> IO (Either ControlMessage a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ControlMessage -> Either ControlMessage a
forall a b. a -> Either a b
Left (ControlMessage -> IO (Either ControlMessage a))
-> ControlMessage -> IO (Either ControlMessage a)
forall a b. (a -> b) -> a -> b
$ ControlMessage
RestartShard

  e :: ConnectionException
e                       -> ConnectionException -> IO (Either ControlMessage a)
forall e a. Exception e => e -> IO a
throwIO ConnectionException
e

tryWriteTBMQueue' :: TBMQueue a -> a -> STM Bool
tryWriteTBMQueue' :: TBMQueue a -> a -> STM Bool
tryWriteTBMQueue' q :: TBMQueue a
q v :: a
v = do
  Maybe Bool
v' <- TBMQueue a -> a -> STM (Maybe Bool)
forall a. TBMQueue a -> a -> STM (Maybe Bool)
tryWriteTBMQueue TBMQueue a
q a
v
  case Maybe Bool
v' of
    Just False -> STM Bool
forall a. STM a
retry
    Just True  -> Bool -> STM Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
    Nothing    -> Bool -> STM Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False

-- | The loop a shard will run on
shardLoop :: ShardC r => Sem r ()
shardLoop :: Sem r ()
shardLoop = do
  Gauge
activeShards <- Text -> [(Text, Text)] -> Sem r Gauge
forall (r :: [(* -> *) -> * -> *]).
MemberWithError MetricEff r =>
Text -> [(Text, Text)] -> Sem r Gauge
registerGauge "active_shards" [(Text, Text)]
forall a. Monoid a => a
mempty
  Sem r Double -> Sem r ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Sem r Double -> Sem r ()) -> Sem r Double -> Sem r ()
forall a b. (a -> b) -> a -> b
$ (Double -> Double) -> Gauge -> Sem r Double
forall (r :: [(* -> *) -> * -> *]).
MemberWithError MetricEff r =>
(Double -> Double) -> Gauge -> Sem r Double
modifyGauge Double -> Double
forall a. Enum a => a -> a
succ Gauge
activeShards
  Sem r (Either ShardException ()) -> Sem r ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void Sem r (Either ShardException ())
forall (r :: [(* -> *) -> * -> *]).
ShardC r =>
Sem r (Either ShardException ())
outerloop
  Sem r Double -> Sem r ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Sem r Double -> Sem r ()) -> Sem r Double -> Sem r ()
forall a b. (a -> b) -> a -> b
$ (Double -> Double) -> Gauge -> Sem r Double
forall (r :: [(* -> *) -> * -> *]).
MemberWithError MetricEff r =>
(Double -> Double) -> Gauge -> Sem r Double
modifyGauge Double -> Double
forall a. Enum a => a -> a
pred Gauge
activeShards
  Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
debug "Shard shut down"
 where
  controlStream :: Shard -> TBMQueue ShardMsg -> IO ()
  controlStream :: Shard -> TBMQueue ShardMsg -> IO ()
controlStream shard :: Shard
shard outqueue :: TBMQueue ShardMsg
outqueue = IO ()
inner
    where
      q :: TQueue ControlMessage
q = Shard
shard Shard
-> Getting (TQueue ControlMessage) Shard (TQueue ControlMessage)
-> TQueue ControlMessage
forall s a. s -> Getting a s a -> a
^. IsLabel
  "cmdQueue"
  (Getting (TQueue ControlMessage) Shard (TQueue ControlMessage))
Getting (TQueue ControlMessage) Shard (TQueue ControlMessage)
#cmdQueue
      inner :: IO ()
inner = do
        ControlMessage
v <- STM ControlMessage -> IO ControlMessage
forall a. STM a -> IO a
atomically (STM ControlMessage -> IO ControlMessage)
-> STM ControlMessage -> IO ControlMessage
forall a b. (a -> b) -> a -> b
$ TQueue ControlMessage -> STM ControlMessage
forall a. TQueue a -> STM a
readTQueue TQueue ControlMessage
q
        Bool
r <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TBMQueue ShardMsg -> ShardMsg -> STM Bool
forall a. TBMQueue a -> a -> STM Bool
tryWriteTBMQueue' TBMQueue ShardMsg
outqueue (ControlMessage -> ShardMsg
Control ControlMessage
v)
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
r IO ()
inner

  discordStream :: P.Members '[LogEff, MetricEff, P.Embed IO] r => Connection -> TBMQueue ShardMsg -> Sem r ()
  discordStream :: Connection -> TBMQueue ShardMsg -> Sem r ()
discordStream ws :: Connection
ws outqueue :: TBMQueue ShardMsg
outqueue = Sem r ()
inner
    where inner :: Sem r ()
inner = do
            Either ControlMessage ByteString
msg <- IO (Either ControlMessage ByteString)
-> Sem r (Either ControlMessage ByteString)
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO (Either ControlMessage ByteString)
 -> Sem r (Either ControlMessage ByteString))
-> (IO ByteString -> IO (Either ControlMessage ByteString))
-> IO ByteString
-> Sem r (Either ControlMessage ByteString)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO ByteString -> IO (Either ControlMessage ByteString)
forall a. IO a -> IO (Either ControlMessage a)
checkWSClose (IO ByteString -> Sem r (Either ControlMessage ByteString))
-> IO ByteString -> Sem r (Either ControlMessage ByteString)
forall a b. (a -> b) -> a -> b
$ Connection -> IO ByteString
forall a. WebSocketsData a => Connection -> IO a
receiveData Connection
ws

            -- trace $ "Received from stream: "+||msg||+""

            case Either ControlMessage ByteString
msg of
              Left c :: ControlMessage
c ->
                IO () -> Sem r ()
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem r ()) -> (STM () -> IO ()) -> STM () -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> Sem r ()) -> STM () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ TBMQueue ShardMsg -> ShardMsg -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue ShardMsg
outqueue (ControlMessage -> ShardMsg
Control ControlMessage
c)

              Right msg' :: ByteString
msg' -> do
                let decoded :: Either HostName ReceivedDiscordMessage
decoded = ByteString -> Either HostName ReceivedDiscordMessage
forall a. FromJSON a => ByteString -> Either HostName a
A.eitherDecode ByteString
msg'
                Bool
r <- case Either HostName ReceivedDiscordMessage
decoded of
                  Right a :: ReceivedDiscordMessage
a ->
                    IO Bool -> Sem r Bool
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO Bool -> Sem r Bool)
-> (STM Bool -> IO Bool) -> STM Bool -> Sem r Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> Sem r Bool) -> STM Bool -> Sem r Bool
forall a b. (a -> b) -> a -> b
$ TBMQueue ShardMsg -> ShardMsg -> STM Bool
forall a. TBMQueue a -> a -> STM Bool
tryWriteTBMQueue' TBMQueue ShardMsg
outqueue (ReceivedDiscordMessage -> ShardMsg
Discord ReceivedDiscordMessage
a)
                  Left e :: HostName
e -> do
                    Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
error (Text -> Sem r ()) -> Text -> Sem r ()
forall a b. (a -> b) -> a -> b
$ "Failed to decode: "Builder -> Builder -> Text
forall b. FromBuilder b => Builder -> Builder -> b
+|HostName
eHostName -> Builder -> Builder
forall a b. (Buildable a, FromBuilder b) => a -> Builder -> b
|+""
                    Bool -> Sem r Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
                Bool -> Sem r () -> Sem r ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
r Sem r ()
inner

  -- mergedStream ::  Log -> Shard -> Connection -> ExceptT ShardException ShardM ShardMsg
  -- mergedStream logEnv shard ws =
  --   liftIO (fromEither <$> race (controlStream shard) (discordStream logEnv ws))

  -- | The outer loop, sets up the ws conn, etc handles reconnecting and such
  -- Currently if this goes to the error path we just exit the forever loop
  -- and the shard stops, maybe we might want to do some extra logic to reboot
  -- the shard, or maybe force a resharding
  outerloop :: ShardC r => Sem r (Either ShardException ())
  outerloop :: Sem r (Either ShardException ())
outerloop = Sem (Error ShardException : r) ()
-> Sem r (Either ShardException ())
forall e (r :: [(* -> *) -> * -> *]) a.
Sem (Error e : r) a -> Sem r (Either e a)
P.runError (Sem (Error ShardException : r) ()
 -> Sem r (Either ShardException ()))
-> (Sem (Error ShardException : r) ()
    -> Sem (Error ShardException : r) ())
-> Sem (Error ShardException : r) ()
-> Sem r (Either ShardException ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem (Error ShardException : r) ()
-> Sem (Error ShardException : r) ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (Sem (Error ShardException : r) ()
 -> Sem r (Either ShardException ()))
-> Sem (Error ShardException : r) ()
-> Sem r (Either ShardException ())
forall a b. (a -> b) -> a -> b
$ do
    Shard
shard :: Shard <- (ShardState -> Shard) -> Sem (Error ShardException : r) Shard
forall s s' (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState -> Getting Shard ShardState Shard -> Shard
forall s a. s -> Getting a s a -> a
^. IsLabel "shardS" (Getting Shard ShardState Shard)
Getting Shard ShardState Shard
#shardS)
    let host :: Text
host = Shard
shard Shard -> Getting Text Shard Text -> Text
forall s a. s -> Getting a s a -> a
^. IsLabel "gateway" (Getting Text Shard Text)
Getting Text Shard Text
#gateway
    let host' :: Text
host' =  Text -> Maybe Text -> Text
forall a. a -> Maybe a -> a
fromMaybe Text
host (Maybe Text -> Text) -> Maybe Text -> Text
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Maybe Text
stripPrefix "wss://" Text
host
    Text -> Sem (Error ShardException : r) ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
info (Text -> Sem (Error ShardException : r) ())
-> Text -> Sem (Error ShardException : r) ()
forall a b. (a -> b) -> a -> b
$ "starting up shard "Builder -> Builder -> Text
forall b. FromBuilder b => Builder -> Builder -> b
+| (Shard
shard Shard -> Getting Int Shard Int -> Int
forall s a. s -> Getting a s a -> a
^. IsLabel "shardID" (Getting Int Shard Int)
Getting Int Shard Int
#shardID) Int -> Builder -> Builder
forall a b. (Buildable a, FromBuilder b) => a -> Builder -> b
|+" of "Builder -> Builder -> Builder
forall b. FromBuilder b => Builder -> Builder -> b
+| (Shard
shard Shard -> Getting Int Shard Int -> Int
forall s a. s -> Getting a s a -> a
^. IsLabel "shardCount" (Getting Int Shard Int)
Getting Int Shard Int
#shardCount) Int -> Builder -> Builder
forall a b. (Buildable a, FromBuilder b) => a -> Builder -> b
|+""


    ShardException
innerLoopVal <- Sem (Websocket : Error ShardException : r) ShardException
-> Sem (Error ShardException : r) ShardException
forall (r :: [(* -> *) -> * -> *]) a.
Member (Embed IO) r =>
Sem (Websocket : r) a -> Sem r a
websocketToIO (Sem (Websocket : Error ShardException : r) ShardException
 -> Sem (Error ShardException : r) ShardException)
-> Sem (Websocket : Error ShardException : r) ShardException
-> Sem (Error ShardException : r) ShardException
forall a b. (a -> b) -> a -> b
$ Text
-> Text
-> (Connection
    -> Sem (Websocket : Error ShardException : r) ShardException)
-> Sem (Websocket : Error ShardException : r) ShardException
forall (r :: [(* -> *) -> * -> *]) a.
MemberWithError Websocket r =>
Text -> Text -> (Connection -> Sem r a) -> Sem r a
runWebsocket Text
host' "/?v=7&encoding=json" Connection
-> Sem (Websocket : Error ShardException : r) ShardException
forall (r :: [(* -> *) -> * -> *]).
ShardC r =>
Connection -> Sem r ShardException
innerloop

    case ShardException
innerLoopVal of
      ShardExcShutDown -> do
        Text -> Sem (Error ShardException : r) ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
info "Shutting down shard"
        ShardException -> Sem (Error ShardException : r) ()
forall e (r :: [(* -> *) -> * -> *]) a.
MemberWithError (Error e) r =>
e -> Sem r a
P.throw ShardException
ShardExcShutDown

      ShardExcRestart ->
        Text -> Sem (Error ShardException : r) ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
info "Restaring shard"
        -- we restart normally when we loop

  -- | The inner loop, handles receiving a message from discord or a command message
  -- and then decides what to do with it
  innerloop :: ShardC r => Connection -> Sem r ShardException
  innerloop :: Connection -> Sem r ShardException
innerloop ws :: Connection
ws = do
    Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
debug "Entering inner loop of shard"

    Shard
shard <- (ShardState -> Shard) -> Sem r Shard
forall s s' (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState -> Getting Shard ShardState Shard -> Shard
forall s a. s -> Getting a s a -> a
^. IsLabel "shardS" (Getting Shard ShardState Shard)
Getting Shard ShardState Shard
#shardS)
    (ShardState -> ShardState) -> Sem r ()
forall s (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify (IsLabel
  "wsConn"
  (ASetter
     ShardState ShardState (Maybe Connection) (Maybe Connection))
ASetter ShardState ShardState (Maybe Connection) (Maybe Connection)
#wsConn ASetter ShardState ShardState (Maybe Connection) (Maybe Connection)
-> Connection -> ShardState -> ShardState
forall s t a b. ASetter s t a (Maybe b) -> b -> s -> t
?~ Connection
ws)

    Maybe Int
seqNum'    <- (ShardState -> Maybe Int) -> Sem r (Maybe Int)
forall s s' (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState
-> Getting (Maybe Int) ShardState (Maybe Int) -> Maybe Int
forall s a. s -> Getting a s a -> a
^. IsLabel "seqNum" (Getting (Maybe Int) ShardState (Maybe Int))
Getting (Maybe Int) ShardState (Maybe Int)
#seqNum)
    Maybe Text
sessionID' <- (ShardState -> Maybe Text) -> Sem r (Maybe Text)
forall s s' (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState
-> Getting (Maybe Text) ShardState (Maybe Text) -> Maybe Text
forall s a. s -> Getting a s a -> a
^. IsLabel "sessionID" (Getting (Maybe Text) ShardState (Maybe Text))
Getting (Maybe Text) ShardState (Maybe Text)
#sessionID)

    case (Maybe Int
seqNum', Maybe Text
sessionID') of
      (Just n :: Int
n, Just s :: Text
s) -> do
        Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
debug (Text -> Sem r ()) -> Text -> Sem r ()
forall a b. (a -> b) -> a -> b
$ "Resuming shard (sessionID: "Builder -> Builder -> Text
forall b. FromBuilder b => Builder -> Builder -> b
+|Text
sText -> Builder -> Builder
forall a b. (Buildable a, FromBuilder b) => a -> Builder -> b
|+", seq: "Builder -> Builder -> Builder
forall b. FromBuilder b => Builder -> Builder -> b
+|Int
nInt -> Builder -> Builder
forall a b. (Buildable a, FromBuilder b) => a -> Builder -> b
|+")"
        SentDiscordMessage -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
ShardC r =>
SentDiscordMessage -> Sem r ()
sendToWs (ResumeData -> SentDiscordMessage
Resume ResumeData :: Text -> Text -> Int -> ResumeData
ResumeData
                  { $sel:token:ResumeData :: Text
token = Shard
shard Shard -> Getting Text Shard Text -> Text
forall s a. s -> Getting a s a -> a
^. IsLabel "token" (Getting Text Shard Text)
Getting Text Shard Text
#token
                  , $sel:sessionID:ResumeData :: Text
sessionID = Text
s
                  , $sel:seq:ResumeData :: Int
seq = Int
n
                  })
      _ -> do
        Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
debug "Identifying shard"
        SentDiscordMessage -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
ShardC r =>
SentDiscordMessage -> Sem r ()
sendToWs (IdentifyData -> SentDiscordMessage
Identify IdentifyData :: Text
-> IdentifyProps
-> Bool
-> Int
-> (Int, Int)
-> Maybe StatusUpdateData
-> IdentifyData
IdentifyData
                  { $sel:token:IdentifyData :: Text
token = Shard
shard Shard -> Getting Text Shard Text -> Text
forall s a. s -> Getting a s a -> a
^. IsLabel "token" (Getting Text Shard Text)
Getting Text Shard Text
#token
                  , $sel:properties:IdentifyData :: IdentifyProps
properties = IdentifyProps :: Text -> Text -> IdentifyProps
IdentifyProps
                                 { $sel:browser:IdentifyProps :: Text
browser = "Calamity: https://github.com/nitros12/calamity"
                                 , $sel:device:IdentifyProps :: Text
device = "Calamity: https://github.com/nitros12/calamity"
                                 }
                  , $sel:compress:IdentifyData :: Bool
compress = Bool
False
                  , $sel:largeThreshold:IdentifyData :: Int
largeThreshold = 250
                  , $sel:shard:IdentifyData :: (Int, Int)
shard = (Shard
shard Shard -> Getting Int Shard Int -> Int
forall s a. s -> Getting a s a -> a
^. IsLabel "shardID" (Getting Int Shard Int)
Getting Int Shard Int
#shardID,
                             Shard
shard Shard -> Getting Int Shard Int -> Int
forall s a. s -> Getting a s a -> a
^. IsLabel "shardCount" (Getting Int Shard Int)
Getting Int Shard Int
#shardCount)
                  , $sel:presence:IdentifyData :: Maybe StatusUpdateData
presence = Maybe StatusUpdateData
forall a. Maybe a
Nothing
                  })

    Counter
receivedMessages <- Text -> [(Text, Text)] -> Sem r Counter
forall (r :: [(* -> *) -> * -> *]).
MemberWithError MetricEff r =>
Text -> [(Text, Text)] -> Sem r Counter
registerCounter "received_messages" [("shard", Int -> Text
forall a. TextShow a => a -> Text
showt (Int -> Text) -> Int -> Text
forall a b. (a -> b) -> a -> b
$ Shard
shard Shard -> Getting Int Shard Int -> Int
forall s a. s -> Getting a s a -> a
^. IsLabel "shardID" (Getting Int Shard Int)
Getting Int Shard Int
#shardID)]

    ShardException
result <- Sem (Resource : r) ShardException -> Sem r ShardException
forall (r :: [(* -> *) -> * -> *]) a.
Sem (Resource : r) a -> Sem r a
P.runResource (Sem (Resource : r) ShardException -> Sem r ShardException)
-> Sem (Resource : r) ShardException -> Sem r ShardException
forall a b. (a -> b) -> a -> b
$ Sem (Resource : r) (TBMQueue ShardMsg)
-> (TBMQueue ShardMsg -> Sem (Resource : r) ())
-> (TBMQueue ShardMsg -> Sem (Resource : r) ShardException)
-> Sem (Resource : r) ShardException
forall (r :: [(* -> *) -> * -> *]) a c b.
MemberWithError Resource r =>
Sem r a -> (a -> Sem r c) -> (a -> Sem r b) -> Sem r b
P.bracket (IO (TBMQueue ShardMsg) -> Sem (Resource : r) (TBMQueue ShardMsg)
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO (TBMQueue ShardMsg) -> Sem (Resource : r) (TBMQueue ShardMsg))
-> IO (TBMQueue ShardMsg) -> Sem (Resource : r) (TBMQueue ShardMsg)
forall a b. (a -> b) -> a -> b
$ Int -> IO (TBMQueue ShardMsg)
forall a. Int -> IO (TBMQueue a)
newTBMQueueIO 1)
      (IO () -> Sem (Resource : r) ()
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem (Resource : r) ())
-> (TBMQueue ShardMsg -> IO ())
-> TBMQueue ShardMsg
-> Sem (Resource : r) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ())
-> (TBMQueue ShardMsg -> STM ()) -> TBMQueue ShardMsg -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TBMQueue ShardMsg -> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue)
      (\q :: TBMQueue ShardMsg
q -> do
        Text -> Sem (Resource : r) ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
debug "handling events now"
        Async (Maybe ())
_controlThread <- Sem (Resource : r) () -> Sem (Resource : r) (Async (Maybe ()))
forall (r :: [(* -> *) -> * -> *]) a.
MemberWithError Async r =>
Sem r a -> Sem r (Async (Maybe a))
P.async (Sem (Resource : r) () -> Sem (Resource : r) (Async (Maybe ())))
-> (IO () -> Sem (Resource : r) ())
-> IO ()
-> Sem (Resource : r) (Async (Maybe ()))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> Sem (Resource : r) ()
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem (Resource : r) (Async (Maybe ())))
-> IO () -> Sem (Resource : r) (Async (Maybe ()))
forall a b. (a -> b) -> a -> b
$ Shard -> TBMQueue ShardMsg -> IO ()
controlStream Shard
shard TBMQueue ShardMsg
q
        Async (Maybe ())
_discordThread <- Sem (Resource : r) () -> Sem (Resource : r) (Async (Maybe ()))
forall (r :: [(* -> *) -> * -> *]) a.
MemberWithError Async r =>
Sem r a -> Sem r (Async (Maybe a))
P.async (Sem (Resource : r) () -> Sem (Resource : r) (Async (Maybe ())))
-> Sem (Resource : r) () -> Sem (Resource : r) (Async (Maybe ()))
forall a b. (a -> b) -> a -> b
$ Connection -> TBMQueue ShardMsg -> Sem (Resource : r) ()
forall (r :: [(* -> *) -> * -> *]).
Members '[LogEff, MetricEff, Embed IO] r =>
Connection -> TBMQueue ShardMsg -> Sem r ()
discordStream Connection
ws TBMQueue ShardMsg
q
        (Either ShardException Void -> ShardException
forall a. Either a Void -> a
fromEitherVoid (Either ShardException Void -> ShardException)
-> Sem (Resource : r) (Either ShardException Void)
-> Sem (Resource : r) ShardException
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) (Sem (Resource : r) (Either ShardException Void)
 -> Sem (Resource : r) ShardException)
-> (Sem (Error ShardException : r) ()
    -> Sem (Resource : r) (Either ShardException Void))
-> Sem (Error ShardException : r) ()
-> Sem (Resource : r) ShardException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem r (Either ShardException Void)
-> Sem (Resource : r) (Either ShardException Void)
forall (e :: (* -> *) -> * -> *) (r :: [(* -> *) -> * -> *]) a.
Sem r a -> Sem (e : r) a
P.raise (Sem r (Either ShardException Void)
 -> Sem (Resource : r) (Either ShardException Void))
-> (Sem (Error ShardException : r) ()
    -> Sem r (Either ShardException Void))
-> Sem (Error ShardException : r) ()
-> Sem (Resource : r) (Either ShardException Void)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem (Error ShardException : r) Void
-> Sem r (Either ShardException Void)
forall e (r :: [(* -> *) -> * -> *]) a.
Sem (Error e : r) a -> Sem r (Either e a)
P.runError (Sem (Error ShardException : r) Void
 -> Sem r (Either ShardException Void))
-> (Sem (Error ShardException : r) ()
    -> Sem (Error ShardException : r) Void)
-> Sem (Error ShardException : r) ()
-> Sem r (Either ShardException Void)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem (Error ShardException : r) ()
-> Sem (Error ShardException : r) Void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (Sem (Error ShardException : r) ()
 -> Sem (Resource : r) ShardException)
-> Sem (Error ShardException : r) ()
-> Sem (Resource : r) ShardException
forall a b. (a -> b) -> a -> b
$ do
          -- only we close the queue
          Sem (Error ShardException : r) Int
-> Sem (Error ShardException : r) ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Sem (Error ShardException : r) Int
 -> Sem (Error ShardException : r) ())
-> Sem (Error ShardException : r) Int
-> Sem (Error ShardException : r) ()
forall a b. (a -> b) -> a -> b
$ Int -> Counter -> Sem (Error ShardException : r) Int
forall (r :: [(* -> *) -> * -> *]).
MemberWithError MetricEff r =>
Int -> Counter -> Sem r Int
addCounter 1 Counter
receivedMessages
          Maybe ShardMsg
msg <- IO (Maybe ShardMsg)
-> Sem (Error ShardException : r) (Maybe ShardMsg)
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO (Maybe ShardMsg)
 -> Sem (Error ShardException : r) (Maybe ShardMsg))
-> (STM (Maybe ShardMsg) -> IO (Maybe ShardMsg))
-> STM (Maybe ShardMsg)
-> Sem (Error ShardException : r) (Maybe ShardMsg)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (Maybe ShardMsg) -> IO (Maybe ShardMsg)
forall a. STM a -> IO a
atomically (STM (Maybe ShardMsg)
 -> Sem (Error ShardException : r) (Maybe ShardMsg))
-> STM (Maybe ShardMsg)
-> Sem (Error ShardException : r) (Maybe ShardMsg)
forall a b. (a -> b) -> a -> b
$ TBMQueue ShardMsg -> STM (Maybe ShardMsg)
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue ShardMsg
q
          ShardMsg -> Sem (Error ShardException : r) ()
forall (r :: [(* -> *) -> * -> *]).
(ShardC r, Member (Error ShardException) r) =>
ShardMsg -> Sem r ()
handleMsg (ShardMsg -> Sem (Error ShardException : r) ())
-> ShardMsg -> Sem (Error ShardException : r) ()
forall a b. (a -> b) -> a -> b
$ Maybe ShardMsg -> ShardMsg
forall a. HasCallStack => Maybe a -> a
fromJust Maybe ShardMsg
msg)

    Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
debug "Exiting inner loop of shard"

    (ShardState -> ShardState) -> Sem r ()
forall s (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify (IsLabel
  "wsConn"
  (ASetter
     ShardState ShardState (Maybe Connection) (Maybe Connection))
ASetter ShardState ShardState (Maybe Connection) (Maybe Connection)
#wsConn ASetter ShardState ShardState (Maybe Connection) (Maybe Connection)
-> Maybe Connection -> ShardState -> ShardState
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Maybe Connection
forall a. Maybe a
Nothing)
    Sem r ()
forall (r :: [(* -> *) -> * -> *]). ShardC r => Sem r ()
haltHeartBeat
    ShardException -> Sem r ShardException
forall (f :: * -> *) a. Applicative f => a -> f a
pure ShardException
result

  -- | Handlers for each message, not sure what they'll need to do exactly yet
  handleMsg :: (ShardC r, P.Member (P.Error ShardException) r) => ShardMsg -> Sem r ()
  handleMsg :: ShardMsg -> Sem r ()
handleMsg (Discord msg :: ReceivedDiscordMessage
msg) = case ReceivedDiscordMessage
msg of
    Dispatch sn :: Int
sn data' :: DispatchData
data' -> do
      -- trace $ "Handling event: ("+||data'||+")"
      (ShardState -> ShardState) -> Sem r ()
forall s (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify (IsLabel
  "seqNum" (ASetter ShardState ShardState (Maybe Int) (Maybe Int))
ASetter ShardState ShardState (Maybe Int) (Maybe Int)
#seqNum ASetter ShardState ShardState (Maybe Int) (Maybe Int)
-> Int -> ShardState -> ShardState
forall s t a b. ASetter s t a (Maybe b) -> b -> s -> t
?~ Int
sn)

      case DispatchData
data' of
        Ready rdata' :: ReadyData
rdata' ->
          (ShardState -> ShardState) -> Sem r ()
forall s (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify (IsLabel
  "sessionID"
  (ASetter ShardState ShardState (Maybe Text) (Maybe Text))
ASetter ShardState ShardState (Maybe Text) (Maybe Text)
#sessionID ASetter ShardState ShardState (Maybe Text) (Maybe Text)
-> Text -> ShardState -> ShardState
forall s t a b. ASetter s t a (Maybe b) -> b -> s -> t
?~ (ReadyData
rdata' ReadyData -> Getting Text ReadyData Text -> Text
forall s a. s -> Getting a s a -> a
^. IsLabel "sessionID" (Getting Text ReadyData Text)
Getting Text ReadyData Text
#sessionID))

        _ -> () -> Sem r ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

      Shard
shard <- (ShardState -> Shard) -> Sem r Shard
forall s s' (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState -> Getting Shard ShardState Shard -> Shard
forall s a. s -> Getting a s a -> a
^. IsLabel "shardS" (Getting Shard ShardState Shard)
Getting Shard ShardState Shard
#shardS)
      IO () -> Sem r ()
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem r ()) -> (STM () -> IO ()) -> STM () -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> Sem r ()) -> STM () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ TQueue DispatchMessage -> DispatchMessage -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue (Shard
shard Shard
-> Getting (TQueue DispatchMessage) Shard (TQueue DispatchMessage)
-> TQueue DispatchMessage
forall s a. s -> Getting a s a -> a
^. IsLabel
  "evtQueue"
  (Getting (TQueue DispatchMessage) Shard (TQueue DispatchMessage))
Getting (TQueue DispatchMessage) Shard (TQueue DispatchMessage)
#evtQueue) (DispatchData -> DispatchMessage
DispatchData' DispatchData
data')
      -- sn' <- P.atomicGets (^. #seqNum)
      -- trace $ "Done handling event, seq is now: "+||sn'||+""

    HeartBeatReq -> do
      Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
debug "Received heartbeat request"
      Sem r ()
forall (r :: [(* -> *) -> * -> *]). ShardC r => Sem r ()
sendHeartBeat

    Reconnect -> do
      Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
debug "Being asked to restart by Discord"
      ShardException -> Sem r ()
forall e (r :: [(* -> *) -> * -> *]) a.
MemberWithError (Error e) r =>
e -> Sem r a
P.throw ShardException
ShardExcRestart

    InvalidSession resumable :: Bool
resumable -> do
      if Bool
resumable
      then do
        Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
info "Received non-resumable invalid session, sleeping for 15 seconds then retrying"
        (ShardState -> ShardState) -> Sem r ()
forall s (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify (IsLabel
  "sessionID"
  (ASetter ShardState ShardState (Maybe Text) (Maybe Text))
ASetter ShardState ShardState (Maybe Text) (Maybe Text)
#sessionID ASetter ShardState ShardState (Maybe Text) (Maybe Text)
-> Maybe Text -> ShardState -> ShardState
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Maybe Text
forall a. Maybe a
Nothing)
        (ShardState -> ShardState) -> Sem r ()
forall s (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify (IsLabel
  "seqNum" (ASetter ShardState ShardState (Maybe Int) (Maybe Int))
ASetter ShardState ShardState (Maybe Int) (Maybe Int)
#seqNum ASetter ShardState ShardState (Maybe Int) (Maybe Int)
-> Maybe Int -> ShardState -> ShardState
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Maybe Int
forall a. Maybe a
Nothing)
        IO () -> Sem r ()
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem r ()) -> IO () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (15 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000)
      else
        Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
info "Received resumable invalid session"
      ShardException -> Sem r ()
forall e (r :: [(* -> *) -> * -> *]) a.
MemberWithError (Error e) r =>
e -> Sem r a
P.throw ShardException
ShardExcRestart

    Hello interval :: Int
interval -> do
      Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
info (Text -> Sem r ()) -> Text -> Sem r ()
forall a b. (a -> b) -> a -> b
$ "Received hello, beginning to heartbeat at an interval of "Builder -> Builder -> Text
forall b. FromBuilder b => Builder -> Builder -> b
+|Int
intervalInt -> Builder -> Builder
forall a b. (Buildable a, FromBuilder b) => a -> Builder -> b
|+"ms"
      Int -> Sem r ()
forall (r :: [(* -> *) -> * -> *]). ShardC r => Int -> Sem r ()
startHeartBeatLoop Int
interval

    HeartBeatAck -> do
      Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
debug "Received heartbeat ack"
      (ShardState -> ShardState) -> Sem r ()
forall s (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify (IsLabel "hbResponse" (ASetter ShardState ShardState Bool Bool)
ASetter ShardState ShardState Bool Bool
#hbResponse ASetter ShardState ShardState Bool Bool
-> Bool -> ShardState -> ShardState
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Bool
True)

  handleMsg (Control msg :: ControlMessage
msg) = case ControlMessage
msg of
    SendPresence data' :: StatusUpdateData
data' -> do
      Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
debug (Text -> Sem r ()) -> Text -> Sem r ()
forall a b. (a -> b) -> a -> b
$ "Sending presence: ("Builder -> Builder -> Text
forall b. FromBuilder b => Builder -> Builder -> b
+||StatusUpdateData
data'StatusUpdateData -> Builder -> Builder
forall a b. (Show a, FromBuilder b) => a -> Builder -> b
||+")"
      SentDiscordMessage -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
ShardC r =>
SentDiscordMessage -> Sem r ()
sendToWs (SentDiscordMessage -> Sem r ()) -> SentDiscordMessage -> Sem r ()
forall a b. (a -> b) -> a -> b
$ StatusUpdateData -> SentDiscordMessage
StatusUpdate StatusUpdateData
data'

    RestartShard       -> ShardException -> Sem r ()
forall e (r :: [(* -> *) -> * -> *]) a.
MemberWithError (Error e) r =>
e -> Sem r a
P.throw ShardException
ShardExcRestart
    ShutDownShard      -> ShardException -> Sem r ()
forall e (r :: [(* -> *) -> * -> *]) a.
MemberWithError (Error e) r =>
e -> Sem r a
P.throw ShardException
ShardExcShutDown

startHeartBeatLoop :: ShardC r => Int -> Sem r ()
startHeartBeatLoop :: Int -> Sem r ()
startHeartBeatLoop interval :: Int
interval = do
  Sem r ()
forall (r :: [(* -> *) -> * -> *]). ShardC r => Sem r ()
haltHeartBeat -- cancel any currently running hb thread
  Async (Maybe ())
thread <- Sem r () -> Sem r (Async (Maybe ()))
forall (r :: [(* -> *) -> * -> *]) a.
MemberWithError Async r =>
Sem r a -> Sem r (Async (Maybe a))
P.async (Sem r () -> Sem r (Async (Maybe ())))
-> Sem r () -> Sem r (Async (Maybe ()))
forall a b. (a -> b) -> a -> b
$ Int -> Sem r ()
forall (r :: [(* -> *) -> * -> *]). ShardC r => Int -> Sem r ()
heartBeatLoop Int
interval
  (ShardState -> ShardState) -> Sem r ()
forall s (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify (IsLabel
  "hbThread"
  (ASetter
     ShardState
     ShardState
     (Maybe (Async (Maybe ())))
     (Maybe (Async (Maybe ()))))
ASetter
  ShardState
  ShardState
  (Maybe (Async (Maybe ())))
  (Maybe (Async (Maybe ())))
#hbThread ASetter
  ShardState
  ShardState
  (Maybe (Async (Maybe ())))
  (Maybe (Async (Maybe ())))
-> Async (Maybe ()) -> ShardState -> ShardState
forall s t a b. ASetter s t a (Maybe b) -> b -> s -> t
?~ Async (Maybe ())
thread)

haltHeartBeat :: ShardC r => Sem r ()
haltHeartBeat :: Sem r ()
haltHeartBeat = do
  Maybe (Async (Maybe ()))
thread <- forall a (r :: [(* -> *) -> * -> *]).
Member (AtomicState ShardState) r =>
(ShardState -> (ShardState, a)) -> Sem r a
forall s a (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> (s, a)) -> Sem r a
P.atomicState @ShardState ((ShardState -> (ShardState, Maybe (Async (Maybe ()))))
 -> Sem r (Maybe (Async (Maybe ()))))
-> (State ShardState (Maybe (Async (Maybe ())))
    -> ShardState -> (ShardState, Maybe (Async (Maybe ()))))
-> State ShardState (Maybe (Async (Maybe ())))
-> Sem r (Maybe (Async (Maybe ())))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((Maybe (Async (Maybe ())), ShardState)
-> (ShardState, Maybe (Async (Maybe ())))
forall a b. (a, b) -> (b, a)
swap ((Maybe (Async (Maybe ())), ShardState)
 -> (ShardState, Maybe (Async (Maybe ()))))
-> (ShardState -> (Maybe (Async (Maybe ())), ShardState))
-> ShardState
-> (ShardState, Maybe (Async (Maybe ())))
forall b c a. (b -> c) -> (a -> b) -> a -> c
.) ((ShardState -> (Maybe (Async (Maybe ())), ShardState))
 -> ShardState -> (ShardState, Maybe (Async (Maybe ()))))
-> (State ShardState (Maybe (Async (Maybe ())))
    -> ShardState -> (Maybe (Async (Maybe ())), ShardState))
-> State ShardState (Maybe (Async (Maybe ())))
-> ShardState
-> (ShardState, Maybe (Async (Maybe ())))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. State ShardState (Maybe (Async (Maybe ())))
-> ShardState -> (Maybe (Async (Maybe ())), ShardState)
forall s a. State s a -> s -> (a, s)
runState (State ShardState (Maybe (Async (Maybe ())))
 -> Sem r (Maybe (Async (Maybe ()))))
-> State ShardState (Maybe (Async (Maybe ())))
-> Sem r (Maybe (Async (Maybe ())))
forall a b. (a -> b) -> a -> b
$ do
    Maybe (Async (Maybe ()))
thread <- Getting
  (Maybe (Async (Maybe ()))) ShardState (Maybe (Async (Maybe ())))
-> State ShardState (Maybe (Async (Maybe ())))
forall s (m :: * -> *) a. MonadState s m => Getting a s a -> m a
use IsLabel
  "hbThread"
  (Getting
     (Maybe (Async (Maybe ()))) ShardState (Maybe (Async (Maybe ()))))
Getting
  (Maybe (Async (Maybe ()))) ShardState (Maybe (Async (Maybe ())))
#hbThread
    #hbThread .= Nothing
    Maybe (Async (Maybe ()))
-> State ShardState (Maybe (Async (Maybe ())))
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Async (Maybe ()))
thread
  case Maybe (Async (Maybe ()))
thread of
    Just t :: Async (Maybe ())
t  -> do
      Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
debug "Stopping heartbeat thread"
      IO () -> Sem r ()
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Async (Maybe ()) -> IO ()
forall a. Async a -> IO ()
cancel Async (Maybe ())
t)
    Nothing -> () -> Sem r ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

sendHeartBeat :: ShardC r => Sem r ()
sendHeartBeat :: Sem r ()
sendHeartBeat = do
  Maybe Int
sn <- (ShardState -> Maybe Int) -> Sem r (Maybe Int)
forall s s' (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState
-> Getting (Maybe Int) ShardState (Maybe Int) -> Maybe Int
forall s a. s -> Getting a s a -> a
^. IsLabel "seqNum" (Getting (Maybe Int) ShardState (Maybe Int))
Getting (Maybe Int) ShardState (Maybe Int)
#seqNum)
  Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
debug (Text -> Sem r ()) -> Text -> Sem r ()
forall a b. (a -> b) -> a -> b
$ "Sending heartbeat (seq: " Builder -> Builder -> Text
forall b. FromBuilder b => Builder -> Builder -> b
+|| Maybe Int
sn Maybe Int -> Builder -> Builder
forall a b. (Show a, FromBuilder b) => a -> Builder -> b
||+ ")"
  SentDiscordMessage -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
ShardC r =>
SentDiscordMessage -> Sem r ()
sendToWs (SentDiscordMessage -> Sem r ()) -> SentDiscordMessage -> Sem r ()
forall a b. (a -> b) -> a -> b
$ Maybe Int -> SentDiscordMessage
HeartBeat Maybe Int
sn
  (ShardState -> ShardState) -> Sem r ()
forall s (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify (IsLabel "hbResponse" (ASetter ShardState ShardState Bool Bool)
ASetter ShardState ShardState Bool Bool
#hbResponse ASetter ShardState ShardState Bool Bool
-> Bool -> ShardState -> ShardState
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Bool
False)

heartBeatLoop :: ShardC r => Int -> Sem r ()
heartBeatLoop :: Int -> Sem r ()
heartBeatLoop interval :: Int
interval = Sem r (Either () Any) -> Sem r ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Sem r (Either () Any) -> Sem r ())
-> (Sem (Error () : r) () -> Sem r (Either () Any))
-> Sem (Error () : r) ()
-> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem (Error () : r) Any -> Sem r (Either () Any)
forall e (r :: [(* -> *) -> * -> *]) a.
Sem (Error e : r) a -> Sem r (Either e a)
P.runError (Sem (Error () : r) Any -> Sem r (Either () Any))
-> (Sem (Error () : r) () -> Sem (Error () : r) Any)
-> Sem (Error () : r) ()
-> Sem r (Either () Any)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem (Error () : r) () -> Sem (Error () : r) Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (Sem (Error () : r) () -> Sem r ())
-> Sem (Error () : r) () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ do
  Sem (Error () : r) ()
forall (r :: [(* -> *) -> * -> *]). ShardC r => Sem r ()
sendHeartBeat
  IO () -> Sem (Error () : r) ()
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem (Error () : r) ())
-> (Int -> IO ()) -> Int -> Sem (Error () : r) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO ()
threadDelay (Int -> Sem (Error () : r) ()) -> Int -> Sem (Error () : r) ()
forall a b. (a -> b) -> a -> b
$ Int
interval Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000
  Sem (Error () : r) Bool
-> Sem (Error () : r) () -> Sem (Error () : r) ()
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
unlessM ((ShardState -> Bool) -> Sem (Error () : r) Bool
forall s s' (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState -> Getting Bool ShardState Bool -> Bool
forall s a. s -> Getting a s a -> a
^. IsLabel "hbResponse" (Getting Bool ShardState Bool)
Getting Bool ShardState Bool
#hbResponse)) (Sem (Error () : r) () -> Sem (Error () : r) ())
-> Sem (Error () : r) () -> Sem (Error () : r) ()
forall a b. (a -> b) -> a -> b
$ do
    Text -> Sem (Error () : r) ()
forall (r :: [(* -> *) -> * -> *]).
Member LogEff r =>
Text -> Sem r ()
debug "No heartbeat response, restarting shard"
    Connection
wsConn <- Maybe Connection -> Connection
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe Connection -> Connection)
-> Sem (Error () : r) (Maybe Connection)
-> Sem (Error () : r) Connection
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (ShardState -> Maybe Connection)
-> Sem (Error () : r) (Maybe Connection)
forall s s' (r :: [(* -> *) -> * -> *]).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState
-> Getting (Maybe Connection) ShardState (Maybe Connection)
-> Maybe Connection
forall s a. s -> Getting a s a -> a
^. IsLabel
  "wsConn" (Getting (Maybe Connection) ShardState (Maybe Connection))
Getting (Maybe Connection) ShardState (Maybe Connection)
#wsConn)
    IO () -> Sem (Error () : r) ()
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem (Error () : r) ()) -> IO () -> Sem (Error () : r) ()
forall a b. (a -> b) -> a -> b
$ Connection -> Word16 -> Text -> IO ()
forall a. WebSocketsData a => Connection -> Word16 -> a -> IO ()
sendCloseCode Connection
wsConn 4000 ("No heartbeat in time" :: Text)
    () -> Sem (Error () : r) ()
forall e (r :: [(* -> *) -> * -> *]) a.
MemberWithError (Error e) r =>
e -> Sem r a
P.throw ()