{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}

module Network.QUIC.Stream.Reass (
    takeRecvStreamQwithSize,
    putRxStreamData,
    FlowCntl (..),
    tryReassemble,
) where

import qualified Data.ByteString as BS
import Data.Sequence (Seq)
import qualified Data.Sequence as Seq

import Network.QUIC.Imports

-- import Network.QUIC.Logger
import Network.QUIC.Stream.Frag
import Network.QUIC.Stream.Misc
import Network.QUIC.Stream.Queue
import qualified Network.QUIC.Stream.Skew as Skew
import Network.QUIC.Stream.Types
import Network.QUIC.Types

----------------------------------------------------------------

getEndOfStream :: Stream -> IO Bool
getEndOfStream :: Stream -> IO Bool
getEndOfStream Stream{Offset
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
MVar ()
Connection
RecvStreamQ
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef RxFlow
streamFlowTx :: Stream -> TVar TxFlow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
streamSyncFinTx :: MVar ()
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef RxFlow
streamFlowTx :: TVar TxFlow
streamConnection :: Connection
streamId :: Offset
..} = forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ RecvStreamQ -> IORef Bool
endOfStream RecvStreamQ
streamRecvQ

setEndOfStream :: Stream -> IO ()
setEndOfStream :: Stream -> IO ()
setEndOfStream Stream{Offset
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
MVar ()
Connection
RecvStreamQ
streamSyncFinTx :: MVar ()
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef RxFlow
streamFlowTx :: TVar TxFlow
streamConnection :: Connection
streamId :: Offset
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef RxFlow
streamFlowTx :: Stream -> TVar TxFlow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
..} = forall a. IORef a -> a -> IO ()
writeIORef (RecvStreamQ -> IORef Bool
endOfStream RecvStreamQ
streamRecvQ) Bool
True

readPendingData :: Stream -> IO (Maybe ByteString)
readPendingData :: Stream -> IO (Maybe StreamData)
readPendingData Stream{Offset
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
MVar ()
Connection
RecvStreamQ
streamSyncFinTx :: MVar ()
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef RxFlow
streamFlowTx :: TVar TxFlow
streamConnection :: Connection
streamId :: Offset
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef RxFlow
streamFlowTx :: Stream -> TVar TxFlow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
..} = forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ RecvStreamQ -> IORef (Maybe StreamData)
pendingData RecvStreamQ
streamRecvQ

writePendingData :: Stream -> ByteString -> IO ()
writePendingData :: Stream -> StreamData -> IO ()
writePendingData Stream{Offset
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
MVar ()
Connection
RecvStreamQ
streamSyncFinTx :: MVar ()
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef RxFlow
streamFlowTx :: TVar TxFlow
streamConnection :: Connection
streamId :: Offset
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef RxFlow
streamFlowTx :: Stream -> TVar TxFlow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
..} StreamData
bs = forall a. IORef a -> a -> IO ()
writeIORef (RecvStreamQ -> IORef (Maybe StreamData)
pendingData RecvStreamQ
streamRecvQ) forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just StreamData
bs

clearPendingData :: Stream -> IO ()
clearPendingData :: Stream -> IO ()
clearPendingData Stream{Offset
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
MVar ()
Connection
RecvStreamQ
streamSyncFinTx :: MVar ()
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef RxFlow
streamFlowTx :: TVar TxFlow
streamConnection :: Connection
streamId :: Offset
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef RxFlow
streamFlowTx :: Stream -> TVar TxFlow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
..} = forall a. IORef a -> a -> IO ()
writeIORef (RecvStreamQ -> IORef (Maybe StreamData)
pendingData RecvStreamQ
streamRecvQ) forall a. Maybe a
Nothing

----------------------------------------------------------------

takeRecvStreamQwithSize :: Stream -> Int -> IO ByteString
takeRecvStreamQwithSize :: Stream -> Offset -> IO StreamData
takeRecvStreamQwithSize Stream
strm Offset
siz0 = do
    Bool
eos <- Stream -> IO Bool
getEndOfStream Stream
strm
    if Bool
eos
        then forall (m :: * -> *) a. Monad m => a -> m a
return StreamData
""
        else do
            Maybe StreamData
mb <- Stream -> IO (Maybe StreamData)
readPendingData Stream
strm
            case Maybe StreamData
mb of
                Maybe StreamData
Nothing -> do
                    StreamData
b0 <- Stream -> IO StreamData
takeRecvStreamQ Stream
strm
                    if StreamData
b0 forall a. Eq a => a -> a -> Bool
== StreamData
""
                        then do
                            Stream -> IO ()
setEndOfStream Stream
strm
                            forall (m :: * -> *) a. Monad m => a -> m a
return StreamData
""
                        else do
                            let len :: Offset
len = StreamData -> Offset
BS.length StreamData
b0
                            case Offset
len forall a. Ord a => a -> a -> Ordering
`compare` Offset
siz0 of
                                Ordering
LT -> Offset -> ([StreamData] -> [StreamData]) -> IO StreamData
tryRead (Offset
siz0 forall a. Num a => a -> a -> a
- Offset
len) (StreamData
b0 forall a. a -> [a] -> [a]
:)
                                Ordering
EQ -> forall (m :: * -> *) a. Monad m => a -> m a
return StreamData
b0
                                Ordering
GT -> do
                                    let (StreamData
b1, StreamData
b2) = Offset -> StreamData -> (StreamData, StreamData)
BS.splitAt Offset
siz0 StreamData
b0
                                    Stream -> StreamData -> IO ()
writePendingData Stream
strm StreamData
b2
                                    forall (m :: * -> *) a. Monad m => a -> m a
return StreamData
b1
                Just StreamData
b0 -> do
                    Stream -> IO ()
clearPendingData Stream
strm
                    let len :: Offset
len = StreamData -> Offset
BS.length StreamData
b0
                    Offset -> ([StreamData] -> [StreamData]) -> IO StreamData
tryRead (Offset
siz0 forall a. Num a => a -> a -> a
- Offset
len) (StreamData
b0 forall a. a -> [a] -> [a]
:)
  where
    tryRead :: Offset -> ([StreamData] -> [StreamData]) -> IO StreamData
tryRead Offset
siz [StreamData] -> [StreamData]
build = do
        Maybe StreamData
mb <- Stream -> IO (Maybe StreamData)
tryTakeRecvStreamQ Stream
strm
        case Maybe StreamData
mb of
            Maybe StreamData
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ [StreamData] -> StreamData
BS.concat forall a b. (a -> b) -> a -> b
$ [StreamData] -> [StreamData]
build []
            Just StreamData
b -> do
                if StreamData
b forall a. Eq a => a -> a -> Bool
== StreamData
""
                    then do
                        Stream -> IO ()
setEndOfStream Stream
strm
                        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ [StreamData] -> StreamData
BS.concat forall a b. (a -> b) -> a -> b
$ [StreamData] -> [StreamData]
build []
                    else do
                        let len :: Offset
len = StreamData -> Offset
BS.length StreamData
b
                        case Offset
len forall a. Ord a => a -> a -> Ordering
`compare` Offset
siz of
                            Ordering
LT -> Offset -> ([StreamData] -> [StreamData]) -> IO StreamData
tryRead (Offset
siz forall a. Num a => a -> a -> a
- Offset
len) ([StreamData] -> [StreamData]
build forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamData
b forall a. a -> [a] -> [a]
:))
                            Ordering
EQ -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ [StreamData] -> StreamData
BS.concat forall a b. (a -> b) -> a -> b
$ [StreamData] -> [StreamData]
build [StreamData
b]
                            Ordering
GT -> do
                                let (StreamData
b1, StreamData
b2) = Offset -> StreamData -> (StreamData, StreamData)
BS.splitAt Offset
siz StreamData
b
                                Stream -> StreamData -> IO ()
writePendingData Stream
strm StreamData
b2
                                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ [StreamData] -> StreamData
BS.concat forall a b. (a -> b) -> a -> b
$ [StreamData] -> [StreamData]
build [StreamData
b1]

----------------------------------------------------------------
----------------------------------------------------------------

data FlowCntl = OverLimit | Duplicated | Reassembled

putRxStreamData :: Stream -> RxStreamData -> IO FlowCntl
putRxStreamData :: Stream -> RxStreamData -> IO FlowCntl
putRxStreamData Stream
s rx :: RxStreamData
rx@(RxStreamData StreamData
_ Offset
off Offset
len Bool
_) = do
    Offset
lim <- Stream -> IO Offset
getRxMaxStreamData Stream
s
    if Offset
len forall a. Num a => a -> a -> a
+ Offset
off forall a. Ord a => a -> a -> Bool
> Offset
lim
        then forall (m :: * -> *) a. Monad m => a -> m a
return FlowCntl
OverLimit
        else do
            Bool
dup <- Stream -> RxStreamData -> (StreamData -> IO ()) -> IO () -> IO Bool
tryReassemble Stream
s RxStreamData
rx StreamData -> IO ()
put IO ()
putFin
            if Bool
dup
                then forall (m :: * -> *) a. Monad m => a -> m a
return FlowCntl
Duplicated
                else forall (m :: * -> *) a. Monad m => a -> m a
return FlowCntl
Reassembled
  where
    put :: StreamData -> IO ()
put StreamData
"" = forall (m :: * -> *) a. Monad m => a -> m a
return ()
    put StreamData
d = do
        Stream -> Offset -> IO ()
addRxStreamData Stream
s forall a b. (a -> b) -> a -> b
$ StreamData -> Offset
BS.length StreamData
d
        Stream -> StreamData -> IO ()
putRecvStreamQ Stream
s StreamData
d
    putFin :: IO ()
putFin = Stream -> StreamData -> IO ()
putRecvStreamQ Stream
s StreamData
""

-- fin of StreamState off fin means see-fin-already.
-- return value indicates duplication
tryReassemble
    :: Stream -> RxStreamData -> (StreamData -> IO ()) -> IO () -> IO Bool
tryReassemble :: Stream -> RxStreamData -> (StreamData -> IO ()) -> IO () -> IO Bool
tryReassemble Stream{} (RxStreamData StreamData
"" Offset
_ Offset
_ Bool
False) StreamData -> IO ()
_ IO ()
_ = forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
tryReassemble Stream{Offset
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
MVar ()
Connection
RecvStreamQ
streamSyncFinTx :: MVar ()
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef RxFlow
streamFlowTx :: TVar TxFlow
streamConnection :: Connection
streamId :: Offset
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef RxFlow
streamFlowTx :: Stream -> TVar TxFlow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
..} x :: RxStreamData
x@(RxStreamData StreamData
"" Offset
off Offset
_ Bool
True) StreamData -> IO ()
_ IO ()
putFin = do
    si0 :: StreamState
si0@(StreamState Offset
off0 Bool
fin0) <- forall a. IORef a -> IO a
readIORef IORef StreamState
streamStateRx
    let si1 :: StreamState
si1 = StreamState
si0{streamFin :: Bool
streamFin = Bool
True}
    if Bool
fin0
        then do
            -- stdoutLogger "Illegal Fin" -- fixme
            forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        else case Offset
off forall a. Ord a => a -> a -> Ordering
`compare` Offset
off0 of
            Ordering
LT -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
            Ordering
EQ -> do
                forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1
                IO ()
putFin
                forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
            Ordering
GT -> do
                forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1
                forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef'' IORef (Skew RxStreamData)
streamReass (forall a. Frag a => a -> Skew a -> Skew a
Skew.insert RxStreamData
x)
                forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
tryReassemble Stream{Offset
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
MVar ()
Connection
RecvStreamQ
streamSyncFinTx :: MVar ()
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef RxFlow
streamFlowTx :: TVar TxFlow
streamConnection :: Connection
streamId :: Offset
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef RxFlow
streamFlowTx :: Stream -> TVar TxFlow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
..} x :: RxStreamData
x@(RxStreamData StreamData
dat Offset
off Offset
len Bool
False) StreamData -> IO ()
put IO ()
putFin = do
    si0 :: StreamState
si0@(StreamState Offset
off0 Bool
_) <- forall a. IORef a -> IO a
readIORef IORef StreamState
streamStateRx
    case Offset
off forall a. Ord a => a -> a -> Ordering
`compare` Offset
off0 of
        Ordering
LT -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        Ordering
EQ -> do
            StreamData -> IO ()
put StreamData
dat
            StreamState -> Offset -> IO ()
loop StreamState
si0 (Offset
off0 forall a. Num a => a -> a -> a
+ Offset
len)
            forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        Ordering
GT -> do
            forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef'' IORef (Skew RxStreamData)
streamReass (forall a. Frag a => a -> Skew a -> Skew a
Skew.insert RxStreamData
x)
            forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
  where
    loop :: StreamState -> Offset -> IO ()
loop StreamState
si0 Offset
xff = do
        Maybe (Seq RxStreamData)
mrxs <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (Skew RxStreamData)
streamReass (forall a. Frag a => Offset -> Skew a -> (Skew a, Maybe (Seq a))
Skew.deleteMinIf Offset
xff)
        case Maybe (Seq RxStreamData)
mrxs of
            Maybe (Seq RxStreamData)
Nothing -> forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si0{streamOffset :: Offset
streamOffset = Offset
xff}
            Just Seq RxStreamData
rxs -> do
                forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (StreamData -> IO ()
put forall b c a. (b -> c) -> (a -> b) -> a -> c
. RxStreamData -> StreamData
rxstrmData) Seq RxStreamData
rxs
                let xff1 :: Offset
xff1 = forall a. Frag a => a -> Offset
nextOff Seq RxStreamData
rxs
                if Seq RxStreamData -> Bool
hasFin Seq RxStreamData
rxs
                    then do
                        IO ()
putFin
                    else do
                        StreamState -> Offset -> IO ()
loop StreamState
si0 Offset
xff1
tryReassemble Stream{Offset
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
MVar ()
Connection
RecvStreamQ
streamSyncFinTx :: MVar ()
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef RxFlow
streamFlowTx :: TVar TxFlow
streamConnection :: Connection
streamId :: Offset
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef RxFlow
streamFlowTx :: Stream -> TVar TxFlow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
..} x :: RxStreamData
x@(RxStreamData StreamData
dat Offset
off Offset
len Bool
True) StreamData -> IO ()
put IO ()
putFin = do
    si0 :: StreamState
si0@(StreamState Offset
off0 Bool
fin0) <- forall a. IORef a -> IO a
readIORef IORef StreamState
streamStateRx
    let si1 :: StreamState
si1 = StreamState
si0{streamFin :: Bool
streamFin = Bool
True}
    if Bool
fin0
        then do
            -- stdoutLogger "Illegal Fin" -- fixme
            forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        else case Offset
off forall a. Ord a => a -> a -> Ordering
`compare` Offset
off0 of
            Ordering
LT -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
            Ordering
EQ -> do
                let off1 :: Offset
off1 = Offset
off0 forall a. Num a => a -> a -> a
+ Offset
len
                forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1{streamOffset :: Offset
streamOffset = Offset
off1}
                StreamData -> IO ()
put StreamData
dat
                IO ()
putFin
                forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
            Ordering
GT -> do
                forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1
                forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef'' IORef (Skew RxStreamData)
streamReass (forall a. Frag a => a -> Skew a -> Skew a
Skew.insert RxStreamData
x)
                forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

hasFin :: Seq RxStreamData -> Bool
hasFin :: Seq RxStreamData -> Bool
hasFin Seq RxStreamData
s = case forall a. Seq a -> ViewR a
Seq.viewr Seq RxStreamData
s of
    ViewR RxStreamData
Seq.EmptyR -> Bool
False
    Seq RxStreamData
_ Seq.:> RxStreamData
x -> RxStreamData -> Bool
rxstrmFin RxStreamData
x