module Network.Nakadi.Subscriptions.Events
( subscriptionSource
, subscriptionSourceR
, runSubscription
, runSubscriptionR
, subscriptionSink
) where
import Network.Nakadi.Internal.Prelude
import Conduit
import Control.Lens
import Control.Monad.Reader
import Data.Aeson
import Network.HTTP.Simple
import Network.HTTP.Types
import Data.Void
import Network.Nakadi.Internal.Config
import Network.Nakadi.Internal.Conversions
import Network.Nakadi.Internal.Http
import Network.Nakadi.Internal.Lenses (HasNakadiSubscriptionCursor)
import qualified Network.Nakadi.Internal.Lenses as L
import Network.Nakadi.Subscriptions.Cursors
subscriptionSource ::
(MonadNakadi m, MonadResource m, MonadBaseControl IO m, MonadMask m, FromJSON a)
=> Config
-> Maybe ConsumeParameters
-> SubscriptionId
-> m ( SubscriptionEventStream
, ConduitM ()
(SubscriptionEventStreamBatch a)
(ReaderT SubscriptionEventStreamContext m)
() )
subscriptionSource config maybeParams subscriptionId = do
let consumeParams = fromMaybe (config^.L.consumeParameters) maybeParams
queryParams = buildSubscriptionConsumeQueryParameters consumeParams
addFlowId = case _flowId consumeParams of
Just flowId -> setRequestHeader "X-Flow-Id" [encodeUtf8 flowId]
Nothing -> identity
httpJsonBodyStream config ok200 buildSubscriptionEventStream
[(status404, errorSubscriptionNotFound)]
(setRequestPath path . addFlowId . setRequestQueryParameters queryParams)
where buildSubscriptionEventStream response =
case listToMaybe (getResponseHeader "X-Nakadi-StreamId" response) of
Just streamId ->
Right SubscriptionEventStream
{ _streamId = StreamId (decodeUtf8 streamId)
, _subscriptionId = subscriptionId }
Nothing ->
Left "X-Nakadi-StreamId not found"
path = "/subscriptions/"
<> subscriptionIdToByteString subscriptionId
<> "/events"
subscriptionSourceR ::
(MonadNakadiEnv r m, MonadResource m, MonadBaseControl IO m, MonadMask m, FromJSON a)
=> Maybe ConsumeParameters
-> SubscriptionId
-> m ( SubscriptionEventStream
, ConduitM ()
(SubscriptionEventStreamBatch a)
(ReaderT SubscriptionEventStreamContext m)
() )
subscriptionSourceR maybeParams subscriptionId = do
config <- asks (view L.nakadiConfig)
subscriptionSource config maybeParams subscriptionId
runSubscription ::
(Monad m, MonadBaseControl IO m, MonadResource m)
=> Config
-> SubscriptionEventStream
-> ConduitM ()
Void
(ReaderT SubscriptionEventStreamContext m)
r
-> m r
runSubscription config SubscriptionEventStream { .. } =
let subscriptionStreamContext = SubscriptionEventStreamContext
{ _streamId = _streamId
, _subscriptionId = _subscriptionId
, _ctxConfig = config }
in runConduit . runReaderC subscriptionStreamContext
runSubscriptionR ::
(Monad m, MonadBaseControl IO m, MonadResource m, MonadReader r m, L.HasNakadiConfig r Config)
=> SubscriptionEventStream
-> ConduitM ()
Void
(ReaderT SubscriptionEventStreamContext m)
r
-> m r
runSubscriptionR subscriptionEventStream conduit = do
config <- asks (view L.nakadiConfig)
runSubscription config subscriptionEventStream conduit
subscriptionSink ::
(MonadNakadi m, HasNakadiSubscriptionCursor a )
=> ConduitM a Void (ReaderT SubscriptionEventStreamContext m) ()
subscriptionSink = awaitForever $ lift . subscriptionCommit . (: [])