{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}

module Network.HTTP2.Client.Run where

import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception
import qualified Data.ByteString.UTF8 as UTF8
import Data.IORef
import Data.IP (IPv6)
import Network.Control (RxFlow (..), defaultMaxData)
import Network.HTTP.Semantics.Client
import Network.HTTP.Semantics.Client.Internal
import Network.HTTP.Semantics.IO
import Network.Socket (SockAddr)
import qualified System.ThreadManager as T
import Text.Read (readMaybe)

import Imports
import Network.HTTP2.Frame
import Network.HTTP2.H2

-- | Client configuration
data ClientConfig = ClientConfig
    { ClientConfig -> Scheme
scheme :: Scheme
    -- ^ https or http
    , ClientConfig -> Authority
authority :: Authority
    -- ^ Server name
    , ClientConfig -> Int
cacheLimit :: Int
    -- ^ The maximum number of incoming streams on the net
    , ClientConfig -> Int
connectionWindowSize :: WindowSize
    -- ^ The window size of connection.
    , ClientConfig -> Settings
settings :: Settings
    -- ^ Settings
    }
    deriving (ClientConfig -> ClientConfig -> Bool
(ClientConfig -> ClientConfig -> Bool)
-> (ClientConfig -> ClientConfig -> Bool) -> Eq ClientConfig
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ClientConfig -> ClientConfig -> Bool
== :: ClientConfig -> ClientConfig -> Bool
$c/= :: ClientConfig -> ClientConfig -> Bool
/= :: ClientConfig -> ClientConfig -> Bool
Eq, Int -> ClientConfig -> ShowS
[ClientConfig] -> ShowS
ClientConfig -> Authority
(Int -> ClientConfig -> ShowS)
-> (ClientConfig -> Authority)
-> ([ClientConfig] -> ShowS)
-> Show ClientConfig
forall a.
(Int -> a -> ShowS) -> (a -> Authority) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ClientConfig -> ShowS
showsPrec :: Int -> ClientConfig -> ShowS
$cshow :: ClientConfig -> Authority
show :: ClientConfig -> Authority
$cshowList :: [ClientConfig] -> ShowS
showList :: [ClientConfig] -> ShowS
Show)

-- | The default client config.
--
-- The @authority@ field will be used to set the HTTP2 @:authority@
-- pseudo-header. In most cases you will want to override it to be equal to
-- @host@.
--
-- Further background on @authority@:
-- [RFC 3986](https://datatracker.ietf.org/doc/html/rfc3986#section-3.2) also
-- allows @host:port@, and most servers will accept this too. However, when
-- using TLS, many servers will expect the TLS SNI server name and the
-- @:authority@ pseudo-header to be equal, and for TLS SNI the server name
-- should not include the port. Note that HTTP2 explicitly /disallows/ using
-- @userinfo\@@ as part of the authority.
--
-- >>> defaultClientConfig
-- ClientConfig {scheme = "http", authority = "localhost", cacheLimit = 64, connectionWindowSize = 16777216, settings = Settings {headerTableSize = 4096, enablePush = True, maxConcurrentStreams = Just 64, initialWindowSize = 262144, maxFrameSize = 16384, maxHeaderListSize = Nothing, pingRateLimit = 10, emptyFrameRateLimit = 4, settingsRateLimit = 4, rstRateLimit = 4}}
defaultClientConfig :: ClientConfig
defaultClientConfig :: ClientConfig
defaultClientConfig =
    ClientConfig
        { scheme :: Scheme
scheme = Scheme
"http"
        , authority :: Authority
authority = Authority
"localhost"
        , cacheLimit :: Int
cacheLimit = Int
64
        , connectionWindowSize :: Int
connectionWindowSize = Int
defaultMaxData
        , settings :: Settings
settings = Settings
defaultSettings
        }

-- | Running HTTP/2 client.
run :: ClientConfig -> Config -> Client a -> IO a
run :: forall a. ClientConfig -> Config -> Client a -> IO a
run cconf :: ClientConfig
cconf@ClientConfig{Int
Authority
Scheme
Settings
scheme :: ClientConfig -> Scheme
authority :: ClientConfig -> Authority
cacheLimit :: ClientConfig -> Int
connectionWindowSize :: ClientConfig -> Int
settings :: ClientConfig -> Settings
scheme :: Scheme
authority :: Authority
cacheLimit :: Int
connectionWindowSize :: Int
settings :: Settings
..} Config
conf Client a
client = do
    Context
ctx <- ClientConfig -> Config -> IO Context
setup ClientConfig
cconf Config
conf
    Config -> Context -> IO a -> IO a
forall a. Config -> Context -> IO a -> IO a
runH2 Config
conf Context
ctx (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ Context -> IO a
runClient Context
ctx
  where
    serverMaxStreams :: Context -> IO Int
serverMaxStreams Context
ctx = do
        Maybe Int
mx <- Settings -> Maybe Int
maxConcurrentStreams (Settings -> Maybe Int) -> IO Settings -> IO (Maybe Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef Settings -> IO Settings
forall a. IORef a -> IO a
readIORef (Context -> IORef Settings
peerSettings Context
ctx)
        case Maybe Int
mx of
            Maybe Int
Nothing -> Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
forall a. Bounded a => a
maxBound
            Just Int
x -> Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
x
    possibleClientStream :: Context -> IO Int
possibleClientStream Context
ctx = do
        Int
x <- Context -> IO Int
serverMaxStreams Context
ctx
        Int
n <- OddStreamTable -> Int
oddConc (OddStreamTable -> Int) -> IO OddStreamTable -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar OddStreamTable -> IO OddStreamTable
forall a. TVar a -> IO a
readTVarIO (Context -> TVar OddStreamTable
oddStreamTable Context
ctx)
        Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
n)
    aux :: Context -> Aux
aux Context
ctx =
        Aux
            { auxPossibleClientStreams :: IO Int
auxPossibleClientStreams = Context -> IO Int
possibleClientStream Context
ctx
            }
    clientCore :: Context -> Request -> (Response -> IO b) -> IO b
clientCore Context
ctx Request
req Response -> IO b
processResponse = do
        (Stream
strm, Maybe OutObj
moutobj) <- Context
-> Scheme -> Authority -> Request -> IO (Stream, Maybe OutObj)
makeStream Context
ctx Scheme
scheme Authority
authority Request
req
        case Maybe OutObj
moutobj of
            Maybe OutObj
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Just OutObj
outobj -> Config -> Context -> Stream -> OutObj -> Bool -> IO ()
sendRequest Config
conf Context
ctx Stream
strm OutObj
outobj Bool
False
        Response
rsp <- Stream -> IO Response
getResponse Stream
strm
        b
x <- Response -> IO b
processResponse Response
rsp
        Context -> Stream -> IO ()
adjustRxWindow Context
ctx Stream
strm
        b -> IO b
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return b
x
    runClient :: Context -> IO a
runClient Context
ctx = Context -> IO a -> IO a
forall a. Context -> IO a -> IO a
wrapClient Context
ctx (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ Client a
client (Context -> Request -> (Response -> IO r) -> IO r
forall {b}. Context -> Request -> (Response -> IO b) -> IO b
clientCore Context
ctx) (Aux -> IO a) -> Aux -> IO a
forall a b. (a -> b) -> a -> b
$ Context -> Aux
aux Context
ctx

wrapClient :: Context -> IO a -> IO a
wrapClient :: forall a. Context -> IO a -> IO a
wrapClient Context
ctx IO a
client = do
    a
x <- IO a
client
    ThreadManager -> IO ()
T.waitUntilAllGone (ThreadManager -> IO ()) -> ThreadManager -> IO ()
forall a b. (a -> b) -> a -> b
$ Context -> ThreadManager
threadManager Context
ctx
    let frame :: Scheme
frame = Int -> ErrorCode -> Scheme -> Scheme
goawayFrame Int
0 ErrorCode
NoError Scheme
"graceful closing"
    TQueue Control -> Control -> IO ()
enqueueControl (Context -> TQueue Control
controlQ Context
ctx) (Control -> IO ()) -> Control -> IO ()
forall a b. (a -> b) -> a -> b
$ Maybe SettingsList -> [Scheme] -> Control
CFrames Maybe SettingsList
forall a. Maybe a
Nothing [Scheme
frame]
    TQueue Control -> Control -> IO ()
enqueueControl (Context -> TQueue Control
controlQ Context
ctx) (Control -> IO ()) -> Control -> IO ()
forall a b. (a -> b) -> a -> b
$ HTTP2Error -> Control
CFinish HTTP2Error
GoAwayIsSent
    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        Bool
done <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar (TVar Bool -> STM Bool) -> TVar Bool -> STM Bool
forall a b. (a -> b) -> a -> b
$ Context -> TVar Bool
senderDone Context
ctx
        Bool -> STM ()
check Bool
done
    a -> IO a
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x

-- | Launching a receiver and a sender.
runIO :: ClientConfig -> Config -> (ClientIO -> IO (IO a)) -> IO a
runIO :: forall a. ClientConfig -> Config -> (ClientIO -> IO (IO a)) -> IO a
runIO cconf :: ClientConfig
cconf@ClientConfig{Int
Authority
Scheme
Settings
scheme :: ClientConfig -> Scheme
authority :: ClientConfig -> Authority
cacheLimit :: ClientConfig -> Int
connectionWindowSize :: ClientConfig -> Int
settings :: ClientConfig -> Settings
scheme :: Scheme
authority :: Authority
cacheLimit :: Int
connectionWindowSize :: Int
settings :: Settings
..} conf :: Config
conf@Config{Int
Buffer
Manager
SockAddr
Int -> IO Scheme
PositionReadMaker
Scheme -> IO ()
confWriteBuffer :: Buffer
confBufferSize :: Int
confSendAll :: Scheme -> IO ()
confReadN :: Int -> IO Scheme
confPositionReadMaker :: PositionReadMaker
confTimeoutManager :: Manager
confMySockAddr :: SockAddr
confPeerSockAddr :: SockAddr
confWriteBuffer :: Config -> Buffer
confBufferSize :: Config -> Int
confSendAll :: Config -> Scheme -> IO ()
confReadN :: Config -> Int -> IO Scheme
confPositionReadMaker :: Config -> PositionReadMaker
confTimeoutManager :: Config -> Manager
confMySockAddr :: Config -> SockAddr
confPeerSockAddr :: Config -> SockAddr
..} ClientIO -> IO (IO a)
action = do
    ctx :: Context
ctx@Context{TVar Bool
TVar Int
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef Int
IORef (Maybe Int)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue Output
ThreadManager
DynamicTable
Settings
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
threadManager :: Context -> ThreadManager
controlQ :: Context -> TQueue Control
senderDone :: Context -> TVar Bool
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe Int)
myStreamId :: TVar Int
peerStreamId :: IORef Int
outputBufferLimit :: IORef Int
outputQ :: TQueue Output
outputQStreamID :: TVar Int
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
senderDone :: TVar Bool
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe Int)
myStreamId :: Context -> TVar Int
peerStreamId :: Context -> IORef Int
outputBufferLimit :: Context -> IORef Int
outputQ :: Context -> TQueue Output
outputQStreamID :: Context -> TVar Int
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
..} <- ClientConfig -> Config -> IO Context
setup ClientConfig
cconf Config
conf
    let putB :: Scheme -> IO ()
putB Scheme
bs = TQueue Control -> Control -> IO ()
enqueueControl TQueue Control
controlQ (Control -> IO ()) -> Control -> IO ()
forall a b. (a -> b) -> a -> b
$ Maybe SettingsList -> [Scheme] -> Control
CFrames Maybe SettingsList
forall a. Maybe a
Nothing [Scheme
bs]
        putR :: Request -> IO (Int, Stream)
putR Request
req = do
            (Stream
strm, Maybe OutObj
moutobj) <- Context
-> Scheme -> Authority -> Request -> IO (Stream, Maybe OutObj)
makeStream Context
ctx Scheme
scheme Authority
authority Request
req
            case Maybe OutObj
moutobj of
                Maybe OutObj
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                Just OutObj
outobj -> Config -> Context -> Stream -> OutObj -> Bool -> IO ()
sendRequest Config
conf Context
ctx Stream
strm OutObj
outobj Bool
True
            (Int, Stream) -> IO (Int, Stream)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> Int
streamNumber Stream
strm, Stream
strm)
        get :: Stream -> IO Response
get = Stream -> IO Response
getResponse
        create :: IO (Int, Stream)
create = Context -> IO (Int, Stream)
openOddStreamWait Context
ctx
    IO a
runClient <- do
        IO a
act <- ClientIO -> IO (IO a)
action (ClientIO -> IO (IO a)) -> ClientIO -> IO (IO a)
forall a b. (a -> b) -> a -> b
$ SockAddr
-> SockAddr
-> (Request -> IO (Int, Stream))
-> (Stream -> IO Response)
-> (Scheme -> IO ())
-> IO (Int, Stream)
-> ClientIO
ClientIO SockAddr
confMySockAddr SockAddr
confPeerSockAddr Request -> IO (Int, Stream)
putR Stream -> IO Response
get Scheme -> IO ()
putB IO (Int, Stream)
create
        IO a -> IO (IO a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO a -> IO (IO a)) -> IO a -> IO (IO a)
forall a b. (a -> b) -> a -> b
$ Context -> IO a -> IO a
forall a. Context -> IO a -> IO a
wrapClient Context
ctx IO a
act
    Config -> Context -> IO a -> IO a
forall a. Config -> Context -> IO a -> IO a
runH2 Config
conf Context
ctx IO a
runClient

getResponse :: Stream -> IO Response
getResponse :: Stream -> IO Response
getResponse Stream
strm = do
    Either SomeException InpObj
mRsp <- MVar (Either SomeException InpObj)
-> IO (Either SomeException InpObj)
forall a. MVar a -> IO a
takeMVar (MVar (Either SomeException InpObj)
 -> IO (Either SomeException InpObj))
-> MVar (Either SomeException InpObj)
-> IO (Either SomeException InpObj)
forall a b. (a -> b) -> a -> b
$ Stream -> MVar (Either SomeException InpObj)
streamInput Stream
strm
    case Either SomeException InpObj
mRsp of
        Left SomeException
err -> SomeException -> IO Response
forall e a. Exception e => e -> IO a
throwIO SomeException
err
        Right InpObj
rsp -> Response -> IO Response
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Response -> IO Response) -> Response -> IO Response
forall a b. (a -> b) -> a -> b
$ InpObj -> Response
Response InpObj
rsp

setup :: ClientConfig -> Config -> IO Context
setup :: ClientConfig -> Config -> IO Context
setup ClientConfig{Int
Authority
Scheme
Settings
scheme :: ClientConfig -> Scheme
authority :: ClientConfig -> Authority
cacheLimit :: ClientConfig -> Int
connectionWindowSize :: ClientConfig -> Int
settings :: ClientConfig -> Settings
scheme :: Scheme
authority :: Authority
cacheLimit :: Int
connectionWindowSize :: Int
settings :: Settings
..} conf :: Config
conf@Config{Int
Buffer
Manager
SockAddr
Int -> IO Scheme
PositionReadMaker
Scheme -> IO ()
confWriteBuffer :: Config -> Buffer
confBufferSize :: Config -> Int
confSendAll :: Config -> Scheme -> IO ()
confReadN :: Config -> Int -> IO Scheme
confPositionReadMaker :: Config -> PositionReadMaker
confTimeoutManager :: Config -> Manager
confMySockAddr :: Config -> SockAddr
confPeerSockAddr :: Config -> SockAddr
confWriteBuffer :: Buffer
confBufferSize :: Int
confSendAll :: Scheme -> IO ()
confReadN :: Int -> IO Scheme
confPositionReadMaker :: PositionReadMaker
confTimeoutManager :: Manager
confMySockAddr :: SockAddr
confPeerSockAddr :: SockAddr
..} = do
    let clientInfo :: RoleInfo
clientInfo = Scheme -> Authority -> RoleInfo
newClientInfo Scheme
scheme Authority
authority
    Context
ctx <-
        RoleInfo
-> Config -> Int -> Int -> Settings -> Manager -> IO Context
newContext
            RoleInfo
clientInfo
            Config
conf
            Int
cacheLimit
            Int
connectionWindowSize
            Settings
settings
            Manager
confTimeoutManager
    Context -> IO ()
exchangeSettings Context
ctx
    Context -> IO Context
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Context
ctx

runH2 :: Config -> Context -> IO a -> IO a
runH2 :: forall a. Config -> Context -> IO a -> IO a
runH2 Config
conf Context
ctx IO a
runClient = do
    ThreadManager -> IO a -> (Maybe SomeException -> IO ()) -> IO a
forall a.
ThreadManager -> IO a -> (Maybe SomeException -> IO ()) -> IO a
T.stopAfter ThreadManager
mgr IO a
runAll ((Maybe SomeException -> IO ()) -> IO a)
-> (Maybe SomeException -> IO ()) -> IO a
forall a b. (a -> b) -> a -> b
$ \Maybe SomeException
res ->
        TVar OddStreamTable
-> TVar EvenStreamTable -> Maybe SomeException -> IO ()
closeAllStreams (Context -> TVar OddStreamTable
oddStreamTable Context
ctx) (Context -> TVar EvenStreamTable
evenStreamTable Context
ctx) Maybe SomeException
res
  where
    mgr :: ThreadManager
mgr = Context -> ThreadManager
threadManager Context
ctx
    runReceiver :: IO ()
runReceiver = Context -> Config -> IO ()
frameReceiver Context
ctx Config
conf
    runSender :: IO ()
runSender = Context -> Config -> IO ()
frameSender Context
ctx Config
conf
    runBackgroundThreads :: IO ()
runBackgroundThreads = do
        Authority -> IO ()
labelMe Authority
"H2 runBackgroundThreads"
        IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
concurrently_ IO ()
runReceiver IO ()
runSender

    -- Run the background threads and client concurrently. If the client
    -- finishes first, cancel the background threads. If the background
    -- threads finish first, wait for the client.
    runAll :: IO a
runAll = do
        IO () -> (Async () -> IO a) -> IO a
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync IO ()
runBackgroundThreads ((Async () -> IO a) -> IO a) -> (Async () -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Async ()
runningBackgroundThreads ->
            IO a -> (Async a -> IO a) -> IO a
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync IO a
runClient ((Async a -> IO a) -> IO a) -> (Async a -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Async a
runningClient -> do
                Either () a
result <- Async () -> Async a -> IO (Either () a)
forall a b. Async a -> Async b -> IO (Either a b)
waitEither Async ()
runningBackgroundThreads Async a
runningClient
                case Either () a
result of
                    Right a
clientResult -> do
                        Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
runningBackgroundThreads
                        a -> IO a
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
clientResult
                    Left () -> do
                        Async a -> IO a
forall a. Async a -> IO a
wait Async a
runningClient

makeStream
    :: Context
    -> Scheme
    -> Authority
    -> Request
    -> IO (Stream, Maybe OutObj)
makeStream :: Context
-> Scheme -> Authority -> Request -> IO (Stream, Maybe OutObj)
makeStream ctx :: Context
ctx@Context{TVar Bool
TVar Int
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef Int
IORef (Maybe Int)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue Output
ThreadManager
DynamicTable
Settings
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
threadManager :: Context -> ThreadManager
controlQ :: Context -> TQueue Control
senderDone :: Context -> TVar Bool
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe Int)
myStreamId :: Context -> TVar Int
peerStreamId :: Context -> IORef Int
outputBufferLimit :: Context -> IORef Int
outputQ :: Context -> TQueue Output
outputQStreamID :: Context -> TVar Int
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe Int)
myStreamId :: TVar Int
peerStreamId :: IORef Int
outputBufferLimit :: IORef Int
outputQ :: TQueue Output
outputQStreamID :: TVar Int
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
senderDone :: TVar Bool
..} Scheme
scheme Authority
auth (Request OutObj
req) = do
    -- Checking push promises
    let hdr0 :: [Header]
hdr0 = OutObj -> [Header]
outObjHeaders OutObj
req
        method :: Scheme
method = Scheme -> Maybe Scheme -> Scheme
forall a. a -> Maybe a -> a
fromMaybe (Authority -> Scheme
forall a. HasCallStack => Authority -> a
error Authority
"makeStream:method") (Maybe Scheme -> Scheme) -> Maybe Scheme -> Scheme
forall a b. (a -> b) -> a -> b
$ HeaderName -> [Header] -> Maybe Scheme
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup HeaderName
":method" [Header]
hdr0
        path :: Scheme
path = Scheme -> Maybe Scheme -> Scheme
forall a. a -> Maybe a -> a
fromMaybe (Authority -> Scheme
forall a. HasCallStack => Authority -> a
error Authority
"makeStream:path") (Maybe Scheme -> Scheme) -> Maybe Scheme -> Scheme
forall a b. (a -> b) -> a -> b
$ HeaderName -> [Header] -> Maybe Scheme
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup HeaderName
":path" [Header]
hdr0
    Maybe Stream
mstrm0 <- TVar EvenStreamTable -> Scheme -> Scheme -> IO (Maybe Stream)
lookupEvenCache TVar EvenStreamTable
evenStreamTable Scheme
method Scheme
path
    case Maybe Stream
mstrm0 of
        Just Stream
strm0 -> do
            TVar EvenStreamTable -> Scheme -> Scheme -> IO ()
deleteEvenCache TVar EvenStreamTable
evenStreamTable Scheme
method Scheme
path
            (Stream, Maybe OutObj) -> IO (Stream, Maybe OutObj)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream
strm0, Maybe OutObj
forall a. Maybe a
Nothing)
        Maybe Stream
Nothing -> do
            -- Arch/Sender is originally implemented for servers where
            -- the ordering of responses can be out-of-order.
            -- But for clients, the ordering must be maintained.
            -- To implement this, 'outputQStreamID' is used.
            let isIPv6 :: Bool
isIPv6 = Maybe IPv6 -> Bool
forall a. Maybe a -> Bool
isJust (Authority -> Maybe IPv6
forall a. Read a => Authority -> Maybe a
readMaybe Authority
auth :: Maybe IPv6)
                auth' :: Scheme
auth'
                    | Bool
isIPv6 = Scheme
"[" Scheme -> Scheme -> Scheme
forall a. Semigroup a => a -> a -> a
<> Authority -> Scheme
UTF8.fromString Authority
auth Scheme -> Scheme -> Scheme
forall a. Semigroup a => a -> a -> a
<> Scheme
"]"
                    | Bool
otherwise = Authority -> Scheme
UTF8.fromString Authority
auth
            let hdr1, hdr2 :: [Header]
                hdr1 :: [Header]
hdr1
                    | Scheme
scheme Scheme -> Scheme -> Bool
forall a. Eq a => a -> a -> Bool
/= Scheme
"" = (HeaderName
":scheme", Scheme
scheme) Header -> [Header] -> [Header]
forall a. a -> [a] -> [a]
: [Header]
hdr0
                    | Bool
otherwise = [Header]
hdr0
                hdr2 :: [Header]
hdr2
                    | Authority
auth Authority -> Authority -> Bool
forall a. Eq a => a -> a -> Bool
/= Authority
"" = (HeaderName
":authority", Scheme
auth') Header -> [Header] -> [Header]
forall a. a -> [a] -> [a]
: [Header]
hdr1
                    | Bool
otherwise = [Header]
hdr1
                req' :: OutObj
req' = OutObj
req{outObjHeaders = hdr2}
            -- FLOW CONTROL: SETTINGS_MAX_CONCURRENT_STREAMS: send: respecting peer's limit
            (Int
_sid, Stream
newstrm) <- Context -> IO (Int, Stream)
openOddStreamWait Context
ctx
            (Stream, Maybe OutObj) -> IO (Stream, Maybe OutObj)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream
newstrm, OutObj -> Maybe OutObj
forall a. a -> Maybe a
Just OutObj
req')

sendRequest :: Config -> Context -> Stream -> OutObj -> Bool -> IO ()
sendRequest :: Config -> Context -> Stream -> OutObj -> Bool -> IO ()
sendRequest Config{Int
Buffer
Manager
SockAddr
Int -> IO Scheme
PositionReadMaker
Scheme -> IO ()
confWriteBuffer :: Config -> Buffer
confBufferSize :: Config -> Int
confSendAll :: Config -> Scheme -> IO ()
confReadN :: Config -> Int -> IO Scheme
confPositionReadMaker :: Config -> PositionReadMaker
confTimeoutManager :: Config -> Manager
confMySockAddr :: Config -> SockAddr
confPeerSockAddr :: Config -> SockAddr
confWriteBuffer :: Buffer
confBufferSize :: Int
confSendAll :: Scheme -> IO ()
confReadN :: Int -> IO Scheme
confPositionReadMaker :: PositionReadMaker
confTimeoutManager :: Manager
confMySockAddr :: SockAddr
confPeerSockAddr :: SockAddr
..} ctx :: Context
ctx@Context{TVar Bool
TVar Int
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef Int
IORef (Maybe Int)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue Output
ThreadManager
DynamicTable
Settings
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
threadManager :: Context -> ThreadManager
controlQ :: Context -> TQueue Control
senderDone :: Context -> TVar Bool
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe Int)
myStreamId :: Context -> TVar Int
peerStreamId :: Context -> IORef Int
outputBufferLimit :: Context -> IORef Int
outputQ :: Context -> TQueue Output
outputQStreamID :: Context -> TVar Int
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe Int)
myStreamId :: TVar Int
peerStreamId :: IORef Int
outputBufferLimit :: IORef Int
outputQ :: TQueue Output
outputQStreamID :: TVar Int
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
senderDone :: TVar Bool
..} Stream
strm OutObj{[Header]
OutBody
TrailersMaker
outObjHeaders :: OutObj -> [Header]
outObjHeaders :: [Header]
outObjBody :: OutBody
outObjTrailers :: TrailersMaker
outObjBody :: OutObj -> OutBody
outObjTrailers :: OutObj -> TrailersMaker
..} Bool
io = do
    let sid :: Int
sid = Stream -> Int
streamNumber Stream
strm
    (Maybe DynaNext
mnext, Maybe (TBQueue StreamingChunk)
mtbq) <- case OutBody
outObjBody of
        OutBody
OutBodyNone -> (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
-> IO (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe DynaNext
forall a. Maybe a
Nothing, Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing)
        OutBodyFile (FileSpec Authority
path FileOffset
fileoff FileOffset
bytecount) -> do
            (PositionRead
pread, Sentinel
sentinel) <- PositionReadMaker
confPositionReadMaker Authority
path
            let next :: DynaNext
next = PositionRead -> FileOffset -> FileOffset -> Sentinel -> DynaNext
fillFileBodyGetNext PositionRead
pread FileOffset
fileoff FileOffset
bytecount Sentinel
sentinel
            (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
-> IO (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DynaNext -> Maybe DynaNext
forall a. a -> Maybe a
Just DynaNext
next, Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing)
        OutBodyBuilder Builder
builder -> do
            let next :: DynaNext
next = Builder -> DynaNext
fillBuilderBodyGetNext Builder
builder
            (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
-> IO (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DynaNext -> Maybe DynaNext
forall a. a -> Maybe a
Just DynaNext
next, Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing)
        OutBodyStreaming (Builder -> IO ()) -> IO () -> IO ()
strmbdy -> do
            TBQueue StreamingChunk
q <- Context
-> Stream -> (OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk)
sendStreaming Context
ctx Stream
strm ((OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk))
-> (OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk)
forall a b. (a -> b) -> a -> b
$ \OutBodyIface
iface ->
                OutBodyIface -> forall x. IO x -> IO x
outBodyUnmask OutBodyIface
iface (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ (Builder -> IO ()) -> IO () -> IO ()
strmbdy (OutBodyIface -> Builder -> IO ()
outBodyPush OutBodyIface
iface) (OutBodyIface -> IO ()
outBodyFlush OutBodyIface
iface)
            let next :: DynaNext
next = TBQueue StreamingChunk -> DynaNext
nextForStreaming TBQueue StreamingChunk
q
            (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
-> IO (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DynaNext -> Maybe DynaNext
forall a. a -> Maybe a
Just DynaNext
next, TBQueue StreamingChunk -> Maybe (TBQueue StreamingChunk)
forall a. a -> Maybe a
Just TBQueue StreamingChunk
q)
        OutBodyStreamingIface OutBodyIface -> IO ()
strmbdy -> do
            TBQueue StreamingChunk
q <- Context
-> Stream -> (OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk)
sendStreaming Context
ctx Stream
strm OutBodyIface -> IO ()
strmbdy
            let next :: DynaNext
next = TBQueue StreamingChunk -> DynaNext
nextForStreaming TBQueue StreamingChunk
q
            (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
-> IO (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DynaNext -> Maybe DynaNext
forall a. a -> Maybe a
Just DynaNext
next, TBQueue StreamingChunk -> Maybe (TBQueue StreamingChunk)
forall a. a -> Maybe a
Just TBQueue StreamingChunk
q)
    let ot :: OutputType
ot = [Header] -> Maybe DynaNext -> TrailersMaker -> OutputType
OHeader [Header]
outObjHeaders Maybe DynaNext
mnext TrailersMaker
outObjTrailers
    if Bool
io
        then do
            let out :: Output
out = Context -> Stream -> OutputType -> Output
makeOutputIO Context
ctx Stream
strm OutputType
ot
            Int -> Output -> IO ()
pushOutput Int
sid Output
out
        else do
            (IO Sync
pop, Output
out) <- Stream -> OutputType -> IO (IO Sync, Output)
makeOutput Stream
strm OutputType
ot
            Int -> Output -> IO ()
pushOutput Int
sid Output
out
            LoopCheck
lc <- Stream -> Maybe (TBQueue StreamingChunk) -> IO LoopCheck
newLoopCheck Stream
strm Maybe (TBQueue StreamingChunk)
mtbq
            ThreadManager -> Authority -> IO () -> IO ()
T.forkManaged ThreadManager
threadManager Authority
label (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Context -> IO Sync -> LoopCheck -> IO ()
syncWithSender' Context
ctx IO Sync
pop LoopCheck
lc
  where
    label :: Authority
label = Authority
"H2 request sender for stream " Authority -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> Authority
forall a. Show a => a -> Authority
show (Stream -> Int
streamNumber Stream
strm)
    pushOutput :: Int -> Output -> IO ()
pushOutput Int
sid Output
out = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        Int
sidOK <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
outputQStreamID
        Bool -> STM ()
check (Int
sidOK Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
sid)
        TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
outputQStreamID (Int
sid Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
2)
        TQueue Output -> Output -> STM ()
enqueueOutputSTM TQueue Output
outputQ Output
out

sendStreaming
    :: Context
    -> Stream
    -> (OutBodyIface -> IO ())
    -> IO (TBQueue StreamingChunk)
sendStreaming :: Context
-> Stream -> (OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk)
sendStreaming Context{TVar Bool
TVar Int
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef Int
IORef (Maybe Int)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue Output
ThreadManager
DynamicTable
Settings
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
threadManager :: Context -> ThreadManager
controlQ :: Context -> TQueue Control
senderDone :: Context -> TVar Bool
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe Int)
myStreamId :: Context -> TVar Int
peerStreamId :: Context -> IORef Int
outputBufferLimit :: Context -> IORef Int
outputQ :: Context -> TQueue Output
outputQStreamID :: Context -> TVar Int
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe Int)
myStreamId :: TVar Int
peerStreamId :: IORef Int
outputBufferLimit :: IORef Int
outputQ :: TQueue Output
outputQStreamID :: TVar Int
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
senderDone :: TVar Bool
..} Stream
strm OutBodyIface -> IO ()
strmbdy = do
    TBQueue StreamingChunk
tbq <- Natural -> IO (TBQueue StreamingChunk)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
10 -- fixme: hard coding: 10
    ThreadManager
-> Authority -> ((forall x. IO x -> IO x) -> IO ()) -> IO ()
T.forkManagedUnmask ThreadManager
threadManager Authority
label (((forall x. IO x -> IO x) -> IO ()) -> IO ())
-> ((forall x. IO x -> IO x) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \forall x. IO x -> IO x
unmask ->
        TBQueue StreamingChunk
-> (forall x. IO x -> IO x) -> (OutBodyIface -> IO ()) -> IO ()
forall r.
TBQueue StreamingChunk
-> (forall x. IO x -> IO x) -> (OutBodyIface -> IO r) -> IO r
withOutBodyIface TBQueue StreamingChunk
tbq IO a -> IO a
forall x. IO x -> IO x
unmask OutBodyIface -> IO ()
strmbdy
    TBQueue StreamingChunk -> IO (TBQueue StreamingChunk)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return TBQueue StreamingChunk
tbq
  where
    label :: Authority
label = Authority
"H2 request streaming sender for stream " Authority -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> Authority
forall a. Show a => a -> Authority
show (Stream -> Int
streamNumber Stream
strm)

exchangeSettings :: Context -> IO ()
exchangeSettings :: Context -> IO ()
exchangeSettings Context{TVar Bool
TVar Int
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef Int
IORef (Maybe Int)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue Output
ThreadManager
DynamicTable
Settings
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
threadManager :: Context -> ThreadManager
controlQ :: Context -> TQueue Control
senderDone :: Context -> TVar Bool
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe Int)
myStreamId :: Context -> TVar Int
peerStreamId :: Context -> IORef Int
outputBufferLimit :: Context -> IORef Int
outputQ :: Context -> TQueue Output
outputQStreamID :: Context -> TVar Int
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe Int)
myStreamId :: TVar Int
peerStreamId :: IORef Int
outputBufferLimit :: IORef Int
outputQ :: TQueue Output
outputQStreamID :: TVar Int
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
senderDone :: TVar Bool
..} = do
    Int
connRxWS <- RxFlow -> Int
rxfBufSize (RxFlow -> Int) -> IO RxFlow -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef RxFlow -> IO RxFlow
forall a. IORef a -> IO a
readIORef IORef RxFlow
rxFlow
    let frames :: [Scheme]
frames = Settings -> Int -> [Scheme]
makeNegotiationFrames Settings
mySettings Int
connRxWS
        setframe :: Control
setframe = Maybe SettingsList -> [Scheme] -> Control
CFrames Maybe SettingsList
forall a. Maybe a
Nothing (Scheme
connectionPreface Scheme -> [Scheme] -> [Scheme]
forall a. a -> [a] -> [a]
: [Scheme]
frames)
    IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
myFirstSettings Bool
True
    TQueue Control -> Control -> IO ()
enqueueControl TQueue Control
controlQ Control
setframe

data ClientIO = ClientIO
    { ClientIO -> SockAddr
cioMySockAddr :: SockAddr
    , ClientIO -> SockAddr
cioPeerSockAddr :: SockAddr
    , ClientIO -> Request -> IO (Int, Stream)
cioWriteRequest :: Request -> IO (StreamId, Stream)
    , ClientIO -> Stream -> IO Response
cioReadResponse :: Stream -> IO Response
    , ClientIO -> Scheme -> IO ()
cioWriteBytes :: ByteString -> IO ()
    , ClientIO -> IO (Int, Stream)
cioCreateStream :: IO (StreamId, Stream)
    }