{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
module Network.HTTP2.Client.Run where
import Control.Concurrent.STM (check)
import Control.Exception
import Data.ByteString.Builder (Builder)
import Data.IORef
import Network.Control (RxFlow (..), defaultMaxData)
import Network.Socket (SockAddr)
import UnliftIO.Async
import UnliftIO.Concurrent
import UnliftIO.STM
import Imports
import Network.HTTP2.Client.Types
import Network.HTTP2.Frame
import Network.HTTP2.H2
data ClientConfig = ClientConfig
{ ClientConfig -> Scheme
scheme :: Scheme
, ClientConfig -> Scheme
authority :: Authority
, ClientConfig -> StreamId
cacheLimit :: Int
, ClientConfig -> StreamId
connectionWindowSize :: WindowSize
, ClientConfig -> Settings
settings :: Settings
}
deriving (ClientConfig -> ClientConfig -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ClientConfig -> ClientConfig -> Bool
$c/= :: ClientConfig -> ClientConfig -> Bool
== :: ClientConfig -> ClientConfig -> Bool
$c== :: ClientConfig -> ClientConfig -> Bool
Eq, StreamId -> ClientConfig -> ShowS
[ClientConfig] -> ShowS
ClientConfig -> String
forall a.
(StreamId -> a -> ShowS)
-> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ClientConfig] -> ShowS
$cshowList :: [ClientConfig] -> ShowS
show :: ClientConfig -> String
$cshow :: ClientConfig -> String
showsPrec :: StreamId -> ClientConfig -> ShowS
$cshowsPrec :: StreamId -> ClientConfig -> ShowS
Show)
defaultClientConfig :: ClientConfig
defaultClientConfig :: ClientConfig
defaultClientConfig =
ClientConfig
{ scheme :: Scheme
scheme = Scheme
"http"
, authority :: Scheme
authority = Scheme
"localhost"
, cacheLimit :: StreamId
cacheLimit = StreamId
64
, connectionWindowSize :: StreamId
connectionWindowSize = StreamId
defaultMaxData
, settings :: Settings
settings = Settings
defaultSettings
}
run :: ClientConfig -> Config -> Client a -> IO a
run :: forall a. ClientConfig -> Config -> Client a -> IO a
run cconf :: ClientConfig
cconf@ClientConfig{StreamId
Scheme
Settings
settings :: Settings
connectionWindowSize :: StreamId
cacheLimit :: StreamId
authority :: Scheme
scheme :: Scheme
settings :: ClientConfig -> Settings
connectionWindowSize :: ClientConfig -> StreamId
cacheLimit :: ClientConfig -> StreamId
authority :: ClientConfig -> Scheme
scheme :: ClientConfig -> Scheme
..} Config
conf Client a
client = do
(Context
ctx, Manager
mgr) <- ClientConfig -> Config -> IO (Context, Manager)
setup ClientConfig
cconf Config
conf
forall a. Config -> Context -> Manager -> IO a -> IO a
runH2 Config
conf Context
ctx Manager
mgr forall a b. (a -> b) -> a -> b
$ Context -> Manager -> IO a
runClient Context
ctx Manager
mgr
where
serverMaxStreams :: Context -> IO StreamId
serverMaxStreams Context
ctx = do
Maybe StreamId
mx <- Settings -> Maybe StreamId
maxConcurrentStreams forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. IORef a -> IO a
readIORef (Context -> IORef Settings
peerSettings Context
ctx)
case Maybe StreamId
mx of
Maybe StreamId
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Bounded a => a
maxBound
Just StreamId
x -> forall (m :: * -> *) a. Monad m => a -> m a
return StreamId
x
possibleClientStream :: Context -> IO StreamId
possibleClientStream Context
ctx = do
StreamId
x <- Context -> IO StreamId
serverMaxStreams Context
ctx
StreamId
n <- OddStreamTable -> StreamId
oddConc forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (Context -> TVar OddStreamTable
oddStreamTable Context
ctx)
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamId
x forall a. Num a => a -> a -> a
- StreamId
n)
aux :: Context -> Aux
aux Context
ctx =
Aux
{ auxPossibleClientStreams :: IO StreamId
auxPossibleClientStreams = Context -> IO StreamId
possibleClientStream Context
ctx
}
clientCore :: Context -> Manager -> Request -> (Response -> IO b) -> IO b
clientCore Context
ctx Manager
mgr Request
req Response -> IO b
processResponse = do
Stream
strm <- Context -> Manager -> Scheme -> Scheme -> Request -> IO Stream
sendRequest Context
ctx Manager
mgr Scheme
scheme Scheme
authority Request
req
Response
rsp <- Stream -> IO Response
getResponse Stream
strm
Response -> IO b
processResponse Response
rsp
runClient :: Context -> Manager -> IO a
runClient Context
ctx Manager
mgr = do
a
x <- Client a
client (forall {b}.
Context -> Manager -> Request -> (Response -> IO b) -> IO b
clientCore Context
ctx Manager
mgr) forall a b. (a -> b) -> a -> b
$ Context -> Aux
aux Context
ctx
Manager -> IO ()
waitCounter0 Manager
mgr
let frame :: Scheme
frame = StreamId -> ErrorCode -> Scheme -> Scheme
goawayFrame StreamId
0 ErrorCode
NoError Scheme
"graceful closing"
MVar ()
mvar <- forall (m :: * -> *) a. MonadIO m => a -> m (MVar a)
newMVar ()
TQueue Control -> Control -> IO ()
enqueueControl (Context -> TQueue Control
controlQ Context
ctx) forall a b. (a -> b) -> a -> b
$ Scheme -> MVar () -> Control
CGoaway Scheme
frame MVar ()
mvar
forall (m :: * -> *) a. MonadIO m => MVar a -> m a
takeMVar MVar ()
mvar
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
runIO :: ClientConfig -> Config -> (ClientIO -> IO (IO a)) -> IO a
runIO :: forall a. ClientConfig -> Config -> (ClientIO -> IO (IO a)) -> IO a
runIO cconf :: ClientConfig
cconf@ClientConfig{StreamId
Scheme
Settings
settings :: Settings
connectionWindowSize :: StreamId
cacheLimit :: StreamId
authority :: Scheme
scheme :: Scheme
settings :: ClientConfig -> Settings
connectionWindowSize :: ClientConfig -> StreamId
cacheLimit :: ClientConfig -> StreamId
authority :: ClientConfig -> Scheme
scheme :: ClientConfig -> Scheme
..} conf :: Config
conf@Config{StreamId
Buffer
Manager
SockAddr
StreamId -> IO Scheme
PositionReadMaker
Scheme -> IO ()
confPeerSockAddr :: Config -> SockAddr
confMySockAddr :: Config -> SockAddr
confTimeoutManager :: Config -> Manager
confPositionReadMaker :: Config -> PositionReadMaker
confReadN :: Config -> StreamId -> IO Scheme
confSendAll :: Config -> Scheme -> IO ()
confBufferSize :: Config -> StreamId
confWriteBuffer :: Config -> Buffer
confPeerSockAddr :: SockAddr
confMySockAddr :: SockAddr
confTimeoutManager :: Manager
confPositionReadMaker :: PositionReadMaker
confReadN :: StreamId -> IO Scheme
confSendAll :: Scheme -> IO ()
confBufferSize :: StreamId
confWriteBuffer :: Buffer
..} ClientIO -> IO (IO a)
action = do
(ctx :: Context
ctx@Context{TVar StreamId
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue (Output Stream)
DynamicTable
Settings
RoleInfo
Role
peerSockAddr :: Context -> SockAddr
mySockAddr :: Context -> SockAddr
rstRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxFlow :: Context -> IORef RxFlow
txFlow :: Context -> TVar TxFlow
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
outputQStreamID :: Context -> TVar StreamId
outputQ :: Context -> TQueue (Output Stream)
outputBufferLimit :: Context -> IORef StreamId
peerStreamId :: Context -> IORef StreamId
myStreamId :: Context -> TVar StreamId
continued :: Context -> IORef (Maybe StreamId)
evenStreamTable :: Context -> TVar EvenStreamTable
myFirstSettings :: Context -> IORef Bool
mySettings :: Context -> Settings
roleInfo :: Context -> RoleInfo
role :: Context -> Role
peerSockAddr :: SockAddr
mySockAddr :: SockAddr
rstRate :: Rate
emptyFrameRate :: Rate
settingsRate :: Rate
pingRate :: Rate
rxFlow :: IORef RxFlow
txFlow :: TVar TxFlow
decodeDynamicTable :: DynamicTable
encodeDynamicTable :: DynamicTable
controlQ :: TQueue Control
outputQStreamID :: TVar StreamId
outputQ :: TQueue (Output Stream)
outputBufferLimit :: IORef StreamId
peerStreamId :: IORef StreamId
myStreamId :: TVar StreamId
continued :: IORef (Maybe StreamId)
evenStreamTable :: TVar EvenStreamTable
oddStreamTable :: TVar OddStreamTable
peerSettings :: IORef Settings
myFirstSettings :: IORef Bool
mySettings :: Settings
roleInfo :: RoleInfo
role :: Role
controlQ :: Context -> TQueue Control
oddStreamTable :: Context -> TVar OddStreamTable
peerSettings :: Context -> IORef Settings
..}, Manager
mgr) <- ClientConfig -> Config -> IO (Context, Manager)
setup ClientConfig
cconf Config
conf
let putB :: Scheme -> IO ()
putB Scheme
bs = TQueue Control -> Control -> IO ()
enqueueControl TQueue Control
controlQ forall a b. (a -> b) -> a -> b
$ Maybe SettingsList -> [Scheme] -> Control
CFrames forall a. Maybe a
Nothing [Scheme
bs]
putR :: Request -> IO (StreamId, Stream)
putR Request
req = do
Stream
strm <- Context -> Manager -> Scheme -> Scheme -> Request -> IO Stream
sendRequest Context
ctx Manager
mgr Scheme
scheme Scheme
authority Request
req
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> StreamId
streamNumber Stream
strm, Stream
strm)
get :: Stream -> IO Response
get = Stream -> IO Response
getResponse
create :: IO (StreamId, Stream)
create = Context -> IO (StreamId, Stream)
openOddStreamWait Context
ctx
IO a
runClient <-
ClientIO -> IO (IO a)
action forall a b. (a -> b) -> a -> b
$ SockAddr
-> SockAddr
-> (Request -> IO (StreamId, Stream))
-> (Stream -> IO Response)
-> (Scheme -> IO ())
-> IO (StreamId, Stream)
-> ClientIO
ClientIO SockAddr
confMySockAddr SockAddr
confPeerSockAddr Request -> IO (StreamId, Stream)
putR Stream -> IO Response
get Scheme -> IO ()
putB IO (StreamId, Stream)
create
forall a. Config -> Context -> Manager -> IO a -> IO a
runH2 Config
conf Context
ctx Manager
mgr IO a
runClient
getResponse :: Stream -> IO Response
getResponse :: Stream -> IO Response
getResponse Stream
strm = do
Either SomeException InpObj
mRsp <- forall (m :: * -> *) a. MonadIO m => MVar a -> m a
takeMVar forall a b. (a -> b) -> a -> b
$ Stream -> MVar (Either SomeException InpObj)
streamInput Stream
strm
case Either SomeException InpObj
mRsp of
Left SomeException
err -> forall e a. Exception e => e -> IO a
throwIO SomeException
err
Right InpObj
rsp -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ InpObj -> Response
Response InpObj
rsp
setup :: ClientConfig -> Config -> IO (Context, Manager)
setup :: ClientConfig -> Config -> IO (Context, Manager)
setup ClientConfig{StreamId
Scheme
Settings
settings :: Settings
connectionWindowSize :: StreamId
cacheLimit :: StreamId
authority :: Scheme
scheme :: Scheme
settings :: ClientConfig -> Settings
connectionWindowSize :: ClientConfig -> StreamId
cacheLimit :: ClientConfig -> StreamId
authority :: ClientConfig -> Scheme
scheme :: ClientConfig -> Scheme
..} conf :: Config
conf@Config{StreamId
Buffer
Manager
SockAddr
StreamId -> IO Scheme
PositionReadMaker
Scheme -> IO ()
confPeerSockAddr :: SockAddr
confMySockAddr :: SockAddr
confTimeoutManager :: Manager
confPositionReadMaker :: PositionReadMaker
confReadN :: StreamId -> IO Scheme
confSendAll :: Scheme -> IO ()
confBufferSize :: StreamId
confWriteBuffer :: Buffer
confPeerSockAddr :: Config -> SockAddr
confMySockAddr :: Config -> SockAddr
confTimeoutManager :: Config -> Manager
confPositionReadMaker :: Config -> PositionReadMaker
confReadN :: Config -> StreamId -> IO Scheme
confSendAll :: Config -> Scheme -> IO ()
confBufferSize :: Config -> StreamId
confWriteBuffer :: Config -> Buffer
..} = do
let clientInfo :: RoleInfo
clientInfo = Scheme -> Scheme -> RoleInfo
newClientInfo Scheme
scheme Scheme
authority
Context
ctx <-
RoleInfo
-> Config -> StreamId -> StreamId -> Settings -> IO Context
newContext
RoleInfo
clientInfo
Config
conf
StreamId
cacheLimit
StreamId
connectionWindowSize
Settings
settings
Manager
mgr <- Manager -> IO Manager
start Manager
confTimeoutManager
Context -> IO ()
exchangeSettings Context
ctx
forall (m :: * -> *) a. Monad m => a -> m a
return (Context
ctx, Manager
mgr)
runH2 :: Config -> Context -> Manager -> IO a -> IO a
runH2 :: forall a. Config -> Context -> Manager -> IO a -> IO a
runH2 Config
conf Context
ctx Manager
mgr IO a
runClient =
forall a b.
Manager -> IO a -> (Either SomeException a -> IO b) -> IO b
stopAfter Manager
mgr (forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> m b -> m (Either a b)
race IO ()
runBackgroundThreads IO a
runClient) forall a b. (a -> b) -> a -> b
$ \Either SomeException (Either () a)
res -> do
TVar OddStreamTable
-> TVar EvenStreamTable -> Maybe SomeException -> IO ()
closeAllStreams (Context -> TVar OddStreamTable
oddStreamTable Context
ctx) (Context -> TVar EvenStreamTable
evenStreamTable Context
ctx) forall a b. (a -> b) -> a -> b
$
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall a. a -> Maybe a
Just (forall a b. a -> b -> a
const forall a. Maybe a
Nothing) Either SomeException (Either () a)
res
case Either SomeException (Either () a)
res of
Left SomeException
err ->
forall e a. Exception e => e -> IO a
throwIO SomeException
err
Right (Left ()) ->
forall a. HasCallStack => a
undefined
Right (Right a
x) ->
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
where
runReceiver :: IO ()
runReceiver = Context -> Config -> IO ()
frameReceiver Context
ctx Config
conf
runSender :: IO ()
runSender = Context -> Config -> Manager -> IO ()
frameSender Context
ctx Config
conf Manager
mgr
runBackgroundThreads :: IO ()
runBackgroundThreads = forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m ()
concurrently_ IO ()
runReceiver IO ()
runSender
sendRequest
:: Context
-> Manager
-> Scheme
-> Authority
-> Request
-> IO Stream
sendRequest :: Context -> Manager -> Scheme -> Scheme -> Request -> IO Stream
sendRequest ctx :: Context
ctx@Context{TVar StreamId
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue (Output Stream)
DynamicTable
Settings
RoleInfo
Role
peerSockAddr :: SockAddr
mySockAddr :: SockAddr
rstRate :: Rate
emptyFrameRate :: Rate
settingsRate :: Rate
pingRate :: Rate
rxFlow :: IORef RxFlow
txFlow :: TVar TxFlow
decodeDynamicTable :: DynamicTable
encodeDynamicTable :: DynamicTable
controlQ :: TQueue Control
outputQStreamID :: TVar StreamId
outputQ :: TQueue (Output Stream)
outputBufferLimit :: IORef StreamId
peerStreamId :: IORef StreamId
myStreamId :: TVar StreamId
continued :: IORef (Maybe StreamId)
evenStreamTable :: TVar EvenStreamTable
oddStreamTable :: TVar OddStreamTable
peerSettings :: IORef Settings
myFirstSettings :: IORef Bool
mySettings :: Settings
roleInfo :: RoleInfo
role :: Role
peerSockAddr :: Context -> SockAddr
mySockAddr :: Context -> SockAddr
rstRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxFlow :: Context -> IORef RxFlow
txFlow :: Context -> TVar TxFlow
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
outputQStreamID :: Context -> TVar StreamId
outputQ :: Context -> TQueue (Output Stream)
outputBufferLimit :: Context -> IORef StreamId
peerStreamId :: Context -> IORef StreamId
myStreamId :: Context -> TVar StreamId
continued :: Context -> IORef (Maybe StreamId)
evenStreamTable :: Context -> TVar EvenStreamTable
myFirstSettings :: Context -> IORef Bool
mySettings :: Context -> Settings
roleInfo :: Context -> RoleInfo
role :: Context -> Role
controlQ :: Context -> TQueue Control
oddStreamTable :: Context -> TVar OddStreamTable
peerSettings :: Context -> IORef Settings
..} Manager
mgr Scheme
scheme Scheme
auth (Request OutObj
req) = do
let hdr0 :: [Header]
hdr0 = OutObj -> [Header]
outObjHeaders OutObj
req
method :: Scheme
method = forall a. a -> Maybe a -> a
fromMaybe (forall a. HasCallStack => String -> a
error String
"sendRequest:method") forall a b. (a -> b) -> a -> b
$ forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup HeaderName
":method" [Header]
hdr0
path :: Scheme
path = forall a. a -> Maybe a -> a
fromMaybe (forall a. HasCallStack => String -> a
error String
"sendRequest:path") forall a b. (a -> b) -> a -> b
$ forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup HeaderName
":path" [Header]
hdr0
Maybe Stream
mstrm0 <- TVar EvenStreamTable -> Scheme -> Scheme -> IO (Maybe Stream)
lookupEvenCache TVar EvenStreamTable
evenStreamTable Scheme
method Scheme
path
case Maybe Stream
mstrm0 of
Just Stream
strm0 -> do
TVar EvenStreamTable -> Scheme -> Scheme -> IO ()
deleteEvenCache TVar EvenStreamTable
evenStreamTable Scheme
method Scheme
path
forall (m :: * -> *) a. Monad m => a -> m a
return Stream
strm0
Maybe Stream
Nothing -> do
let hdr1 :: [Header]
hdr1
| Scheme
scheme forall a. Eq a => a -> a -> Bool
/= Scheme
"" = (HeaderName
":scheme", Scheme
scheme) forall a. a -> [a] -> [a]
: [Header]
hdr0
| Bool
otherwise = [Header]
hdr0
hdr2 :: [Header]
hdr2
| Scheme
auth forall a. Eq a => a -> a -> Bool
/= Scheme
"" = (HeaderName
":authority", Scheme
auth) forall a. a -> [a] -> [a]
: [Header]
hdr1
| Bool
otherwise = [Header]
hdr1
req' :: OutObj
req' = OutObj
req{outObjHeaders :: [Header]
outObjHeaders = [Header]
hdr2}
(StreamId
sid, Stream
newstrm) <- Context -> IO (StreamId, Stream)
openOddStreamWait Context
ctx
case OutObj -> OutBody
outObjBody OutObj
req of
OutBodyStreaming (Builder -> IO ()) -> IO () -> IO ()
strmbdy ->
Context
-> Manager
-> OutObj
-> StreamId
-> Stream
-> ((forall x. IO x -> IO x)
-> (Builder -> IO ()) -> IO () -> IO ())
-> IO ()
sendStreaming Context
ctx Manager
mgr OutObj
req' StreamId
sid Stream
newstrm forall a b. (a -> b) -> a -> b
$ \forall x. IO x -> IO x
unmask Builder -> IO ()
push IO ()
flush ->
forall x. IO x -> IO x
unmask forall a b. (a -> b) -> a -> b
$ (Builder -> IO ()) -> IO () -> IO ()
strmbdy Builder -> IO ()
push IO ()
flush
OutBodyStreamingUnmask (forall x. IO x -> IO x) -> (Builder -> IO ()) -> IO () -> IO ()
strmbdy ->
Context
-> Manager
-> OutObj
-> StreamId
-> Stream
-> ((forall x. IO x -> IO x)
-> (Builder -> IO ()) -> IO () -> IO ())
-> IO ()
sendStreaming Context
ctx Manager
mgr OutObj
req' StreamId
sid Stream
newstrm (forall x. IO x -> IO x) -> (Builder -> IO ()) -> IO () -> IO ()
strmbdy
OutBody
_ -> forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
StreamId
sidOK <- forall a. TVar a -> STM a
readTVar TVar StreamId
outputQStreamID
Bool -> STM ()
check (StreamId
sidOK forall a. Eq a => a -> a -> Bool
== StreamId
sid)
forall a. TVar a -> a -> STM ()
writeTVar TVar StreamId
outputQStreamID (StreamId
sid forall a. Num a => a -> a -> a
+ StreamId
2)
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Output Stream)
outputQ forall a b. (a -> b) -> a -> b
$ forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output Stream
newstrm OutObj
req' OutputType
OObj forall a. Maybe a
Nothing (forall (m :: * -> *) a. Monad m => a -> m a
return ())
forall (m :: * -> *) a. Monad m => a -> m a
return Stream
newstrm
sendStreaming
:: Context
-> Manager
-> OutObj
-> StreamId
-> Stream
-> ((forall x. IO x -> IO x) -> (Builder -> IO ()) -> IO () -> IO ())
-> IO ()
sendStreaming :: Context
-> Manager
-> OutObj
-> StreamId
-> Stream
-> ((forall x. IO x -> IO x)
-> (Builder -> IO ()) -> IO () -> IO ())
-> IO ()
sendStreaming Context{TVar StreamId
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue (Output Stream)
DynamicTable
Settings
RoleInfo
Role
peerSockAddr :: SockAddr
mySockAddr :: SockAddr
rstRate :: Rate
emptyFrameRate :: Rate
settingsRate :: Rate
pingRate :: Rate
rxFlow :: IORef RxFlow
txFlow :: TVar TxFlow
decodeDynamicTable :: DynamicTable
encodeDynamicTable :: DynamicTable
controlQ :: TQueue Control
outputQStreamID :: TVar StreamId
outputQ :: TQueue (Output Stream)
outputBufferLimit :: IORef StreamId
peerStreamId :: IORef StreamId
myStreamId :: TVar StreamId
continued :: IORef (Maybe StreamId)
evenStreamTable :: TVar EvenStreamTable
oddStreamTable :: TVar OddStreamTable
peerSettings :: IORef Settings
myFirstSettings :: IORef Bool
mySettings :: Settings
roleInfo :: RoleInfo
role :: Role
peerSockAddr :: Context -> SockAddr
mySockAddr :: Context -> SockAddr
rstRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxFlow :: Context -> IORef RxFlow
txFlow :: Context -> TVar TxFlow
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
outputQStreamID :: Context -> TVar StreamId
outputQ :: Context -> TQueue (Output Stream)
outputBufferLimit :: Context -> IORef StreamId
peerStreamId :: Context -> IORef StreamId
myStreamId :: Context -> TVar StreamId
continued :: Context -> IORef (Maybe StreamId)
evenStreamTable :: Context -> TVar EvenStreamTable
myFirstSettings :: Context -> IORef Bool
mySettings :: Context -> Settings
roleInfo :: Context -> RoleInfo
role :: Context -> Role
controlQ :: Context -> TQueue Control
oddStreamTable :: Context -> TVar OddStreamTable
peerSettings :: Context -> IORef Settings
..} Manager
mgr OutObj
req StreamId
sid Stream
newstrm (forall x. IO x -> IO x) -> (Builder -> IO ()) -> IO () -> IO ()
strmbdy = do
TBQueue StreamingChunk
tbq <- forall (m :: * -> *) a. MonadIO m => Natural -> m (TBQueue a)
newTBQueueIO Natural
10
TVar Bool
tbqNonEmpty <- forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Bool
False
Manager -> ((forall x. IO x -> IO x) -> IO ()) -> IO ()
forkManagedUnmask Manager
mgr forall a b. (a -> b) -> a -> b
$ \forall x. IO x -> IO x
unmask -> do
let push :: Builder -> m ()
push Builder
b = forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (Builder -> StreamingChunk
StreamingBuilder Builder
b)
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
tbqNonEmpty Bool
True
flush :: IO ()
flush = forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq StreamingChunk
StreamingFlush
finished :: IO ()
finished = forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq forall a b. (a -> b) -> a -> b
$ IO () -> StreamingChunk
StreamingFinished (Manager -> IO ()
decCounter Manager
mgr)
Manager -> IO ()
incCounter Manager
mgr
(forall x. IO x -> IO x) -> (Builder -> IO ()) -> IO () -> IO ()
strmbdy forall x. IO x -> IO x
unmask forall {m :: * -> *}. MonadIO m => Builder -> m ()
push IO ()
flush forall a b. IO a -> IO b -> IO a
`finally` IO ()
finished
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
StreamId
sidOK <- forall a. TVar a -> STM a
readTVar TVar StreamId
outputQStreamID
Bool
ready <- forall a. TVar a -> STM a
readTVar TVar Bool
tbqNonEmpty
Bool -> STM ()
check (StreamId
sidOK forall a. Eq a => a -> a -> Bool
== StreamId
sid Bool -> Bool -> Bool
&& Bool
ready)
forall a. TVar a -> a -> STM ()
writeTVar TVar StreamId
outputQStreamID (StreamId
sid forall a. Num a => a -> a -> a
+ StreamId
2)
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Output Stream)
outputQ forall a b. (a -> b) -> a -> b
$ forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output Stream
newstrm OutObj
req OutputType
OObj (forall a. a -> Maybe a
Just TBQueue StreamingChunk
tbq) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
exchangeSettings :: Context -> IO ()
exchangeSettings :: Context -> IO ()
exchangeSettings Context{TVar StreamId
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue (Output Stream)
DynamicTable
Settings
RoleInfo
Role
peerSockAddr :: SockAddr
mySockAddr :: SockAddr
rstRate :: Rate
emptyFrameRate :: Rate
settingsRate :: Rate
pingRate :: Rate
rxFlow :: IORef RxFlow
txFlow :: TVar TxFlow
decodeDynamicTable :: DynamicTable
encodeDynamicTable :: DynamicTable
controlQ :: TQueue Control
outputQStreamID :: TVar StreamId
outputQ :: TQueue (Output Stream)
outputBufferLimit :: IORef StreamId
peerStreamId :: IORef StreamId
myStreamId :: TVar StreamId
continued :: IORef (Maybe StreamId)
evenStreamTable :: TVar EvenStreamTable
oddStreamTable :: TVar OddStreamTable
peerSettings :: IORef Settings
myFirstSettings :: IORef Bool
mySettings :: Settings
roleInfo :: RoleInfo
role :: Role
peerSockAddr :: Context -> SockAddr
mySockAddr :: Context -> SockAddr
rstRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxFlow :: Context -> IORef RxFlow
txFlow :: Context -> TVar TxFlow
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
outputQStreamID :: Context -> TVar StreamId
outputQ :: Context -> TQueue (Output Stream)
outputBufferLimit :: Context -> IORef StreamId
peerStreamId :: Context -> IORef StreamId
myStreamId :: Context -> TVar StreamId
continued :: Context -> IORef (Maybe StreamId)
evenStreamTable :: Context -> TVar EvenStreamTable
myFirstSettings :: Context -> IORef Bool
mySettings :: Context -> Settings
roleInfo :: Context -> RoleInfo
role :: Context -> Role
controlQ :: Context -> TQueue Control
oddStreamTable :: Context -> TVar OddStreamTable
peerSettings :: Context -> IORef Settings
..} = do
StreamId
connRxWS <- RxFlow -> StreamId
rxfWindow forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. IORef a -> IO a
readIORef IORef RxFlow
rxFlow
let frames :: [Scheme]
frames = Settings -> StreamId -> [Scheme]
makeNegotiationFrames Settings
mySettings StreamId
connRxWS
setframe :: Control
setframe = Maybe SettingsList -> [Scheme] -> Control
CFrames forall a. Maybe a
Nothing (Scheme
connectionPreface forall a. a -> [a] -> [a]
: [Scheme]
frames)
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
myFirstSettings Bool
True
TQueue Control -> Control -> IO ()
enqueueControl TQueue Control
controlQ Control
setframe
data ClientIO = ClientIO
{ ClientIO -> SockAddr
cioMySockAddr :: SockAddr
, ClientIO -> SockAddr
cioPeerSockAddr :: SockAddr
, ClientIO -> Request -> IO (StreamId, Stream)
cioWriteRequest :: Request -> IO (StreamId, Stream)
, ClientIO -> Stream -> IO Response
cioReadResponse :: Stream -> IO Response
, ClientIO -> Scheme -> IO ()
cioWriteBytes :: ByteString -> IO ()
, ClientIO -> IO (StreamId, Stream)
cioCreateStream :: IO (StreamId, Stream)
}