{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE BangPatterns #-}
module Network.HTTP2.Client.Dispatch where
import Control.Exception (throwIO)
import Control.Monad.Base (MonadBase, liftBase)
import Data.ByteString (ByteString)
import qualified Data.ByteString.Internal as ByteString
import Foreign.Marshal.Alloc (mallocBytes, finalizerFree)
import Foreign.ForeignPtr (newForeignPtr)
import Data.IORef.Lifted (IORef, atomicModifyIORef', newIORef, readIORef)
import Data.IntMap (IntMap)
import qualified Data.IntMap as IntMap
import GHC.Exception (Exception)
import Network.HPACK as HPACK
import qualified Network.HPACK.Token as HPACK
import Network.HTTP2 as HTTP2
import Network.HTTP2.Client.Channels
import Network.HTTP2.Client.Exceptions
type DispatchChan = FramesChan HTTP2Error
type FallBackFrameHandler = (FrameHeader, FramePayload) -> ClientIO ()
ignoreFallbackHandler :: FallBackFrameHandler
ignoreFallbackHandler = const $ pure ()
data RemoteSentGoAwayFrame = RemoteSentGoAwayFrame !StreamId !ErrorCodeId !ByteString
deriving Show
instance Exception RemoteSentGoAwayFrame
type GoAwayHandler = RemoteSentGoAwayFrame -> ClientIO ()
defaultGoAwayHandler :: GoAwayHandler
defaultGoAwayHandler = lift . throwIO
data StreamFSMState =
Idle
| ReservedRemote
| Open
| HalfClosedRemote
| HalfClosedLocal
| Closed
data StreamEvent =
StreamHeadersEvent !FrameHeader !HeaderList
| StreamPushPromiseEvent !FrameHeader !StreamId !HeaderList
| StreamDataEvent !FrameHeader ByteString
| StreamErrorEvent !FrameHeader ErrorCode
deriving Show
data StreamState = StreamState {
_streamStateWindowUpdatesChan :: !(Chan (FrameHeader, FramePayload))
, _streamStateEvents :: !(Chan StreamEvent)
, _streamStateFSMState :: !StreamFSMState
}
data Dispatch = Dispatch {
_dispatchMaxStreamId :: !(IORef StreamId)
, _dispatchCurrentStreams :: !(IORef (IntMap StreamState))
}
newDispatchIO :: MonadBase IO m => m Dispatch
newDispatchIO = Dispatch <$> newIORef 0 <*> newIORef (IntMap.empty)
readMaxReceivedStreamIdIO :: MonadBase IO m => Dispatch -> m StreamId
readMaxReceivedStreamIdIO = readIORef . _dispatchMaxStreamId
registerStream :: MonadBase IO m => Dispatch -> StreamId -> StreamState -> m ()
registerStream d sid st =
atomicModifyIORef' (_dispatchCurrentStreams d) $ \xs ->
let v = (IntMap.insert sid st xs) in (v, ())
lookupStreamState :: MonadBase IO m => Dispatch -> StreamId -> m (Maybe StreamState)
lookupStreamState d sid =
IntMap.lookup sid <$> readIORef (_dispatchCurrentStreams d)
closeLocalStream :: MonadBase IO m => Dispatch -> StreamId -> m ()
closeLocalStream d sid =
atomicModifyIORef' (_dispatchCurrentStreams d) $ \xs ->
let (_,v) = IntMap.updateLookupWithKey f sid xs in (v, ())
where
f :: StreamId -> StreamState -> Maybe StreamState
f _ st = case _streamStateFSMState st of
HalfClosedRemote -> Nothing
Closed -> Nothing
_ -> Just $ st { _streamStateFSMState = HalfClosedLocal }
closeRemoteStream :: MonadBase IO m => Dispatch -> StreamId -> m ()
closeRemoteStream d sid =
atomicModifyIORef' (_dispatchCurrentStreams d) $ \xs ->
let (_,v) = IntMap.updateLookupWithKey f sid xs in (v, ())
where
f :: StreamId -> StreamState -> Maybe StreamState
f _ st = case _streamStateFSMState st of
HalfClosedLocal -> Nothing
Closed -> Nothing
_ -> Just $ st { _streamStateFSMState = HalfClosedRemote }
closeReleaseStream :: MonadBase IO m => Dispatch -> StreamId -> m ()
closeReleaseStream d sid =
atomicModifyIORef' (_dispatchCurrentStreams d) $ \xs ->
let v = (IntMap.delete sid xs) in (v, ())
data ConnectionSettings = ConnectionSettings {
_clientSettings :: !Settings
, _serverSettings :: !Settings
}
defaultConnectionSettings :: ConnectionSettings
defaultConnectionSettings =
ConnectionSettings defaultSettings defaultSettings
data PingHandler = PingHandler !(Chan (FrameHeader, FramePayload))
newPingHandler :: MonadBase IO m => m PingHandler
newPingHandler = PingHandler <$> newChan
notifyPingHandler :: MonadBase IO m => (FrameHeader, FramePayload) -> PingHandler -> m ()
notifyPingHandler dat (PingHandler c) = writeChan c dat
waitPingReply :: MonadBase IO m => PingHandler -> m (FrameHeader, FramePayload)
waitPingReply (PingHandler c) = readChan c
data SetSettingsHandler = SetSettingsHandler !(Chan (FrameHeader, FramePayload))
newSetSettingsHandler :: MonadBase IO m => m SetSettingsHandler
newSetSettingsHandler = SetSettingsHandler <$> newChan
notifySetSettingsHandler :: MonadBase IO m => (FrameHeader, FramePayload) -> SetSettingsHandler -> m ()
notifySetSettingsHandler dat (SetSettingsHandler c) = writeChan c dat
waitSetSettingsReply :: MonadBase IO m => SetSettingsHandler -> m (FrameHeader, FramePayload)
waitSetSettingsReply (SetSettingsHandler c) = readChan c
registerPingHandler :: DispatchControl -> ByteString -> IO PingHandler
registerPingHandler dc dat = do
handler <- newPingHandler
atomicModifyIORef' (_dispatchControlPingHandlers dc) (\xs ->
((dat,handler):xs, ()))
return handler
lookupAndReleasePingHandler :: MonadBase IO m => DispatchControl -> ByteString -> m (Maybe PingHandler)
lookupAndReleasePingHandler dc dat =
atomicModifyIORef' (_dispatchControlPingHandlers dc) f
where
f xs = (filter (\x -> dat /= fst x) xs, lookup dat xs)
registerSetSettingsHandler :: MonadBase IO m => DispatchControl -> m SetSettingsHandler
registerSetSettingsHandler dc = do
handler <- newSetSettingsHandler
atomicModifyIORef' (_dispatchControlSetSettingsHandlers dc) (\xs ->
(handler:xs, ()))
return handler
lookupAndReleaseSetSettingsHandler :: MonadBase IO m => DispatchControl -> m (Maybe SetSettingsHandler)
lookupAndReleaseSetSettingsHandler dc =
atomicModifyIORef' (_dispatchControlSetSettingsHandlers dc) f
where
f [] = ([], Nothing)
f (x:xs) = (xs, Just x)
data DispatchControl = DispatchControl {
_dispatchControlConnectionSettings :: !(IORef ConnectionSettings)
, _dispatchControlHpackEncoder :: !HpackEncoderContext
, _dispatchControlAckPing :: !(ByteString -> ClientIO ())
, _dispatchControlAckSettings :: !(ClientIO ())
, _dispatchControlOnGoAway :: !GoAwayHandler
, _dispatchControlOnFallback :: !FallBackFrameHandler
, _dispatchControlPingHandlers :: !(IORef [(ByteString, PingHandler)])
, _dispatchControlSetSettingsHandlers :: !(IORef [SetSettingsHandler])
}
newDispatchControlIO
:: MonadBase IO m
=> Size
-> (ByteString -> ClientIO ())
-> (ClientIO ())
-> GoAwayHandler
-> FallBackFrameHandler
-> m DispatchControl
newDispatchControlIO encoderBufSize ackPing ackSetts onGoAway onFallback =
DispatchControl <$> newIORef defaultConnectionSettings
<*> newHpackEncoderContext encoderBufSize
<*> pure ackPing
<*> pure ackSetts
<*> pure onGoAway
<*> pure onFallback
<*> newIORef []
<*> newIORef []
newHpackEncoderContext :: MonadBase IO m => Size -> m HpackEncoderContext
newHpackEncoderContext encoderBufSize = liftBase $ do
let strategy = (HPACK.defaultEncodeStrategy { HPACK.useHuffman = True })
dt <- HPACK.newDynamicTableForEncoding HPACK.defaultDynamicTableSize
buf <- mallocBytes encoderBufSize
ptr <- newForeignPtr finalizerFree buf
return $ HpackEncoderContext
(\hdrs -> encoder strategy dt buf ptr hdrs)
(\n -> HPACK.setLimitForEncoding n dt)
where
encoder strategy dt buf ptr hdrs = do
let hdrs' = fmap (\(k,v) -> let !t = HPACK.toToken k in (t,v)) hdrs
remainder <- HPACK.encodeTokenHeader buf encoderBufSize strategy True dt hdrs'
case remainder of
([],len) -> pure $ ByteString.fromForeignPtr ptr 0 len
(_,_) -> throwIO HPACK.BufferOverrun
readSettings :: MonadBase IO m => DispatchControl -> m ConnectionSettings
readSettings = readIORef . _dispatchControlConnectionSettings
modifySettings :: MonadBase IO m => DispatchControl -> (ConnectionSettings -> (ConnectionSettings, a)) -> m a
modifySettings d = atomicModifyIORef' (_dispatchControlConnectionSettings d)
data HpackEncoderContext = HpackEncoderContext {
_encodeHeaders :: HeaderList -> IO HeaderBlockFragment
, _applySettings :: Size -> IO ()
}
data DispatchHPACK = DispatchHPACK {
_dispatchHPACKDynamicTable :: !DynamicTable
}
newDispatchHPACKIO :: MonadBase IO m => Size -> m DispatchHPACK
newDispatchHPACKIO decoderBufSize = liftBase $
DispatchHPACK <$> newDecoder
where
newDecoder = newDynamicTableForDecoding
HPACK.defaultDynamicTableSize
decoderBufSize
data DispatchStream = DispatchStream {
_dispatchStreamId :: !StreamId
, _dispatchStreamReadEvents :: !(Chan StreamEvent)
}
newDispatchStreamIO :: MonadBase IO m => StreamId -> m DispatchStream
newDispatchStreamIO sid = liftBase $
DispatchStream <$> pure sid
<*> newChan