{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}

module Network.QUIC.Stream.Reass (
    takeRecvStreamQwithSize
  , putRxStreamData
  , tryReassemble
  ) where

import Data.Sequence (Seq)
import qualified Data.Sequence as Seq
import qualified Data.ByteString as BS

import Network.QUIC.Imports
-- import Network.QUIC.Logger
import Network.QUIC.Stream.Frag
import Network.QUIC.Stream.Misc
import Network.QUIC.Stream.Queue
import qualified Network.QUIC.Stream.Skew as Skew
import Network.QUIC.Stream.Types
import Network.QUIC.Types

----------------------------------------------------------------

getEndOfStream :: Stream -> IO Bool
getEndOfStream :: Stream -> IO Bool
getEndOfStream Stream{Offset
TVar Flow
IORef (Skew RxStreamData)
IORef StreamState
IORef Flow
MVar ()
Connection
RecvStreamQ
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef Flow
streamFlowTx :: Stream -> TVar Flow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
streamSyncFinTx :: MVar ()
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef Flow
streamFlowTx :: TVar Flow
streamConnection :: Connection
streamId :: Offset
..} = forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ RecvStreamQ -> IORef Bool
endOfStream RecvStreamQ
streamRecvQ

setEndOfStream :: Stream -> IO ()
setEndOfStream :: Stream -> IO ()
setEndOfStream Stream{Offset
TVar Flow
IORef (Skew RxStreamData)
IORef StreamState
IORef Flow
MVar ()
Connection
RecvStreamQ
streamSyncFinTx :: MVar ()
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef Flow
streamFlowTx :: TVar Flow
streamConnection :: Connection
streamId :: Offset
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef Flow
streamFlowTx :: Stream -> TVar Flow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
..} = forall a. IORef a -> a -> IO ()
writeIORef (RecvStreamQ -> IORef Bool
endOfStream RecvStreamQ
streamRecvQ) Bool
True

readPendingData :: Stream -> IO (Maybe ByteString)
readPendingData :: Stream -> IO (Maybe StreamData)
readPendingData Stream{Offset
TVar Flow
IORef (Skew RxStreamData)
IORef StreamState
IORef Flow
MVar ()
Connection
RecvStreamQ
streamSyncFinTx :: MVar ()
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef Flow
streamFlowTx :: TVar Flow
streamConnection :: Connection
streamId :: Offset
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef Flow
streamFlowTx :: Stream -> TVar Flow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
..} = forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ RecvStreamQ -> IORef (Maybe StreamData)
pendingData RecvStreamQ
streamRecvQ

writePendingData :: Stream -> ByteString -> IO ()
writePendingData :: Stream -> StreamData -> IO ()
writePendingData Stream{Offset
TVar Flow
IORef (Skew RxStreamData)
IORef StreamState
IORef Flow
MVar ()
Connection
RecvStreamQ
streamSyncFinTx :: MVar ()
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef Flow
streamFlowTx :: TVar Flow
streamConnection :: Connection
streamId :: Offset
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef Flow
streamFlowTx :: Stream -> TVar Flow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
..} StreamData
bs = forall a. IORef a -> a -> IO ()
writeIORef (RecvStreamQ -> IORef (Maybe StreamData)
pendingData RecvStreamQ
streamRecvQ) forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just StreamData
bs

clearPendingData :: Stream -> IO ()
clearPendingData :: Stream -> IO ()
clearPendingData Stream{Offset
TVar Flow
IORef (Skew RxStreamData)
IORef StreamState
IORef Flow
MVar ()
Connection
RecvStreamQ
streamSyncFinTx :: MVar ()
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef Flow
streamFlowTx :: TVar Flow
streamConnection :: Connection
streamId :: Offset
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef Flow
streamFlowTx :: Stream -> TVar Flow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
..} = forall a. IORef a -> a -> IO ()
writeIORef (RecvStreamQ -> IORef (Maybe StreamData)
pendingData RecvStreamQ
streamRecvQ) forall a. Maybe a
Nothing

----------------------------------------------------------------

takeRecvStreamQwithSize :: Stream -> Int -> IO ByteString
takeRecvStreamQwithSize :: Stream -> Offset -> IO StreamData
takeRecvStreamQwithSize Stream
strm Offset
siz0 = do
    Bool
eos <- Stream -> IO Bool
getEndOfStream Stream
strm
    if Bool
eos then
        forall (m :: * -> *) a. Monad m => a -> m a
return StreamData
""
      else do
        Maybe StreamData
mb <- Stream -> IO (Maybe StreamData)
readPendingData Stream
strm
        case Maybe StreamData
mb of
          Maybe StreamData
Nothing -> do
              StreamData
b0 <- Stream -> IO StreamData
takeRecvStreamQ Stream
strm
              if StreamData
b0 forall a. Eq a => a -> a -> Bool
== StreamData
"" then do
                  Stream -> IO ()
setEndOfStream Stream
strm
                  forall (m :: * -> *) a. Monad m => a -> m a
return StreamData
""
                else do
                  let len :: Offset
len = StreamData -> Offset
BS.length StreamData
b0
                  case Offset
len forall a. Ord a => a -> a -> Ordering
`compare` Offset
siz0 of
                      Ordering
LT -> Offset -> ([StreamData] -> [StreamData]) -> IO StreamData
tryRead (Offset
siz0 forall a. Num a => a -> a -> a
- Offset
len) (StreamData
b0 forall a. a -> [a] -> [a]
:)
                      Ordering
EQ -> forall (m :: * -> *) a. Monad m => a -> m a
return StreamData
b0
                      Ordering
GT -> do
                          let (StreamData
b1,StreamData
b2) = Offset -> StreamData -> (StreamData, StreamData)
BS.splitAt Offset
siz0 StreamData
b0
                          Stream -> StreamData -> IO ()
writePendingData Stream
strm StreamData
b2
                          forall (m :: * -> *) a. Monad m => a -> m a
return StreamData
b1
          Just StreamData
b0 -> do
              Stream -> IO ()
clearPendingData Stream
strm
              let len :: Offset
len = StreamData -> Offset
BS.length StreamData
b0
              Offset -> ([StreamData] -> [StreamData]) -> IO StreamData
tryRead (Offset
siz0 forall a. Num a => a -> a -> a
- Offset
len) (StreamData
b0 forall a. a -> [a] -> [a]
:)
  where
    tryRead :: Offset -> ([StreamData] -> [StreamData]) -> IO StreamData
tryRead Offset
siz [StreamData] -> [StreamData]
build = do
        Maybe StreamData
mb <- Stream -> IO (Maybe StreamData)
tryTakeRecvStreamQ Stream
strm
        case Maybe StreamData
mb of
          Maybe StreamData
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ [StreamData] -> StreamData
BS.concat forall a b. (a -> b) -> a -> b
$ [StreamData] -> [StreamData]
build []
          Just StreamData
b  -> do
              if StreamData
b forall a. Eq a => a -> a -> Bool
== StreamData
"" then do
                  Stream -> IO ()
setEndOfStream Stream
strm
                  forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ [StreamData] -> StreamData
BS.concat forall a b. (a -> b) -> a -> b
$ [StreamData] -> [StreamData]
build []
                else do
                  let len :: Offset
len = StreamData -> Offset
BS.length StreamData
b
                  case Offset
len forall a. Ord a => a -> a -> Ordering
`compare` Offset
siz of
                    Ordering
LT -> Offset -> ([StreamData] -> [StreamData]) -> IO StreamData
tryRead (Offset
siz forall a. Num a => a -> a -> a
- Offset
len) ([StreamData] -> [StreamData]
build forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamData
b forall a. a -> [a] -> [a]
:))
                    Ordering
EQ -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ [StreamData] -> StreamData
BS.concat forall a b. (a -> b) -> a -> b
$ [StreamData] -> [StreamData]
build [StreamData
b]
                    Ordering
GT -> do
                        let (StreamData
b1,StreamData
b2) = Offset -> StreamData -> (StreamData, StreamData)
BS.splitAt Offset
siz StreamData
b
                        Stream -> StreamData -> IO ()
writePendingData Stream
strm StreamData
b2
                        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ [StreamData] -> StreamData
BS.concat forall a b. (a -> b) -> a -> b
$ [StreamData] -> [StreamData]
build [StreamData
b1]

----------------------------------------------------------------
----------------------------------------------------------------

putRxStreamData :: Stream -> RxStreamData -> IO Bool
putRxStreamData :: Stream -> RxStreamData -> IO Bool
putRxStreamData Stream
s rx :: RxStreamData
rx@(RxStreamData StreamData
dat Offset
off Offset
_ Bool
_) = do
    Offset
lim <- Stream -> IO Offset
getRxMaxStreamData Stream
s
    if StreamData -> Offset
BS.length StreamData
dat forall a. Num a => a -> a -> a
+ Offset
off forall a. Ord a => a -> a -> Bool
> Offset
lim then
        forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
      else do
        Bool
_ <- Stream -> RxStreamData -> (StreamData -> IO ()) -> IO () -> IO Bool
tryReassemble Stream
s RxStreamData
rx StreamData -> IO ()
put IO ()
putFin
        forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
  where
    put :: StreamData -> IO ()
put StreamData
"" = forall (m :: * -> *) a. Monad m => a -> m a
return ()
    put StreamData
d  = Stream -> StreamData -> IO ()
putRecvStreamQ Stream
s StreamData
d
    putFin :: IO ()
putFin = Stream -> StreamData -> IO ()
putRecvStreamQ Stream
s StreamData
""

-- fin of StreamState off fin means see-fin-already.
-- return value indicates duplication
tryReassemble :: Stream -> RxStreamData -> (StreamData -> IO ()) -> IO () -> IO Bool
tryReassemble :: Stream -> RxStreamData -> (StreamData -> IO ()) -> IO () -> IO Bool
tryReassemble Stream{}   (RxStreamData StreamData
"" Offset
_  Offset
_ Bool
False) StreamData -> IO ()
_ IO ()
_ = forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
tryReassemble Stream{Offset
TVar Flow
IORef (Skew RxStreamData)
IORef StreamState
IORef Flow
MVar ()
Connection
RecvStreamQ
streamSyncFinTx :: MVar ()
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef Flow
streamFlowTx :: TVar Flow
streamConnection :: Connection
streamId :: Offset
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef Flow
streamFlowTx :: Stream -> TVar Flow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
..} x :: RxStreamData
x@(RxStreamData StreamData
"" Offset
off Offset
_ Bool
True) StreamData -> IO ()
_ IO ()
putFin = do
    si0 :: StreamState
si0@(StreamState Offset
off0 Bool
fin0) <- forall a. IORef a -> IO a
readIORef IORef StreamState
streamStateRx
    let si1 :: StreamState
si1 = StreamState
si0 { streamFin :: Bool
streamFin = Bool
True }
    if Bool
fin0 then do
        -- stdoutLogger "Illegal Fin" -- fixme
        forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
      else case Offset
off forall a. Ord a => a -> a -> Ordering
`compare` Offset
off0 of
        Ordering
LT -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        Ordering
EQ -> do
            forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1
            IO ()
putFin
            forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        Ordering
GT -> do
            forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1
            forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef'' IORef (Skew RxStreamData)
streamReass (forall a. Frag a => a -> Skew a -> Skew a
Skew.insert RxStreamData
x)
            forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
tryReassemble Stream{Offset
TVar Flow
IORef (Skew RxStreamData)
IORef StreamState
IORef Flow
MVar ()
Connection
RecvStreamQ
streamSyncFinTx :: MVar ()
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef Flow
streamFlowTx :: TVar Flow
streamConnection :: Connection
streamId :: Offset
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef Flow
streamFlowTx :: Stream -> TVar Flow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
..} x :: RxStreamData
x@(RxStreamData StreamData
dat Offset
off Offset
len Bool
False) StreamData -> IO ()
put IO ()
putFin = do
    si0 :: StreamState
si0@(StreamState Offset
off0 Bool
_) <- forall a. IORef a -> IO a
readIORef IORef StreamState
streamStateRx
    case Offset
off forall a. Ord a => a -> a -> Ordering
`compare` Offset
off0 of
      Ordering
LT -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
      Ordering
EQ -> do
          StreamData -> IO ()
put StreamData
dat
          StreamState -> Offset -> IO ()
loop StreamState
si0 (Offset
off0 forall a. Num a => a -> a -> a
+ Offset
len)
          forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
      Ordering
GT -> do
          forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef'' IORef (Skew RxStreamData)
streamReass (forall a. Frag a => a -> Skew a -> Skew a
Skew.insert RxStreamData
x)
          forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
  where
    loop :: StreamState -> Offset -> IO ()
loop StreamState
si0 Offset
xff = do
        Maybe (Seq RxStreamData)
mrxs <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (Skew RxStreamData)
streamReass (forall a. Frag a => Offset -> Skew a -> (Skew a, Maybe (Seq a))
Skew.deleteMinIf Offset
xff)
        case Maybe (Seq RxStreamData)
mrxs of
           Maybe (Seq RxStreamData)
Nothing   -> forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si0 { streamOffset :: Offset
streamOffset = Offset
xff }
           Just Seq RxStreamData
rxs -> do
               forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (StreamData -> IO ()
put forall b c a. (b -> c) -> (a -> b) -> a -> c
. RxStreamData -> StreamData
rxstrmData) Seq RxStreamData
rxs
               let xff1 :: Offset
xff1 = forall a. Frag a => a -> Offset
nextOff Seq RxStreamData
rxs
               if Seq RxStreamData -> Bool
hasFin Seq RxStreamData
rxs then do
                   IO ()
putFin
                 else do
                   StreamState -> Offset -> IO ()
loop StreamState
si0 Offset
xff1

tryReassemble Stream{Offset
TVar Flow
IORef (Skew RxStreamData)
IORef StreamState
IORef Flow
MVar ()
Connection
RecvStreamQ
streamSyncFinTx :: MVar ()
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef Flow
streamFlowTx :: TVar Flow
streamConnection :: Connection
streamId :: Offset
streamSyncFinTx :: Stream -> MVar ()
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef Flow
streamFlowTx :: Stream -> TVar Flow
streamConnection :: Stream -> Connection
streamId :: Stream -> Offset
..} x :: RxStreamData
x@(RxStreamData StreamData
dat Offset
off Offset
len Bool
True) StreamData -> IO ()
put IO ()
putFin = do
    si0 :: StreamState
si0@(StreamState Offset
off0 Bool
fin0) <- forall a. IORef a -> IO a
readIORef IORef StreamState
streamStateRx
    let si1 :: StreamState
si1 = StreamState
si0 { streamFin :: Bool
streamFin = Bool
True }
    if Bool
fin0 then do
        -- stdoutLogger "Illegal Fin" -- fixme
        forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
      else case Offset
off forall a. Ord a => a -> a -> Ordering
`compare` Offset
off0 of
        Ordering
LT -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        Ordering
EQ -> do
            let off1 :: Offset
off1 = Offset
off0 forall a. Num a => a -> a -> a
+ Offset
len
            forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1 { streamOffset :: Offset
streamOffset = Offset
off1 }
            StreamData -> IO ()
put StreamData
dat
            IO ()
putFin
            forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        Ordering
GT -> do
            forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1
            forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef'' IORef (Skew RxStreamData)
streamReass (forall a. Frag a => a -> Skew a -> Skew a
Skew.insert RxStreamData
x)
            forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

hasFin :: Seq RxStreamData -> Bool
hasFin :: Seq RxStreamData -> Bool
hasFin Seq RxStreamData
s = case forall a. Seq a -> ViewR a
Seq.viewr Seq RxStreamData
s of
  ViewR RxStreamData
Seq.EmptyR -> Bool
False
  Seq RxStreamData
_ Seq.:> RxStreamData
x -> RxStreamData -> Bool
rxstrmFin RxStreamData
x