{-# LANGUAGE RecordWildCards #-}
module Network.HTTP2.H2.Sync (
LoopCheck (..),
newLoopCheck,
syncWithSender,
syncWithSender',
makeOutput,
makeOutputIO,
enqueueOutputSIO,
) where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import Network.Control
import Network.HTTP.Semantics.IO
import Network.HTTP2.H2.Context
import Network.HTTP2.H2.Queue
import Network.HTTP2.H2.Types
syncWithSender
:: Context
-> Stream
-> OutputType
-> LoopCheck
-> IO ()
syncWithSender :: Context -> Stream -> OutputType -> LoopCheck -> IO ()
syncWithSender ctx :: Context
ctx@Context{TVar Bool
TVar WindowSize
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef WindowSize
IORef (Maybe WindowSize)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue Output
ThreadManager
DynamicTable
Settings
RoleInfo
Role
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe WindowSize)
myStreamId :: TVar WindowSize
peerStreamId :: IORef WindowSize
outputBufferLimit :: IORef WindowSize
outputQ :: TQueue Output
outputQStreamID :: TVar WindowSize
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
senderDone :: TVar Bool
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe WindowSize)
myStreamId :: Context -> TVar WindowSize
peerStreamId :: Context -> IORef WindowSize
outputBufferLimit :: Context -> IORef WindowSize
outputQ :: Context -> TQueue Output
outputQStreamID :: Context -> TVar WindowSize
controlQ :: Context -> TQueue Control
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
threadManager :: Context -> ThreadManager
senderDone :: Context -> TVar Bool
..} Stream
strm OutputType
otyp LoopCheck
lc = do
(IO Sync
pop, Output
out) <- Stream -> OutputType -> IO (IO Sync, Output)
makeOutput Stream
strm OutputType
otyp
TQueue Output -> Output -> IO ()
enqueueOutput TQueue Output
outputQ Output
out
Context -> IO Sync -> LoopCheck -> IO ()
syncWithSender' Context
ctx IO Sync
pop LoopCheck
lc
makeOutput :: Stream -> OutputType -> IO (IO Sync, Output)
makeOutput :: Stream -> OutputType -> IO (IO Sync, Output)
makeOutput Stream
strm OutputType
otyp = do
MVar Sync
var <- IO (MVar Sync)
forall a. IO (MVar a)
newEmptyMVar
let push :: Maybe Output -> IO ()
push Maybe Output
mout = case Maybe Output
mout of
Maybe Output
Nothing -> MVar Sync -> Sync -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar Sync
var Sync
Done
Just Output
ot -> MVar Sync -> Sync -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar Sync
var (Sync -> IO ()) -> Sync -> IO ()
forall a b. (a -> b) -> a -> b
$ Output -> Sync
Cont Output
ot
pop :: IO Sync
pop = MVar Sync -> IO Sync
forall a. MVar a -> IO a
takeMVar MVar Sync
var
out :: Output
out =
Output
{ outputStream :: Stream
outputStream = Stream
strm
, outputType :: OutputType
outputType = OutputType
otyp
, outputSync :: Maybe Output -> IO ()
outputSync = Maybe Output -> IO ()
push
}
(IO Sync, Output) -> IO (IO Sync, Output)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO Sync
pop, Output
out)
makeOutputIO :: Context -> Stream -> OutputType -> Output
makeOutputIO :: Context -> Stream -> OutputType -> Output
makeOutputIO Context{TVar Bool
TVar WindowSize
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef WindowSize
IORef (Maybe WindowSize)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue Output
ThreadManager
DynamicTable
Settings
RoleInfo
Role
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe WindowSize)
myStreamId :: Context -> TVar WindowSize
peerStreamId :: Context -> IORef WindowSize
outputBufferLimit :: Context -> IORef WindowSize
outputQ :: Context -> TQueue Output
outputQStreamID :: Context -> TVar WindowSize
controlQ :: Context -> TQueue Control
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
threadManager :: Context -> ThreadManager
senderDone :: Context -> TVar Bool
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe WindowSize)
myStreamId :: TVar WindowSize
peerStreamId :: IORef WindowSize
outputBufferLimit :: IORef WindowSize
outputQ :: TQueue Output
outputQStreamID :: TVar WindowSize
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
senderDone :: TVar Bool
..} Stream
strm OutputType
otyp = Output
out
where
push :: Maybe Output -> IO ()
push Maybe Output
mout = case Maybe Output
mout of
Maybe Output
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just Output
ot -> TQueue Output -> Output -> IO ()
enqueueOutput TQueue Output
outputQ Output
ot
out :: Output
out =
Output
{ outputStream :: Stream
outputStream = Stream
strm
, outputType :: OutputType
outputType = OutputType
otyp
, outputSync :: Maybe Output -> IO ()
outputSync = Maybe Output -> IO ()
push
}
enqueueOutputSIO :: Context -> Stream -> OutputType -> IO ()
enqueueOutputSIO :: Context -> Stream -> OutputType -> IO ()
enqueueOutputSIO ctx :: Context
ctx@Context{TVar Bool
TVar WindowSize
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef WindowSize
IORef (Maybe WindowSize)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue Output
ThreadManager
DynamicTable
Settings
RoleInfo
Role
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe WindowSize)
myStreamId :: Context -> TVar WindowSize
peerStreamId :: Context -> IORef WindowSize
outputBufferLimit :: Context -> IORef WindowSize
outputQ :: Context -> TQueue Output
outputQStreamID :: Context -> TVar WindowSize
controlQ :: Context -> TQueue Control
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
threadManager :: Context -> ThreadManager
senderDone :: Context -> TVar Bool
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe WindowSize)
myStreamId :: TVar WindowSize
peerStreamId :: IORef WindowSize
outputBufferLimit :: IORef WindowSize
outputQ :: TQueue Output
outputQStreamID :: TVar WindowSize
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
senderDone :: TVar Bool
..} Stream
strm OutputType
otyp = do
let out :: Output
out = Context -> Stream -> OutputType -> Output
makeOutputIO Context
ctx Stream
strm OutputType
otyp
TQueue Output -> Output -> IO ()
enqueueOutput TQueue Output
outputQ Output
out
syncWithSender' :: Context -> IO Sync -> LoopCheck -> IO ()
syncWithSender' :: Context -> IO Sync -> LoopCheck -> IO ()
syncWithSender' Context{TVar Bool
TVar WindowSize
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef WindowSize
IORef (Maybe WindowSize)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue Output
ThreadManager
DynamicTable
Settings
RoleInfo
Role
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe WindowSize)
myStreamId :: Context -> TVar WindowSize
peerStreamId :: Context -> IORef WindowSize
outputBufferLimit :: Context -> IORef WindowSize
outputQ :: Context -> TQueue Output
outputQStreamID :: Context -> TVar WindowSize
controlQ :: Context -> TQueue Control
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
threadManager :: Context -> ThreadManager
senderDone :: Context -> TVar Bool
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe WindowSize)
myStreamId :: TVar WindowSize
peerStreamId :: IORef WindowSize
outputBufferLimit :: IORef WindowSize
outputQ :: TQueue Output
outputQStreamID :: TVar WindowSize
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
senderDone :: TVar Bool
..} IO Sync
pop LoopCheck
lc = IO ()
loop
where
loop :: IO ()
loop = do
Sync
s <- IO Sync
pop
case Sync
s of
Sync
Done -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Cont Output
newout -> do
Bool
cont <- LoopCheck -> IO Bool
checkLoop LoopCheck
lc
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
cont (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TQueue Output -> Output -> IO ()
enqueueOutput TQueue Output
outputQ Output
newout
IO ()
loop
newLoopCheck :: Stream -> Maybe (TBQueue StreamingChunk) -> IO LoopCheck
newLoopCheck :: Stream -> Maybe (TBQueue StreamingChunk) -> IO LoopCheck
newLoopCheck Stream
strm Maybe (TBQueue StreamingChunk)
mtbq = do
TVar Bool
tovar <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
False
LoopCheck -> IO LoopCheck
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (LoopCheck -> IO LoopCheck) -> LoopCheck -> IO LoopCheck
forall a b. (a -> b) -> a -> b
$
LoopCheck
{ lcTBQ :: Maybe (TBQueue StreamingChunk)
lcTBQ = Maybe (TBQueue StreamingChunk)
mtbq
, lcTimeout :: TVar Bool
lcTimeout = TVar Bool
tovar
, lcWindow :: TVar TxFlow
lcWindow = Stream -> TVar TxFlow
streamTxFlow Stream
strm
}
data LoopCheck = LoopCheck
{ LoopCheck -> Maybe (TBQueue StreamingChunk)
lcTBQ :: Maybe (TBQueue StreamingChunk)
, LoopCheck -> TVar Bool
lcTimeout :: TVar Bool
, LoopCheck -> TVar TxFlow
lcWindow :: TVar TxFlow
}
checkLoop :: LoopCheck -> IO Bool
checkLoop :: LoopCheck -> IO Bool
checkLoop LoopCheck{Maybe (TBQueue StreamingChunk)
TVar Bool
TVar TxFlow
lcTBQ :: LoopCheck -> Maybe (TBQueue StreamingChunk)
lcTimeout :: LoopCheck -> TVar Bool
lcWindow :: LoopCheck -> TVar TxFlow
lcTBQ :: Maybe (TBQueue StreamingChunk)
lcTimeout :: TVar Bool
lcWindow :: TVar TxFlow
..} = STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ do
Bool
tout <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
lcTimeout
if Bool
tout
then Bool -> STM Bool
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
else do
Maybe (TBQueue StreamingChunk) -> STM ()
forall a. Maybe (TBQueue a) -> STM ()
waitStreaming' Maybe (TBQueue StreamingChunk)
lcTBQ
TVar TxFlow -> STM ()
waitStreamWindowSizeSTM TVar TxFlow
lcWindow
Bool -> STM Bool
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
waitStreaming' :: Maybe (TBQueue a) -> STM ()
waitStreaming' :: forall a. Maybe (TBQueue a) -> STM ()
waitStreaming' Maybe (TBQueue a)
Nothing = () -> STM ()
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
waitStreaming' (Just TBQueue a
tbq) = do
Bool
isEmpty <- TBQueue a -> STM Bool
forall a. TBQueue a -> STM Bool
isEmptyTBQueue TBQueue a
tbq
Bool -> STM ()
check (Bool -> Bool
not Bool
isEmpty)
waitStreamWindowSizeSTM :: TVar TxFlow -> STM ()
waitStreamWindowSizeSTM :: TVar TxFlow -> STM ()
waitStreamWindowSizeSTM TVar TxFlow
txf = do
WindowSize
w <- TxFlow -> WindowSize
txWindowSize (TxFlow -> WindowSize) -> STM TxFlow -> STM WindowSize
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar TxFlow -> STM TxFlow
forall a. TVar a -> STM a
readTVar TVar TxFlow
txf
Bool -> STM ()
check (WindowSize
w WindowSize -> WindowSize -> Bool
forall a. Ord a => a -> a -> Bool
> WindowSize
0)