{-# language FlexibleContexts    #-}
{-# language OverloadedStrings   #-}
{-# language ScopedTypeVariables #-}
{-
This module implements the protocol as specified in
https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md
-}
module Mu.GraphQL.Subscription.Protocol where

import           Control.Applicative
import           Control.Concurrent
import           Control.Concurrent.Async
import           Control.Concurrent.STM
import           Control.Monad               (forM_)
import           Control.Monad.IO.Class      (MonadIO (liftIO))
import           Data.Aeson                  ((.:), (.:?), (.=))
import qualified Data.Aeson                  as A
import           Data.Conduit
import qualified Data.HashMap.Strict         as HM
import qualified Data.Text                   as T
import           Language.GraphQL.AST
import qualified ListT                       as L
import           Network.WebSockets
import qualified StmContainers.Map           as M

import qualified Mu.GraphQL.Quasi.LostParser as P
import           Mu.GraphQL.Query.Parse

protocol :: ( Maybe T.Text -> VariableMapC -> [Definition]
              -> ConduitT A.Value Void IO ()
              -> IO () )
         -> Connection -> IO ()
protocol :: (Maybe Text
 -> VariableMapC
 -> [Definition]
 -> ConduitT Value Void IO ()
 -> IO ())
-> Connection -> IO ()
protocol Maybe Text
-> VariableMapC
-> [Definition]
-> ConduitT Value Void IO ()
-> IO ()
f Connection
conn = IO ()
start
  where
    -- listen for GQL_CONNECTION_INIT
    start :: IO ()
start = do
      Maybe ClientMessage
msg <- Connection -> IO (Maybe ClientMessage)
forall a. FromJSON a => Connection -> IO (Maybe a)
receiveJSON Connection
conn
      case Maybe ClientMessage
msg of
        Just (GQLConnectionInit Maybe Value
_)
          -> do -- send GQL_CONNECTION_ACK
                Connection -> ServerMessage -> IO ()
forall a. ToJSON a => Connection -> a -> IO ()
sendJSON Connection
conn ServerMessage
GQLConnectionAck
                Map Text (Async ())
vars <- IO (Map Text (Async ()))
forall key value. IO (Map key value)
M.newIO
                -- send GQL_KEEP_ALIVE each 1s.
                IO Any -> (Async Any -> IO ()) -> IO ()
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync IO Any
forall b. IO b
keepAlive ((Async Any -> IO ()) -> IO ()) -> (Async Any -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Async Any
ka ->
                -- start listening for incoming messages
                  Async Any -> Map Text (Async ()) -> IO ()
forall a. Async a -> Map Text (Async ()) -> IO ()
listen Async Any
ka Map Text (Async ())
vars
        Maybe ClientMessage
_ -> IO ()
start  -- Keep waiting
    -- keep-alive
    keepAlive :: IO b
keepAlive = do
      Connection -> ServerMessage -> IO ()
forall a. ToJSON a => Connection -> a -> IO ()
sendJSON Connection
conn ServerMessage
GQLKeepAlive
      Int -> IO ()
threadDelay Int
1000000
      IO b
keepAlive
    -- listen for messages from client
    listen :: Async a -> Map Text (Async ()) -> IO ()
listen Async a
ka Map Text (Async ())
vars = do
      Maybe ClientMessage
msg <- Connection -> IO (Maybe ClientMessage)
forall a. FromJSON a => Connection -> IO (Maybe a)
receiveJSON Connection
conn
      case Maybe ClientMessage
msg of
        Just (GQLStart Text
i Text
q VariableMapC
v Maybe Text
o)  -- start handling
          -> IO () -> (Async () -> IO ()) -> IO ()
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync (Text -> Text -> VariableMapC -> Maybe Text -> IO ()
handle Text
i Text
q VariableMapC
v Maybe Text
o IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> STM () -> IO ()
forall a. STM a -> IO a
atomically (Text -> Map Text (Async ()) -> STM ()
forall key value.
(Eq key, Hashable key) =>
key -> Map key value -> STM ()
M.delete Text
i Map Text (Async ())
vars)) ((Async () -> IO ()) -> IO ()) -> (Async () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Async ()
t -> do
             STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ Async () -> Text -> Map Text (Async ()) -> STM ()
forall key value.
(Eq key, Hashable key) =>
value -> key -> Map key value -> STM ()
M.insert Async ()
t Text
i Map Text (Async ())
vars
             Async a -> Map Text (Async ()) -> IO ()
listen Async a
ka Map Text (Async ())
vars
        Just (GQLStop Text
i)  -- stop with handling that query
          -> do Maybe (Async ())
r <- STM (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a. STM a -> IO a
atomically (STM (Maybe (Async ())) -> IO (Maybe (Async ())))
-> STM (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a b. (a -> b) -> a -> b
$ Text -> Map Text (Async ()) -> STM (Maybe (Async ()))
forall key value.
(Eq key, Hashable key) =>
key -> Map key value -> STM (Maybe value)
M.lookup Text
i Map Text (Async ())
vars
                case Maybe (Async ())
r of
                  Maybe (Async ())
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                  Just Async ()
a  -> do Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
a
                                STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ Text -> Map Text (Async ()) -> STM ()
forall key value.
(Eq key, Hashable key) =>
key -> Map key value -> STM ()
M.delete Text
i Map Text (Async ())
vars
                Async a -> Map Text (Async ()) -> IO ()
listen Async a
ka Map Text (Async ())
vars
        Just ClientMessage
GQLTerminate  -- terminate all queries
          -> do Async a -> Map Text (Async ()) -> IO ()
forall a a a. Async a -> Map a (Async a) -> IO ()
cancelAll Async a
ka Map Text (Async ())
vars
                Connection -> Text -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
sendClose Connection
conn (Text
"GraphQL session terminated" :: T.Text)
        Maybe ClientMessage
_ -> Async a -> Map Text (Async ()) -> IO ()
listen Async a
ka Map Text (Async ())
vars  -- Keep going
    -- Handle a single query
    handle :: Text -> Text -> VariableMapC -> Maybe Text -> IO ()
handle Text
i Text
q VariableMapC
v Maybe Text
o
      = case Text -> Either Text [Definition]
P.parseDoc Text
q of
          Left Text
err -> Connection -> ServerMessage -> IO ()
forall a. ToJSON a => Connection -> a -> IO ()
sendJSON Connection
conn (Text -> Value -> ServerMessage
GQLError Text
i (Text -> Value
forall a. ToJSON a => a -> Value
A.toJSON Text
err))
          Right [Definition]
d  -> do
            Maybe Text
-> VariableMapC
-> [Definition]
-> ConduitT Value Void IO ()
-> IO ()
f Maybe Text
o VariableMapC
v [Definition]
d (Text -> ConduitT Value Void IO ()
forall (m :: * -> *) o. MonadIO m => Text -> ConduitT Value o m ()
cndt Text
i)
            Connection -> ServerMessage -> IO ()
forall a. ToJSON a => Connection -> a -> IO ()
sendJSON Connection
conn (Text -> ServerMessage
GQLComplete Text
i)
    -- Conduit which sends the results via the wire
    cndt :: Text -> ConduitT Value o m ()
cndt Text
i = do
      Maybe Value
msg <- ConduitT Value o m (Maybe Value)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await
      case Maybe Value
msg of
        Maybe Value
Nothing -> () -> ConduitT Value o m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Just Value
v  -> do IO () -> ConduitT Value o m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT Value o m ()) -> IO () -> ConduitT Value o m ()
forall a b. (a -> b) -> a -> b
$ Connection -> ServerMessage -> IO ()
forall a. ToJSON a => Connection -> a -> IO ()
sendJSON Connection
conn (Text -> Value -> ServerMessage
GQLData Text
i Value
v)
                      Text -> ConduitT Value o m ()
cndt Text
i
    -- Cancel all pending subscriptions
    cancelAll :: Async a -> Map a (Async a) -> IO ()
cancelAll Async a
ka Map a (Async a)
vars
      = do Async a -> IO ()
forall a. Async a -> IO ()
cancel Async a
ka
           [(a, Async a)]
vs <- STM [(a, Async a)] -> IO [(a, Async a)]
forall a. STM a -> IO a
atomically (STM [(a, Async a)] -> IO [(a, Async a)])
-> STM [(a, Async a)] -> IO [(a, Async a)]
forall a b. (a -> b) -> a -> b
$ ListT STM (a, Async a) -> STM [(a, Async a)]
forall (m :: * -> *) a. Monad m => ListT m a -> m [a]
L.toList (ListT STM (a, Async a) -> STM [(a, Async a)])
-> ListT STM (a, Async a) -> STM [(a, Async a)]
forall a b. (a -> b) -> a -> b
$ Map a (Async a) -> ListT STM (a, Async a)
forall key value. Map key value -> ListT STM (key, value)
M.listT Map a (Async a)
vars
           [Async a] -> (Async a -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (((a, Async a) -> Async a) -> [(a, Async a)] -> [Async a]
forall a b. (a -> b) -> [a] -> [b]
map (a, Async a) -> Async a
forall a b. (a, b) -> b
snd [(a, Async a)]
vs) Async a -> IO ()
forall a. Async a -> IO ()
cancel

receiveJSON :: A.FromJSON a => Connection -> IO (Maybe a)
receiveJSON :: Connection -> IO (Maybe a)
receiveJSON Connection
conn = do
  ByteString
d <- Connection -> IO ByteString
forall a. WebSocketsData a => Connection -> IO a
receiveData Connection
conn
  Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> IO (Maybe a)) -> Maybe a -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ ByteString -> Maybe a
forall a. FromJSON a => ByteString -> Maybe a
A.decode ByteString
d

sendJSON :: A.ToJSON a => Connection -> a -> IO ()
sendJSON :: Connection -> a -> IO ()
sendJSON Connection
conn a
v
  = Connection -> ByteString -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
sendTextData Connection
conn (a -> ByteString
forall a. ToJSON a => a -> ByteString
A.encode a
v)

data ClientMessage
  = GQLConnectionInit { ClientMessage -> Maybe Value
initPayload :: Maybe A.Value }
  | GQLStart { ClientMessage -> Text
clientMsgId   :: T.Text
             , ClientMessage -> Text
query         :: T.Text
             , ClientMessage -> VariableMapC
variables     :: VariableMapC
             , ClientMessage -> Maybe Text
operationName :: Maybe T.Text}
  | GQLStop { clientMsgId :: T.Text }
  | GQLTerminate
  deriving Int -> ClientMessage -> ShowS
[ClientMessage] -> ShowS
ClientMessage -> String
(Int -> ClientMessage -> ShowS)
-> (ClientMessage -> String)
-> ([ClientMessage] -> ShowS)
-> Show ClientMessage
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ClientMessage] -> ShowS
$cshowList :: [ClientMessage] -> ShowS
show :: ClientMessage -> String
$cshow :: ClientMessage -> String
showsPrec :: Int -> ClientMessage -> ShowS
$cshowsPrec :: Int -> ClientMessage -> ShowS
Show

data ServerMessage
  = GQLConnectionError { ServerMessage -> Maybe Value
errorPayload :: Maybe A.Value }
  | GQLConnectionAck
  | GQLData     { ServerMessage -> Text
serverMsgId :: T.Text
                , ServerMessage -> Value
payload     :: A.Value }
  | GQLError    { serverMsgId :: T.Text
                , payload     :: A.Value }
  | GQLComplete { serverMsgId :: T.Text}
  | GQLKeepAlive
  deriving Int -> ServerMessage -> ShowS
[ServerMessage] -> ShowS
ServerMessage -> String
(Int -> ServerMessage -> ShowS)
-> (ServerMessage -> String)
-> ([ServerMessage] -> ShowS)
-> Show ServerMessage
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ServerMessage] -> ShowS
$cshowList :: [ServerMessage] -> ShowS
show :: ServerMessage -> String
$cshow :: ServerMessage -> String
showsPrec :: Int -> ServerMessage -> ShowS
$cshowsPrec :: Int -> ServerMessage -> ShowS
Show

-- NOTE: using https://github.com/apollographql/subscriptions-transport-ws/blob/master/src/message-types.ts
-- as source of truth for the message types

instance A.FromJSON ClientMessage where
  parseJSON :: Value -> Parser ClientMessage
parseJSON = String
-> (Object -> Parser ClientMessage)
-> Value
-> Parser ClientMessage
forall a. String -> (Object -> Parser a) -> Value -> Parser a
A.withObject String
"ClientMessage" ((Object -> Parser ClientMessage) -> Value -> Parser ClientMessage)
-> (Object -> Parser ClientMessage)
-> Value
-> Parser ClientMessage
forall a b. (a -> b) -> a -> b
$ \Object
v -> do
     String
ty :: String <- Object
v Object -> Text -> Parser String
forall a. FromJSON a => Object -> Text -> Parser a
.: Text
"type"
     case String
ty of
       String
"connection_init"
         -> Maybe Value -> ClientMessage
GQLConnectionInit (Maybe Value -> ClientMessage)
-> Parser (Maybe Value) -> Parser ClientMessage
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
v Object -> Text -> Parser (Maybe Value)
forall a. FromJSON a => Object -> Text -> Parser (Maybe a)
.:? Text
"payload"
       String
"start"
         -> do Text
i <- Object
v Object -> Text -> Parser Text
forall a. FromJSON a => Object -> Text -> Parser a
.: Text
"id"
               (Text
q,VariableMapC
vrs,Maybe Text
opN) <- Object
v Object -> Text -> Parser Value
forall a. FromJSON a => Object -> Text -> Parser a
.: Text
"payload" Parser Value
-> (Value -> Parser (Text, VariableMapC, Maybe Text))
-> Parser (Text, VariableMapC, Maybe Text)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Value -> Parser (Text, VariableMapC, Maybe Text)
parsePayload
               ClientMessage -> Parser ClientMessage
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ClientMessage -> Parser ClientMessage)
-> ClientMessage -> Parser ClientMessage
forall a b. (a -> b) -> a -> b
$ Text -> Text -> VariableMapC -> Maybe Text -> ClientMessage
GQLStart Text
i Text
q VariableMapC
vrs Maybe Text
opN
       String
"stop"
         -> Text -> ClientMessage
GQLStop (Text -> ClientMessage) -> Parser Text -> Parser ClientMessage
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
v Object -> Text -> Parser Text
forall a. FromJSON a => Object -> Text -> Parser a
.: Text
"id"
       String
"terminate"
         -> ClientMessage -> Parser ClientMessage
forall (f :: * -> *) a. Applicative f => a -> f a
pure ClientMessage
GQLTerminate
       String
_ -> Parser ClientMessage
forall (f :: * -> *) a. Alternative f => f a
empty
    where
      parsePayload :: Value -> Parser (Text, VariableMapC, Maybe Text)
parsePayload = String
-> (Object -> Parser (Text, VariableMapC, Maybe Text))
-> Value
-> Parser (Text, VariableMapC, Maybe Text)
forall a. String -> (Object -> Parser a) -> Value -> Parser a
A.withObject String
"ClientMessage/GQL_START" ((Object -> Parser (Text, VariableMapC, Maybe Text))
 -> Value -> Parser (Text, VariableMapC, Maybe Text))
-> (Object -> Parser (Text, VariableMapC, Maybe Text))
-> Value
-> Parser (Text, VariableMapC, Maybe Text)
forall a b. (a -> b) -> a -> b
$
        \Object
v -> (,,) (Text
 -> VariableMapC -> Maybe Text -> (Text, VariableMapC, Maybe Text))
-> Parser Text
-> Parser
     (VariableMapC -> Maybe Text -> (Text, VariableMapC, Maybe Text))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
v Object -> Text -> Parser Text
forall a. FromJSON a => Object -> Text -> Parser a
.: Text
"query"
                   Parser
  (VariableMapC -> Maybe Text -> (Text, VariableMapC, Maybe Text))
-> Parser VariableMapC
-> Parser (Maybe Text -> (Text, VariableMapC, Maybe Text))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (Object
v Object -> Text -> Parser VariableMapC
forall a. FromJSON a => Object -> Text -> Parser a
.: Text
"variables" Parser VariableMapC -> Parser VariableMapC -> Parser VariableMapC
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> VariableMapC -> Parser VariableMapC
forall (f :: * -> *) a. Applicative f => a -> f a
pure VariableMapC
forall k v. HashMap k v
HM.empty)
                   Parser (Maybe Text -> (Text, VariableMapC, Maybe Text))
-> Parser (Maybe Text) -> Parser (Text, VariableMapC, Maybe Text)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
v Object -> Text -> Parser (Maybe Text)
forall a. FromJSON a => Object -> Text -> Parser (Maybe a)
.:? Text
"operationName"

theType :: (A.KeyValue kv) => T.Text -> kv
theType :: Text -> kv
theType Text
t = Text
"type" Text -> Text -> kv
forall kv v. (KeyValue kv, ToJSON v) => Text -> v -> kv
.= Text
t

instance A.ToJSON ServerMessage where
  toJSON :: ServerMessage -> Value
toJSON (GQLConnectionError Maybe Value
e)
    = [Pair] -> Value
A.object [Text -> Pair
forall kv. KeyValue kv => Text -> kv
theType Text
"connection_error", Text
"payload" Text -> Maybe Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Text -> v -> kv
.= Maybe Value
e]
  toJSON ServerMessage
GQLConnectionAck
    = [Pair] -> Value
A.object [Text -> Pair
forall kv. KeyValue kv => Text -> kv
theType Text
"connection_ack"]
  toJSON (GQLData Text
i Value
p)
    = [Pair] -> Value
A.object [Text -> Pair
forall kv. KeyValue kv => Text -> kv
theType Text
"data", Text
"id" Text -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Text -> v -> kv
.= Text
i, Text
"payload" Text -> Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Text -> v -> kv
.= Value
p]
  toJSON (GQLError Text
i Value
p)
    = [Pair] -> Value
A.object [Text -> Pair
forall kv. KeyValue kv => Text -> kv
theType Text
"error", Text
"id" Text -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Text -> v -> kv
.= Text
i, Text
"payload" Text -> Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Text -> v -> kv
.= Value
p]
  toJSON (GQLComplete Text
i)
    = [Pair] -> Value
A.object [Text -> Pair
forall kv. KeyValue kv => Text -> kv
theType Text
"complete", Text
"id" Text -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Text -> v -> kv
.= Text
i]
  toJSON ServerMessage
GQLKeepAlive
    = [Pair] -> Value
A.object [Text -> Pair
forall kv. KeyValue kv => Text -> kv
theType Text
"ka"]