-- Session: links frames to streams, and helps in ordering the header frames -- so that they don't get mixed with header frames from other streams when -- resources are being served concurrently. {-# LANGUAGE FlexibleContexts, Rank2Types, TemplateHaskell, OverloadedStrings #-} {-# OPTIONS_HADDOCK hide #-} module SecondTransfer.Http2.Session( http2Session ,getFrameFromSession ,sendFirstFrameToSession ,sendMiddleFrameToSession ,sendCommandToSession ,CoherentSession ,SessionInput(..) ,SessionInputCommand(..) ,SessionOutput(..) ,SessionOutputCommand(..) ,SessionCoordinates(..) ,SessionComponent(..) ,SessionsCallbacks(..) ,SessionsConfig(..) ,ErrorCallback -- Internal stuff ,OutputFrame ,InputFrame ) where #include "Logging.cpphs" -- System grade utilities import Control.Concurrent (ThreadId, forkIO, threadDelay) import Control.Concurrent.Chan import Control.Exception (throwTo) import qualified Control.Exception as E import Control.Monad (forever,unless, when, mapM_, forM, forM_) import Control.Monad.IO.Class (liftIO) import Control.DeepSeq ( ($!!), deepseq ) import Control.Monad.Trans.Reader -- import Control.Monad.Catch (throwM) import Control.Concurrent.MVar import qualified Data.ByteString as B import Data.ByteString.Char8 (pack,unpack) import qualified Data.ByteString.Builder as Bu import qualified Data.ByteString.Lazy as Bl import Data.Conduit import qualified Data.HashTable.IO as H import qualified Data.IntSet as NS import Data.Maybe (isJust) #ifndef IMPLICIT_MONOID import Data.Monoid (mappend) #endif import Control.Lens -- No framing layer here... let's use Kazu's Yamamoto library import qualified Network.HPACK as HP import qualified Network.HTTP2 as NH2 -- Logging utilities import System.Log.Logger import System.Clock (getTime, TimeSpec, Clock(..)) -- Imports from other parts of the program import SecondTransfer.MainLoop.CoherentWorker import SecondTransfer.MainLoop.Tokens import SecondTransfer.MainLoop.Protocol import SecondTransfer.Sessions.Config import SecondTransfer.Sessions.Internal (sessionExceptionHandler, SessionsContext, sessionsConfig) import SecondTransfer.Utils (unfoldChannelAndSource) import SecondTransfer.Exception import qualified SecondTransfer.Utils.HTTPHeaders as He import SecondTransfer.MainLoop.Logging (logWithExclusivity, logit) -- Unfortunately the frame encoding API of Network.HTTP2 is a bit difficult to -- use :-( type OutputFrame = (NH2.EncodeInfo, NH2.FramePayload, Effect) type InputFrame = NH2.Frame --useChunkLength :: Int -- useChunkLength = 2048 -- Singleton instance used for concurrency data HeadersSent = HeadersSent -- All streams put their data bits here. A "Nothing" value signals -- end of data. Middle value is delay in microseconds type DataOutputToConveyor = (GlobalStreamId, Maybe B.ByteString, Effect) data HeaderOutputMessage = NormalResponse_HM (GlobalStreamId, MVar HeadersSent, Headers, Effect) -- |PushPromise_HM (GlobalStreamId, GlobalStreamId, Headers, Effect) -- Settings imposed by the peer data SessionSettings = SessionSettings { _pushEnabled :: Bool } makeLenses ''SessionSettings -- Whatever a worker thread is going to need comes here.... -- this is to make refactoring easier, but not strictly needed. data WorkerThreadEnvironment = WorkerThreadEnvironment { -- What's the header stream id? _streamId :: GlobalStreamId -- A full block of headers can come here... the mvar in the middle should -- be populate to signal end of headers transmission. A thread will be suspended -- waiting for that , _headersOutput :: Chan HeaderOutputMessage -- And regular contents can come this way and thus be properly mixed -- with everything else.... for now... ,_dataOutput :: MVar DataOutputToConveyor ,_streamsCancelled_WTE :: MVar NS.IntSet ,_sessionSettings_WTE :: MVar SessionSettings ,_nextPushStream_WTE :: MVar Int } makeLenses ''WorkerThreadEnvironment -- An HTTP/2 session. Basically a couple of channels ... type Session = (SessionInput, SessionOutput) -- From outside, one can only write to this one ... the newtype is to enforce -- this. newtype SessionInput = SessionInput ( Chan SessionInputCommand ) sendMiddleFrameToSession :: SessionInput -> InputFrame -> IO () sendMiddleFrameToSession (SessionInput chan) frame = writeChan chan $ MiddleFrame_SIC frame sendFirstFrameToSession :: SessionInput -> InputFrame -> IO () sendFirstFrameToSession (SessionInput chan) frame = writeChan chan $ FirstFrame_SIC frame sendCommandToSession :: SessionInput -> SessionInputCommand -> IO () sendCommandToSession (SessionInput chan) command = writeChan chan command type SessionOutputPacket = Either SessionOutputCommand OutputFrame type SessionOutputChannelAbstraction = Chan SessionOutputPacket -- From outside, one can only read from this one newtype SessionOutput = SessionOutput SessionOutputChannelAbstraction getFrameFromSession :: SessionOutput -> IO SessionOutputPacket getFrameFromSession (SessionOutput chan) = readChan chan type HashTable k v = H.CuckooHashTable k v type Stream2HeaderBlockFragment = HashTable GlobalStreamId Bu.Builder type WorkerMonad = ReaderT WorkerThreadEnvironment IO -- Have to figure out which are these...but I would expect to have things -- like unexpected aborts here in this type. data SessionInputCommand = FirstFrame_SIC InputFrame -- This frame is special |MiddleFrame_SIC InputFrame -- Ordinary frame |InternalAbort_SIC -- Internal abort from the session itself |CancelSession_SIC -- Cancel request from the framer deriving Show -- temporary data SessionOutputCommand = CancelSession_SOC |FinishStream_SOC GlobalStreamId deriving Show -- Here is how we make a session type SessionMaker = SessionsContext -> IO Session -- Here is how we make a session wrapping a CoherentWorker type CoherentSession = AwareWorker -> SessionMaker data PostInputMechanism = PostInputMechanism (MVar (Maybe B.ByteString), InputDataStream) -- NH2.Frame != Frame data SessionData = SessionData { -- ATTENTION: Ignore the warning coming from here for now _sessionsContext :: SessionsContext ,_sessionInput :: Chan SessionInputCommand -- We need to lock this channel occassionally so that we can order multiple -- header frames properly....that's the reason for the outer MVar ,_sessionOutput :: MVar SessionOutputChannelAbstraction -- Use to encode ,_toEncodeHeaders :: MVar HP.DynamicTable -- And used to decode ,_toDecodeHeaders :: MVar HP.DynamicTable -- While I'm receiving headers, anything which -- is not a header should end in the connection being -- closed ,_receivingHeaders :: MVar (Maybe Int) -- _lastGoodStream is used both to report the last good stream in the -- GoAwayFrame and to keep track of streams oppened by the client. In -- other words, it contains the stream_id of the last valid client -- stream and is updated as soon as the first frame of that stream is -- received. ,_lastGoodStream :: MVar Int -- Used for decoding the headers ,_stream2HeaderBlockFragment :: Stream2HeaderBlockFragment -- Used for worker threads... this is actually a pre-filled template -- I make copies of it in different contexts, and as needed. ,_forWorkerThread :: WorkerThreadEnvironment ,_awareWorker :: AwareWorker -- Some streams may be cancelled ,_streamsCancelled :: MVar NS.IntSet -- Data input mechanism corresponding to some threads ,_stream2PostInputMechanism :: HashTable Int PostInputMechanism -- Worker thread register. This is a dictionary from stream id to -- the ThreadId of the thread with the worker thread. I use this to -- raise asynchronous exceptions in the worker thread if the stream -- is cancelled by the client. This way we get early finalization. ,_stream2WorkerThread :: HashTable Int ThreadId -- Use to retrieve/set the session id ,_sessionIdAtSession :: Int -- And used to keep peer session settings ,_sessionSettings :: MVar SessionSettings -- What is the next stream available for push? ,_nextPushStream :: MVar Int } makeLenses ''SessionData -- v- {headers table size comes here!!} http2Session :: AwareWorker -> Int -> SessionsContext -> IO Session http2Session aware_worker session_id sessions_context = do session_input <- newChan session_output <- newChan session_output_mvar <- newMVar session_output -- For incremental construction of headers... stream_request_headers <- H.new :: IO Stream2HeaderBlockFragment -- Warning: we should find a way of coping with different table sizes. decode_headers_table <- HP.newDynamicTableForDecoding 4096 decode_headers_table_mvar <- newMVar decode_headers_table encode_headers_table <- HP.newDynamicTableForEncoding 4096 encode_headers_table_mvar <- newMVar encode_headers_table -- These ones need independent threads taking care of sending stuff -- their way... headers_output <- newChan :: IO (Chan HeaderOutputMessage) data_output <- newEmptyMVar :: IO (MVar DataOutputToConveyor) stream2postinputmechanism <- H.new stream2workerthread <- H.new last_good_stream_mvar <- newMVar (-1) receiving_headers <- newMVar Nothing session_settings <- newMVar $ SessionSettings { _pushEnabled = True } next_push_stream <- newMVar 8 -- What about stream cancellation? cancelled_streams_mvar <- newMVar $ NS.empty :: IO (MVar NS.IntSet) let for_worker_thread = WorkerThreadEnvironment { _streamId = error "NotInitialized" ,_headersOutput = headers_output ,_dataOutput = data_output ,_streamsCancelled_WTE = cancelled_streams_mvar ,_sessionSettings_WTE = session_settings ,_nextPushStream_WTE = next_push_stream } let session_data = SessionData { _sessionsContext = sessions_context ,_sessionInput = session_input ,_sessionOutput = session_output_mvar ,_toDecodeHeaders = decode_headers_table_mvar ,_toEncodeHeaders = encode_headers_table_mvar ,_stream2HeaderBlockFragment = stream_request_headers ,_forWorkerThread = for_worker_thread ,_awareWorker = aware_worker ,_streamsCancelled = cancelled_streams_mvar ,_stream2PostInputMechanism = stream2postinputmechanism ,_stream2WorkerThread = stream2workerthread ,_sessionIdAtSession = session_id ,_receivingHeaders = receiving_headers ,_sessionSettings = session_settings ,_lastGoodStream = last_good_stream_mvar ,_nextPushStream = next_push_stream } let exc_handler :: SessionComponent -> HTTP2SessionException -> IO () exc_handler component e = sessionExceptionHandler component session_id sessions_context e exc_guard :: SessionComponent -> IO () -> IO () exc_guard component action = E.catch action (\e -> do -- INSTRUMENTATION( errorM "HTTP2.Session" "Exception processed" ) exc_handler component e ) use_chunk_length = sessions_context ^. sessionsConfig . dataFrameSize -- Create an input thread that decodes frames... forkIO $ exc_guard SessionInputThread_HTTP2SessionComponent $ runReaderT sessionInputThread session_data -- Create a thread that captures headers and sends them down the tube forkIO $ exc_guard SessionHeadersOutputThread_HTTP2SessionComponent $ runReaderT (headersOutputThread headers_output session_output_mvar) session_data -- Create a thread that captures data and sends it down the tube. This is waiting for -- stuff coming from all the workers. This function is also in charge of closing streams forkIO $ exc_guard SessionDataOutputThread_HTTP2SessionComponent $ dataOutputThread use_chunk_length data_output session_output_mvar -- The two previous threads fill the session_output argument below (they write to it) -- the session machinery in the other end is in charge of sending that data through the -- socket. return ( (SessionInput session_input), (SessionOutput session_output) ) -- TODO: Some ill clients can break this thread with exceptions. Make these paths a bit --- more robust. sessionInputThread :: ReaderT SessionData IO () sessionInputThread = do -- This is an introductory and declarative block... all of this is tail-executed -- every time that a packet needs to be processed. It may be a good idea to abstract -- these values in a closure... session_input <- view sessionInput decode_headers_table_mvar <- view toDecodeHeaders stream_request_headers <- view stream2HeaderBlockFragment cancelled_streams_mvar <- view streamsCancelled coherent_worker <- view awareWorker for_worker_thread_uns <- view forWorkerThread stream2workerthread <- view stream2WorkerThread receiving_headers_mvar <- view receivingHeaders last_good_stream_mvar <- view lastGoodStream current_session_id <- view sessionIdAtSession input <- liftIO $ readChan session_input case input of FirstFrame_SIC (NH2.Frame (NH2.FrameHeader _ 1 null_stream_id ) _ )| 0 == null_stream_id -> do -- This is a SETTINGS ACK frame, which is okej to have, -- do nothing here continue FirstFrame_SIC (NH2.Frame (NH2.FrameHeader _ 0 null_stream_id ) (NH2.SettingsFrame settings_list) ) | 0 == null_stream_id -> do -- Good, handle handleSettingsFrame settings_list continue FirstFrame_SIC _ -> do -- Bad, incorrect id or god knows only what .... closeConnectionBecauseIsInvalid NH2.ProtocolError return () CancelSession_SIC -> do -- Good place to tear down worker threads... Let the rest of the finalization -- to the framer. -- -- This message is normally got from the Framer liftIO $ do H.mapM_ (\ (_, thread_id) -> do throwTo thread_id StreamCancelledException return () ) stream2workerthread -- We do not continue here, but instead let it finish return () InternalAbort_SIC -> do -- Message triggered because the worker failed to behave. -- When this is sent, the connection is closed closeConnectionBecauseIsInvalid NH2.InternalError return () -- The block below will process both HEADERS and CONTINUATION frames. -- TODO: As it stands now, the server will happily start a new stream with -- a CONTINUATION frame instead of a HEADERS frame. That's against the -- protocol. MiddleFrame_SIC frame | Just (stream_id, bytes) <- isAboutHeaders frame -> do -- Just append the frames to streamRequestHeaders opens_stream <- appendHeaderFragmentBlock stream_id bytes if opens_stream then do maybe_rcv_headers_of <- liftIO $ takeMVar receiving_headers_mvar case maybe_rcv_headers_of of Just _ -> do -- Bad client, it is already sending headers -- and trying to open another one closeConnectionBecauseIsInvalid NH2.ProtocolError -- An exception will be thrown above, so to not complicate -- control flow here too much. Nothing -> do -- Signal that we are receiving headers now, for this stream liftIO $ putMVar receiving_headers_mvar (Just stream_id) -- And go to check if the stream id is valid last_good_stream <- liftIO $ takeMVar last_good_stream_mvar if (odd stream_id ) && (stream_id > last_good_stream) then do -- We are golden, set the new good stream liftIO $ putMVar last_good_stream_mvar (stream_id) else do -- We are not golden -- INSTRUMENTATION( errorM "HTTP2.Session" "Protocol error: bad stream id") closeConnectionBecauseIsInvalid NH2.ProtocolError -- So, now we know, we are ignoring priority frames at the moment -- liftIO . logit $ "Priority: " `mappend` (pack . show $ getHeadersPriority frame) else do maybe_rcv_headers_of <- liftIO $ takeMVar receiving_headers_mvar case maybe_rcv_headers_of of Just a_stream_id | a_stream_id == stream_id -> do -- Nothing to complain about liftIO $ putMVar receiving_headers_mvar maybe_rcv_headers_of Nothing -> error "InternalError, this should be set" if frameEndsHeaders frame then do -- Ok, let it be known that we are not receiving more headers liftIO $ modifyMVar_ receiving_headers_mvar (\ _ -> return Nothing ) -- Lets get a time headers_arrived_time <- liftIO $ getTime Monotonic -- Let's decode the headers let for_worker_thread = set streamId stream_id for_worker_thread_uns headers_bytes <- getHeaderBytes stream_id dyn_table <- liftIO $ takeMVar decode_headers_table_mvar (new_table, header_list ) <- liftIO $ HP.decodeHeader dyn_table headers_bytes #if LOGIT_SWITCH_TIMINGS let (Just path) = He.fetchHeader header_list ":path" liftIO $ logit $ (pack . show $ stream_id ) `mappend` " -> " `mappend` path #endif -- /DEBUG -- Good moment to remove the headers from the table.... we don't want a space -- leak here liftIO $ do H.delete stream_request_headers stream_id putMVar decode_headers_table_mvar new_table -- TODO: Validate headers, abort session if the headers are invalid. -- Otherwise other invariants will break!! -- THIS IS PROBABLY THE BEST PLACE FOR DOING IT. let headers_editor = He.fromList header_list maybe_good_headers_editor <- validateIncomingHeaders headers_editor good_headers <- case maybe_good_headers_editor of Just yes_they_are_good -> return yes_they_are_good Nothing -> closeConnectionBecauseIsInvalid NH2.ProtocolError -- Add any extra headers, on demand headers_extra_good <- addExtraHeaders good_headers let header_list_after = He.toList headers_extra_good -- liftIO $ putStrLn $ "header list after " ++ (show header_list_after) -- If the headers end the request.... post_data_source <- if not (frameEndsStream frame) then do mechanism <- createMechanismForStream stream_id let source = postDataSourceFromMechanism mechanism return $ Just source else do return Nothing let perception = Perception { _startedTime_Pr = headers_arrived_time, _streamId_Pr = stream_id, _sessionId_Pr = current_session_id, _protocol_Pr = Http2_HPV, _anouncedProtocols_Pr = Nothing } request = Request { _headers_RQ = header_list_after, _inputData_RQ = post_data_source, _perception_RQ = perception } -- TODO: Handle the cases where a request tries to send data -- even if the method doesn't allow for data. -- I'm clear to start the worker, in its own thread -- -- NOTE: Some late internal errors from the worker thread are -- handled here by clossing the session. -- -- TODO: Log exceptions handled here. liftIO $ do thread_id <- forkIO $ E.catch (runReaderT (workerThread request coherent_worker) for_worker_thread ) ( ( \ _ -> do -- Actions to take when the thread breaks.... writeChan session_input InternalAbort_SIC ) :: HTTP500PrecursorException -> IO () ) H.insert stream2workerthread stream_id thread_id return () else -- Frame doesn't end the headers... it was added before... so -- probably do nothing return () continue MiddleFrame_SIC frame@(NH2.Frame _ (NH2.RSTStreamFrame _error_code_id)) -> do let stream_id = streamIdFromFrame frame liftIO $ do -- INSTRUMENTATION( infoM "HTTP2.Session" $ "Stream reset: " ++ (show _error_code_id) ) cancelled_streams <- takeMVar cancelled_streams_mvar -- INSTRUMENTATION( infoM "HTTP2.Session" $ "Cancelled stream was: " ++ (show stream_id) ) putMVar cancelled_streams_mvar $ NS.insert stream_id cancelled_streams maybe_thread_id <- H.lookup stream2workerthread stream_id case maybe_thread_id of Nothing -> -- This is can actually happen in some implementations: we are asked to -- cancel an stream we know nothing about. Log which stream it is logit $ "InterruptingUnexistentStream " `mappend` (pack . show $ stream_id) -- and don't get too crazy about it. Just thread_id -> do -- INSTRUMENTATION( infoM "HTTP2.Session" $ "Stream successfully interrupted" ) throwTo thread_id StreamCancelledException continue MiddleFrame_SIC frame@(NH2.Frame (NH2.FrameHeader _ _ nh2_stream_id) (NH2.DataFrame somebytes)) -> unlessReceivingHeaders $ do -- So I got data to process -- TODO: Handle end of stream let stream_id = nh2_stream_id -- TODO: Handle the cases where the stream_id doesn't match an already existent -- stream. In such cases it is justified to reset the connection with a protocol_error. streamWorkerSendData stream_id somebytes -- After that data has been received and forwarded downstream, we can issue a windows update -- -- TODO: We can use wider frames to avoid congestion... -- .... and we can also be more compositional with these short bursts of data.... -- -- TODO: Consider that the best place to output these frames can be somewhere else... -- -- TODO: Use a special, with-quota queue here to do flow control. Don't send meaningless -- WindowUpdateFrame's sendOutFrame (NH2.EncodeInfo NH2.defaultFlags nh2_stream_id Nothing ) (NH2.WindowUpdateFrame (fromIntegral (B.length somebytes)) ) sendOutFrame (NH2.EncodeInfo NH2.defaultFlags 0 Nothing ) (NH2.WindowUpdateFrame (fromIntegral (B.length somebytes)) ) if frameEndsStream frame then do -- Good place to close the source ... closePostDataSource stream_id else return () continue MiddleFrame_SIC (NH2.Frame (NH2.FrameHeader _ flags _) (NH2.PingFrame _)) | NH2.testAck flags-> do -- Deal with pings: this is an Ack, so do nothing continue MiddleFrame_SIC (NH2.Frame (NH2.FrameHeader _ _ _) (NH2.PingFrame somebytes)) -> do -- Deal with pings: NOT an Ack, so answer -- INSTRUMENTATION( debugM "HTTP2.Session" "Ping processed" ) sendOutFrame (NH2.EncodeInfo (NH2.setAck NH2.defaultFlags) 0 Nothing ) (NH2.PingFrame somebytes) continue MiddleFrame_SIC (NH2.Frame frame_header (NH2.SettingsFrame _)) | isSettingsAck frame_header -> do -- Frame was received by the peer, do nothing here... continue -- TODO: Do something with these settings!! MiddleFrame_SIC (NH2.Frame _ (NH2.SettingsFrame settings_list)) -> do -- INSTRUMENTATION( debugM "HTTP2.Session" $ "Received settings: " ++ (show settings_list) ) -- Just acknowledge the frame.... for now handleSettingsFrame settings_list continue MiddleFrame_SIC somethingelse -> unlessReceivingHeaders $ do -- An undhandled case here.... -- liftIO . logit $ "Strange frame:" `mappend` (pack . show $ somethingelse) continue where continue = sessionInputThread -- TODO: Do use the settings!!! handleSettingsFrame :: NH2.SettingsList -> ReaderT SessionData IO () handleSettingsFrame _settings_list = do let enable_push = lookup NH2.SettingsEnablePush _settings_list session_settings_mvar <- view sessionSettings case enable_push of Just 1 -> liftIO $ modifyMVar_ session_settings_mvar $ \ old_settings -> return ( pushEnabled .~ True $ old_settings) Just 0 -> liftIO $ modifyMVar_ session_settings_mvar $ \ old_settings -> return ( pushEnabled .~ False $ old_settings) Just _ -> closeConnectionBecauseIsInvalid NH2.ProtocolError Nothing -> return () -- liftIO $ logit $ "Settings " `mappend` (pack . show $ _settings_list) sendOutFrame (NH2.EncodeInfo (NH2.setAck NH2.defaultFlags) 0 Nothing ) (NH2.SettingsFrame []) sendOutFrame :: NH2.EncodeInfo -> NH2.FramePayload -> ReaderT SessionData IO () sendOutFrame encode_info payload = do session_output_mvar <- view sessionOutput session_output <- liftIO $ takeMVar session_output_mvar liftIO $ writeChan session_output $ Right ( encode_info, payload, -- Not sending effects in this frame, since it is not related... error "sendOutFrameNotFor") liftIO $ putMVar session_output_mvar session_output -- TODO: This function, but using the headers editor, triggers -- some renormalization of the header order. A good thing, if -- I get that order well enough.... addExtraHeaders :: He.HeaderEditor -> ReaderT SessionData IO He.HeaderEditor addExtraHeaders headers_editor = do let enriched_lens = (sessionsContext . sessionsConfig .sessionsEnrichedHeaders ) -- TODO: Figure out which is the best way to put this contact in the -- source code protocol_lens = He.headerLens "second-transfer-eh--used-protocol" add_used_protocol <- view (enriched_lens . addUsedProtocol ) -- liftIO $ putStrLn $ "AAA" ++ (show add_used_protocol) let he1 = if add_used_protocol then set protocol_lens (Just "HTTP/2") headers_editor else headers_editor if add_used_protocol -- Nothing will be computed here if the headers are not modified. then return he1 else return headers_editor validateIncomingHeaders :: He.HeaderEditor -> ReaderT SessionData IO (Maybe He.HeaderEditor) validateIncomingHeaders headers_editor = do -- Check that the headers block comes with all mandatory headers. -- Right now I'm not checking that they come in the mandatory order though... -- -- Notice that this function will transform a "host" header to an ":authority" -- one. let h1 = He.replaceHostByAuthority headers_editor -- Check that headers are lowercase headers_are_lowercase = He.headersAreLowercaseAtHeaderEditor headers_editor -- Check that we have mandatory headers maybe_authority = h1 ^. (He.headerLens ":authority") maybe_method = h1 ^. (He.headerLens ":method") maybe_scheme = h1 ^. (He.headerLens ":scheme") maybe_path = h1 ^. (He.headerLens ":path") if (isJust maybe_authority) && (isJust maybe_method) && (isJust maybe_scheme) && (isJust maybe_path ) then return (Just h1) else return Nothing -- Sends a GO_AWAY frame and raises an exception, effectively terminating the input -- thread of the session. closeConnectionBecauseIsInvalid :: NH2.ErrorCodeId -> ReaderT SessionData IO a closeConnectionBecauseIsInvalid error_code = do -- liftIO $ errorM "HTTP2.Session" "closeConnectionBecauseIsInvalid called!" last_good_stream_mvar <- view lastGoodStream last_good_stream <- liftIO $ takeMVar last_good_stream_mvar session_output_mvar <- view sessionOutput stream2workerthread <- view stream2WorkerThread sendOutFrame (NH2.EncodeInfo NH2.defaultFlags 0 Nothing ) (NH2.GoAwayFrame last_good_stream error_code "" ) liftIO $ do -- Close all active threads for this session H.mapM_ ( \(_stream_id, thread_id) -> throwTo thread_id StreamCancelledException ) stream2workerthread -- Notify the framer that the session is closing, so -- that it stops accepting frames from connected sources -- (Streams?) session_output <- takeMVar session_output_mvar writeChan session_output $ Left CancelSession_SOC putMVar session_output_mvar session_output -- And unwind the input thread in the session, so that the -- exception handler runs.... E.throw HTTP2ProtocolException frameEndsStream :: InputFrame -> Bool frameEndsStream (NH2.Frame (NH2.FrameHeader _ flags _) _) = NH2.testEndStream flags -- Executes its argument, unless receiving -- headers, in which case the connection is closed. unlessReceivingHeaders :: ReaderT SessionData IO a -> ReaderT SessionData IO a unlessReceivingHeaders comp = do receiving_headers_mvar <- view receivingHeaders -- First check if we are receiving headers maybe_recv_headers <- liftIO $ readMVar receiving_headers_mvar if isJust maybe_recv_headers then -- So, this frame is highly illegal closeConnectionBecauseIsInvalid NH2.ProtocolError else comp createMechanismForStream :: GlobalStreamId -> ReaderT SessionData IO PostInputMechanism createMechanismForStream stream_id = do (chan, source) <- liftIO $ unfoldChannelAndSource stream2postinputmechanism <- view stream2PostInputMechanism let pim = PostInputMechanism (chan, source) liftIO $ H.insert stream2postinputmechanism stream_id pim return pim -- TODO: Can be optimized by factoring out the mechanism lookup -- TODO IMPORTANT: This is a good place to drop the postinputmechanism -- for a stream, so that unprocessed data can be garbage-collected. closePostDataSource :: GlobalStreamId -> ReaderT SessionData IO () closePostDataSource stream_id = do stream2postinputmechanism <- view stream2PostInputMechanism pim_maybe <- liftIO $ H.lookup stream2postinputmechanism stream_id case pim_maybe of Just (PostInputMechanism (chan, _)) -> liftIO $ putMVar chan Nothing Nothing -> -- TODO: This is a protocol error, handle it properly error "Internal error/closePostDataSource" streamWorkerSendData :: Int -> B.ByteString -> ReaderT SessionData IO () streamWorkerSendData stream_id bytes = do s2pim <- view stream2PostInputMechanism pim_maybe <- liftIO $ H.lookup s2pim stream_id case pim_maybe of Just pim -> sendBytesToPim pim bytes Nothing -> -- This is an internal error, the mechanism should be -- created when the headers end (and if the headers -- do not finish the stream) error "Internal error" sendBytesToPim :: PostInputMechanism -> B.ByteString -> ReaderT SessionData IO () sendBytesToPim (PostInputMechanism (chan, _)) bytes = liftIO $ putMVar chan (Just bytes) postDataSourceFromMechanism :: PostInputMechanism -> InputDataStream postDataSourceFromMechanism (PostInputMechanism (_, source)) = source isSettingsAck :: NH2.FrameHeader -> Bool isSettingsAck (NH2.FrameHeader _ flags _) = NH2.testAck flags isStreamCancelled :: GlobalStreamId -> WorkerMonad Bool isStreamCancelled stream_id = do cancelled_streams_mvar <- view streamsCancelled_WTE cancelled_streams <- liftIO $ readMVar cancelled_streams_mvar return $ NS.member stream_id cancelled_streams sendPrimitive500Error :: IO TupledPrincipalStream sendPrimitive500Error = return ( [ (":status", "500") ], [], do yield "Internal server error\n" -- No footers return [] ) -- Invokes the Coherent worker. Data is sent through pipes to -- the output thread here in this session. workerThread :: Request -> AwareWorker -> WorkerMonad () workerThread req aware_worker = do headers_output <- view headersOutput stream_id <- view streamId session_settings_mvar <- view sessionSettings_WTE session_settings <- liftIO $ readMVar session_settings_mvar next_push_stream_mvar <- view nextPushStream_WTE -- TODO: Handle exceptions here: what happens if the coherent worker -- throws an exception signaling that the request is ill-formed -- and should be dropped? That could happen in a couple of occassions, -- but really most cases should be handled here in this file... #if LOGIT_SWITCH_TIMINGS liftIO . logit $ "worker-thread " `mappend` (pack . show $ stream_id) #endif -- (headers, _, data_and_conclussion) principal_stream <- liftIO $ E.catch ( aware_worker req ) ( const $ tupledPrincipalStreamToPrincipalStream <$> sendPrimitive500Error :: HTTP500PrecursorException -> IO PrincipalStream ) -- Pieces of the header let headers = principal_stream ^. headers_PS data_and_conclusion = principal_stream ^. dataAndConclusion_PS effects = principal_stream ^. effect_PS pushed_streams = principal_stream ^. pushedStreams_PS can_push = session_settings ^. pushEnabled -- There are several possible moments where the PUSH_PROMISEs can be sent, -- but a default safe one is before sending the response HEADERS, so that -- LINK headers in the response come after any potential promises. data_promises <- if can_push then do forM pushed_streams $ \ pushed_stream_comp -> do -- Initialize pushed stream pushed_stream <- liftIO pushed_stream_comp -- Send the request headers properly wrapped in a "Push Promise", do -- it before sending the actual response headers of this stream let request_headers = pushed_stream ^. requestHeaders_Psh response_headers = pushed_stream ^. responseHeaders_Psh pushed_data_and_conclusion = pushed_stream ^. dataAndConclusion_Psh child_stream_id <- liftIO $ modifyMVar next_push_stream_mvar $ (\ x -> return (x+2,x) ) liftIO . writeChan headers_output . PushPromise_HM $ (stream_id, child_stream_id, request_headers, effects) return (child_stream_id, response_headers, pushed_data_and_conclusion, effects) else return [] -- Now I send the headers, if that's possible at all headers_sent <- liftIO newEmptyMVar liftIO $ writeChan headers_output $ NormalResponse_HM (stream_id, headers_sent, headers, effects) -- At this moment I should ask if the stream hasn't been cancelled by the browser before -- commiting to the work of sending addtitional data... this is important for pushed -- streams is_stream_cancelled <- isStreamCancelled stream_id unless is_stream_cancelled $ do -- I have a beautiful source that I can de-construct... -- TODO: Optionally pulling data out from a Conduit .... -- liftIO ( data_and_conclussion $$ (_sendDataOfStream stream_id) ) -- -- This threadlet should block here waiting for the headers to finish going -- NOTE: Exceptions generated here inheriting from HTTP500PrecursorException -- are let to bubble and managed in this thread fork point... (_maybe_footers, _) <- runConduit $ transPipe liftIO data_and_conclusion `fuseBothMaybe` sendDataOfStream stream_id headers_sent effects -- BIG TODO: Send the footers ... likely stream conclusion semantics -- will need to be changed. -- Now, time to fork threads for the pusher streams .... forM_ data_promises $ \ (child_stream_id, response_headers, pushed_data_and_conclusion, effects) -> do environment <- ask let action = pusherThread child_stream_id response_headers pushed_data_and_conclusion effects -- And let the action run in its own thread liftIO . forkIO $ runReaderT action environment return () -- Invokes the Coherent worker. Data is sent through pipes to -- the output thread here in this session. pusherThread :: GlobalStreamId -> Headers -> DataAndConclusion -> Effect -> WorkerMonad () pusherThread child_stream_id response_headers pushed_data_and_conclusion effects = do headers_output <- view headersOutput session_settings_mvar <- view sessionSettings_WTE session_settings <- liftIO $ readMVar session_settings_mvar -- TODO: Handle exceptions here: what happens if the coherent worker -- throws an exception signaling that the request is ill-formed -- and should be dropped? That could happen in a couple of occassions, -- but really most cases should be handled here in this file... -- (headers, _, data_and_conclussion) -- Now I send the headers, if that's possible at all headers_sent <- liftIO newEmptyMVar liftIO . writeChan headers_output $ NormalResponse_HM (child_stream_id, headers_sent, response_headers, effects) -- At this moment I should ask if the stream hasn't been cancelled by the browser before -- commiting to the work of sending addtitional data... this is important for pushed -- streams is_stream_cancelled <- isStreamCancelled child_stream_id unless is_stream_cancelled $ do -- I have a beautiful source that I can de-construct... -- TODO: Optionally pulling data out from a Conduit .... -- liftIO ( data_and_conclussion $$ (_sendDataOfStream stream_id) ) -- -- This threadlet should block here waiting for the headers to finish going -- NOTE: Exceptions generated here inheriting from HTTP500PrecursorException -- are let to bubble and managed in this thread fork point... (_maybe_footers, _) <- runConduit $ transPipe liftIO pushed_data_and_conclusion `fuseBothMaybe` sendDataOfStream child_stream_id headers_sent effects -- BIG TODO: Send the footers ... likely stream conclusion semantics -- will need to be changed. return () -- v-- comp. monad. sendDataOfStream :: GlobalStreamId -> MVar HeadersSent -> Effect -> Sink B.ByteString (ReaderT WorkerThreadEnvironment IO) () sendDataOfStream stream_id headers_sent effect = do data_output <- view dataOutput -- Wait for all headers sent liftIO $ takeMVar headers_sent consumer data_output where --delay = effect ^. middlePauseForDelivery_Ef consumer data_output = do maybe_bytes <- await case maybe_bytes of Nothing -> -- This is how we finish sending data liftIO $ putMVar data_output (stream_id, Nothing, effect) Just bytes -> do liftIO $ do putMVar data_output (stream_id, Just bytes, effect) consumer data_output -- Returns if the frame is the first in the stream appendHeaderFragmentBlock :: GlobalStreamId -> B.ByteString -> ReaderT SessionData IO Bool appendHeaderFragmentBlock global_stream_id bytes = do ht <- view stream2HeaderBlockFragment maybe_old_block <- liftIO $ H.lookup ht global_stream_id (new_block, new_stream) <- case maybe_old_block of Nothing -> do -- TODO: Make the commented message below more informative return $ (Bu.byteString bytes, True) Just something -> return $ (something `mappend` (Bu.byteString bytes), False) liftIO $ H.insert ht global_stream_id new_block return new_stream getHeaderBytes :: GlobalStreamId -> ReaderT SessionData IO B.ByteString getHeaderBytes global_stream_id = do ht <- view stream2HeaderBlockFragment Just bytes <- liftIO $ H.lookup ht global_stream_id return $ Bl.toStrict $ Bu.toLazyByteString bytes isAboutHeaders :: InputFrame -> Maybe (GlobalStreamId, B.ByteString) isAboutHeaders (NH2.Frame (NH2.FrameHeader _ _ stream_id) ( NH2.HeadersFrame _ block_fragment ) ) = Just (stream_id, block_fragment) isAboutHeaders (NH2.Frame (NH2.FrameHeader _ _ stream_id) ( NH2.ContinuationFrame block_fragment) ) = Just (stream_id, block_fragment) isAboutHeaders _ = Nothing getHeadersPriority :: InputFrame -> Maybe NH2.Priority getHeadersPriority (NH2.Frame _ ( NH2.HeadersFrame prio _ ) ) = prio getHeadersPriority _ = Nothing frameEndsHeaders :: InputFrame -> Bool frameEndsHeaders (NH2.Frame (NH2.FrameHeader _ flags _) _) = NH2.testEndHeader flags streamIdFromFrame :: InputFrame -> GlobalStreamId streamIdFromFrame (NH2.Frame (NH2.FrameHeader _ _ stream_id) _) = stream_id -- TODO: Have different size for the headers..... just now going with a default size of 16 k... -- TODO: Find a way to kill this thread.... headersOutputThread :: Chan HeaderOutputMessage -- (GlobalStreamId, MVar HeadersSent, Headers, Effect) -> MVar SessionOutputChannelAbstraction -> ReaderT SessionData IO () headersOutputThread input_chan session_output_mvar = forever $ do use_chunk_length <- view $ sessionsContext . sessionsConfig . dataFrameSize header_output_request <- liftIO $ readChan input_chan case header_output_request of NormalResponse_HM (stream_id, headers_ready_mvar, headers, effect) -> do -- First encode the headers using the table encode_dyn_table_mvar <- view toEncodeHeaders encode_dyn_table <- liftIO $ takeMVar encode_dyn_table_mvar (new_dyn_table, data_to_send ) <- liftIO $ HP.encodeHeader HP.defaultEncodeStrategy encode_dyn_table headers liftIO $ putMVar encode_dyn_table_mvar new_dyn_table -- Now split the bytestring in chunks of the needed size.... -- Note that the only way we can let bs_chunks = bytestringChunk use_chunk_length data_to_send -- And send the chunks through while locking the output place.... liftIO $ bs_chunks `deepseq` E.bracket (takeMVar session_output_mvar) (putMVar session_output_mvar ) (\ session_output -> do writeIndividualHeaderFrames session_output stream_id bs_chunks True effect -- And say that the headers for this thread are out -- INSTRUMENTATION( debugM "HTTP2.Session" $ "Headers were output for stream " ++ (show stream_id) ) putMVar headers_ready_mvar HeadersSent ) PushPromise_HM (parent_stream_id, child_stream_id, promise_headers, effect) -> do encode_dyn_table_mvar <- view toEncodeHeaders encode_dyn_table <- liftIO $ takeMVar encode_dyn_table_mvar (new_dyn_table, data_to_send ) <- liftIO $ HP.encodeHeader HP.defaultEncodeStrategy encode_dyn_table promise_headers liftIO $ putMVar encode_dyn_table_mvar new_dyn_table -- Now split the bytestring in chunks of the needed size.... bs_chunks <- return $! bytestringChunk use_chunk_length data_to_send -- And send the chunks through while locking the output place.... liftIO $ E.bracket (takeMVar session_output_mvar) (putMVar session_output_mvar ) (\ session_output -> do writePushPromiseFrames session_output parent_stream_id child_stream_id bs_chunks True effect -- No notification here... but since everyhing headers is serialized here and the next -- piece of information corresponding to the child stream is a headers sequence, we shouldn't -- need any other tricks here ) where writeIndividualHeaderFrames :: SessionOutputChannelAbstraction -> GlobalStreamId -> [B.ByteString] -> Bool -> Effect -> IO () writeIndividualHeaderFrames session_output stream_id (last_fragment:[]) is_first effect = writeChan session_output $ Right ( NH2.EncodeInfo { NH2.encodeFlags = NH2.setEndHeader NH2.defaultFlags ,NH2.encodeStreamId = stream_id ,NH2.encodePadding = Nothing }, (if is_first then NH2.HeadersFrame Nothing last_fragment else NH2.ContinuationFrame last_fragment), effect ) writeIndividualHeaderFrames session_output stream_id (fragment:xs) is_first effect = do writeChan session_output $ Right ( NH2.EncodeInfo { NH2.encodeFlags = NH2.defaultFlags ,NH2.encodeStreamId = stream_id ,NH2.encodePadding = Nothing }, (if is_first then NH2.HeadersFrame Nothing fragment else NH2.ContinuationFrame fragment), effect ) writeIndividualHeaderFrames session_output stream_id xs False effect writePushPromiseFrames :: SessionOutputChannelAbstraction -> GlobalStreamId -> GlobalStreamId -> [B.ByteString] -> Bool -> Effect -> IO () writePushPromiseFrames session_output parent_stream_id child_stream_id (last_fragment:[]) is_first effect = writeChan session_output $ Right ( NH2.EncodeInfo { NH2.encodeFlags = NH2.setEndHeader NH2.defaultFlags ,NH2.encodeStreamId = parent_stream_id ,NH2.encodePadding = Nothing }, (if is_first then NH2.PushPromiseFrame child_stream_id last_fragment else NH2.ContinuationFrame last_fragment ), effect ) writePushPromiseFrames session_output parent_stream_id child_stream_id (fragment:xs) is_first effect = do writeChan session_output $ Right ( NH2.EncodeInfo { NH2.encodeFlags = NH2.defaultFlags ,NH2.encodeStreamId = parent_stream_id ,NH2.encodePadding = Nothing }, (if is_first then NH2.PushPromiseFrame child_stream_id fragment else NH2.ContinuationFrame fragment ), effect ) writePushPromiseFrames session_output parent_stream_id child_stream_id xs False effect bytestringChunk :: Int -> B.ByteString -> [B.ByteString] bytestringChunk len s | (B.length s) < len = [ s ] bytestringChunk len s = h:(bytestringChunk len xs) where (h, xs) = B.splitAt len s -- This thread is for the entire session, not for each stream. The thread should -- die while waiting for input in a garbage-collected mvar. -- TODO: This function does non-optimal chunking for the case where responses are -- actually streamed.... in those cases we need to keep state for frames in -- some other format.... -- TODO: Right now, we are transmitting an empty last frame with the end-of-stream -- flag set. I'm afraid that the only -- way to avoid that is by holding a frame or by augmenting the end-user interface -- so that the user can signal which one is the last frame. The first approach -- restricts responsiviness, the second one clutters things. dataOutputThread :: Int -> MVar DataOutputToConveyor -> MVar SessionOutputChannelAbstraction -> IO () dataOutputThread use_chunk_length input_chan session_output_mvar = forever $ do (stream_id, maybe_contents, effect) <- takeMVar input_chan case maybe_contents of Nothing -> do liftIO $ do withLockedSessionOutput (\ session_output -> do -- Write an empty data-frame with the right flags writeChan session_output $ Right ( NH2.EncodeInfo { NH2.encodeFlags = NH2.setEndStream NH2.defaultFlags ,NH2.encodeStreamId = stream_id ,NH2.encodePadding = Nothing }, NH2.DataFrame "", effect ) -- And then write an-end-of-stream command writeChan session_output $ Left . FinishStream_SOC $ stream_id ) Just contents -> do -- And now just simply output it... let bs_chunks = bytestringChunk use_chunk_length contents -- And send the chunks through while locking the output place.... bs_chunks `deepseq` writeContinuations bs_chunks stream_id effect return () where withLockedSessionOutput = E.bracket (takeMVar session_output_mvar) (putMVar session_output_mvar) -- <-- There is an implicit argument there!! writeContinuations :: [B.ByteString] -> GlobalStreamId -> Effect -> IO () writeContinuations fragments stream_id effect = mapM_ (\ fragment -> withLockedSessionOutput (\ session_output -> writeChan session_output $ Right ( NH2.EncodeInfo { NH2.encodeFlags = NH2.defaultFlags ,NH2.encodeStreamId = stream_id ,NH2.encodePadding = Nothing }, NH2.DataFrame fragment, effect ) ) ) fragments