{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module Network.QUIC.Stream.Reass (
takeRecvStreamQwithSize,
putRxStreamData,
FlowCntl (..),
tryReassemble,
) where
import qualified Data.ByteString as BS
import Data.Sequence (Seq)
import qualified Data.Sequence as Seq
import Network.QUIC.Imports
import Network.QUIC.Stream.Frag
import Network.QUIC.Stream.Misc
import Network.QUIC.Stream.Queue
import qualified Network.QUIC.Stream.Skew as Skew
import Network.QUIC.Stream.Types
import Network.QUIC.Types
getEndOfStream :: Stream -> IO Bool
getEndOfStream :: Stream -> IO Bool
getEndOfStream Stream{Offset
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
MVar ()
Connection
RecvStreamQ
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef RxFlow
streamFlowTx :: Stream -> TVar TxFlow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
streamSyncFinTx :: MVar ()
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef RxFlow
streamFlowTx :: TVar TxFlow
streamConnection :: Connection
streamId :: Offset
..} = forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ RecvStreamQ -> IORef Bool
endOfStream RecvStreamQ
streamRecvQ
setEndOfStream :: Stream -> IO ()
setEndOfStream :: Stream -> IO ()
setEndOfStream Stream{Offset
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
MVar ()
Connection
RecvStreamQ
streamSyncFinTx :: MVar ()
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef RxFlow
streamFlowTx :: TVar TxFlow
streamConnection :: Connection
streamId :: Offset
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef RxFlow
streamFlowTx :: Stream -> TVar TxFlow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
..} = forall a. IORef a -> a -> IO ()
writeIORef (RecvStreamQ -> IORef Bool
endOfStream RecvStreamQ
streamRecvQ) Bool
True
readPendingData :: Stream -> IO (Maybe ByteString)
readPendingData :: Stream -> IO (Maybe StreamData)
readPendingData Stream{Offset
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
MVar ()
Connection
RecvStreamQ
streamSyncFinTx :: MVar ()
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef RxFlow
streamFlowTx :: TVar TxFlow
streamConnection :: Connection
streamId :: Offset
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef RxFlow
streamFlowTx :: Stream -> TVar TxFlow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
..} = forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ RecvStreamQ -> IORef (Maybe StreamData)
pendingData RecvStreamQ
streamRecvQ
writePendingData :: Stream -> ByteString -> IO ()
writePendingData :: Stream -> StreamData -> IO ()
writePendingData Stream{Offset
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
MVar ()
Connection
RecvStreamQ
streamSyncFinTx :: MVar ()
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef RxFlow
streamFlowTx :: TVar TxFlow
streamConnection :: Connection
streamId :: Offset
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef RxFlow
streamFlowTx :: Stream -> TVar TxFlow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
..} StreamData
bs = forall a. IORef a -> a -> IO ()
writeIORef (RecvStreamQ -> IORef (Maybe StreamData)
pendingData RecvStreamQ
streamRecvQ) forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just StreamData
bs
clearPendingData :: Stream -> IO ()
clearPendingData :: Stream -> IO ()
clearPendingData Stream{Offset
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
MVar ()
Connection
RecvStreamQ
streamSyncFinTx :: MVar ()
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef RxFlow
streamFlowTx :: TVar TxFlow
streamConnection :: Connection
streamId :: Offset
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef RxFlow
streamFlowTx :: Stream -> TVar TxFlow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
..} = forall a. IORef a -> a -> IO ()
writeIORef (RecvStreamQ -> IORef (Maybe StreamData)
pendingData RecvStreamQ
streamRecvQ) forall a. Maybe a
Nothing
takeRecvStreamQwithSize :: Stream -> Int -> IO ByteString
takeRecvStreamQwithSize :: Stream -> Offset -> IO StreamData
takeRecvStreamQwithSize Stream
strm Offset
siz0 = do
Bool
eos <- Stream -> IO Bool
getEndOfStream Stream
strm
if Bool
eos
then forall (m :: * -> *) a. Monad m => a -> m a
return StreamData
""
else do
Maybe StreamData
mb <- Stream -> IO (Maybe StreamData)
readPendingData Stream
strm
case Maybe StreamData
mb of
Maybe StreamData
Nothing -> do
StreamData
b0 <- Stream -> IO StreamData
takeRecvStreamQ Stream
strm
if StreamData
b0 forall a. Eq a => a -> a -> Bool
== StreamData
""
then do
Stream -> IO ()
setEndOfStream Stream
strm
forall (m :: * -> *) a. Monad m => a -> m a
return StreamData
""
else do
let len :: Offset
len = StreamData -> Offset
BS.length StreamData
b0
case Offset
len forall a. Ord a => a -> a -> Ordering
`compare` Offset
siz0 of
Ordering
LT -> Offset -> ([StreamData] -> [StreamData]) -> IO StreamData
tryRead (Offset
siz0 forall a. Num a => a -> a -> a
- Offset
len) (StreamData
b0 forall a. a -> [a] -> [a]
:)
Ordering
EQ -> forall (m :: * -> *) a. Monad m => a -> m a
return StreamData
b0
Ordering
GT -> do
let (StreamData
b1, StreamData
b2) = Offset -> StreamData -> (StreamData, StreamData)
BS.splitAt Offset
siz0 StreamData
b0
Stream -> StreamData -> IO ()
writePendingData Stream
strm StreamData
b2
forall (m :: * -> *) a. Monad m => a -> m a
return StreamData
b1
Just StreamData
b0 -> do
Stream -> IO ()
clearPendingData Stream
strm
let len :: Offset
len = StreamData -> Offset
BS.length StreamData
b0
Offset -> ([StreamData] -> [StreamData]) -> IO StreamData
tryRead (Offset
siz0 forall a. Num a => a -> a -> a
- Offset
len) (StreamData
b0 forall a. a -> [a] -> [a]
:)
where
tryRead :: Offset -> ([StreamData] -> [StreamData]) -> IO StreamData
tryRead Offset
siz [StreamData] -> [StreamData]
build = do
Maybe StreamData
mb <- Stream -> IO (Maybe StreamData)
tryTakeRecvStreamQ Stream
strm
case Maybe StreamData
mb of
Maybe StreamData
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ [StreamData] -> StreamData
BS.concat forall a b. (a -> b) -> a -> b
$ [StreamData] -> [StreamData]
build []
Just StreamData
b -> do
if StreamData
b forall a. Eq a => a -> a -> Bool
== StreamData
""
then do
Stream -> IO ()
setEndOfStream Stream
strm
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ [StreamData] -> StreamData
BS.concat forall a b. (a -> b) -> a -> b
$ [StreamData] -> [StreamData]
build []
else do
let len :: Offset
len = StreamData -> Offset
BS.length StreamData
b
case Offset
len forall a. Ord a => a -> a -> Ordering
`compare` Offset
siz of
Ordering
LT -> Offset -> ([StreamData] -> [StreamData]) -> IO StreamData
tryRead (Offset
siz forall a. Num a => a -> a -> a
- Offset
len) ([StreamData] -> [StreamData]
build forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamData
b forall a. a -> [a] -> [a]
:))
Ordering
EQ -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ [StreamData] -> StreamData
BS.concat forall a b. (a -> b) -> a -> b
$ [StreamData] -> [StreamData]
build [StreamData
b]
Ordering
GT -> do
let (StreamData
b1, StreamData
b2) = Offset -> StreamData -> (StreamData, StreamData)
BS.splitAt Offset
siz StreamData
b
Stream -> StreamData -> IO ()
writePendingData Stream
strm StreamData
b2
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ [StreamData] -> StreamData
BS.concat forall a b. (a -> b) -> a -> b
$ [StreamData] -> [StreamData]
build [StreamData
b1]
data FlowCntl = OverLimit | Duplicated | Reassembled
putRxStreamData :: Stream -> RxStreamData -> IO FlowCntl
putRxStreamData :: Stream -> RxStreamData -> IO FlowCntl
putRxStreamData Stream
s rx :: RxStreamData
rx@(RxStreamData StreamData
_ Offset
off Offset
len Bool
_) = do
Offset
lim <- Stream -> IO Offset
getRxMaxStreamData Stream
s
if Offset
len forall a. Num a => a -> a -> a
+ Offset
off forall a. Ord a => a -> a -> Bool
> Offset
lim
then forall (m :: * -> *) a. Monad m => a -> m a
return FlowCntl
OverLimit
else do
Bool
dup <- Stream -> RxStreamData -> (StreamData -> IO ()) -> IO () -> IO Bool
tryReassemble Stream
s RxStreamData
rx StreamData -> IO ()
put IO ()
putFin
if Bool
dup
then forall (m :: * -> *) a. Monad m => a -> m a
return FlowCntl
Duplicated
else forall (m :: * -> *) a. Monad m => a -> m a
return FlowCntl
Reassembled
where
put :: StreamData -> IO ()
put StreamData
"" = forall (m :: * -> *) a. Monad m => a -> m a
return ()
put StreamData
d = do
Stream -> Offset -> IO ()
addRxStreamData Stream
s forall a b. (a -> b) -> a -> b
$ StreamData -> Offset
BS.length StreamData
d
Stream -> StreamData -> IO ()
putRecvStreamQ Stream
s StreamData
d
putFin :: IO ()
putFin = Stream -> StreamData -> IO ()
putRecvStreamQ Stream
s StreamData
""
tryReassemble
:: Stream -> RxStreamData -> (StreamData -> IO ()) -> IO () -> IO Bool
tryReassemble :: Stream -> RxStreamData -> (StreamData -> IO ()) -> IO () -> IO Bool
tryReassemble Stream{} (RxStreamData StreamData
"" Offset
_ Offset
_ Bool
False) StreamData -> IO ()
_ IO ()
_ = forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
tryReassemble Stream{Offset
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
MVar ()
Connection
RecvStreamQ
streamSyncFinTx :: MVar ()
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef RxFlow
streamFlowTx :: TVar TxFlow
streamConnection :: Connection
streamId :: Offset
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef RxFlow
streamFlowTx :: Stream -> TVar TxFlow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
..} x :: RxStreamData
x@(RxStreamData StreamData
"" Offset
off Offset
_ Bool
True) StreamData -> IO ()
_ IO ()
putFin = do
si0 :: StreamState
si0@(StreamState Offset
off0 Bool
fin0) <- forall a. IORef a -> IO a
readIORef IORef StreamState
streamStateRx
let si1 :: StreamState
si1 = StreamState
si0{streamFin :: Bool
streamFin = Bool
True}
if Bool
fin0
then do
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
else case Offset
off forall a. Ord a => a -> a -> Ordering
`compare` Offset
off0 of
Ordering
LT -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Ordering
EQ -> do
forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1
IO ()
putFin
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Ordering
GT -> do
forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef'' IORef (Skew RxStreamData)
streamReass (forall a. Frag a => a -> Skew a -> Skew a
Skew.insert RxStreamData
x)
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
tryReassemble Stream{Offset
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
MVar ()
Connection
RecvStreamQ
streamSyncFinTx :: MVar ()
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef RxFlow
streamFlowTx :: TVar TxFlow
streamConnection :: Connection
streamId :: Offset
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef RxFlow
streamFlowTx :: Stream -> TVar TxFlow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
..} x :: RxStreamData
x@(RxStreamData StreamData
dat Offset
off Offset
len Bool
False) StreamData -> IO ()
put IO ()
putFin = do
si0 :: StreamState
si0@(StreamState Offset
off0 Bool
_) <- forall a. IORef a -> IO a
readIORef IORef StreamState
streamStateRx
case Offset
off forall a. Ord a => a -> a -> Ordering
`compare` Offset
off0 of
Ordering
LT -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Ordering
EQ -> do
StreamData -> IO ()
put StreamData
dat
StreamState -> Offset -> IO ()
loop StreamState
si0 (Offset
off0 forall a. Num a => a -> a -> a
+ Offset
len)
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Ordering
GT -> do
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef'' IORef (Skew RxStreamData)
streamReass (forall a. Frag a => a -> Skew a -> Skew a
Skew.insert RxStreamData
x)
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
where
loop :: StreamState -> Offset -> IO ()
loop StreamState
si0 Offset
xff = do
Maybe (Seq RxStreamData)
mrxs <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (Skew RxStreamData)
streamReass (forall a. Frag a => Offset -> Skew a -> (Skew a, Maybe (Seq a))
Skew.deleteMinIf Offset
xff)
case Maybe (Seq RxStreamData)
mrxs of
Maybe (Seq RxStreamData)
Nothing -> forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si0{streamOffset :: Offset
streamOffset = Offset
xff}
Just Seq RxStreamData
rxs -> do
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (StreamData -> IO ()
put forall b c a. (b -> c) -> (a -> b) -> a -> c
. RxStreamData -> StreamData
rxstrmData) Seq RxStreamData
rxs
let xff1 :: Offset
xff1 = forall a. Frag a => a -> Offset
nextOff Seq RxStreamData
rxs
if Seq RxStreamData -> Bool
hasFin Seq RxStreamData
rxs
then do
IO ()
putFin
else do
StreamState -> Offset -> IO ()
loop StreamState
si0 Offset
xff1
tryReassemble Stream{Offset
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
MVar ()
Connection
RecvStreamQ
streamSyncFinTx :: MVar ()
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef RxFlow
streamFlowTx :: TVar TxFlow
streamConnection :: Connection
streamId :: Offset
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef RxFlow
streamFlowTx :: Stream -> TVar TxFlow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
..} x :: RxStreamData
x@(RxStreamData StreamData
dat Offset
off Offset
len Bool
True) StreamData -> IO ()
put IO ()
putFin = do
si0 :: StreamState
si0@(StreamState Offset
off0 Bool
fin0) <- forall a. IORef a -> IO a
readIORef IORef StreamState
streamStateRx
let si1 :: StreamState
si1 = StreamState
si0{streamFin :: Bool
streamFin = Bool
True}
if Bool
fin0
then do
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
else case Offset
off forall a. Ord a => a -> a -> Ordering
`compare` Offset
off0 of
Ordering
LT -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Ordering
EQ -> do
let off1 :: Offset
off1 = Offset
off0 forall a. Num a => a -> a -> a
+ Offset
len
forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1{streamOffset :: Offset
streamOffset = Offset
off1}
StreamData -> IO ()
put StreamData
dat
IO ()
putFin
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Ordering
GT -> do
forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef'' IORef (Skew RxStreamData)
streamReass (forall a. Frag a => a -> Skew a -> Skew a
Skew.insert RxStreamData
x)
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
hasFin :: Seq RxStreamData -> Bool
hasFin :: Seq RxStreamData -> Bool
hasFin Seq RxStreamData
s = case forall a. Seq a -> ViewR a
Seq.viewr Seq RxStreamData
s of
ViewR RxStreamData
Seq.EmptyR -> Bool
False
Seq RxStreamData
_ Seq.:> RxStreamData
x -> RxStreamData -> Bool
rxstrmFin RxStreamData
x