{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
module Network.HTTP2.H2.Stream where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Data.IORef
import Data.Maybe (fromMaybe)
import Network.Control
import Network.HTTP.Semantics
import Network.HTTP.Semantics.IO
import Network.HTTP2.Frame
import Network.HTTP2.H2.StreamTable
import Network.HTTP2.H2.Types
isIdle :: StreamState -> Bool
isIdle :: StreamState -> Bool
isIdle StreamState
Idle = Bool
True
isIdle StreamState
_ = Bool
False
isOpen :: StreamState -> Bool
isOpen :: StreamState -> Bool
isOpen Open{} = Bool
True
isOpen StreamState
_ = Bool
False
isHalfClosedRemote :: StreamState -> Bool
isHalfClosedRemote :: StreamState -> Bool
isHalfClosedRemote StreamState
HalfClosedRemote = Bool
True
isHalfClosedRemote (Closed ClosedCode
_) = Bool
True
isHalfClosedRemote StreamState
_ = Bool
False
isHalfClosedLocal :: StreamState -> Bool
isHalfClosedLocal :: StreamState -> Bool
isHalfClosedLocal (Open (Just ClosedCode
_) OpenState
_) = Bool
True
isHalfClosedLocal (Closed ClosedCode
_) = Bool
True
isHalfClosedLocal StreamState
_ = Bool
False
isClosed :: StreamState -> Bool
isClosed :: StreamState -> Bool
isClosed Closed{} = Bool
True
isClosed StreamState
_ = Bool
False
isReserved :: StreamState -> Bool
isReserved :: StreamState -> Bool
isReserved StreamState
Reserved = Bool
True
isReserved StreamState
_ = Bool
False
newOddStream :: StreamId -> WindowSize -> WindowSize -> IO Stream
newOddStream :: StreamId -> StreamId -> StreamId -> IO Stream
newOddStream StreamId
sid StreamId
txwin StreamId
rxwin =
StreamId
-> IORef StreamState
-> MVar (Either SomeException InpObj)
-> TVar TxFlow
-> IORef RxFlow
-> IORef (Maybe RxQ)
-> Stream
Stream StreamId
sid
(IORef StreamState
-> MVar (Either SomeException InpObj)
-> TVar TxFlow
-> IORef RxFlow
-> IORef (Maybe RxQ)
-> Stream)
-> IO (IORef StreamState)
-> IO
(MVar (Either SomeException InpObj)
-> TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StreamState -> IO (IORef StreamState)
forall a. a -> IO (IORef a)
newIORef StreamState
Idle
IO
(MVar (Either SomeException InpObj)
-> TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
-> IO (MVar (Either SomeException InpObj))
-> IO (TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> IO (MVar (Either SomeException InpObj))
forall a. IO (MVar a)
newEmptyMVar
IO (TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
-> IO (TVar TxFlow)
-> IO (IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TxFlow -> IO (TVar TxFlow)
forall a. a -> IO (TVar a)
newTVarIO (StreamId -> TxFlow
newTxFlow StreamId
txwin)
IO (IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
-> IO (IORef RxFlow) -> IO (IORef (Maybe RxQ) -> Stream)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> RxFlow -> IO (IORef RxFlow)
forall a. a -> IO (IORef a)
newIORef (StreamId -> RxFlow
newRxFlow StreamId
rxwin)
IO (IORef (Maybe RxQ) -> Stream)
-> IO (IORef (Maybe RxQ)) -> IO Stream
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe RxQ -> IO (IORef (Maybe RxQ))
forall a. a -> IO (IORef a)
newIORef Maybe RxQ
forall a. Maybe a
Nothing
newEvenStream :: StreamId -> WindowSize -> WindowSize -> IO Stream
newEvenStream :: StreamId -> StreamId -> StreamId -> IO Stream
newEvenStream StreamId
sid StreamId
txwin StreamId
rxwin =
StreamId
-> IORef StreamState
-> MVar (Either SomeException InpObj)
-> TVar TxFlow
-> IORef RxFlow
-> IORef (Maybe RxQ)
-> Stream
Stream StreamId
sid
(IORef StreamState
-> MVar (Either SomeException InpObj)
-> TVar TxFlow
-> IORef RxFlow
-> IORef (Maybe RxQ)
-> Stream)
-> IO (IORef StreamState)
-> IO
(MVar (Either SomeException InpObj)
-> TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StreamState -> IO (IORef StreamState)
forall a. a -> IO (IORef a)
newIORef StreamState
Reserved
IO
(MVar (Either SomeException InpObj)
-> TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
-> IO (MVar (Either SomeException InpObj))
-> IO (TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> IO (MVar (Either SomeException InpObj))
forall a. IO (MVar a)
newEmptyMVar
IO (TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
-> IO (TVar TxFlow)
-> IO (IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TxFlow -> IO (TVar TxFlow)
forall a. a -> IO (TVar a)
newTVarIO (StreamId -> TxFlow
newTxFlow StreamId
txwin)
IO (IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
-> IO (IORef RxFlow) -> IO (IORef (Maybe RxQ) -> Stream)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> RxFlow -> IO (IORef RxFlow)
forall a. a -> IO (IORef a)
newIORef (StreamId -> RxFlow
newRxFlow StreamId
rxwin)
IO (IORef (Maybe RxQ) -> Stream)
-> IO (IORef (Maybe RxQ)) -> IO Stream
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe RxQ -> IO (IORef (Maybe RxQ))
forall a. a -> IO (IORef a)
newIORef Maybe RxQ
forall a. Maybe a
Nothing
{-# INLINE readStreamState #-}
readStreamState :: Stream -> IO StreamState
readStreamState :: Stream -> IO StreamState
readStreamState Stream{IORef StreamState
streamState :: IORef StreamState
streamState :: Stream -> IORef StreamState
streamState} = IORef StreamState -> IO StreamState
forall a. IORef a -> IO a
readIORef IORef StreamState
streamState
closeAllStreams
:: TVar OddStreamTable -> TVar EvenStreamTable -> Maybe SomeException -> IO ()
closeAllStreams :: TVar OddStreamTable
-> TVar EvenStreamTable -> Maybe SomeException -> IO ()
closeAllStreams TVar OddStreamTable
ovar TVar EvenStreamTable
evar Maybe SomeException
mErr' = do
IntMap Stream
ostrms <- TVar OddStreamTable -> IO (IntMap Stream)
clearOddStreamTable TVar OddStreamTable
ovar
(Stream -> IO ()) -> IntMap Stream -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Stream -> IO ()
finalize IntMap Stream
ostrms
IntMap Stream
estrms <- TVar EvenStreamTable -> IO (IntMap Stream)
clearEvenStreamTable TVar EvenStreamTable
evar
(Stream -> IO ()) -> IntMap Stream -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Stream -> IO ()
finalize IntMap Stream
estrms
where
finalize :: Stream -> IO ()
finalize Stream
strm = do
StreamState
st <- Stream -> IO StreamState
readStreamState Stream
strm
IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar (Either SomeException InpObj)
-> Either SomeException InpObj -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (Stream -> MVar (Either SomeException InpObj)
streamInput Stream
strm) Either SomeException InpObj
forall a. Either SomeException a
err
case StreamState
st of
Open Maybe ClosedCode
_ (Body RxQ
q Maybe StreamId
_ IORef StreamId
_ IORef (Maybe TokenHeaderTable)
_) ->
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ RxQ -> Either SomeException (ByteString, Bool) -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue RxQ
q (Either SomeException (ByteString, Bool) -> STM ())
-> Either SomeException (ByteString, Bool) -> STM ()
forall a b. (a -> b) -> a -> b
$ Either SomeException (ByteString, Bool)
-> (SomeException -> Either SomeException (ByteString, Bool))
-> Maybe SomeException
-> Either SomeException (ByteString, Bool)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ((ByteString, Bool) -> Either SomeException (ByteString, Bool)
forall a b. b -> Either a b
Right (ByteString
forall a. Monoid a => a
mempty, Bool
True)) SomeException -> Either SomeException (ByteString, Bool)
forall a b. a -> Either a b
Left Maybe SomeException
mErr
StreamState
_otherwise ->
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
mErr :: Maybe SomeException
mErr :: Maybe SomeException
mErr = case Maybe SomeException
mErr' of
Just SomeException
e
| Just HTTP2Error
ConnectionIsClosed <- SomeException -> Maybe HTTP2Error
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e ->
Maybe SomeException
forall a. Maybe a
Nothing
Maybe SomeException
_otherwise ->
Maybe SomeException
mErr'
err :: Either SomeException a
err :: forall a. Either SomeException a
err =
SomeException -> Either SomeException a
forall a b. a -> Either a b
Left (SomeException -> Either SomeException a)
-> SomeException -> Either SomeException a
forall a b. (a -> b) -> a -> b
$
SomeException -> Maybe SomeException -> SomeException
forall a. a -> Maybe a -> a
fromMaybe (HTTP2Error -> SomeException
forall e. Exception e => e -> SomeException
toException HTTP2Error
ConnectionIsClosed) (Maybe SomeException -> SomeException)
-> Maybe SomeException -> SomeException
forall a b. (a -> b) -> a -> b
$
Maybe SomeException
mErr
data StreamTerminated
= StreamPushedFinal
| StreamCancelled
| StreamOutOfScope
deriving (StreamId -> StreamTerminated -> ShowS
[StreamTerminated] -> ShowS
StreamTerminated -> String
(StreamId -> StreamTerminated -> ShowS)
-> (StreamTerminated -> String)
-> ([StreamTerminated] -> ShowS)
-> Show StreamTerminated
forall a.
(StreamId -> a -> ShowS)
-> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: StreamId -> StreamTerminated -> ShowS
showsPrec :: StreamId -> StreamTerminated -> ShowS
$cshow :: StreamTerminated -> String
show :: StreamTerminated -> String
$cshowList :: [StreamTerminated] -> ShowS
showList :: [StreamTerminated] -> ShowS
Show)
deriving anyclass (Show StreamTerminated
Typeable StreamTerminated
(Typeable StreamTerminated, Show StreamTerminated) =>
(StreamTerminated -> SomeException)
-> (SomeException -> Maybe StreamTerminated)
-> (StreamTerminated -> String)
-> Exception StreamTerminated
SomeException -> Maybe StreamTerminated
StreamTerminated -> String
StreamTerminated -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: StreamTerminated -> SomeException
toException :: StreamTerminated -> SomeException
$cfromException :: SomeException -> Maybe StreamTerminated
fromException :: SomeException -> Maybe StreamTerminated
$cdisplayException :: StreamTerminated -> String
displayException :: StreamTerminated -> String
Exception)
withOutBodyIface
:: TBQueue StreamingChunk
-> (forall a. IO a -> IO a)
-> (OutBodyIface -> IO r)
-> IO r
withOutBodyIface :: forall r.
TBQueue StreamingChunk
-> (forall a. IO a -> IO a) -> (OutBodyIface -> IO r) -> IO r
withOutBodyIface TBQueue StreamingChunk
tbq forall a. IO a -> IO a
unmask OutBodyIface -> IO r
k = do
TVar (Maybe StreamTerminated)
terminated <- Maybe StreamTerminated -> IO (TVar (Maybe StreamTerminated))
forall a. a -> IO (TVar a)
newTVarIO Maybe StreamTerminated
forall a. Maybe a
Nothing
let whenNotTerminated :: STM b -> STM b
whenNotTerminated STM b
act = do
Maybe StreamTerminated
mTerminated <- TVar (Maybe StreamTerminated) -> STM (Maybe StreamTerminated)
forall a. TVar a -> STM a
readTVar TVar (Maybe StreamTerminated)
terminated
case Maybe StreamTerminated
mTerminated of
Just StreamTerminated
reason ->
StreamTerminated -> STM b
forall e a. Exception e => e -> STM a
throwSTM StreamTerminated
reason
Maybe StreamTerminated
Nothing ->
STM b
act
terminateWith :: StreamTerminated -> STM () -> STM ()
terminateWith StreamTerminated
reason STM ()
act = do
Maybe StreamTerminated
mTerminated <- TVar (Maybe StreamTerminated) -> STM (Maybe StreamTerminated)
forall a. TVar a -> STM a
readTVar TVar (Maybe StreamTerminated)
terminated
case Maybe StreamTerminated
mTerminated of
Just StreamTerminated
_ ->
() -> STM ()
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Maybe StreamTerminated
Nothing -> do
TVar (Maybe StreamTerminated) -> Maybe StreamTerminated -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe StreamTerminated)
terminated (StreamTerminated -> Maybe StreamTerminated
forall a. a -> Maybe a
Just StreamTerminated
reason)
STM ()
act
iface :: OutBodyIface
iface =
OutBodyIface
{ outBodyUnmask :: forall a. IO a -> IO a
outBodyUnmask = IO x -> IO x
forall a. IO a -> IO a
unmask
, outBodyPush :: Builder -> IO ()
outBodyPush = \Builder
b ->
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
STM () -> STM ()
forall {b}. STM b -> STM b
whenNotTerminated (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$
TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (StreamingChunk -> STM ()) -> StreamingChunk -> STM ()
forall a b. (a -> b) -> a -> b
$
Builder -> IsEndOfStream -> StreamingChunk
StreamingBuilder Builder
b IsEndOfStream
NotEndOfStream
, outBodyPushFinal :: Builder -> IO ()
outBodyPushFinal = \Builder
b ->
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> STM ()
forall {b}. STM b -> STM b
whenNotTerminated (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ do
TVar (Maybe StreamTerminated) -> Maybe StreamTerminated -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe StreamTerminated)
terminated (StreamTerminated -> Maybe StreamTerminated
forall a. a -> Maybe a
Just StreamTerminated
StreamPushedFinal)
TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (StreamingChunk -> STM ()) -> StreamingChunk -> STM ()
forall a b. (a -> b) -> a -> b
$ Builder -> IsEndOfStream -> StreamingChunk
StreamingBuilder Builder
b (Maybe (IO ()) -> IsEndOfStream
EndOfStream Maybe (IO ())
forall a. Maybe a
Nothing)
TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (StreamingChunk -> STM ()) -> StreamingChunk -> STM ()
forall a b. (a -> b) -> a -> b
$ Maybe (IO ()) -> StreamingChunk
StreamingFinished Maybe (IO ())
forall a. Maybe a
Nothing
, outBodyFlush :: IO ()
outBodyFlush =
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
STM () -> STM ()
forall {b}. STM b -> STM b
whenNotTerminated (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$
TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq StreamingChunk
StreamingFlush
, outBodyCancel :: Maybe SomeException -> IO ()
outBodyCancel = \Maybe SomeException
mErr ->
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
StreamTerminated -> STM () -> STM ()
terminateWith StreamTerminated
StreamCancelled (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$
TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (Maybe SomeException -> StreamingChunk
StreamingCancelled Maybe SomeException
mErr)
}
finished :: IO ()
finished = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
StreamTerminated -> STM () -> STM ()
terminateWith StreamTerminated
StreamOutOfScope (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$
TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (StreamingChunk -> STM ()) -> StreamingChunk -> STM ()
forall a b. (a -> b) -> a -> b
$
Maybe (IO ()) -> StreamingChunk
StreamingFinished Maybe (IO ())
forall a. Maybe a
Nothing
OutBodyIface -> IO r
k OutBodyIface
iface IO r -> IO () -> IO r
forall a b. IO a -> IO b -> IO a
`finally` IO ()
finished
nextForStreaming
:: TBQueue StreamingChunk
-> DynaNext
nextForStreaming :: TBQueue StreamingChunk -> DynaNext
nextForStreaming TBQueue StreamingChunk
tbq =
let takeQ :: IO (Maybe StreamingChunk)
takeQ = STM (Maybe StreamingChunk) -> IO (Maybe StreamingChunk)
forall a. STM a -> IO a
atomically (STM (Maybe StreamingChunk) -> IO (Maybe StreamingChunk))
-> STM (Maybe StreamingChunk) -> IO (Maybe StreamingChunk)
forall a b. (a -> b) -> a -> b
$ TBQueue StreamingChunk -> STM (Maybe StreamingChunk)
forall a. TBQueue a -> STM (Maybe a)
tryReadTBQueue TBQueue StreamingChunk
tbq
next :: DynaNext
next = IO (Maybe StreamingChunk) -> DynaNext
fillStreamBodyGetNext IO (Maybe StreamingChunk)
takeQ
in DynaNext
next