module Network.Nakadi.Internal.Http
( module Network.HTTP.Simple
, module Network.HTTP.Types.Status
, module Network.Nakadi.Internal.Types
, httpJsonBody
, httpJsonNoBody
, httpJsonBodyStream
, errorClientNotAuthenticated
, errorUnprocessableEntity
, errorAccessForbidden
, errorTooManyRequests
, errorBadRequest
, errorSubscriptionNotFound
, errorCursorAlreadyCommitted
, errorCursorResetInProgress
, errorEventTypeNotFound
, errorSubscriptionExistsAlready
, errorBatchPartiallySubmitted
, errorBatchNotSubmitted
, setRequestQueryParameters
) where
import Network.Nakadi.Internal.Prelude
import Conduit
import Control.Arrow
import Control.Lens
import Control.Monad (void)
import Control.Monad.Reader
import Control.Monad.Trans.Resource
import Data.Aeson
import qualified Data.ByteString.Lazy as ByteString.Lazy
import qualified Data.Conduit.Binary as Conduit
import qualified Data.Text as Text
import Network.HTTP.Client (BodyReader,
HttpException (..),
HttpExceptionContent (..),
checkResponse, responseStatus)
import Network.HTTP.Client.Conduit (bodyReaderSource)
import Network.HTTP.Simple
import Network.HTTP.Types
import Network.HTTP.Types.Status
import qualified Network.Nakadi.Internal.Lenses as L
import Network.Nakadi.Internal.Retry
import Network.Nakadi.Internal.Types
import Network.Nakadi.Internal.Util
conduitDecode ::
(MonadIO m, FromJSON a)
=> Config
-> ConduitM ByteString a m ()
conduitDecode Config { .. } = awaitForever $ \a ->
case eitherDecodeStrict a of
Right v -> yield v
Left err -> liftIO $ callback a (Text.pack err)
where callback = fromMaybe dummyCallback _deserializationFailureCallback
dummyCallback _ _ = return ()
checkNakadiResponse :: Request -> Response BodyReader -> IO ()
checkNakadiResponse request response =
when (statusCode (responseStatus response) `div` 100 == 5) $
throwIO $ HttpExceptionRequest request (StatusCodeException (void response) mempty)
httpBuildRequest ::
(MonadIO m, MonadCatch m)
=> Config
-> (Request -> Request)
-> m Request
httpBuildRequest Config { .. } requestDef =
let manager = _manager
request = requestDef _requestTemplate
& setRequestManager manager
& (\req -> req { checkResponse = checkNakadiResponse })
in modifyRequest _requestModifier request
modifyRequest :: (MonadIO m, MonadCatch m) => (Request -> IO Request) -> Request -> m Request
modifyRequest rm request =
tryAny (liftIO (rm request)) >>= \case
Right modifiedRequest -> return modifiedRequest
Left exn -> throwIO $ RequestModificationException exn
httpExecRequest ::
(MonadIO m, MonadMask m)
=> Config
-> (Request -> Request)
-> m (Response ByteString.Lazy.ByteString)
httpExecRequest config requestDef = do
req <- httpBuildRequest config requestDef
retryAction config req (liftIO . (config^.L.http.L.httpLbs))
httpExecRequestWithStatus ::
(MonadIO m, MonadMask m)
=> Config
-> (Request -> Request)
-> m (Response ByteString.Lazy.ByteString, Status)
httpExecRequestWithStatus config requestDef =
(identity &&& getResponseStatus) <$> httpExecRequest config requestDef
httpJsonBody ::
(MonadMask m, MonadIO m, FromJSON a)
=> Config
-> Status
-> [(Status, ByteString.Lazy.ByteString -> m NakadiException)]
-> (Request -> Request)
-> m a
httpJsonBody config successStatus exceptionMap requestDef = do
(response, status) <- httpExecRequestWithStatus config requestDef
if status == successStatus
then decodeThrow (getResponseBody response)
else case lookup status exceptionMap' of
Just mkExn -> mkExn (getResponseBody response) >>= throwIO
Nothing -> throwIO (UnexpectedResponse (void response))
where exceptionMap' = exceptionMap ++ defaultExceptionMap
httpJsonNoBody ::
(MonadMask m, MonadIO m)
=> Config
-> Status
-> [(Status, ByteString.Lazy.ByteString -> m NakadiException)]
-> (Request -> Request)
-> m ()
httpJsonNoBody config successStatus exceptionMap requestDef = do
(response, status) <- httpExecRequestWithStatus config requestDef
unless (status == successStatus) $
case lookup status exceptionMap' of
Just mkExn -> mkExn (getResponseBody response) >>= throwIO
Nothing -> throwIO (UnexpectedResponse (void response))
where exceptionMap' = exceptionMap ++ defaultExceptionMap
httpJsonBodyStream ::
(MonadMask m, MonadIO m, FromJSON a, MonadBaseControl IO m, MonadResource m)
=> Config
-> Status
-> (Response () -> Either Text b)
-> [(Status, ByteString.Lazy.ByteString -> m NakadiException)]
-> (Request -> Request)
-> m (b, ConduitM () a (ReaderT r m) ())
httpJsonBodyStream config successStatus f exceptionMap requestDef = do
request <- httpBuildRequest config requestDef
let manager = config^.L.manager
responseOpen = config^.L.http.L.responseOpen
responseClose = config^.L.http.L.responseClose
responseOpenRetry = retryAction config request (flip responseOpen manager)
(_, response) <- allocate responseOpenRetry responseClose
let response_ = void response
bodySource = bodyReaderSource (getResponseBody response)
status = getResponseStatus response
if status == successStatus
then do connectCallback (config^.L.logFunc) response_
let conduit = bodySource
.| Conduit.lines
.| conduitDecode config
case f response_ of
Left errMsg -> throwString (Text.unpack errMsg)
Right b -> return (b, readerC (const conduit))
else case lookup status exceptionMap' of
Just mkExn -> conduitDrainToLazyByteString bodySource >>= mkExn >>= throwIO
Nothing -> throwIO (UnexpectedResponse response_)
where exceptionMap' = exceptionMap ++ defaultExceptionMap
connectCallback = case config^.L.streamConnectCallback of
Just cb -> \logFunc response -> liftIO $ cb logFunc response
Nothing -> \_ _ -> return ()
setRequestQueryParameters ::
[(ByteString, ByteString)]
-> Request
-> Request
setRequestQueryParameters parameters = setRequestQueryString parameters'
where parameters' = map (fmap Just) parameters
defaultExceptionMap ::
MonadThrow m
=> [(Status, ByteString.Lazy.ByteString -> m NakadiException)]
defaultExceptionMap =
[ (status401, errorClientNotAuthenticated)
, (status403, errorAccessForbidden)
, (status400, errorBadRequest)
, (status422, errorUnprocessableEntity)
, (status409, errorConflict) ]
errorClientNotAuthenticated :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorClientNotAuthenticated s = ClientNotAuthenticated <$> decodeThrow s
errorConflict :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorConflict s = Conflict <$> decodeThrow s
errorUnprocessableEntity :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorUnprocessableEntity s = UnprocessableEntity <$> decodeThrow s
errorAccessForbidden :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorAccessForbidden s = AccessForbidden <$> decodeThrow s
errorTooManyRequests :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorTooManyRequests s = TooManyRequests <$> decodeThrow s
errorBadRequest :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorBadRequest s = BadRequest <$> decodeThrow s
errorSubscriptionNotFound :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorSubscriptionNotFound s = SubscriptionNotFound <$> decodeThrow s
errorCursorAlreadyCommitted :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorCursorAlreadyCommitted s = CursorAlreadyCommitted <$> decodeThrow s
errorCursorResetInProgress :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorCursorResetInProgress s = CursorResetInProgress <$> decodeThrow s
errorEventTypeNotFound :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorEventTypeNotFound s = EventTypeNotFound <$> decodeThrow s
errorSubscriptionExistsAlready :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorSubscriptionExistsAlready s = SubscriptionExistsAlready <$> decodeThrow s
errorBatchPartiallySubmitted :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorBatchPartiallySubmitted s = BatchPartiallySubmitted <$> decodeThrow s
errorBatchNotSubmitted :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorBatchNotSubmitted s = BatchNotSubmitted <$> decodeThrow s