{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE BangPatterns, CPP #-}
{-# LANGUAGE NamedFieldPuns #-}
module Network.Wai.Handler.Warp.HTTP2.Sender (frameSender) where
import Control.Concurrent.STM
import qualified Control.Exception as E
import qualified Data.ByteString as BS
import Data.ByteString.Builder (Builder)
import qualified Data.ByteString.Builder.Extra as B
import Data.IORef
import Foreign.Ptr (Ptr, plusPtr)
import Foreign.Storable (poke)
import Network.HPACK (setLimitForEncoding, toHeaderTable)
import Network.HTTP2
import Network.HTTP2.Priority (isEmptySTM, dequeueSTM, Precedence)
import Network.Wai
#ifdef WINDOWS
import qualified System.IO as IO
#else
import Network.Wai.Handler.Warp.FdCache
import Network.Wai.Handler.Warp.SendFile (positionRead)
import qualified Network.Wai.Handler.Warp.Timeout as T
#endif
import Network.Wai.Handler.Warp.Buffer
import Network.Wai.Handler.Warp.HTTP2.EncodeFrame
import Network.Wai.Handler.Warp.HTTP2.HPACK
import Network.Wai.Handler.Warp.HTTP2.Manager (Manager)
import Network.Wai.Handler.Warp.HTTP2.Types
import Network.Wai.Handler.Warp.Imports hiding (readInt)
import qualified Network.Wai.Handler.Warp.Settings as S
import Network.Wai.Handler.Warp.Types
data Leftover = LZero
| LOne B.BufferWriter
| LTwo ByteString B.BufferWriter
{-# INLINE getStreamWindowSize #-}
getStreamWindowSize :: Stream -> IO WindowSize
getStreamWindowSize Stream{streamWindow} = atomically $ readTVar streamWindow
{-# INLINE waitStreamWindowSize #-}
waitStreamWindowSize :: Stream -> IO ()
waitStreamWindowSize Stream{streamWindow} = atomically $ do
w <- readTVar streamWindow
check (w > 0)
{-# INLINE waitStreaming #-}
waitStreaming :: TBQueue a -> IO ()
waitStreaming tbq = atomically $ do
isEmpty <- isEmptyTBQueue tbq
check (not isEmpty)
data Switch = C Control
| O (StreamId,Precedence,Output)
| Flush
frameSender :: Context -> Connection -> S.Settings -> Manager -> IO ()
frameSender ctx@Context{outputQ,controlQ,connectionWindow,encodeDynamicTable}
conn@Connection{connWriteBuffer,connBufferSize,connSendAll}
settings mgr = loop 0 `E.catch` ignore
where
dequeue off = do
isEmpty <- isEmptyTQueue controlQ
if isEmpty then do
w <- readTVar connectionWindow
check (w > 0)
emp <- isEmptySTM outputQ
if emp then
if off /= 0 then return Flush else retry
else
O <$> dequeueSTM outputQ
else
C <$> readTQueue controlQ
loop off = do
x <- atomically $ dequeue off
case x of
C ctl -> do
when (off /= 0) $ flushN off
off' <- control ctl off
when (off' >= 0) $ loop off'
O (_,pre,out) -> do
let strm = outputStream out
writeIORef (streamPrecedence strm) pre
off' <- outputOrEnqueueAgain out off
case off' of
0 -> loop 0
_ | off' > 15872 -> flushN off' >> loop 0
| otherwise -> loop off'
Flush -> flushN off >> loop 0
control CFinish _ = return (-1)
control (CGoaway frame) _ = connSendAll frame >> return (-1)
control (CFrame frame) _ = connSendAll frame >> return 0
control (CSettings frame alist) _ = do
connSendAll frame
setLimit alist
return 0
control (CSettings0 frame1 frame2 alist) off = do
let !buf = connWriteBuffer `plusPtr` off
!off' = off + BS.length frame1 + BS.length frame2
buf' <- copy buf frame1
void $ copy buf' frame2
setLimit alist
return off'
{-# INLINE setLimit #-}
setLimit alist = case lookup SettingsHeaderTableSize alist of
Nothing -> return ()
Just siz -> setLimitForEncoding siz encodeDynamicTable
output out@(Output strm _ _ tell getH2D (ONext curr)) off0 lim = do
let !buf = connWriteBuffer `plusPtr` off0
!siz = connBufferSize - off0
Next datPayloadLen mnext <- curr buf siz lim
off <- fillDataHeader strm off0 datPayloadLen mnext tell getH2D
maybeEnqueueNext out mnext
return off
output out@(Output strm rspn ii tell getH2D ORspn) off0 lim = do
let !sid = streamNumber strm
!endOfStream = case rspn of
RspnNobody _ _ -> True
_ -> False
ths <- addNecessaryHeaders ctx rspn ii settings
kvlen <- headerContinue sid ths endOfStream off0
off <- sendHeadersIfNecessary $ off0 + frameHeaderLength + kvlen
case rspn of
RspnNobody _ _ -> do
closed ctx strm Finished
return off
RspnFile _ _ path mpart -> do
let payloadOff = off + frameHeaderLength
Next datPayloadLen mnext <-
fillFileBodyGetNext conn ii payloadOff lim path mpart
off' <- fillDataHeader strm off datPayloadLen mnext tell getH2D
maybeEnqueueNext out mnext
return off'
RspnBuilder _ _ builder -> do
let payloadOff = off + frameHeaderLength
Next datPayloadLen mnext <-
fillBuilderBodyGetNext conn ii payloadOff lim builder
off' <- fillDataHeader strm off datPayloadLen mnext tell getH2D
maybeEnqueueNext out mnext
return off'
RspnStreaming _ _ tbq -> do
let payloadOff = off + frameHeaderLength
Next datPayloadLen mnext <-
fillStreamBodyGetNext conn payloadOff lim tbq strm
off' <- fillDataHeader strm off datPayloadLen mnext tell getH2D
maybeEnqueueNext out mnext
return off'
output out@(Output strm _ _ _ _ (OPush ths pid)) off0 lim = do
let !sid = streamNumber strm
len <- pushPromise pid sid ths off0
off <- sendHeadersIfNecessary $ off0 + frameHeaderLength + len
output out{ outputType = ORspn } off lim
output _ _ _ = undefined
outputOrEnqueueAgain out off = E.handle resetStream $ do
state <- readIORef $ streamState strm
if isClosed state then
return off
else case out of
Output _ _ _ wait _ OWait -> do
let out' = out {
outputHook = return ()
, outputType = ORspn
}
forkAndEnqueueWhenReady wait outputQ out' mgr
return off
_ -> case mtbq of
Just tbq -> checkStreaming tbq
_ -> checkStreamWindowSize
where
strm = outputStream out
mtbq = outputMaybeTBQueue out
checkStreaming tbq = do
isEmpty <- atomically $ isEmptyTBQueue tbq
if isEmpty then do
forkAndEnqueueWhenReady (waitStreaming tbq) outputQ out mgr
return off
else
checkStreamWindowSize
checkStreamWindowSize = do
sws <- getStreamWindowSize strm
if sws == 0 then do
forkAndEnqueueWhenReady (waitStreamWindowSize strm) outputQ out mgr
return off
else do
cws <- atomically $ readTVar connectionWindow
let !lim = min cws sws
output out off lim
resetStream e = do
closed ctx strm (ResetByMe e)
let !rst = resetFrame InternalError $ streamNumber strm
enqueueControl controlQ $ CFrame rst
return off
{-# INLINE flushN #-}
flushN :: Int -> IO ()
flushN n = bufferIO connWriteBuffer n connSendAll
headerContinue sid ths endOfStream off = do
let !offkv = off + frameHeaderLength
let !bufkv = connWriteBuffer `plusPtr` offkv
!limkv = connBufferSize - offkv
(hs,kvlen) <- hpackEncodeHeader ctx bufkv limkv ths
let flag0 = case hs of
[] -> setEndHeader defaultFlags
_ -> defaultFlags
flag = if endOfStream then setEndStream flag0 else flag0
let buf = connWriteBuffer `plusPtr` off
fillFrameHeader FrameHeaders kvlen sid flag buf
continue sid kvlen hs
!bufHeaderPayload = connWriteBuffer `plusPtr` frameHeaderLength
!headerPayloadLim = connBufferSize - frameHeaderLength
continue _ kvlen [] = return kvlen
continue sid kvlen ths = do
flushN $ kvlen + frameHeaderLength
(ths', kvlen') <- hpackEncodeHeaderLoop ctx bufHeaderPayload headerPayloadLim ths
when (ths == ths') $ E.throwIO $ ConnectionError CompressionError "cannot compress the header"
let flag = case ths' of
[] -> setEndHeader defaultFlags
_ -> defaultFlags
fillFrameHeader FrameContinuation kvlen' sid flag connWriteBuffer
continue sid kvlen' ths'
{-# INLINE maybeEnqueueNext #-}
maybeEnqueueNext :: Output -> Maybe DynaNext -> IO ()
maybeEnqueueNext _ Nothing = return ()
maybeEnqueueNext out (Just next) = enqueueOutput outputQ out'
where
!out' = out { outputType = ONext next }
{-# INLINE sendHeadersIfNecessary #-}
sendHeadersIfNecessary off
| off + frameHeaderLength < connBufferSize = return off
| otherwise = do
flushN off
return 0
fillDataHeader strm@Stream{streamWindow,streamNumber}
off datPayloadLen mnext tell getH2D = do
mh2d <- getH2D
let (!trailers,!noTrailers) = case http2dataTrailers <$> mh2d of
Nothing -> ([], True)
Just ts -> (ts, null ts)
!buf = connWriteBuffer `plusPtr` off
!off' = off + frameHeaderLength + datPayloadLen
!noMoreBody = isNothing mnext
flag | noMoreBody && noTrailers = setEndStream defaultFlags
| otherwise = defaultFlags
fillFrameHeader FrameData datPayloadLen streamNumber flag buf
off'' <- handleEndOfBody noMoreBody off' noTrailers trailers
atomically $ modifyTVar' connectionWindow (subtract datPayloadLen)
atomically $ modifyTVar' streamWindow (subtract datPayloadLen)
return off''
where
handleTrailers True off0 _ = return off0
handleTrailers _ off0 trailers = do
(ths,_) <- toHeaderTable trailers
kvlen <- headerContinue streamNumber ths True off0
sendHeadersIfNecessary $ off0 + frameHeaderLength + kvlen
handleEndOfBody True off0 noTrailers trailers = do
off1 <- handleTrailers noTrailers off0 trailers
void tell
closed ctx strm Finished
return off1
handleEndOfBody False off0 _ _ = return off0
pushPromise pid sid ths off = do
let !offsid = off + frameHeaderLength
!bufsid = connWriteBuffer `plusPtr` offsid
poke32 bufsid $ fromIntegral sid
let !offkv = offsid + 4
!bufkv = connWriteBuffer `plusPtr` offkv
!limkv = connBufferSize - offkv
(_,kvlen) <- hpackEncodeHeader ctx bufkv limkv ths
let !flag = setEndHeader defaultFlags
!buf = connWriteBuffer `plusPtr` off
!len = kvlen + 4
fillFrameHeader FramePushPromise len pid flag buf
return len
{-# INLINE fillFrameHeader #-}
fillFrameHeader ftyp len sid flag buf = encodeFrameHeaderBuf ftyp hinfo buf
where
hinfo = FrameHeader len flag sid
{-# INLINE ignore #-}
ignore :: E.SomeException -> IO ()
ignore _ = return ()
fillBuilderBodyGetNext :: Connection -> InternalInfo -> Int -> WindowSize -> Builder -> IO Next
fillBuilderBodyGetNext Connection{connWriteBuffer,connBufferSize}
_ off lim bb = do
let datBuf = connWriteBuffer `plusPtr` off
room = min (connBufferSize - off) lim
(len, signal) <- B.runBuilder bb datBuf room
return $ nextForBuilder len signal
fillFileBodyGetNext :: Connection -> InternalInfo -> Int -> WindowSize -> FilePath -> Maybe FilePart -> IO Next
#ifdef WINDOWS
fillFileBodyGetNext Connection{connWriteBuffer,connBufferSize}
_ off lim path mpart = do
let datBuf = connWriteBuffer `plusPtr` off
room = min (connBufferSize - off) lim
(start, bytes) <- fileStartEnd path mpart
hdl <- IO.openBinaryFile path IO.ReadMode
IO.hSeek hdl IO.AbsoluteSeek start
len <- IO.hGetBufSome hdl datBuf (mini room bytes)
let bytes' = bytes - fromIntegral len
return $ nextForFile len hdl bytes' (return ())
#else
fillFileBodyGetNext Connection{connWriteBuffer,connBufferSize}
ii off lim path mpart = do
(mfd, refresh') <- getFd ii path
(fd, refresh) <- case mfd of
Nothing -> do
fd' <- openFile path
th <- T.register (timeoutManager ii) (closeFile fd')
return (fd', T.tickle th)
Just fd -> return (fd, refresh')
let datBuf = connWriteBuffer `plusPtr` off
room = min (connBufferSize - off) lim
(start, bytes) <- fileStartEnd path mpart
len <- positionRead fd datBuf (mini room bytes) start
refresh
let len' = fromIntegral len
return $ nextForFile len fd (start + len') (bytes - len') refresh
#endif
fileStartEnd :: FilePath -> Maybe FilePart -> IO (Integer, Integer)
fileStartEnd _ (Just part) =
return (filePartOffset part, filePartByteCount part)
fileStartEnd _ _ = error "fileStartEnd"
fillStreamBodyGetNext :: Connection -> Int -> WindowSize -> TBQueue Sequence -> Stream -> IO Next
fillStreamBodyGetNext Connection{connWriteBuffer,connBufferSize}
off lim sq strm = do
let datBuf = connWriteBuffer `plusPtr` off
room = min (connBufferSize - off) lim
(leftover, cont, len) <- runStreamBuilder datBuf room sq
return $ nextForStream sq strm leftover cont len
fillBufBuilder :: Leftover -> DynaNext
fillBufBuilder leftover buf0 siz0 lim = do
let payloadBuf = buf0 `plusPtr` frameHeaderLength
room = min (siz0 - frameHeaderLength) lim
case leftover of
LZero -> error "fillBufBuilder: LZero"
LOne writer -> do
(len, signal) <- writer payloadBuf room
getNext len signal
LTwo bs writer
| BS.length bs <= room -> do
buf1 <- copy payloadBuf bs
let len1 = BS.length bs
(len2, signal) <- writer buf1 (room - len1)
getNext (len1 + len2) signal
| otherwise -> do
let (bs1,bs2) = BS.splitAt room bs
void $ copy payloadBuf bs1
getNext room (B.Chunk bs2 writer)
where
getNext l s = return $ nextForBuilder l s
nextForBuilder :: BytesFilled -> B.Next -> Next
nextForBuilder len B.Done
= Next len Nothing
nextForBuilder len (B.More _ writer)
= Next len $ Just (fillBufBuilder (LOne writer))
nextForBuilder len (B.Chunk bs writer)
= Next len $ Just (fillBufBuilder (LTwo bs writer))
runStreamBuilder :: Buffer -> BufSize -> TBQueue Sequence
-> IO (Leftover, Bool, BytesFilled)
runStreamBuilder buf0 room0 sq = loop buf0 room0 0
where
loop !buf !room !total = do
mbuilder <- atomically $ tryReadTBQueue sq
case mbuilder of
Nothing -> return (LZero, True, total)
Just (SBuilder builder) -> do
(len, signal) <- B.runBuilder builder buf room
let !total' = total + len
case signal of
B.Done -> loop (buf `plusPtr` len) (room - len) total'
B.More _ writer -> return (LOne writer, True, total')
B.Chunk bs writer -> return (LTwo bs writer, True, total')
Just SFlush -> return (LZero, True, total)
Just SFinish -> return (LZero, False, total)
fillBufStream :: Leftover -> TBQueue Sequence -> Stream -> DynaNext
fillBufStream leftover0 sq strm buf0 siz0 lim0 = do
let payloadBuf = buf0 `plusPtr` frameHeaderLength
room0 = min (siz0 - frameHeaderLength) lim0
case leftover0 of
LZero -> do
(leftover, cont, len) <- runStreamBuilder payloadBuf room0 sq
getNext leftover cont len
LOne writer -> write writer payloadBuf room0 0
LTwo bs writer
| BS.length bs <= room0 -> do
buf1 <- copy payloadBuf bs
let len = BS.length bs
write writer buf1 (room0 - len) len
| otherwise -> do
let (bs1,bs2) = BS.splitAt room0 bs
void $ copy payloadBuf bs1
getNext (LTwo bs2 writer) True room0
where
getNext l b r = return $ nextForStream sq strm l b r
write writer1 buf room sofar = do
(len, signal) <- writer1 buf room
case signal of
B.Done -> do
(leftover, cont, extra) <- runStreamBuilder (buf `plusPtr` len) (room - len) sq
let !total = sofar + len + extra
getNext leftover cont total
B.More _ writer -> do
let !total = sofar + len
getNext (LOne writer) True total
B.Chunk bs writer -> do
let !total = sofar + len
getNext (LTwo bs writer) True total
nextForStream :: TBQueue Sequence -> Stream
-> Leftover -> Bool -> BytesFilled
-> Next
nextForStream _ _ _ False len = Next len Nothing
nextForStream sq strm leftOrZero True len =
Next len $ Just (fillBufStream leftOrZero sq strm)
#ifdef WINDOWS
fillBufFile :: IO.Handle -> Integer -> IO () -> DynaNext
fillBufFile h bytes refresh buf siz lim = do
let payloadBuf = buf `plusPtr` frameHeaderLength
room = min (siz - frameHeaderLength) lim
len <- IO.hGetBufSome h payloadBuf room
refresh
let bytes' = bytes - fromIntegral len
return $ nextForFile len h bytes' refresh
nextForFile :: BytesFilled -> IO.Handle -> Integer -> IO () -> Next
nextForFile 0 _ _ _ = Next 0 Nothing
nextForFile len _ 0 _ = Next len Nothing
nextForFile len h bytes refresh =
Next len $ Just (fillBufFile h bytes refresh)
#else
fillBufFile :: Fd -> Integer -> Integer -> IO () -> DynaNext
fillBufFile fd start bytes refresh buf siz lim = do
let payloadBuf = buf `plusPtr` frameHeaderLength
room = min (siz - frameHeaderLength) lim
len <- positionRead fd payloadBuf (mini room bytes) start
let len' = fromIntegral len
refresh
return $ nextForFile len fd (start + len') (bytes - len') refresh
nextForFile :: BytesFilled -> Fd -> Integer -> Integer -> IO () -> Next
nextForFile 0 _ _ _ _ = Next 0 Nothing
nextForFile len _ _ 0 _ = Next len Nothing
nextForFile len fd start bytes refresh =
Next len $ Just (fillBufFile fd start bytes refresh)
#endif
{-# INLINE mini #-}
mini :: Int -> Integer -> Int
mini i n
| fromIntegral i < n = i
| otherwise = fromIntegral n
poke32 :: Ptr Word8 -> Word32 -> IO ()
poke32 ptr i = do
poke ptr w0
poke8 ptr 1 w1
poke8 ptr 2 w2
poke8 ptr 3 w3
where
w0 = fromIntegral ((i `shiftR` 24) .&. 0xff)
w1 = fromIntegral ((i `shiftR` 16) .&. 0xff)
w2 = fromIntegral ((i `shiftR` 8) .&. 0xff)
w3 = fromIntegral (i .&. 0xff)
poke8 :: Ptr Word8 -> Int -> Word8 -> IO ()
poke8 ptr0 n w = poke (ptr0 `plusPtr` n) w