{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Data.Morpheus.Types.Internal.Subscription
( connectionThread
, toOutStream
, runStreamWS
, runStreamHTTP
, Stream
, Scope(..)
, Input(..)
, WS
, HTTP
, acceptApolloRequest
, publish
, Store(..)
, initDefaultStore
, publishEventWith
)
where
import Control.Exception ( finally )
import Control.Monad ( forever )
import Control.Concurrent ( readMVar
, newMVar
, modifyMVar_
)
import Data.UUID.V4 ( nextRandom )
import Control.Monad.IO.Class ( MonadIO(..) )
import Control.Monad.IO.Unlift ( MonadUnliftIO
, withRunInIO
)
import Data.Morpheus.Types.Internal.Operation
(empty)
import Data.Morpheus.Types.Internal.Resolving
( GQLChannel(..) )
import Data.Morpheus.Types.Internal.Subscription.Apollo
( acceptApolloRequest )
import Data.Morpheus.Types.Internal.Subscription.ClientConnectionStore
( delete
, publish
, ClientConnectionStore
)
import Data.Morpheus.Types.Internal.Subscription.Stream
( toOutStream
, runStreamWS
, runStreamHTTP
, Stream
, Scope(..)
, Input(..)
, WS
, HTTP
)
connect :: MonadIO m => m (Input WS)
connect = Init <$> liftIO nextRandom
disconnect:: Scope WS e m -> Input WS -> m ()
disconnect ScopeWS { update } (Init clientID) = update (delete clientID)
data Store e m = Store
{ readStore :: m (ClientConnectionStore e m)
, writeStore :: (ClientConnectionStore e m -> ClientConnectionStore e m) -> m ()
}
publishEventWith ::
( MonadIO m
, (Eq (StreamChannel event))
, (GQLChannel event)
) => Store event m -> event -> m ()
publishEventWith store event = readStore store >>= publish event
initDefaultStore ::
( MonadIO m
, (Eq (StreamChannel event))
, (GQLChannel event)
)
=> IO (Store event m)
initDefaultStore = do
store <- newMVar empty
pure Store
{ readStore = liftIO $ readMVar store
, writeStore = \changes -> liftIO $ modifyMVar_ store (return . changes)
}
finallyM :: MonadUnliftIO m => m () -> m () -> m ()
finallyM loop end = withRunInIO $ \runIO -> finally (runIO loop) (runIO end)
connectionThread
:: ( MonadUnliftIO m
)
=> (Input WS -> Stream WS e m)
-> Scope WS e m
-> m ()
connectionThread api scope = do
input <- connect
finallyM
(connectionLoop api scope input)
(disconnect scope input)
connectionLoop
:: Monad m
=> (Input WS -> Stream WS e m)
-> Scope WS e m
-> Input WS
-> m ()
connectionLoop api scope input
= forever
$ runStreamWS scope
$ api input