{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}

module Network.QUIC.Stream.Misc (
    getTxStreamOffset,
    isTxStreamClosed,
    setTxStreamClosed,
    getRxStreamOffset,
    isRxStreamClosed,
    setRxStreamClosed,
    --
    readStreamFlowTx,
    addTxStreamData,
    setTxMaxStreamData,
    --
    getRxMaxStreamData,
    addRxStreamData,
    updateStreamFlowRx,
) where

import Network.Control
import UnliftIO.STM

import Network.QUIC.Imports
import Network.QUIC.Stream.Queue
import Network.QUIC.Stream.Types

----------------------------------------------------------------

getTxStreamOffset :: Stream -> Int -> IO Offset
getTxStreamOffset :: Stream -> Int -> IO Int
getTxStreamOffset Stream{Int
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamId :: Int
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
streamId :: Stream -> Int
streamConnection :: Stream -> Connection
streamFlowTx :: Stream -> TVar TxFlow
streamFlowRx :: Stream -> IORef RxFlow
streamStateTx :: Stream -> IORef StreamState
streamStateRx :: Stream -> IORef StreamState
streamRecvQ :: Stream -> RecvStreamQ
streamReass :: Stream -> IORef (Skew RxStreamData)
streamSyncFinTx :: Stream -> MVar ()
..} Int
len = IORef StreamState -> (StreamState -> (StreamState, Int)) -> IO Int
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef StreamState
streamStateTx StreamState -> (StreamState, Int)
get
  where
    get :: StreamState -> (StreamState, Int)
get (StreamState Int
off Fin
fin) = (Int -> Fin -> StreamState
StreamState (Int
off Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
len) Fin
fin, Int
off)

isTxStreamClosed :: Stream -> IO Bool
isTxStreamClosed :: Stream -> IO Fin
isTxStreamClosed Stream{Int
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamId :: Stream -> Int
streamConnection :: Stream -> Connection
streamFlowTx :: Stream -> TVar TxFlow
streamFlowRx :: Stream -> IORef RxFlow
streamStateTx :: Stream -> IORef StreamState
streamStateRx :: Stream -> IORef StreamState
streamRecvQ :: Stream -> RecvStreamQ
streamReass :: Stream -> IORef (Skew RxStreamData)
streamSyncFinTx :: Stream -> MVar ()
streamId :: Int
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} = do
    StreamState Int
_ Fin
fin <- IORef StreamState -> IO StreamState
forall a. IORef a -> IO a
readIORef IORef StreamState
streamStateTx
    Fin -> IO Fin
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Fin
fin

setTxStreamClosed :: Stream -> IO ()
setTxStreamClosed :: Stream -> IO ()
setTxStreamClosed Stream{Int
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamId :: Stream -> Int
streamConnection :: Stream -> Connection
streamFlowTx :: Stream -> TVar TxFlow
streamFlowRx :: Stream -> IORef RxFlow
streamStateTx :: Stream -> IORef StreamState
streamStateRx :: Stream -> IORef StreamState
streamRecvQ :: Stream -> RecvStreamQ
streamReass :: Stream -> IORef (Skew RxStreamData)
streamSyncFinTx :: Stream -> MVar ()
streamId :: Int
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} = IORef StreamState -> (StreamState -> StreamState) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef'' IORef StreamState
streamStateTx StreamState -> StreamState
set
  where
    set :: StreamState -> StreamState
set (StreamState Int
off Fin
_) = Int -> Fin -> StreamState
StreamState Int
off Fin
True

----------------------------------------------------------------

getRxStreamOffset :: Stream -> Int -> IO Offset
getRxStreamOffset :: Stream -> Int -> IO Int
getRxStreamOffset Stream{Int
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamId :: Stream -> Int
streamConnection :: Stream -> Connection
streamFlowTx :: Stream -> TVar TxFlow
streamFlowRx :: Stream -> IORef RxFlow
streamStateTx :: Stream -> IORef StreamState
streamStateRx :: Stream -> IORef StreamState
streamRecvQ :: Stream -> RecvStreamQ
streamReass :: Stream -> IORef (Skew RxStreamData)
streamSyncFinTx :: Stream -> MVar ()
streamId :: Int
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} Int
len = IORef StreamState -> (StreamState -> (StreamState, Int)) -> IO Int
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef StreamState
streamStateRx StreamState -> (StreamState, Int)
get
  where
    get :: StreamState -> (StreamState, Int)
get (StreamState Int
off Fin
fin) = (Int -> Fin -> StreamState
StreamState (Int
off Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
len) Fin
fin, Int
off)

isRxStreamClosed :: Stream -> IO Bool
isRxStreamClosed :: Stream -> IO Fin
isRxStreamClosed Stream{Int
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamId :: Stream -> Int
streamConnection :: Stream -> Connection
streamFlowTx :: Stream -> TVar TxFlow
streamFlowRx :: Stream -> IORef RxFlow
streamStateTx :: Stream -> IORef StreamState
streamStateRx :: Stream -> IORef StreamState
streamRecvQ :: Stream -> RecvStreamQ
streamReass :: Stream -> IORef (Skew RxStreamData)
streamSyncFinTx :: Stream -> MVar ()
streamId :: Int
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} = do
    StreamState Int
_ Fin
fin <- IORef StreamState -> IO StreamState
forall a. IORef a -> IO a
readIORef IORef StreamState
streamStateRx
    Fin -> IO Fin
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Fin
fin

setRxStreamClosed :: Stream -> IO ()
setRxStreamClosed :: Stream -> IO ()
setRxStreamClosed strm :: Stream
strm@Stream{Int
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamId :: Stream -> Int
streamConnection :: Stream -> Connection
streamFlowTx :: Stream -> TVar TxFlow
streamFlowRx :: Stream -> IORef RxFlow
streamStateTx :: Stream -> IORef StreamState
streamStateRx :: Stream -> IORef StreamState
streamRecvQ :: Stream -> RecvStreamQ
streamReass :: Stream -> IORef (Skew RxStreamData)
streamSyncFinTx :: Stream -> MVar ()
streamId :: Int
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} = do
    IORef StreamState -> (StreamState -> StreamState) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef'' IORef StreamState
streamStateRx StreamState -> StreamState
set
    -- Sending a pseudo FIN so that recvStream doesn't block.
    -- See https://github.com/kazu-yamamoto/quic/pull/54
    Stream -> ByteString -> IO ()
putRecvStreamQ Stream
strm ByteString
""
  where
    set :: StreamState -> StreamState
set (StreamState Int
off Fin
_) = Int -> Fin -> StreamState
StreamState Int
off Fin
True

----------------------------------------------------------------

readStreamFlowTx :: Stream -> STM TxFlow
readStreamFlowTx :: Stream -> STM TxFlow
readStreamFlowTx Stream{Int
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamId :: Stream -> Int
streamConnection :: Stream -> Connection
streamFlowTx :: Stream -> TVar TxFlow
streamFlowRx :: Stream -> IORef RxFlow
streamStateTx :: Stream -> IORef StreamState
streamStateRx :: Stream -> IORef StreamState
streamRecvQ :: Stream -> RecvStreamQ
streamReass :: Stream -> IORef (Skew RxStreamData)
streamSyncFinTx :: Stream -> MVar ()
streamId :: Int
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} = TVar TxFlow -> STM TxFlow
forall a. TVar a -> STM a
readTVar TVar TxFlow
streamFlowTx

----------------------------------------------------------------

addTxStreamData :: Stream -> Int -> STM ()
addTxStreamData :: Stream -> Int -> STM ()
addTxStreamData Stream{Int
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamId :: Stream -> Int
streamConnection :: Stream -> Connection
streamFlowTx :: Stream -> TVar TxFlow
streamFlowRx :: Stream -> IORef RxFlow
streamStateTx :: Stream -> IORef StreamState
streamStateRx :: Stream -> IORef StreamState
streamRecvQ :: Stream -> RecvStreamQ
streamReass :: Stream -> IORef (Skew RxStreamData)
streamSyncFinTx :: Stream -> MVar ()
streamId :: Int
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} Int
n = TVar TxFlow -> (TxFlow -> TxFlow) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar TxFlow
streamFlowTx TxFlow -> TxFlow
add
  where
    add :: TxFlow -> TxFlow
add TxFlow
flow = TxFlow
flow{txfSent = txfSent flow + n}

setTxMaxStreamData :: Stream -> Int -> IO ()
setTxMaxStreamData :: Stream -> Int -> IO ()
setTxMaxStreamData Stream{Int
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamId :: Stream -> Int
streamConnection :: Stream -> Connection
streamFlowTx :: Stream -> TVar TxFlow
streamFlowRx :: Stream -> IORef RxFlow
streamStateTx :: Stream -> IORef StreamState
streamStateRx :: Stream -> IORef StreamState
streamRecvQ :: Stream -> RecvStreamQ
streamReass :: Stream -> IORef (Skew RxStreamData)
streamSyncFinTx :: Stream -> MVar ()
streamId :: Int
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} Int
n = STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar TxFlow -> (TxFlow -> TxFlow) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar TxFlow
streamFlowTx TxFlow -> TxFlow
set
  where
    set :: TxFlow -> TxFlow
set TxFlow
flow
        | TxFlow -> Int
txfLimit TxFlow
flow Int -> Int -> Fin
forall a. Ord a => a -> a -> Fin
< Int
n = TxFlow
flow{txfLimit = n}
        | Fin
otherwise = TxFlow
flow

----------------------------------------------------------------

getRxMaxStreamData :: Stream -> IO Int
getRxMaxStreamData :: Stream -> IO Int
getRxMaxStreamData Stream{Int
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamId :: Stream -> Int
streamConnection :: Stream -> Connection
streamFlowTx :: Stream -> TVar TxFlow
streamFlowRx :: Stream -> IORef RxFlow
streamStateTx :: Stream -> IORef StreamState
streamStateRx :: Stream -> IORef StreamState
streamRecvQ :: Stream -> RecvStreamQ
streamReass :: Stream -> IORef (Skew RxStreamData)
streamSyncFinTx :: Stream -> MVar ()
streamId :: Int
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} = RxFlow -> Int
rxfLimit (RxFlow -> Int) -> IO RxFlow -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef RxFlow -> IO RxFlow
forall a. IORef a -> IO a
readIORef IORef RxFlow
streamFlowRx

addRxStreamData :: Stream -> Int -> IO ()
addRxStreamData :: Stream -> Int -> IO ()
addRxStreamData Stream{Int
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamId :: Stream -> Int
streamConnection :: Stream -> Connection
streamFlowTx :: Stream -> TVar TxFlow
streamFlowRx :: Stream -> IORef RxFlow
streamStateTx :: Stream -> IORef StreamState
streamStateRx :: Stream -> IORef StreamState
streamRecvQ :: Stream -> RecvStreamQ
streamReass :: Stream -> IORef (Skew RxStreamData)
streamSyncFinTx :: Stream -> MVar ()
streamId :: Int
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} Int
n = IORef RxFlow -> (RxFlow -> RxFlow) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef'' IORef RxFlow
streamFlowRx RxFlow -> RxFlow
add
  where
    add :: RxFlow -> RxFlow
add RxFlow
flow = RxFlow
flow{rxfReceived = rxfReceived flow + n}

updateStreamFlowRx :: Stream -> Int -> IO (Maybe Int)
updateStreamFlowRx :: Stream -> Int -> IO (Maybe Int)
updateStreamFlowRx Stream{Int
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamId :: Stream -> Int
streamConnection :: Stream -> Connection
streamFlowTx :: Stream -> TVar TxFlow
streamFlowRx :: Stream -> IORef RxFlow
streamStateTx :: Stream -> IORef StreamState
streamStateRx :: Stream -> IORef StreamState
streamRecvQ :: Stream -> RecvStreamQ
streamReass :: Stream -> IORef (Skew RxStreamData)
streamSyncFinTx :: Stream -> MVar ()
streamId :: Int
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} Int
consumed =
    IORef RxFlow -> (RxFlow -> (RxFlow, Maybe Int)) -> IO (Maybe Int)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef RxFlow
streamFlowRx ((RxFlow -> (RxFlow, Maybe Int)) -> IO (Maybe Int))
-> (RxFlow -> (RxFlow, Maybe Int)) -> IO (Maybe Int)
forall a b. (a -> b) -> a -> b
$ Int -> FlowControlType -> RxFlow -> (RxFlow, Maybe Int)
maybeOpenRxWindow Int
consumed FlowControlType
FCTMaxData

{- cannot be used due to reassemble.
checkRxMaxStreamData :: Stream -> Int -> IO Bool
checkRxMaxStreamData Stream{..} len =
    atomicModifyIORef' streamFlowRx $ checkRxLimit len
-}