{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
module Network.HTTP2.Client.Run where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception
import qualified Data.ByteString.UTF8 as UTF8
import Data.IORef
import Data.IP (IPv6)
import Network.Control (RxFlow (..), defaultMaxData)
import Network.HTTP.Semantics.Client
import Network.HTTP.Semantics.Client.Internal
import Network.HTTP.Semantics.IO
import Network.Socket (SockAddr)
import qualified System.ThreadManager as T
import Text.Read (readMaybe)
import Imports
import Network.HTTP2.Frame
import Network.HTTP2.H2
data ClientConfig = ClientConfig
{ ClientConfig -> Scheme
scheme :: Scheme
, ClientConfig -> Authority
authority :: Authority
, ClientConfig -> Int
cacheLimit :: Int
, ClientConfig -> Int
connectionWindowSize :: WindowSize
, ClientConfig -> Settings
settings :: Settings
}
deriving (ClientConfig -> ClientConfig -> Bool
(ClientConfig -> ClientConfig -> Bool)
-> (ClientConfig -> ClientConfig -> Bool) -> Eq ClientConfig
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ClientConfig -> ClientConfig -> Bool
== :: ClientConfig -> ClientConfig -> Bool
$c/= :: ClientConfig -> ClientConfig -> Bool
/= :: ClientConfig -> ClientConfig -> Bool
Eq, Int -> ClientConfig -> ShowS
[ClientConfig] -> ShowS
ClientConfig -> Authority
(Int -> ClientConfig -> ShowS)
-> (ClientConfig -> Authority)
-> ([ClientConfig] -> ShowS)
-> Show ClientConfig
forall a.
(Int -> a -> ShowS) -> (a -> Authority) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ClientConfig -> ShowS
showsPrec :: Int -> ClientConfig -> ShowS
$cshow :: ClientConfig -> Authority
show :: ClientConfig -> Authority
$cshowList :: [ClientConfig] -> ShowS
showList :: [ClientConfig] -> ShowS
Show)
defaultClientConfig :: ClientConfig
defaultClientConfig :: ClientConfig
defaultClientConfig =
ClientConfig
{ scheme :: Scheme
scheme = Scheme
"http"
, authority :: Authority
authority = Authority
"localhost"
, cacheLimit :: Int
cacheLimit = Int
64
, connectionWindowSize :: Int
connectionWindowSize = Int
defaultMaxData
, settings :: Settings
settings = Settings
defaultSettings
}
run :: ClientConfig -> Config -> Client a -> IO a
run :: forall a. ClientConfig -> Config -> Client a -> IO a
run cconf :: ClientConfig
cconf@ClientConfig{Int
Authority
Scheme
Settings
scheme :: ClientConfig -> Scheme
authority :: ClientConfig -> Authority
cacheLimit :: ClientConfig -> Int
connectionWindowSize :: ClientConfig -> Int
settings :: ClientConfig -> Settings
scheme :: Scheme
authority :: Authority
cacheLimit :: Int
connectionWindowSize :: Int
settings :: Settings
..} Config
conf Client a
client = do
Context
ctx <- ClientConfig -> Config -> IO Context
setup ClientConfig
cconf Config
conf
Config -> Context -> IO a -> IO a
forall a. Config -> Context -> IO a -> IO a
runH2 Config
conf Context
ctx (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ Context -> IO a
runClient Context
ctx
where
serverMaxStreams :: Context -> IO Int
serverMaxStreams Context
ctx = do
Maybe Int
mx <- Settings -> Maybe Int
maxConcurrentStreams (Settings -> Maybe Int) -> IO Settings -> IO (Maybe Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef Settings -> IO Settings
forall a. IORef a -> IO a
readIORef (Context -> IORef Settings
peerSettings Context
ctx)
case Maybe Int
mx of
Maybe Int
Nothing -> Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
forall a. Bounded a => a
maxBound
Just Int
x -> Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
x
possibleClientStream :: Context -> IO Int
possibleClientStream Context
ctx = do
Int
x <- Context -> IO Int
serverMaxStreams Context
ctx
Int
n <- OddStreamTable -> Int
oddConc (OddStreamTable -> Int) -> IO OddStreamTable -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar OddStreamTable -> IO OddStreamTable
forall a. TVar a -> IO a
readTVarIO (Context -> TVar OddStreamTable
oddStreamTable Context
ctx)
Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
n)
aux :: Context -> Aux
aux Context
ctx =
Aux
{ auxPossibleClientStreams :: IO Int
auxPossibleClientStreams = Context -> IO Int
possibleClientStream Context
ctx
}
clientCore :: Context -> Request -> (Response -> IO b) -> IO b
clientCore Context
ctx Request
req Response -> IO b
processResponse = do
(Stream
strm, Maybe OutObj
moutobj) <- Context
-> Scheme -> Authority -> Request -> IO (Stream, Maybe OutObj)
makeStream Context
ctx Scheme
scheme Authority
authority Request
req
case Maybe OutObj
moutobj of
Maybe OutObj
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just OutObj
outobj -> Config -> Context -> Stream -> OutObj -> Bool -> IO ()
sendRequest Config
conf Context
ctx Stream
strm OutObj
outobj Bool
False
Response
rsp <- Stream -> IO Response
getResponse Stream
strm
b
x <- Response -> IO b
processResponse Response
rsp
Context -> Stream -> IO ()
adjustRxWindow Context
ctx Stream
strm
b -> IO b
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return b
x
runClient :: Context -> IO a
runClient Context
ctx = Context -> IO a -> IO a
forall a. Context -> IO a -> IO a
wrapClient Context
ctx (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ Client a
client (Context -> Request -> (Response -> IO r) -> IO r
forall {b}. Context -> Request -> (Response -> IO b) -> IO b
clientCore Context
ctx) (Aux -> IO a) -> Aux -> IO a
forall a b. (a -> b) -> a -> b
$ Context -> Aux
aux Context
ctx
wrapClient :: Context -> IO a -> IO a
wrapClient :: forall a. Context -> IO a -> IO a
wrapClient Context
ctx IO a
client = do
a
x <- IO a
client
ThreadManager -> IO ()
T.waitUntilAllGone (ThreadManager -> IO ()) -> ThreadManager -> IO ()
forall a b. (a -> b) -> a -> b
$ Context -> ThreadManager
threadManager Context
ctx
let frame :: Scheme
frame = Int -> ErrorCode -> Scheme -> Scheme
goawayFrame Int
0 ErrorCode
NoError Scheme
"graceful closing"
TQueue Control -> Control -> IO ()
enqueueControl (Context -> TQueue Control
controlQ Context
ctx) (Control -> IO ()) -> Control -> IO ()
forall a b. (a -> b) -> a -> b
$ Maybe SettingsList -> [Scheme] -> Control
CFrames Maybe SettingsList
forall a. Maybe a
Nothing [Scheme
frame]
TQueue Control -> Control -> IO ()
enqueueControl (Context -> TQueue Control
controlQ Context
ctx) (Control -> IO ()) -> Control -> IO ()
forall a b. (a -> b) -> a -> b
$ HTTP2Error -> Control
CFinish HTTP2Error
GoAwayIsSent
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Bool
done <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar (TVar Bool -> STM Bool) -> TVar Bool -> STM Bool
forall a b. (a -> b) -> a -> b
$ Context -> TVar Bool
senderDone Context
ctx
Bool -> STM ()
check Bool
done
a -> IO a
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
runIO :: ClientConfig -> Config -> (ClientIO -> IO (IO a)) -> IO a
runIO :: forall a. ClientConfig -> Config -> (ClientIO -> IO (IO a)) -> IO a
runIO cconf :: ClientConfig
cconf@ClientConfig{Int
Authority
Scheme
Settings
scheme :: ClientConfig -> Scheme
authority :: ClientConfig -> Authority
cacheLimit :: ClientConfig -> Int
connectionWindowSize :: ClientConfig -> Int
settings :: ClientConfig -> Settings
scheme :: Scheme
authority :: Authority
cacheLimit :: Int
connectionWindowSize :: Int
settings :: Settings
..} conf :: Config
conf@Config{Int
Buffer
Manager
SockAddr
Int -> IO Scheme
PositionReadMaker
Scheme -> IO ()
confWriteBuffer :: Buffer
confBufferSize :: Int
confSendAll :: Scheme -> IO ()
confReadN :: Int -> IO Scheme
confPositionReadMaker :: PositionReadMaker
confTimeoutManager :: Manager
confMySockAddr :: SockAddr
confPeerSockAddr :: SockAddr
confWriteBuffer :: Config -> Buffer
confBufferSize :: Config -> Int
confSendAll :: Config -> Scheme -> IO ()
confReadN :: Config -> Int -> IO Scheme
confPositionReadMaker :: Config -> PositionReadMaker
confTimeoutManager :: Config -> Manager
confMySockAddr :: Config -> SockAddr
confPeerSockAddr :: Config -> SockAddr
..} ClientIO -> IO (IO a)
action = do
ctx :: Context
ctx@Context{TVar Bool
TVar Int
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef Int
IORef (Maybe Int)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue Output
ThreadManager
DynamicTable
Settings
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
threadManager :: Context -> ThreadManager
controlQ :: Context -> TQueue Control
senderDone :: Context -> TVar Bool
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe Int)
myStreamId :: TVar Int
peerStreamId :: IORef Int
outputBufferLimit :: IORef Int
outputQ :: TQueue Output
outputQStreamID :: TVar Int
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
senderDone :: TVar Bool
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe Int)
myStreamId :: Context -> TVar Int
peerStreamId :: Context -> IORef Int
outputBufferLimit :: Context -> IORef Int
outputQ :: Context -> TQueue Output
outputQStreamID :: Context -> TVar Int
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
..} <- ClientConfig -> Config -> IO Context
setup ClientConfig
cconf Config
conf
let putB :: Scheme -> IO ()
putB Scheme
bs = TQueue Control -> Control -> IO ()
enqueueControl TQueue Control
controlQ (Control -> IO ()) -> Control -> IO ()
forall a b. (a -> b) -> a -> b
$ Maybe SettingsList -> [Scheme] -> Control
CFrames Maybe SettingsList
forall a. Maybe a
Nothing [Scheme
bs]
putR :: Request -> IO (Int, Stream)
putR Request
req = do
(Stream
strm, Maybe OutObj
moutobj) <- Context
-> Scheme -> Authority -> Request -> IO (Stream, Maybe OutObj)
makeStream Context
ctx Scheme
scheme Authority
authority Request
req
case Maybe OutObj
moutobj of
Maybe OutObj
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just OutObj
outobj -> Config -> Context -> Stream -> OutObj -> Bool -> IO ()
sendRequest Config
conf Context
ctx Stream
strm OutObj
outobj Bool
True
(Int, Stream) -> IO (Int, Stream)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> Int
streamNumber Stream
strm, Stream
strm)
get :: Stream -> IO Response
get = Stream -> IO Response
getResponse
create :: IO (Int, Stream)
create = Context -> IO (Int, Stream)
openOddStreamWait Context
ctx
IO a
runClient <- do
IO a
act <- ClientIO -> IO (IO a)
action (ClientIO -> IO (IO a)) -> ClientIO -> IO (IO a)
forall a b. (a -> b) -> a -> b
$ SockAddr
-> SockAddr
-> (Request -> IO (Int, Stream))
-> (Stream -> IO Response)
-> (Scheme -> IO ())
-> IO (Int, Stream)
-> ClientIO
ClientIO SockAddr
confMySockAddr SockAddr
confPeerSockAddr Request -> IO (Int, Stream)
putR Stream -> IO Response
get Scheme -> IO ()
putB IO (Int, Stream)
create
IO a -> IO (IO a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO a -> IO (IO a)) -> IO a -> IO (IO a)
forall a b. (a -> b) -> a -> b
$ Context -> IO a -> IO a
forall a. Context -> IO a -> IO a
wrapClient Context
ctx IO a
act
Config -> Context -> IO a -> IO a
forall a. Config -> Context -> IO a -> IO a
runH2 Config
conf Context
ctx IO a
runClient
getResponse :: Stream -> IO Response
getResponse :: Stream -> IO Response
getResponse Stream
strm = do
Either SomeException InpObj
mRsp <- MVar (Either SomeException InpObj)
-> IO (Either SomeException InpObj)
forall a. MVar a -> IO a
takeMVar (MVar (Either SomeException InpObj)
-> IO (Either SomeException InpObj))
-> MVar (Either SomeException InpObj)
-> IO (Either SomeException InpObj)
forall a b. (a -> b) -> a -> b
$ Stream -> MVar (Either SomeException InpObj)
streamInput Stream
strm
case Either SomeException InpObj
mRsp of
Left SomeException
err -> SomeException -> IO Response
forall e a. Exception e => e -> IO a
throwIO SomeException
err
Right InpObj
rsp -> Response -> IO Response
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Response -> IO Response) -> Response -> IO Response
forall a b. (a -> b) -> a -> b
$ InpObj -> Response
Response InpObj
rsp
setup :: ClientConfig -> Config -> IO Context
setup :: ClientConfig -> Config -> IO Context
setup ClientConfig{Int
Authority
Scheme
Settings
scheme :: ClientConfig -> Scheme
authority :: ClientConfig -> Authority
cacheLimit :: ClientConfig -> Int
connectionWindowSize :: ClientConfig -> Int
settings :: ClientConfig -> Settings
scheme :: Scheme
authority :: Authority
cacheLimit :: Int
connectionWindowSize :: Int
settings :: Settings
..} conf :: Config
conf@Config{Int
Buffer
Manager
SockAddr
Int -> IO Scheme
PositionReadMaker
Scheme -> IO ()
confWriteBuffer :: Config -> Buffer
confBufferSize :: Config -> Int
confSendAll :: Config -> Scheme -> IO ()
confReadN :: Config -> Int -> IO Scheme
confPositionReadMaker :: Config -> PositionReadMaker
confTimeoutManager :: Config -> Manager
confMySockAddr :: Config -> SockAddr
confPeerSockAddr :: Config -> SockAddr
confWriteBuffer :: Buffer
confBufferSize :: Int
confSendAll :: Scheme -> IO ()
confReadN :: Int -> IO Scheme
confPositionReadMaker :: PositionReadMaker
confTimeoutManager :: Manager
confMySockAddr :: SockAddr
confPeerSockAddr :: SockAddr
..} = do
let clientInfo :: RoleInfo
clientInfo = Scheme -> Authority -> RoleInfo
newClientInfo Scheme
scheme Authority
authority
Context
ctx <-
RoleInfo
-> Config -> Int -> Int -> Settings -> Manager -> IO Context
newContext
RoleInfo
clientInfo
Config
conf
Int
cacheLimit
Int
connectionWindowSize
Settings
settings
Manager
confTimeoutManager
Context -> IO ()
exchangeSettings Context
ctx
Context -> IO Context
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Context
ctx
runH2 :: Config -> Context -> IO a -> IO a
runH2 :: forall a. Config -> Context -> IO a -> IO a
runH2 Config
conf Context
ctx IO a
runClient = do
ThreadManager -> IO a -> (Maybe SomeException -> IO ()) -> IO a
forall a.
ThreadManager -> IO a -> (Maybe SomeException -> IO ()) -> IO a
T.stopAfter ThreadManager
mgr IO a
runAll ((Maybe SomeException -> IO ()) -> IO a)
-> (Maybe SomeException -> IO ()) -> IO a
forall a b. (a -> b) -> a -> b
$ \Maybe SomeException
res ->
TVar OddStreamTable
-> TVar EvenStreamTable -> Maybe SomeException -> IO ()
closeAllStreams (Context -> TVar OddStreamTable
oddStreamTable Context
ctx) (Context -> TVar EvenStreamTable
evenStreamTable Context
ctx) Maybe SomeException
res
where
mgr :: ThreadManager
mgr = Context -> ThreadManager
threadManager Context
ctx
runReceiver :: IO ()
runReceiver = Context -> Config -> IO ()
frameReceiver Context
ctx Config
conf
runSender :: IO ()
runSender = Context -> Config -> IO ()
frameSender Context
ctx Config
conf
runBackgroundThreads :: IO ()
runBackgroundThreads = do
Authority -> IO ()
labelMe Authority
"H2 runBackgroundThreads"
IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
concurrently_ IO ()
runReceiver IO ()
runSender
runAll :: IO a
runAll = do
IO () -> (Async () -> IO a) -> IO a
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync IO ()
runBackgroundThreads ((Async () -> IO a) -> IO a) -> (Async () -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Async ()
runningBackgroundThreads ->
IO a -> (Async a -> IO a) -> IO a
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync IO a
runClient ((Async a -> IO a) -> IO a) -> (Async a -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Async a
runningClient -> do
Either () a
result <- Async () -> Async a -> IO (Either () a)
forall a b. Async a -> Async b -> IO (Either a b)
waitEither Async ()
runningBackgroundThreads Async a
runningClient
case Either () a
result of
Right a
clientResult -> do
Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
runningBackgroundThreads
a -> IO a
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
clientResult
Left () -> do
Async a -> IO a
forall a. Async a -> IO a
wait Async a
runningClient
makeStream
:: Context
-> Scheme
-> Authority
-> Request
-> IO (Stream, Maybe OutObj)
makeStream :: Context
-> Scheme -> Authority -> Request -> IO (Stream, Maybe OutObj)
makeStream ctx :: Context
ctx@Context{TVar Bool
TVar Int
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef Int
IORef (Maybe Int)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue Output
ThreadManager
DynamicTable
Settings
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
threadManager :: Context -> ThreadManager
controlQ :: Context -> TQueue Control
senderDone :: Context -> TVar Bool
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe Int)
myStreamId :: Context -> TVar Int
peerStreamId :: Context -> IORef Int
outputBufferLimit :: Context -> IORef Int
outputQ :: Context -> TQueue Output
outputQStreamID :: Context -> TVar Int
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe Int)
myStreamId :: TVar Int
peerStreamId :: IORef Int
outputBufferLimit :: IORef Int
outputQ :: TQueue Output
outputQStreamID :: TVar Int
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
senderDone :: TVar Bool
..} Scheme
scheme Authority
auth (Request OutObj
req) = do
let hdr0 :: [Header]
hdr0 = OutObj -> [Header]
outObjHeaders OutObj
req
method :: Scheme
method = Scheme -> Maybe Scheme -> Scheme
forall a. a -> Maybe a -> a
fromMaybe (Authority -> Scheme
forall a. HasCallStack => Authority -> a
error Authority
"makeStream:method") (Maybe Scheme -> Scheme) -> Maybe Scheme -> Scheme
forall a b. (a -> b) -> a -> b
$ HeaderName -> [Header] -> Maybe Scheme
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup HeaderName
":method" [Header]
hdr0
path :: Scheme
path = Scheme -> Maybe Scheme -> Scheme
forall a. a -> Maybe a -> a
fromMaybe (Authority -> Scheme
forall a. HasCallStack => Authority -> a
error Authority
"makeStream:path") (Maybe Scheme -> Scheme) -> Maybe Scheme -> Scheme
forall a b. (a -> b) -> a -> b
$ HeaderName -> [Header] -> Maybe Scheme
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup HeaderName
":path" [Header]
hdr0
Maybe Stream
mstrm0 <- TVar EvenStreamTable -> Scheme -> Scheme -> IO (Maybe Stream)
lookupEvenCache TVar EvenStreamTable
evenStreamTable Scheme
method Scheme
path
case Maybe Stream
mstrm0 of
Just Stream
strm0 -> do
TVar EvenStreamTable -> Scheme -> Scheme -> IO ()
deleteEvenCache TVar EvenStreamTable
evenStreamTable Scheme
method Scheme
path
(Stream, Maybe OutObj) -> IO (Stream, Maybe OutObj)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream
strm0, Maybe OutObj
forall a. Maybe a
Nothing)
Maybe Stream
Nothing -> do
let isIPv6 :: Bool
isIPv6 = Maybe IPv6 -> Bool
forall a. Maybe a -> Bool
isJust (Authority -> Maybe IPv6
forall a. Read a => Authority -> Maybe a
readMaybe Authority
auth :: Maybe IPv6)
auth' :: Scheme
auth'
| Bool
isIPv6 = Scheme
"[" Scheme -> Scheme -> Scheme
forall a. Semigroup a => a -> a -> a
<> Authority -> Scheme
UTF8.fromString Authority
auth Scheme -> Scheme -> Scheme
forall a. Semigroup a => a -> a -> a
<> Scheme
"]"
| Bool
otherwise = Authority -> Scheme
UTF8.fromString Authority
auth
let hdr1, hdr2 :: [Header]
hdr1 :: [Header]
hdr1
| Scheme
scheme Scheme -> Scheme -> Bool
forall a. Eq a => a -> a -> Bool
/= Scheme
"" = (HeaderName
":scheme", Scheme
scheme) Header -> [Header] -> [Header]
forall a. a -> [a] -> [a]
: [Header]
hdr0
| Bool
otherwise = [Header]
hdr0
hdr2 :: [Header]
hdr2
| Authority
auth Authority -> Authority -> Bool
forall a. Eq a => a -> a -> Bool
/= Authority
"" = (HeaderName
":authority", Scheme
auth') Header -> [Header] -> [Header]
forall a. a -> [a] -> [a]
: [Header]
hdr1
| Bool
otherwise = [Header]
hdr1
req' :: OutObj
req' = OutObj
req{outObjHeaders = hdr2}
(Int
_sid, Stream
newstrm) <- Context -> IO (Int, Stream)
openOddStreamWait Context
ctx
(Stream, Maybe OutObj) -> IO (Stream, Maybe OutObj)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream
newstrm, OutObj -> Maybe OutObj
forall a. a -> Maybe a
Just OutObj
req')
sendRequest :: Config -> Context -> Stream -> OutObj -> Bool -> IO ()
sendRequest :: Config -> Context -> Stream -> OutObj -> Bool -> IO ()
sendRequest Config{Int
Buffer
Manager
SockAddr
Int -> IO Scheme
PositionReadMaker
Scheme -> IO ()
confWriteBuffer :: Config -> Buffer
confBufferSize :: Config -> Int
confSendAll :: Config -> Scheme -> IO ()
confReadN :: Config -> Int -> IO Scheme
confPositionReadMaker :: Config -> PositionReadMaker
confTimeoutManager :: Config -> Manager
confMySockAddr :: Config -> SockAddr
confPeerSockAddr :: Config -> SockAddr
confWriteBuffer :: Buffer
confBufferSize :: Int
confSendAll :: Scheme -> IO ()
confReadN :: Int -> IO Scheme
confPositionReadMaker :: PositionReadMaker
confTimeoutManager :: Manager
confMySockAddr :: SockAddr
confPeerSockAddr :: SockAddr
..} ctx :: Context
ctx@Context{TVar Bool
TVar Int
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef Int
IORef (Maybe Int)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue Output
ThreadManager
DynamicTable
Settings
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
threadManager :: Context -> ThreadManager
controlQ :: Context -> TQueue Control
senderDone :: Context -> TVar Bool
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe Int)
myStreamId :: Context -> TVar Int
peerStreamId :: Context -> IORef Int
outputBufferLimit :: Context -> IORef Int
outputQ :: Context -> TQueue Output
outputQStreamID :: Context -> TVar Int
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe Int)
myStreamId :: TVar Int
peerStreamId :: IORef Int
outputBufferLimit :: IORef Int
outputQ :: TQueue Output
outputQStreamID :: TVar Int
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
senderDone :: TVar Bool
..} Stream
strm OutObj{[Header]
OutBody
TrailersMaker
outObjHeaders :: OutObj -> [Header]
outObjHeaders :: [Header]
outObjBody :: OutBody
outObjTrailers :: TrailersMaker
outObjBody :: OutObj -> OutBody
outObjTrailers :: OutObj -> TrailersMaker
..} Bool
io = do
let sid :: Int
sid = Stream -> Int
streamNumber Stream
strm
(Maybe DynaNext
mnext, Maybe (TBQueue StreamingChunk)
mtbq) <- case OutBody
outObjBody of
OutBody
OutBodyNone -> (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
-> IO (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe DynaNext
forall a. Maybe a
Nothing, Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing)
OutBodyFile (FileSpec Authority
path FileOffset
fileoff FileOffset
bytecount) -> do
(PositionRead
pread, Sentinel
sentinel) <- PositionReadMaker
confPositionReadMaker Authority
path
let next :: DynaNext
next = PositionRead -> FileOffset -> FileOffset -> Sentinel -> DynaNext
fillFileBodyGetNext PositionRead
pread FileOffset
fileoff FileOffset
bytecount Sentinel
sentinel
(Maybe DynaNext, Maybe (TBQueue StreamingChunk))
-> IO (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DynaNext -> Maybe DynaNext
forall a. a -> Maybe a
Just DynaNext
next, Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing)
OutBodyBuilder Builder
builder -> do
let next :: DynaNext
next = Builder -> DynaNext
fillBuilderBodyGetNext Builder
builder
(Maybe DynaNext, Maybe (TBQueue StreamingChunk))
-> IO (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DynaNext -> Maybe DynaNext
forall a. a -> Maybe a
Just DynaNext
next, Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing)
OutBodyStreaming (Builder -> IO ()) -> IO () -> IO ()
strmbdy -> do
TBQueue StreamingChunk
q <- Context
-> Stream -> (OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk)
sendStreaming Context
ctx Stream
strm ((OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk))
-> (OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk)
forall a b. (a -> b) -> a -> b
$ \OutBodyIface
iface ->
OutBodyIface -> forall x. IO x -> IO x
outBodyUnmask OutBodyIface
iface (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ (Builder -> IO ()) -> IO () -> IO ()
strmbdy (OutBodyIface -> Builder -> IO ()
outBodyPush OutBodyIface
iface) (OutBodyIface -> IO ()
outBodyFlush OutBodyIface
iface)
let next :: DynaNext
next = TBQueue StreamingChunk -> DynaNext
nextForStreaming TBQueue StreamingChunk
q
(Maybe DynaNext, Maybe (TBQueue StreamingChunk))
-> IO (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DynaNext -> Maybe DynaNext
forall a. a -> Maybe a
Just DynaNext
next, TBQueue StreamingChunk -> Maybe (TBQueue StreamingChunk)
forall a. a -> Maybe a
Just TBQueue StreamingChunk
q)
OutBodyStreamingIface OutBodyIface -> IO ()
strmbdy -> do
TBQueue StreamingChunk
q <- Context
-> Stream -> (OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk)
sendStreaming Context
ctx Stream
strm OutBodyIface -> IO ()
strmbdy
let next :: DynaNext
next = TBQueue StreamingChunk -> DynaNext
nextForStreaming TBQueue StreamingChunk
q
(Maybe DynaNext, Maybe (TBQueue StreamingChunk))
-> IO (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DynaNext -> Maybe DynaNext
forall a. a -> Maybe a
Just DynaNext
next, TBQueue StreamingChunk -> Maybe (TBQueue StreamingChunk)
forall a. a -> Maybe a
Just TBQueue StreamingChunk
q)
let ot :: OutputType
ot = [Header] -> Maybe DynaNext -> TrailersMaker -> OutputType
OHeader [Header]
outObjHeaders Maybe DynaNext
mnext TrailersMaker
outObjTrailers
if Bool
io
then do
let out :: Output
out = Context -> Stream -> OutputType -> Output
makeOutputIO Context
ctx Stream
strm OutputType
ot
Int -> Output -> IO ()
pushOutput Int
sid Output
out
else do
(IO Sync
pop, Output
out) <- Stream -> OutputType -> IO (IO Sync, Output)
makeOutput Stream
strm OutputType
ot
Int -> Output -> IO ()
pushOutput Int
sid Output
out
LoopCheck
lc <- Stream -> Maybe (TBQueue StreamingChunk) -> IO LoopCheck
newLoopCheck Stream
strm Maybe (TBQueue StreamingChunk)
mtbq
ThreadManager -> Authority -> IO () -> IO ()
T.forkManaged ThreadManager
threadManager Authority
label (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Context -> IO Sync -> LoopCheck -> IO ()
syncWithSender' Context
ctx IO Sync
pop LoopCheck
lc
where
label :: Authority
label = Authority
"H2 request sender for stream " Authority -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> Authority
forall a. Show a => a -> Authority
show (Stream -> Int
streamNumber Stream
strm)
pushOutput :: Int -> Output -> IO ()
pushOutput Int
sid Output
out = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Int
sidOK <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
outputQStreamID
Bool -> STM ()
check (Int
sidOK Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
sid)
TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
outputQStreamID (Int
sid Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
2)
TQueue Output -> Output -> STM ()
enqueueOutputSTM TQueue Output
outputQ Output
out
sendStreaming
:: Context
-> Stream
-> (OutBodyIface -> IO ())
-> IO (TBQueue StreamingChunk)
sendStreaming :: Context
-> Stream -> (OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk)
sendStreaming Context{TVar Bool
TVar Int
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef Int
IORef (Maybe Int)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue Output
ThreadManager
DynamicTable
Settings
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
threadManager :: Context -> ThreadManager
controlQ :: Context -> TQueue Control
senderDone :: Context -> TVar Bool
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe Int)
myStreamId :: Context -> TVar Int
peerStreamId :: Context -> IORef Int
outputBufferLimit :: Context -> IORef Int
outputQ :: Context -> TQueue Output
outputQStreamID :: Context -> TVar Int
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe Int)
myStreamId :: TVar Int
peerStreamId :: IORef Int
outputBufferLimit :: IORef Int
outputQ :: TQueue Output
outputQStreamID :: TVar Int
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
senderDone :: TVar Bool
..} Stream
strm OutBodyIface -> IO ()
strmbdy = do
TBQueue StreamingChunk
tbq <- Natural -> IO (TBQueue StreamingChunk)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
10
ThreadManager
-> Authority -> ((forall x. IO x -> IO x) -> IO ()) -> IO ()
T.forkManagedUnmask ThreadManager
threadManager Authority
label (((forall x. IO x -> IO x) -> IO ()) -> IO ())
-> ((forall x. IO x -> IO x) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \forall x. IO x -> IO x
unmask ->
TBQueue StreamingChunk
-> (forall x. IO x -> IO x) -> (OutBodyIface -> IO ()) -> IO ()
forall r.
TBQueue StreamingChunk
-> (forall x. IO x -> IO x) -> (OutBodyIface -> IO r) -> IO r
withOutBodyIface TBQueue StreamingChunk
tbq IO a -> IO a
forall x. IO x -> IO x
unmask OutBodyIface -> IO ()
strmbdy
TBQueue StreamingChunk -> IO (TBQueue StreamingChunk)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return TBQueue StreamingChunk
tbq
where
label :: Authority
label = Authority
"H2 request streaming sender for stream " Authority -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> Authority
forall a. Show a => a -> Authority
show (Stream -> Int
streamNumber Stream
strm)
exchangeSettings :: Context -> IO ()
exchangeSettings :: Context -> IO ()
exchangeSettings Context{TVar Bool
TVar Int
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef Int
IORef (Maybe Int)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue Output
ThreadManager
DynamicTable
Settings
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
threadManager :: Context -> ThreadManager
controlQ :: Context -> TQueue Control
senderDone :: Context -> TVar Bool
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe Int)
myStreamId :: Context -> TVar Int
peerStreamId :: Context -> IORef Int
outputBufferLimit :: Context -> IORef Int
outputQ :: Context -> TQueue Output
outputQStreamID :: Context -> TVar Int
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe Int)
myStreamId :: TVar Int
peerStreamId :: IORef Int
outputBufferLimit :: IORef Int
outputQ :: TQueue Output
outputQStreamID :: TVar Int
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
senderDone :: TVar Bool
..} = do
Int
connRxWS <- RxFlow -> Int
rxfBufSize (RxFlow -> Int) -> IO RxFlow -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef RxFlow -> IO RxFlow
forall a. IORef a -> IO a
readIORef IORef RxFlow
rxFlow
let frames :: [Scheme]
frames = Settings -> Int -> [Scheme]
makeNegotiationFrames Settings
mySettings Int
connRxWS
setframe :: Control
setframe = Maybe SettingsList -> [Scheme] -> Control
CFrames Maybe SettingsList
forall a. Maybe a
Nothing (Scheme
connectionPreface Scheme -> [Scheme] -> [Scheme]
forall a. a -> [a] -> [a]
: [Scheme]
frames)
IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
myFirstSettings Bool
True
TQueue Control -> Control -> IO ()
enqueueControl TQueue Control
controlQ Control
setframe
data ClientIO = ClientIO
{ ClientIO -> SockAddr
cioMySockAddr :: SockAddr
, ClientIO -> SockAddr
cioPeerSockAddr :: SockAddr
, ClientIO -> Request -> IO (Int, Stream)
cioWriteRequest :: Request -> IO (StreamId, Stream)
, ClientIO -> Stream -> IO Response
cioReadResponse :: Stream -> IO Response
, ClientIO -> Scheme -> IO ()
cioWriteBytes :: ByteString -> IO ()
, ClientIO -> IO (Int, Stream)
cioCreateStream :: IO (StreamId, Stream)
}