{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternGuards #-}
{-# LANGUAGE RecordWildCards #-}
module Network.HTTP2.Server.Worker (
worker,
WorkerConf (..),
fromContext,
) where
import Data.IORef
import qualified Network.HTTP.Types as H
import Network.Socket (SockAddr)
import qualified System.TimeManager as T
import UnliftIO.Exception (SomeException (..))
import qualified UnliftIO.Exception as E
import UnliftIO.STM
import Imports hiding (insert)
import Network.HPACK
import Network.HPACK.Token
import Network.HTTP2.Frame
import Network.HTTP2.H2
import Network.HTTP2.Server.Types
data WorkerConf a = WorkerConf
{ forall a. WorkerConf a -> IO (Input a)
readInputQ :: IO (Input a)
, forall a. WorkerConf a -> Output a -> IO ()
writeOutputQ :: Output a -> IO ()
, forall a. WorkerConf a -> a -> IO ()
workerCleanup :: a -> IO ()
, forall a. WorkerConf a -> IO Bool
isPushable :: IO Bool
, forall a. WorkerConf a -> a -> PushPromise -> IO (StreamId, a)
makePushStream :: a -> PushPromise -> IO (StreamId, a)
, forall a. WorkerConf a -> SockAddr
mySockAddr :: SockAddr
, forall a. WorkerConf a -> SockAddr
peerSockAddr :: SockAddr
}
fromContext :: Context -> WorkerConf Stream
fromContext :: Context -> WorkerConf Stream
fromContext ctx :: Context
ctx@Context{TVar StreamId
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue (Output Stream)
DynamicTable
Settings
RoleInfo
Role
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe StreamId)
myStreamId :: TVar StreamId
peerStreamId :: IORef StreamId
outputBufferLimit :: IORef StreamId
outputQ :: TQueue (Output Stream)
outputQStreamID :: TVar StreamId
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe StreamId)
myStreamId :: Context -> TVar StreamId
peerStreamId :: Context -> IORef StreamId
outputBufferLimit :: Context -> IORef StreamId
outputQ :: Context -> TQueue (Output Stream)
outputQStreamID :: Context -> TVar StreamId
controlQ :: Context -> TQueue Control
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
..} =
WorkerConf
{ readInputQ :: IO (Input Stream)
readInputQ = STM (Input Stream) -> IO (Input Stream)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (Input Stream) -> IO (Input Stream))
-> STM (Input Stream) -> IO (Input Stream)
forall a b. (a -> b) -> a -> b
$ TQueue (Input Stream) -> STM (Input Stream)
forall a. TQueue a -> STM a
readTQueue (TQueue (Input Stream) -> STM (Input Stream))
-> TQueue (Input Stream) -> STM (Input Stream)
forall a b. (a -> b) -> a -> b
$ ServerInfo -> TQueue (Input Stream)
inputQ (ServerInfo -> TQueue (Input Stream))
-> ServerInfo -> TQueue (Input Stream)
forall a b. (a -> b) -> a -> b
$ RoleInfo -> ServerInfo
toServerInfo RoleInfo
roleInfo
, writeOutputQ :: Output Stream -> IO ()
writeOutputQ = TQueue (Output Stream) -> Output Stream -> IO ()
enqueueOutput TQueue (Output Stream)
outputQ
, workerCleanup :: Stream -> IO ()
workerCleanup = \Stream
strm -> do
Context -> Stream -> ClosedCode -> IO ()
closed Context
ctx Stream
strm ClosedCode
Killed
let frame :: ByteString
frame = ErrorCode -> StreamId -> ByteString
resetFrame ErrorCode
InternalError (StreamId -> ByteString) -> StreamId -> ByteString
forall a b. (a -> b) -> a -> b
$ Stream -> StreamId
streamNumber Stream
strm
TQueue Control -> Control -> IO ()
enqueueControl TQueue Control
controlQ (Control -> IO ()) -> Control -> IO ()
forall a b. (a -> b) -> a -> b
$ Maybe SettingsList -> [ByteString] -> Control
CFrames Maybe SettingsList
forall a. Maybe a
Nothing [ByteString
frame]
,
isPushable :: IO Bool
isPushable = Settings -> Bool
enablePush (Settings -> Bool) -> IO Settings -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef Settings -> IO Settings
forall a. IORef a -> IO a
readIORef IORef Settings
peerSettings
,
makePushStream :: Stream -> PushPromise -> IO (StreamId, Stream)
makePushStream = \Stream
pstrm PushPromise
_ -> do
(StreamId
_, Stream
newstrm) <- Context -> IO (StreamId, Stream)
openEvenStreamWait Context
ctx
let pid :: StreamId
pid = Stream -> StreamId
streamNumber Stream
pstrm
(StreamId, Stream) -> IO (StreamId, Stream)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamId
pid, Stream
newstrm)
, mySockAddr :: SockAddr
mySockAddr = SockAddr
mySockAddr
, peerSockAddr :: SockAddr
peerSockAddr = SockAddr
peerSockAddr
}
pushStream
:: WorkerConf a
-> a
-> ValueTable
-> [PushPromise]
-> IO OutputType
pushStream :: forall a.
WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
pushStream WorkerConf a
_ a
_ ValueTable
_ [] = OutputType -> IO OutputType
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
OObj
pushStream WorkerConf{IO Bool
IO (Input a)
SockAddr
a -> IO ()
a -> PushPromise -> IO (StreamId, a)
Output a -> IO ()
readInputQ :: forall a. WorkerConf a -> IO (Input a)
writeOutputQ :: forall a. WorkerConf a -> Output a -> IO ()
workerCleanup :: forall a. WorkerConf a -> a -> IO ()
isPushable :: forall a. WorkerConf a -> IO Bool
makePushStream :: forall a. WorkerConf a -> a -> PushPromise -> IO (StreamId, a)
mySockAddr :: forall a. WorkerConf a -> SockAddr
peerSockAddr :: forall a. WorkerConf a -> SockAddr
readInputQ :: IO (Input a)
writeOutputQ :: Output a -> IO ()
workerCleanup :: a -> IO ()
isPushable :: IO Bool
makePushStream :: a -> PushPromise -> IO (StreamId, a)
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
..} a
pstrm ValueTable
reqvt [PushPromise]
pps0
| StreamId
len StreamId -> StreamId -> Bool
forall a. Eq a => a -> a -> Bool
== StreamId
0 = OutputType -> IO OutputType
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
OObj
| Bool
otherwise = do
Bool
pushable <- IO Bool
isPushable
if Bool
pushable
then do
TVar StreamId
tvar <- StreamId -> IO (TVar StreamId)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO StreamId
0
StreamId
lim <- TVar StreamId -> [PushPromise] -> StreamId -> IO StreamId
forall {a}.
Num a =>
TVar a -> [PushPromise] -> StreamId -> IO StreamId
push TVar StreamId
tvar [PushPromise]
pps0 StreamId
0
if StreamId
lim StreamId -> StreamId -> Bool
forall a. Eq a => a -> a -> Bool
== StreamId
0
then OutputType -> IO OutputType
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
OObj
else OutputType -> IO OutputType
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (OutputType -> IO OutputType) -> OutputType -> IO OutputType
forall a b. (a -> b) -> a -> b
$ IO () -> OutputType
OWait (StreamId -> TVar StreamId -> IO ()
forall {m :: * -> *} {a}. (MonadIO m, Ord a) => a -> TVar a -> m ()
waiter StreamId
lim TVar StreamId
tvar)
else OutputType -> IO OutputType
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
OObj
where
len :: StreamId
len = [PushPromise] -> StreamId
forall a. [a] -> StreamId
forall (t :: * -> *) a. Foldable t => t a -> StreamId
length [PushPromise]
pps0
increment :: TVar a -> m ()
increment TVar a
tvar = STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ TVar a -> (a -> a) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar a
tvar (a -> a -> a
forall a. Num a => a -> a -> a
+ a
1)
waiter :: a -> TVar a -> m ()
waiter a
lim TVar a
tvar = STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ do
a
n <- TVar a -> STM a
forall a. TVar a -> STM a
readTVar TVar a
tvar
Bool -> STM ()
checkSTM (a
n a -> a -> Bool
forall a. Ord a => a -> a -> Bool
>= a
lim)
push :: TVar a -> [PushPromise] -> StreamId -> IO StreamId
push TVar a
_ [] StreamId
n = StreamId -> IO StreamId
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamId
n :: Int)
push TVar a
tvar (PushPromise
pp : [PushPromise]
pps) StreamId
n = do
(StreamId
pid, a
newstrm) <- a -> PushPromise -> IO (StreamId, a)
makePushStream a
pstrm PushPromise
pp
let scheme :: ByteString
scheme = Maybe ByteString -> ByteString
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe ByteString -> ByteString) -> Maybe ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ Token -> ValueTable -> Maybe ByteString
getHeaderValue Token
tokenScheme ValueTable
reqvt
auth :: ByteString
auth =
Maybe ByteString -> ByteString
forall a. HasCallStack => Maybe a -> a
fromJust
( Token -> ValueTable -> Maybe ByteString
getHeaderValue Token
tokenAuthority ValueTable
reqvt
Maybe ByteString -> Maybe ByteString -> Maybe ByteString
forall a. Maybe a -> Maybe a -> Maybe a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Token -> ValueTable -> Maybe ByteString
getHeaderValue Token
tokenHost ValueTable
reqvt
)
path :: ByteString
path = PushPromise -> ByteString
promiseRequestPath PushPromise
pp
promiseRequest :: [(Token, ByteString)]
promiseRequest =
[ (Token
tokenMethod, ByteString
H.methodGet)
, (Token
tokenScheme, ByteString
scheme)
, (Token
tokenAuthority, ByteString
auth)
, (Token
tokenPath, ByteString
path)
]
ot :: OutputType
ot = [(Token, ByteString)] -> StreamId -> OutputType
OPush [(Token, ByteString)]
promiseRequest StreamId
pid
Response OutObj
rsp = PushPromise -> Response
promiseResponse PushPromise
pp
out :: Output a
out = a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
newstrm OutObj
rsp OutputType
ot Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing (IO () -> Output a) -> IO () -> Output a
forall a b. (a -> b) -> a -> b
$ TVar a -> IO ()
forall {m :: * -> *} {a}. (MonadIO m, Num a) => TVar a -> m ()
increment TVar a
tvar
Output a -> IO ()
writeOutputQ Output a
out
TVar a -> [PushPromise] -> StreamId -> IO StreamId
push TVar a
tvar [PushPromise]
pps (StreamId
n StreamId -> StreamId -> StreamId
forall a. Num a => a -> a -> a
+ StreamId
1)
response
:: WorkerConf a
-> Manager
-> T.Handle
-> ThreadContinue
-> a
-> Request
-> Response
-> [PushPromise]
-> IO ()
response :: forall a.
WorkerConf a
-> Manager
-> Handle
-> ThreadContinue
-> a
-> Request
-> Response
-> [PushPromise]
-> IO ()
response wc :: WorkerConf a
wc@WorkerConf{IO Bool
IO (Input a)
SockAddr
a -> IO ()
a -> PushPromise -> IO (StreamId, a)
Output a -> IO ()
readInputQ :: forall a. WorkerConf a -> IO (Input a)
writeOutputQ :: forall a. WorkerConf a -> Output a -> IO ()
workerCleanup :: forall a. WorkerConf a -> a -> IO ()
isPushable :: forall a. WorkerConf a -> IO Bool
makePushStream :: forall a. WorkerConf a -> a -> PushPromise -> IO (StreamId, a)
mySockAddr :: forall a. WorkerConf a -> SockAddr
peerSockAddr :: forall a. WorkerConf a -> SockAddr
readInputQ :: IO (Input a)
writeOutputQ :: Output a -> IO ()
workerCleanup :: a -> IO ()
isPushable :: IO Bool
makePushStream :: a -> PushPromise -> IO (StreamId, a)
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
..} Manager
mgr Handle
th ThreadContinue
tconf a
strm (Request InpObj
req) (Response OutObj
rsp) [PushPromise]
pps = case OutObj -> OutBody
outObjBody OutObj
rsp of
OutBody
OutBodyNone -> do
ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
True
Output a -> IO ()
writeOutputQ (Output a -> IO ()) -> Output a -> IO ()
forall a b. (a -> b) -> a -> b
$ a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
strm OutObj
rsp OutputType
OObj Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing (() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
OutBodyBuilder Builder
_ -> do
OutputType
otyp <- WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
forall a.
WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
pushStream WorkerConf a
wc a
strm ValueTable
reqvt [PushPromise]
pps
ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
True
Output a -> IO ()
writeOutputQ (Output a -> IO ()) -> Output a -> IO ()
forall a b. (a -> b) -> a -> b
$ a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
strm OutObj
rsp OutputType
otyp Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing (() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
OutBodyFile FileSpec
_ -> do
OutputType
otyp <- WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
forall a.
WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
pushStream WorkerConf a
wc a
strm ValueTable
reqvt [PushPromise]
pps
ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
True
Output a -> IO ()
writeOutputQ (Output a -> IO ()) -> Output a -> IO ()
forall a b. (a -> b) -> a -> b
$ a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
strm OutObj
rsp OutputType
otyp Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing (() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
OutBodyStreaming (Builder -> IO ()) -> IO () -> IO ()
strmbdy -> do
OutputType
otyp <- WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
forall a.
WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
pushStream WorkerConf a
wc a
strm ValueTable
reqvt [PushPromise]
pps
Manager -> IO ()
spawnAction Manager
mgr
ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
False
TBQueue StreamingChunk
tbq <- Natural -> IO (TBQueue StreamingChunk)
forall (m :: * -> *) a. MonadIO m => Natural -> m (TBQueue a)
newTBQueueIO Natural
10
Output a -> IO ()
writeOutputQ (Output a -> IO ()) -> Output a -> IO ()
forall a b. (a -> b) -> a -> b
$ a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
strm OutObj
rsp OutputType
otyp (TBQueue StreamingChunk -> Maybe (TBQueue StreamingChunk)
forall a. a -> Maybe a
Just TBQueue StreamingChunk
tbq) (() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
let push :: Builder -> IO ()
push Builder
b = do
Handle -> IO ()
T.pause Handle
th
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (Builder -> StreamingChunk
StreamingBuilder Builder
b)
Handle -> IO ()
T.resume Handle
th
flush :: IO ()
flush = STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq StreamingChunk
StreamingFlush
finished :: IO ()
finished = STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (StreamingChunk -> STM ()) -> StreamingChunk -> STM ()
forall a b. (a -> b) -> a -> b
$ IO () -> StreamingChunk
StreamingFinished (Manager -> IO ()
decCounter Manager
mgr)
Manager -> IO ()
incCounter Manager
mgr
(Builder -> IO ()) -> IO () -> IO ()
strmbdy Builder -> IO ()
push IO ()
flush IO () -> IO () -> IO ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`E.finally` IO ()
finished
OutBodyStreamingUnmask (forall x. IO x -> IO x) -> (Builder -> IO ()) -> IO () -> IO ()
_ ->
[Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"response: server does not support OutBodyStreamingUnmask"
where
([(Token, ByteString)]
_, ValueTable
reqvt) = InpObj -> ([(Token, ByteString)], ValueTable)
inpObjHeaders InpObj
req
worker :: WorkerConf a -> Manager -> Server -> Action
worker :: forall a. WorkerConf a -> Manager -> Server -> IO ()
worker wc :: WorkerConf a
wc@WorkerConf{IO Bool
IO (Input a)
SockAddr
a -> IO ()
a -> PushPromise -> IO (StreamId, a)
Output a -> IO ()
readInputQ :: forall a. WorkerConf a -> IO (Input a)
writeOutputQ :: forall a. WorkerConf a -> Output a -> IO ()
workerCleanup :: forall a. WorkerConf a -> a -> IO ()
isPushable :: forall a. WorkerConf a -> IO Bool
makePushStream :: forall a. WorkerConf a -> a -> PushPromise -> IO (StreamId, a)
mySockAddr :: forall a. WorkerConf a -> SockAddr
peerSockAddr :: forall a. WorkerConf a -> SockAddr
readInputQ :: IO (Input a)
writeOutputQ :: Output a -> IO ()
workerCleanup :: a -> IO ()
isPushable :: IO Bool
makePushStream :: a -> PushPromise -> IO (StreamId, a)
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
..} Manager
mgr Server
server = do
StreamInfo a
sinfo <- IO (StreamInfo a)
forall a. IO (StreamInfo a)
newStreamInfo
ThreadContinue
tcont <- IO ThreadContinue
newThreadContinue
Manager -> (Handle -> IO ()) -> IO ()
forall a. Manager -> (Handle -> IO a) -> IO a
timeoutKillThread Manager
mgr ((Handle -> IO ()) -> IO ()) -> (Handle -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ StreamInfo a -> ThreadContinue -> Handle -> IO ()
go StreamInfo a
sinfo ThreadContinue
tcont
where
go :: StreamInfo a -> ThreadContinue -> Handle -> IO ()
go StreamInfo a
sinfo ThreadContinue
tcont Handle
th = do
ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tcont Bool
True
Either SomeException ()
ex <- IO () -> IO (Either SomeException ())
forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
m a -> m (Either e a)
E.trySyncOrAsync (IO () -> IO (Either SomeException ()))
-> IO () -> IO (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ do
Handle -> IO ()
T.pause Handle
th
Input a
strm InpObj
req <- IO (Input a)
readInputQ
let req' :: InpObj
req' = InpObj -> Handle -> InpObj
pauseRequestBody InpObj
req Handle
th
StreamInfo a -> a -> IO ()
forall a. StreamInfo a -> a -> IO ()
setStreamInfo StreamInfo a
sinfo a
strm
Handle -> IO ()
T.resume Handle
th
Handle -> IO ()
T.tickle Handle
th
let aux :: Aux
aux = Handle -> SockAddr -> SockAddr -> Aux
Aux Handle
th SockAddr
mySockAddr SockAddr
peerSockAddr
Server
server (InpObj -> Request
Request InpObj
req') Aux
aux ((Response -> [PushPromise] -> IO ()) -> IO ())
-> (Response -> [PushPromise] -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ WorkerConf a
-> Manager
-> Handle
-> ThreadContinue
-> a
-> Request
-> Response
-> [PushPromise]
-> IO ()
forall a.
WorkerConf a
-> Manager
-> Handle
-> ThreadContinue
-> a
-> Request
-> Response
-> [PushPromise]
-> IO ()
response WorkerConf a
wc Manager
mgr Handle
th ThreadContinue
tcont a
strm (InpObj -> Request
Request InpObj
req')
Bool
cont1 <- case Either SomeException ()
ex of
Right () -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Left e :: SomeException
e@(SomeException e
_)
| Just KilledByHttp2ThreadManager{} <- SomeException -> Maybe KilledByHttp2ThreadManager
forall e. Exception e => SomeException -> Maybe e
E.fromException SomeException
e -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
| Just TimeoutThread
T.TimeoutThread <- SomeException -> Maybe TimeoutThread
forall e. Exception e => SomeException -> Maybe e
E.fromException SomeException
e -> do
StreamInfo a -> IO ()
cleanup StreamInfo a
sinfo
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
| Bool
otherwise -> do
StreamInfo a -> IO ()
cleanup StreamInfo a
sinfo
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Bool
cont2 <- ThreadContinue -> IO Bool
getThreadContinue ThreadContinue
tcont
StreamInfo a -> IO ()
forall a. StreamInfo a -> IO ()
clearStreamInfo StreamInfo a
sinfo
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool
cont1 Bool -> Bool -> Bool
&& Bool
cont2) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ StreamInfo a -> ThreadContinue -> Handle -> IO ()
go StreamInfo a
sinfo ThreadContinue
tcont Handle
th
pauseRequestBody :: InpObj -> Handle -> InpObj
pauseRequestBody InpObj
req Handle
th = InpObj
req{inpObjBody = readBody'}
where
readBody :: IO ByteString
readBody = InpObj -> IO ByteString
inpObjBody InpObj
req
readBody' :: IO ByteString
readBody' = do
Handle -> IO ()
T.pause Handle
th
ByteString
bs <- IO ByteString
readBody
Handle -> IO ()
T.resume Handle
th
ByteString -> IO ByteString
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
bs
cleanup :: StreamInfo a -> IO ()
cleanup StreamInfo a
sinfo = do
Maybe a
minp <- StreamInfo a -> IO (Maybe a)
forall a. StreamInfo a -> IO (Maybe a)
getStreamInfo StreamInfo a
sinfo
case Maybe a
minp of
Maybe a
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just a
strm -> a -> IO ()
workerCleanup a
strm
newtype ThreadContinue = ThreadContinue (IORef Bool)
{-# INLINE newThreadContinue #-}
newThreadContinue :: IO ThreadContinue
newThreadContinue :: IO ThreadContinue
newThreadContinue = IORef Bool -> ThreadContinue
ThreadContinue (IORef Bool -> ThreadContinue)
-> IO (IORef Bool) -> IO ThreadContinue
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
True
{-# INLINE setThreadContinue #-}
setThreadContinue :: ThreadContinue -> Bool -> IO ()
setThreadContinue :: ThreadContinue -> Bool -> IO ()
setThreadContinue (ThreadContinue IORef Bool
ref) Bool
x = IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
ref Bool
x
{-# INLINE getThreadContinue #-}
getThreadContinue :: ThreadContinue -> IO Bool
getThreadContinue :: ThreadContinue -> IO Bool
getThreadContinue (ThreadContinue IORef Bool
ref) = IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
ref
newtype StreamInfo a = StreamInfo (IORef (Maybe a))
{-# INLINE newStreamInfo #-}
newStreamInfo :: IO (StreamInfo a)
newStreamInfo :: forall a. IO (StreamInfo a)
newStreamInfo = IORef (Maybe a) -> StreamInfo a
forall a. IORef (Maybe a) -> StreamInfo a
StreamInfo (IORef (Maybe a) -> StreamInfo a)
-> IO (IORef (Maybe a)) -> IO (StreamInfo a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe a -> IO (IORef (Maybe a))
forall a. a -> IO (IORef a)
newIORef Maybe a
forall a. Maybe a
Nothing
{-# INLINE clearStreamInfo #-}
clearStreamInfo :: StreamInfo a -> IO ()
clearStreamInfo :: forall a. StreamInfo a -> IO ()
clearStreamInfo (StreamInfo IORef (Maybe a)
ref) = IORef (Maybe a) -> Maybe a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref Maybe a
forall a. Maybe a
Nothing
{-# INLINE setStreamInfo #-}
setStreamInfo :: StreamInfo a -> a -> IO ()
setStreamInfo :: forall a. StreamInfo a -> a -> IO ()
setStreamInfo (StreamInfo IORef (Maybe a)
ref) a
inp = IORef (Maybe a) -> Maybe a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref (Maybe a -> IO ()) -> Maybe a -> IO ()
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
inp
{-# INLINE getStreamInfo #-}
getStreamInfo :: StreamInfo a -> IO (Maybe a)
getStreamInfo :: forall a. StreamInfo a -> IO (Maybe a)
getStreamInfo (StreamInfo IORef (Maybe a)
ref) = IORef (Maybe a) -> IO (Maybe a)
forall a. IORef a -> IO a
readIORef IORef (Maybe a)
ref