{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}

module Network.HTTP2.H2.Stream where

import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Data.IORef
import Data.Maybe (fromMaybe)
import Network.Control
import Network.HTTP.Semantics
import Network.HTTP.Semantics.IO

import Network.HTTP2.Frame
import Network.HTTP2.H2.StreamTable
import Network.HTTP2.H2.Types

----------------------------------------------------------------

isIdle :: StreamState -> Bool
isIdle :: StreamState -> Bool
isIdle StreamState
Idle = Bool
True
isIdle StreamState
_ = Bool
False

isOpen :: StreamState -> Bool
isOpen :: StreamState -> Bool
isOpen Open{} = Bool
True
isOpen StreamState
_ = Bool
False

isHalfClosedRemote :: StreamState -> Bool
isHalfClosedRemote :: StreamState -> Bool
isHalfClosedRemote StreamState
HalfClosedRemote = Bool
True
isHalfClosedRemote (Closed ClosedCode
_) = Bool
True
isHalfClosedRemote StreamState
_ = Bool
False

isHalfClosedLocal :: StreamState -> Bool
isHalfClosedLocal :: StreamState -> Bool
isHalfClosedLocal (Open (Just ClosedCode
_) OpenState
_) = Bool
True
isHalfClosedLocal (Closed ClosedCode
_) = Bool
True
isHalfClosedLocal StreamState
_ = Bool
False

isClosed :: StreamState -> Bool
isClosed :: StreamState -> Bool
isClosed Closed{} = Bool
True
isClosed StreamState
_ = Bool
False

isReserved :: StreamState -> Bool
isReserved :: StreamState -> Bool
isReserved StreamState
Reserved = Bool
True
isReserved StreamState
_ = Bool
False

----------------------------------------------------------------

newOddStream :: StreamId -> WindowSize -> WindowSize -> IO Stream
newOddStream :: StreamId -> StreamId -> StreamId -> IO Stream
newOddStream StreamId
sid StreamId
txwin StreamId
rxwin =
    StreamId
-> IORef StreamState
-> MVar (Either SomeException InpObj)
-> TVar TxFlow
-> IORef RxFlow
-> IORef (Maybe RxQ)
-> Stream
Stream StreamId
sid
        (IORef StreamState
 -> MVar (Either SomeException InpObj)
 -> TVar TxFlow
 -> IORef RxFlow
 -> IORef (Maybe RxQ)
 -> Stream)
-> IO (IORef StreamState)
-> IO
     (MVar (Either SomeException InpObj)
      -> TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StreamState -> IO (IORef StreamState)
forall a. a -> IO (IORef a)
newIORef StreamState
Idle
        IO
  (MVar (Either SomeException InpObj)
   -> TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
-> IO (MVar (Either SomeException InpObj))
-> IO (TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> IO (MVar (Either SomeException InpObj))
forall a. IO (MVar a)
newEmptyMVar
        IO (TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
-> IO (TVar TxFlow)
-> IO (IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TxFlow -> IO (TVar TxFlow)
forall a. a -> IO (TVar a)
newTVarIO (StreamId -> TxFlow
newTxFlow StreamId
txwin)
        IO (IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
-> IO (IORef RxFlow) -> IO (IORef (Maybe RxQ) -> Stream)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> RxFlow -> IO (IORef RxFlow)
forall a. a -> IO (IORef a)
newIORef (StreamId -> RxFlow
newRxFlow StreamId
rxwin)
        IO (IORef (Maybe RxQ) -> Stream)
-> IO (IORef (Maybe RxQ)) -> IO Stream
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe RxQ -> IO (IORef (Maybe RxQ))
forall a. a -> IO (IORef a)
newIORef Maybe RxQ
forall a. Maybe a
Nothing

newEvenStream :: StreamId -> WindowSize -> WindowSize -> IO Stream
newEvenStream :: StreamId -> StreamId -> StreamId -> IO Stream
newEvenStream StreamId
sid StreamId
txwin StreamId
rxwin =
    StreamId
-> IORef StreamState
-> MVar (Either SomeException InpObj)
-> TVar TxFlow
-> IORef RxFlow
-> IORef (Maybe RxQ)
-> Stream
Stream StreamId
sid
        (IORef StreamState
 -> MVar (Either SomeException InpObj)
 -> TVar TxFlow
 -> IORef RxFlow
 -> IORef (Maybe RxQ)
 -> Stream)
-> IO (IORef StreamState)
-> IO
     (MVar (Either SomeException InpObj)
      -> TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StreamState -> IO (IORef StreamState)
forall a. a -> IO (IORef a)
newIORef StreamState
Reserved
        IO
  (MVar (Either SomeException InpObj)
   -> TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
-> IO (MVar (Either SomeException InpObj))
-> IO (TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> IO (MVar (Either SomeException InpObj))
forall a. IO (MVar a)
newEmptyMVar
        IO (TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
-> IO (TVar TxFlow)
-> IO (IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TxFlow -> IO (TVar TxFlow)
forall a. a -> IO (TVar a)
newTVarIO (StreamId -> TxFlow
newTxFlow StreamId
txwin)
        IO (IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
-> IO (IORef RxFlow) -> IO (IORef (Maybe RxQ) -> Stream)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> RxFlow -> IO (IORef RxFlow)
forall a. a -> IO (IORef a)
newIORef (StreamId -> RxFlow
newRxFlow StreamId
rxwin)
        IO (IORef (Maybe RxQ) -> Stream)
-> IO (IORef (Maybe RxQ)) -> IO Stream
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe RxQ -> IO (IORef (Maybe RxQ))
forall a. a -> IO (IORef a)
newIORef Maybe RxQ
forall a. Maybe a
Nothing

----------------------------------------------------------------

{-# INLINE readStreamState #-}
readStreamState :: Stream -> IO StreamState
readStreamState :: Stream -> IO StreamState
readStreamState Stream{IORef StreamState
streamState :: IORef StreamState
streamState :: Stream -> IORef StreamState
streamState} = IORef StreamState -> IO StreamState
forall a. IORef a -> IO a
readIORef IORef StreamState
streamState

----------------------------------------------------------------

closeAllStreams
    :: TVar OddStreamTable -> TVar EvenStreamTable -> Maybe SomeException -> IO ()
closeAllStreams :: TVar OddStreamTable
-> TVar EvenStreamTable -> Maybe SomeException -> IO ()
closeAllStreams TVar OddStreamTable
ovar TVar EvenStreamTable
evar Maybe SomeException
mErr' = do
    IntMap Stream
ostrms <- TVar OddStreamTable -> IO (IntMap Stream)
clearOddStreamTable TVar OddStreamTable
ovar
    (Stream -> IO ()) -> IntMap Stream -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Stream -> IO ()
finalize IntMap Stream
ostrms
    IntMap Stream
estrms <- TVar EvenStreamTable -> IO (IntMap Stream)
clearEvenStreamTable TVar EvenStreamTable
evar
    (Stream -> IO ()) -> IntMap Stream -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Stream -> IO ()
finalize IntMap Stream
estrms
  where
    finalize :: Stream -> IO ()
finalize Stream
strm = do
        StreamState
st <- Stream -> IO StreamState
readStreamState Stream
strm
        IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar (Either SomeException InpObj)
-> Either SomeException InpObj -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (Stream -> MVar (Either SomeException InpObj)
streamInput Stream
strm) Either SomeException InpObj
forall a. Either SomeException a
err
        case StreamState
st of
            Open Maybe ClosedCode
_ (Body RxQ
q Maybe StreamId
_ IORef StreamId
_ IORef (Maybe TokenHeaderTable)
_) ->
                STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ RxQ -> Either SomeException (ByteString, Bool) -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue RxQ
q (Either SomeException (ByteString, Bool) -> STM ())
-> Either SomeException (ByteString, Bool) -> STM ()
forall a b. (a -> b) -> a -> b
$ Either SomeException (ByteString, Bool)
-> (SomeException -> Either SomeException (ByteString, Bool))
-> Maybe SomeException
-> Either SomeException (ByteString, Bool)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ((ByteString, Bool) -> Either SomeException (ByteString, Bool)
forall a b. b -> Either a b
Right (ByteString
forall a. Monoid a => a
mempty, Bool
True)) SomeException -> Either SomeException (ByteString, Bool)
forall a b. a -> Either a b
Left Maybe SomeException
mErr
            StreamState
_otherwise ->
                () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

    mErr :: Maybe SomeException
    mErr :: Maybe SomeException
mErr = case Maybe SomeException
mErr' of
        Just SomeException
e
            | Just HTTP2Error
ConnectionIsClosed <- SomeException -> Maybe HTTP2Error
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e ->
                Maybe SomeException
forall a. Maybe a
Nothing
        Maybe SomeException
_otherwise ->
            Maybe SomeException
mErr'

    err :: Either SomeException a
    err :: forall a. Either SomeException a
err =
        SomeException -> Either SomeException a
forall a b. a -> Either a b
Left (SomeException -> Either SomeException a)
-> SomeException -> Either SomeException a
forall a b. (a -> b) -> a -> b
$
            SomeException -> Maybe SomeException -> SomeException
forall a. a -> Maybe a -> a
fromMaybe (HTTP2Error -> SomeException
forall e. Exception e => e -> SomeException
toException HTTP2Error
ConnectionIsClosed) (Maybe SomeException -> SomeException)
-> Maybe SomeException -> SomeException
forall a b. (a -> b) -> a -> b
$
                Maybe SomeException
mErr

----------------------------------------------------------------

data StreamTerminated
    = StreamPushedFinal
    | StreamCancelled
    | StreamOutOfScope
    deriving (StreamId -> StreamTerminated -> ShowS
[StreamTerminated] -> ShowS
StreamTerminated -> String
(StreamId -> StreamTerminated -> ShowS)
-> (StreamTerminated -> String)
-> ([StreamTerminated] -> ShowS)
-> Show StreamTerminated
forall a.
(StreamId -> a -> ShowS)
-> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: StreamId -> StreamTerminated -> ShowS
showsPrec :: StreamId -> StreamTerminated -> ShowS
$cshow :: StreamTerminated -> String
show :: StreamTerminated -> String
$cshowList :: [StreamTerminated] -> ShowS
showList :: [StreamTerminated] -> ShowS
Show)
    deriving anyclass (Show StreamTerminated
Typeable StreamTerminated
(Typeable StreamTerminated, Show StreamTerminated) =>
(StreamTerminated -> SomeException)
-> (SomeException -> Maybe StreamTerminated)
-> (StreamTerminated -> String)
-> Exception StreamTerminated
SomeException -> Maybe StreamTerminated
StreamTerminated -> String
StreamTerminated -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: StreamTerminated -> SomeException
toException :: StreamTerminated -> SomeException
$cfromException :: SomeException -> Maybe StreamTerminated
fromException :: SomeException -> Maybe StreamTerminated
$cdisplayException :: StreamTerminated -> String
displayException :: StreamTerminated -> String
Exception)

withOutBodyIface
    :: TBQueue StreamingChunk
    -> (forall a. IO a -> IO a)
    -> (OutBodyIface -> IO r)
    -> IO r
withOutBodyIface :: forall r.
TBQueue StreamingChunk
-> (forall a. IO a -> IO a) -> (OutBodyIface -> IO r) -> IO r
withOutBodyIface TBQueue StreamingChunk
tbq forall a. IO a -> IO a
unmask OutBodyIface -> IO r
k = do
    TVar (Maybe StreamTerminated)
terminated <- Maybe StreamTerminated -> IO (TVar (Maybe StreamTerminated))
forall a. a -> IO (TVar a)
newTVarIO Maybe StreamTerminated
forall a. Maybe a
Nothing
    let whenNotTerminated :: STM b -> STM b
whenNotTerminated STM b
act = do
            Maybe StreamTerminated
mTerminated <- TVar (Maybe StreamTerminated) -> STM (Maybe StreamTerminated)
forall a. TVar a -> STM a
readTVar TVar (Maybe StreamTerminated)
terminated
            case Maybe StreamTerminated
mTerminated of
                Just StreamTerminated
reason ->
                    StreamTerminated -> STM b
forall e a. Exception e => e -> STM a
throwSTM StreamTerminated
reason
                Maybe StreamTerminated
Nothing ->
                    STM b
act

        terminateWith :: StreamTerminated -> STM () -> STM ()
terminateWith StreamTerminated
reason STM ()
act = do
            Maybe StreamTerminated
mTerminated <- TVar (Maybe StreamTerminated) -> STM (Maybe StreamTerminated)
forall a. TVar a -> STM a
readTVar TVar (Maybe StreamTerminated)
terminated
            case Maybe StreamTerminated
mTerminated of
                Just StreamTerminated
_ ->
                    -- Already terminated
                    () -> STM ()
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                Maybe StreamTerminated
Nothing -> do
                    TVar (Maybe StreamTerminated) -> Maybe StreamTerminated -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe StreamTerminated)
terminated (StreamTerminated -> Maybe StreamTerminated
forall a. a -> Maybe a
Just StreamTerminated
reason)
                    STM ()
act

        iface :: OutBodyIface
iface =
            OutBodyIface
                { outBodyUnmask :: forall a. IO a -> IO a
outBodyUnmask = IO x -> IO x
forall a. IO a -> IO a
unmask
                , outBodyPush :: Builder -> IO ()
outBodyPush = \Builder
b ->
                    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
                        STM () -> STM ()
forall {b}. STM b -> STM b
whenNotTerminated (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$
                            TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (StreamingChunk -> STM ()) -> StreamingChunk -> STM ()
forall a b. (a -> b) -> a -> b
$
                                Builder -> IsEndOfStream -> StreamingChunk
StreamingBuilder Builder
b IsEndOfStream
NotEndOfStream
                , outBodyPushFinal :: Builder -> IO ()
outBodyPushFinal = \Builder
b ->
                    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> STM ()
forall {b}. STM b -> STM b
whenNotTerminated (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ do
                        TVar (Maybe StreamTerminated) -> Maybe StreamTerminated -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe StreamTerminated)
terminated (StreamTerminated -> Maybe StreamTerminated
forall a. a -> Maybe a
Just StreamTerminated
StreamPushedFinal)
                        TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (StreamingChunk -> STM ()) -> StreamingChunk -> STM ()
forall a b. (a -> b) -> a -> b
$ Builder -> IsEndOfStream -> StreamingChunk
StreamingBuilder Builder
b (Maybe (IO ()) -> IsEndOfStream
EndOfStream Maybe (IO ())
forall a. Maybe a
Nothing)
                        TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (StreamingChunk -> STM ()) -> StreamingChunk -> STM ()
forall a b. (a -> b) -> a -> b
$ Maybe (IO ()) -> StreamingChunk
StreamingFinished Maybe (IO ())
forall a. Maybe a
Nothing
                , outBodyFlush :: IO ()
outBodyFlush =
                    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
                        STM () -> STM ()
forall {b}. STM b -> STM b
whenNotTerminated (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$
                            TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq StreamingChunk
StreamingFlush
                , outBodyCancel :: Maybe SomeException -> IO ()
outBodyCancel = \Maybe SomeException
mErr ->
                    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
                        StreamTerminated -> STM () -> STM ()
terminateWith StreamTerminated
StreamCancelled (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$
                            TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (Maybe SomeException -> StreamingChunk
StreamingCancelled Maybe SomeException
mErr)
                }
        finished :: IO ()
finished = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            StreamTerminated -> STM () -> STM ()
terminateWith StreamTerminated
StreamOutOfScope (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$
                TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (StreamingChunk -> STM ()) -> StreamingChunk -> STM ()
forall a b. (a -> b) -> a -> b
$
                    Maybe (IO ()) -> StreamingChunk
StreamingFinished Maybe (IO ())
forall a. Maybe a
Nothing
    OutBodyIface -> IO r
k OutBodyIface
iface IO r -> IO () -> IO r
forall a b. IO a -> IO b -> IO a
`finally` IO ()
finished

nextForStreaming
    :: TBQueue StreamingChunk
    -> DynaNext
nextForStreaming :: TBQueue StreamingChunk -> DynaNext
nextForStreaming TBQueue StreamingChunk
tbq =
    let takeQ :: IO (Maybe StreamingChunk)
takeQ = STM (Maybe StreamingChunk) -> IO (Maybe StreamingChunk)
forall a. STM a -> IO a
atomically (STM (Maybe StreamingChunk) -> IO (Maybe StreamingChunk))
-> STM (Maybe StreamingChunk) -> IO (Maybe StreamingChunk)
forall a b. (a -> b) -> a -> b
$ TBQueue StreamingChunk -> STM (Maybe StreamingChunk)
forall a. TBQueue a -> STM (Maybe a)
tryReadTBQueue TBQueue StreamingChunk
tbq
        next :: DynaNext
next = IO (Maybe StreamingChunk) -> DynaNext
fillStreamBodyGetNext IO (Maybe StreamingChunk)
takeQ
     in DynaNext
next