{-# LANGUAGE RecordWildCards #-}

module Network.HTTP2.H2.Sync (
    LoopCheck (..),
    newLoopCheck,
    syncWithSender,
    syncWithSender',
    makeOutput,
    makeOutputIO,
    enqueueOutputSIO,
) where

import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import Network.Control
import Network.HTTP.Semantics.IO

import Network.HTTP2.H2.Context
import Network.HTTP2.H2.Queue
import Network.HTTP2.H2.Types

syncWithSender
    :: Context
    -> Stream
    -> OutputType
    -> LoopCheck
    -> IO ()
syncWithSender :: Context -> Stream -> OutputType -> LoopCheck -> IO ()
syncWithSender ctx :: Context
ctx@Context{TVar Bool
TVar WindowSize
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef WindowSize
IORef (Maybe WindowSize)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue Output
ThreadManager
DynamicTable
Settings
RoleInfo
Role
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe WindowSize)
myStreamId :: TVar WindowSize
peerStreamId :: IORef WindowSize
outputBufferLimit :: IORef WindowSize
outputQ :: TQueue Output
outputQStreamID :: TVar WindowSize
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
senderDone :: TVar Bool
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe WindowSize)
myStreamId :: Context -> TVar WindowSize
peerStreamId :: Context -> IORef WindowSize
outputBufferLimit :: Context -> IORef WindowSize
outputQ :: Context -> TQueue Output
outputQStreamID :: Context -> TVar WindowSize
controlQ :: Context -> TQueue Control
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
threadManager :: Context -> ThreadManager
senderDone :: Context -> TVar Bool
..} Stream
strm OutputType
otyp LoopCheck
lc = do
    (IO Sync
pop, Output
out) <- Stream -> OutputType -> IO (IO Sync, Output)
makeOutput Stream
strm OutputType
otyp
    TQueue Output -> Output -> IO ()
enqueueOutput TQueue Output
outputQ Output
out
    Context -> IO Sync -> LoopCheck -> IO ()
syncWithSender' Context
ctx IO Sync
pop LoopCheck
lc

makeOutput :: Stream -> OutputType -> IO (IO Sync, Output)
makeOutput :: Stream -> OutputType -> IO (IO Sync, Output)
makeOutput Stream
strm OutputType
otyp = do
    MVar Sync
var <- IO (MVar Sync)
forall a. IO (MVar a)
newEmptyMVar
    let push :: Maybe Output -> IO ()
push Maybe Output
mout = case Maybe Output
mout of
            Maybe Output
Nothing -> MVar Sync -> Sync -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar Sync
var Sync
Done
            Just Output
ot -> MVar Sync -> Sync -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar Sync
var (Sync -> IO ()) -> Sync -> IO ()
forall a b. (a -> b) -> a -> b
$ Output -> Sync
Cont Output
ot
        pop :: IO Sync
pop = MVar Sync -> IO Sync
forall a. MVar a -> IO a
takeMVar MVar Sync
var
        out :: Output
out =
            Output
                { outputStream :: Stream
outputStream = Stream
strm
                , outputType :: OutputType
outputType = OutputType
otyp
                , outputSync :: Maybe Output -> IO ()
outputSync = Maybe Output -> IO ()
push
                }
    (IO Sync, Output) -> IO (IO Sync, Output)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO Sync
pop, Output
out)

makeOutputIO :: Context -> Stream -> OutputType -> Output
makeOutputIO :: Context -> Stream -> OutputType -> Output
makeOutputIO Context{TVar Bool
TVar WindowSize
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef WindowSize
IORef (Maybe WindowSize)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue Output
ThreadManager
DynamicTable
Settings
RoleInfo
Role
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe WindowSize)
myStreamId :: Context -> TVar WindowSize
peerStreamId :: Context -> IORef WindowSize
outputBufferLimit :: Context -> IORef WindowSize
outputQ :: Context -> TQueue Output
outputQStreamID :: Context -> TVar WindowSize
controlQ :: Context -> TQueue Control
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
threadManager :: Context -> ThreadManager
senderDone :: Context -> TVar Bool
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe WindowSize)
myStreamId :: TVar WindowSize
peerStreamId :: IORef WindowSize
outputBufferLimit :: IORef WindowSize
outputQ :: TQueue Output
outputQStreamID :: TVar WindowSize
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
senderDone :: TVar Bool
..} Stream
strm OutputType
otyp = Output
out
  where
    push :: Maybe Output -> IO ()
push Maybe Output
mout = case Maybe Output
mout of
        Maybe Output
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        -- Sender enqueues output again ignoring
        -- the stream TX window.
        Just Output
ot -> TQueue Output -> Output -> IO ()
enqueueOutput TQueue Output
outputQ Output
ot
    out :: Output
out =
        Output
            { outputStream :: Stream
outputStream = Stream
strm
            , outputType :: OutputType
outputType = OutputType
otyp
            , outputSync :: Maybe Output -> IO ()
outputSync = Maybe Output -> IO ()
push
            }

enqueueOutputSIO :: Context -> Stream -> OutputType -> IO ()
enqueueOutputSIO :: Context -> Stream -> OutputType -> IO ()
enqueueOutputSIO ctx :: Context
ctx@Context{TVar Bool
TVar WindowSize
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef WindowSize
IORef (Maybe WindowSize)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue Output
ThreadManager
DynamicTable
Settings
RoleInfo
Role
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe WindowSize)
myStreamId :: Context -> TVar WindowSize
peerStreamId :: Context -> IORef WindowSize
outputBufferLimit :: Context -> IORef WindowSize
outputQ :: Context -> TQueue Output
outputQStreamID :: Context -> TVar WindowSize
controlQ :: Context -> TQueue Control
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
threadManager :: Context -> ThreadManager
senderDone :: Context -> TVar Bool
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe WindowSize)
myStreamId :: TVar WindowSize
peerStreamId :: IORef WindowSize
outputBufferLimit :: IORef WindowSize
outputQ :: TQueue Output
outputQStreamID :: TVar WindowSize
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
senderDone :: TVar Bool
..} Stream
strm OutputType
otyp = do
    let out :: Output
out = Context -> Stream -> OutputType -> Output
makeOutputIO Context
ctx Stream
strm OutputType
otyp
    TQueue Output -> Output -> IO ()
enqueueOutput TQueue Output
outputQ Output
out

syncWithSender' :: Context -> IO Sync -> LoopCheck -> IO ()
syncWithSender' :: Context -> IO Sync -> LoopCheck -> IO ()
syncWithSender' Context{TVar Bool
TVar WindowSize
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef WindowSize
IORef (Maybe WindowSize)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue Output
ThreadManager
DynamicTable
Settings
RoleInfo
Role
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe WindowSize)
myStreamId :: Context -> TVar WindowSize
peerStreamId :: Context -> IORef WindowSize
outputBufferLimit :: Context -> IORef WindowSize
outputQ :: Context -> TQueue Output
outputQStreamID :: Context -> TVar WindowSize
controlQ :: Context -> TQueue Control
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
threadManager :: Context -> ThreadManager
senderDone :: Context -> TVar Bool
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe WindowSize)
myStreamId :: TVar WindowSize
peerStreamId :: IORef WindowSize
outputBufferLimit :: IORef WindowSize
outputQ :: TQueue Output
outputQStreamID :: TVar WindowSize
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
senderDone :: TVar Bool
..} IO Sync
pop LoopCheck
lc = IO ()
loop
  where
    loop :: IO ()
loop = do
        Sync
s <- IO Sync
pop
        case Sync
s of
            Sync
Done -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Cont Output
newout -> do
                Bool
cont <- LoopCheck -> IO Bool
checkLoop LoopCheck
lc
                Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
cont (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                    -- This is justified by the precondition above
                    TQueue Output -> Output -> IO ()
enqueueOutput TQueue Output
outputQ Output
newout
                    IO ()
loop

newLoopCheck :: Stream -> Maybe (TBQueue StreamingChunk) -> IO LoopCheck
newLoopCheck :: Stream -> Maybe (TBQueue StreamingChunk) -> IO LoopCheck
newLoopCheck Stream
strm Maybe (TBQueue StreamingChunk)
mtbq = do
    TVar Bool
tovar <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
False
    LoopCheck -> IO LoopCheck
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (LoopCheck -> IO LoopCheck) -> LoopCheck -> IO LoopCheck
forall a b. (a -> b) -> a -> b
$
        LoopCheck
            { lcTBQ :: Maybe (TBQueue StreamingChunk)
lcTBQ = Maybe (TBQueue StreamingChunk)
mtbq
            , lcTimeout :: TVar Bool
lcTimeout = TVar Bool
tovar
            , lcWindow :: TVar TxFlow
lcWindow = Stream -> TVar TxFlow
streamTxFlow Stream
strm
            }

data LoopCheck = LoopCheck
    { LoopCheck -> Maybe (TBQueue StreamingChunk)
lcTBQ :: Maybe (TBQueue StreamingChunk)
    , LoopCheck -> TVar Bool
lcTimeout :: TVar Bool
    , LoopCheck -> TVar TxFlow
lcWindow :: TVar TxFlow
    }

checkLoop :: LoopCheck -> IO Bool
checkLoop :: LoopCheck -> IO Bool
checkLoop LoopCheck{Maybe (TBQueue StreamingChunk)
TVar Bool
TVar TxFlow
lcTBQ :: LoopCheck -> Maybe (TBQueue StreamingChunk)
lcTimeout :: LoopCheck -> TVar Bool
lcWindow :: LoopCheck -> TVar TxFlow
lcTBQ :: Maybe (TBQueue StreamingChunk)
lcTimeout :: TVar Bool
lcWindow :: TVar TxFlow
..} = STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ do
    Bool
tout <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
lcTimeout
    if Bool
tout
        then Bool -> STM Bool
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        else do
            Maybe (TBQueue StreamingChunk) -> STM ()
forall a. Maybe (TBQueue a) -> STM ()
waitStreaming' Maybe (TBQueue StreamingChunk)
lcTBQ
            TVar TxFlow -> STM ()
waitStreamWindowSizeSTM TVar TxFlow
lcWindow
            Bool -> STM Bool
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True

waitStreaming' :: Maybe (TBQueue a) -> STM ()
waitStreaming' :: forall a. Maybe (TBQueue a) -> STM ()
waitStreaming' Maybe (TBQueue a)
Nothing = () -> STM ()
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
waitStreaming' (Just TBQueue a
tbq) = do
    Bool
isEmpty <- TBQueue a -> STM Bool
forall a. TBQueue a -> STM Bool
isEmptyTBQueue TBQueue a
tbq
    Bool -> STM ()
check (Bool -> Bool
not Bool
isEmpty)

waitStreamWindowSizeSTM :: TVar TxFlow -> STM ()
waitStreamWindowSizeSTM :: TVar TxFlow -> STM ()
waitStreamWindowSizeSTM TVar TxFlow
txf = do
    WindowSize
w <- TxFlow -> WindowSize
txWindowSize (TxFlow -> WindowSize) -> STM TxFlow -> STM WindowSize
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar TxFlow -> STM TxFlow
forall a. TVar a -> STM a
readTVar TVar TxFlow
txf
    Bool -> STM ()
check (WindowSize
w WindowSize -> WindowSize -> Bool
forall a. Ord a => a -> a -> Bool
> WindowSize
0)