{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternGuards #-}
{-# LANGUAGE RecordWildCards #-}

module Network.HTTP2.Server.Worker (
    worker,
    WorkerConf (..),
    fromContext,
) where

import Data.IORef
import qualified Network.HTTP.Types as H
import Network.Socket (SockAddr)
import qualified System.TimeManager as T
import UnliftIO.Exception (SomeException (..))
import qualified UnliftIO.Exception as E
import UnliftIO.STM

import Imports hiding (insert)
import Network.HPACK
import Network.HPACK.Token
import Network.HTTP2.Frame
import Network.HTTP2.H2
import Network.HTTP2.Server.Types

----------------------------------------------------------------

data WorkerConf a = WorkerConf
    { forall a. WorkerConf a -> IO (Input a)
readInputQ :: IO (Input a)
    , forall a. WorkerConf a -> Output a -> IO ()
writeOutputQ :: Output a -> IO ()
    , forall a. WorkerConf a -> a -> IO ()
workerCleanup :: a -> IO ()
    , forall a. WorkerConf a -> IO Bool
isPushable :: IO Bool
    , forall a. WorkerConf a -> a -> PushPromise -> IO (StreamId, a)
makePushStream :: a -> PushPromise -> IO (StreamId, a)
    , forall a. WorkerConf a -> SockAddr
mySockAddr :: SockAddr
    , forall a. WorkerConf a -> SockAddr
peerSockAddr :: SockAddr
    }

fromContext :: Context -> WorkerConf Stream
fromContext :: Context -> WorkerConf Stream
fromContext ctx :: Context
ctx@Context{TVar StreamId
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue (Output Stream)
DynamicTable
Settings
RoleInfo
Role
peerSockAddr :: Context -> SockAddr
mySockAddr :: Context -> SockAddr
rstRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxFlow :: Context -> IORef RxFlow
txFlow :: Context -> TVar TxFlow
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
controlQ :: Context -> TQueue Control
outputQStreamID :: Context -> TVar StreamId
outputQ :: Context -> TQueue (Output Stream)
outputBufferLimit :: Context -> IORef StreamId
peerStreamId :: Context -> IORef StreamId
myStreamId :: Context -> TVar StreamId
continued :: Context -> IORef (Maybe StreamId)
evenStreamTable :: Context -> TVar EvenStreamTable
oddStreamTable :: Context -> TVar OddStreamTable
peerSettings :: Context -> IORef Settings
myFirstSettings :: Context -> IORef Bool
mySettings :: Context -> Settings
roleInfo :: Context -> RoleInfo
role :: Context -> Role
peerSockAddr :: SockAddr
mySockAddr :: SockAddr
rstRate :: Rate
emptyFrameRate :: Rate
settingsRate :: Rate
pingRate :: Rate
rxFlow :: IORef RxFlow
txFlow :: TVar TxFlow
decodeDynamicTable :: DynamicTable
encodeDynamicTable :: DynamicTable
controlQ :: TQueue Control
outputQStreamID :: TVar StreamId
outputQ :: TQueue (Output Stream)
outputBufferLimit :: IORef StreamId
peerStreamId :: IORef StreamId
myStreamId :: TVar StreamId
continued :: IORef (Maybe StreamId)
evenStreamTable :: TVar EvenStreamTable
oddStreamTable :: TVar OddStreamTable
peerSettings :: IORef Settings
myFirstSettings :: IORef Bool
mySettings :: Settings
roleInfo :: RoleInfo
role :: Role
..} =
    WorkerConf
        { readInputQ :: IO (Input Stream)
readInputQ = forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> STM a
readTQueue forall a b. (a -> b) -> a -> b
$ ServerInfo -> TQueue (Input Stream)
inputQ forall a b. (a -> b) -> a -> b
$ RoleInfo -> ServerInfo
toServerInfo RoleInfo
roleInfo
        , writeOutputQ :: Output Stream -> IO ()
writeOutputQ = TQueue (Output Stream) -> Output Stream -> IO ()
enqueueOutput TQueue (Output Stream)
outputQ
        , workerCleanup :: Stream -> IO ()
workerCleanup = \Stream
strm -> do
            Context -> Stream -> ClosedCode -> IO ()
closed Context
ctx Stream
strm ClosedCode
Killed
            let frame :: ByteString
frame = ErrorCode -> StreamId -> ByteString
resetFrame ErrorCode
InternalError forall a b. (a -> b) -> a -> b
$ Stream -> StreamId
streamNumber Stream
strm
            TQueue Control -> Control -> IO ()
enqueueControl TQueue Control
controlQ forall a b. (a -> b) -> a -> b
$ Maybe SettingsList -> [ByteString] -> Control
CFrames forall a. Maybe a
Nothing [ByteString
frame]
        , -- Peer SETTINGS_ENABLE_PUSH
          isPushable :: IO Bool
isPushable = Settings -> Bool
enablePush forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. IORef a -> IO a
readIORef IORef Settings
peerSettings
        , -- Peer SETTINGS_INITIAL_WINDOW_SIZE
          makePushStream :: Stream -> PushPromise -> IO (StreamId, Stream)
makePushStream = \Stream
pstrm PushPromise
_ -> do
            -- FLOW CONTROL: SETTINGS_MAX_CONCURRENT_STREAMS: send: respecting peer's limit
            (StreamId
_, Stream
newstrm) <- Context -> IO (StreamId, Stream)
openEvenStreamWait Context
ctx
            let pid :: StreamId
pid = Stream -> StreamId
streamNumber Stream
pstrm
            forall (m :: * -> *) a. Monad m => a -> m a
return (StreamId
pid, Stream
newstrm)
        , mySockAddr :: SockAddr
mySockAddr = SockAddr
mySockAddr
        , peerSockAddr :: SockAddr
peerSockAddr = SockAddr
peerSockAddr
        }

----------------------------------------------------------------

pushStream
    :: WorkerConf a
    -> a -- parent stream
    -> ValueTable -- request
    -> [PushPromise]
    -> IO OutputType
pushStream :: forall a.
WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
pushStream WorkerConf a
_ a
_ ValueTable
_ [] = forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
OObj
pushStream WorkerConf{IO Bool
IO (Input a)
SockAddr
a -> IO ()
a -> PushPromise -> IO (StreamId, a)
Output a -> IO ()
peerSockAddr :: SockAddr
mySockAddr :: SockAddr
makePushStream :: a -> PushPromise -> IO (StreamId, a)
isPushable :: IO Bool
workerCleanup :: a -> IO ()
writeOutputQ :: Output a -> IO ()
readInputQ :: IO (Input a)
peerSockAddr :: forall a. WorkerConf a -> SockAddr
mySockAddr :: forall a. WorkerConf a -> SockAddr
makePushStream :: forall a. WorkerConf a -> a -> PushPromise -> IO (StreamId, a)
isPushable :: forall a. WorkerConf a -> IO Bool
workerCleanup :: forall a. WorkerConf a -> a -> IO ()
writeOutputQ :: forall a. WorkerConf a -> Output a -> IO ()
readInputQ :: forall a. WorkerConf a -> IO (Input a)
..} a
pstrm ValueTable
reqvt [PushPromise]
pps0
    | StreamId
len forall a. Eq a => a -> a -> Bool
== StreamId
0 = forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
OObj
    | Bool
otherwise = do
        Bool
pushable <- IO Bool
isPushable
        if Bool
pushable
            then do
                TVar StreamId
tvar <- forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO StreamId
0
                StreamId
lim <- forall {a}.
Num a =>
TVar a -> [PushPromise] -> StreamId -> IO StreamId
push TVar StreamId
tvar [PushPromise]
pps0 StreamId
0
                if StreamId
lim forall a. Eq a => a -> a -> Bool
== StreamId
0
                    then forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
OObj
                    else forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ IO () -> OutputType
OWait (forall {m :: * -> *} {a}. (MonadIO m, Ord a) => a -> TVar a -> m ()
waiter StreamId
lim TVar StreamId
tvar)
            else forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
OObj
  where
    len :: StreamId
len = forall (t :: * -> *) a. Foldable t => t a -> StreamId
length [PushPromise]
pps0
    increment :: TVar a -> m ()
increment TVar a
tvar = forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar a
tvar (forall a. Num a => a -> a -> a
+ a
1)
    waiter :: a -> TVar a -> m ()
waiter a
lim TVar a
tvar = forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
        a
n <- forall a. TVar a -> STM a
readTVar TVar a
tvar
        Bool -> STM ()
checkSTM (a
n forall a. Ord a => a -> a -> Bool
>= a
lim)
    push :: TVar a -> [PushPromise] -> StreamId -> IO StreamId
push TVar a
_ [] StreamId
n = forall (m :: * -> *) a. Monad m => a -> m a
return (StreamId
n :: Int)
    push TVar a
tvar (PushPromise
pp : [PushPromise]
pps) StreamId
n = do
        (StreamId
pid, a
newstrm) <- a -> PushPromise -> IO (StreamId, a)
makePushStream a
pstrm PushPromise
pp
        let scheme :: ByteString
scheme = forall a. HasCallStack => Maybe a -> a
fromJust forall a b. (a -> b) -> a -> b
$ Token -> ValueTable -> Maybe ByteString
getHeaderValue Token
tokenScheme ValueTable
reqvt
            -- fixme: this value can be Nothing
            auth :: ByteString
auth =
                forall a. HasCallStack => Maybe a -> a
fromJust
                    ( Token -> ValueTable -> Maybe ByteString
getHeaderValue Token
tokenAuthority ValueTable
reqvt
                        forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Token -> ValueTable -> Maybe ByteString
getHeaderValue Token
tokenHost ValueTable
reqvt
                    )
            path :: ByteString
path = PushPromise -> ByteString
promiseRequestPath PushPromise
pp
            promiseRequest :: [(Token, ByteString)]
promiseRequest =
                [ (Token
tokenMethod, ByteString
H.methodGet)
                , (Token
tokenScheme, ByteString
scheme)
                , (Token
tokenAuthority, ByteString
auth)
                , (Token
tokenPath, ByteString
path)
                ]
            ot :: OutputType
ot = [(Token, ByteString)] -> StreamId -> OutputType
OPush [(Token, ByteString)]
promiseRequest StreamId
pid
            Response OutObj
rsp = PushPromise -> Response
promiseResponse PushPromise
pp
            out :: Output a
out = forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
newstrm OutObj
rsp OutputType
ot forall a. Maybe a
Nothing forall a b. (a -> b) -> a -> b
$ forall {m :: * -> *} {a}. (MonadIO m, Num a) => TVar a -> m ()
increment TVar a
tvar
        Output a -> IO ()
writeOutputQ Output a
out
        TVar a -> [PushPromise] -> StreamId -> IO StreamId
push TVar a
tvar [PushPromise]
pps (StreamId
n forall a. Num a => a -> a -> a
+ StreamId
1)

-- | This function is passed to workers.
--   They also pass 'Response's from a server to this function.
--   This function enqueues commands for the HTTP/2 sender.
response
    :: WorkerConf a
    -> Manager
    -> T.Handle
    -> ThreadContinue
    -> a
    -> Request
    -> Response
    -> [PushPromise]
    -> IO ()
response :: forall a.
WorkerConf a
-> Manager
-> Handle
-> ThreadContinue
-> a
-> Request
-> Response
-> [PushPromise]
-> IO ()
response wc :: WorkerConf a
wc@WorkerConf{IO Bool
IO (Input a)
SockAddr
a -> IO ()
a -> PushPromise -> IO (StreamId, a)
Output a -> IO ()
peerSockAddr :: SockAddr
mySockAddr :: SockAddr
makePushStream :: a -> PushPromise -> IO (StreamId, a)
isPushable :: IO Bool
workerCleanup :: a -> IO ()
writeOutputQ :: Output a -> IO ()
readInputQ :: IO (Input a)
peerSockAddr :: forall a. WorkerConf a -> SockAddr
mySockAddr :: forall a. WorkerConf a -> SockAddr
makePushStream :: forall a. WorkerConf a -> a -> PushPromise -> IO (StreamId, a)
isPushable :: forall a. WorkerConf a -> IO Bool
workerCleanup :: forall a. WorkerConf a -> a -> IO ()
writeOutputQ :: forall a. WorkerConf a -> Output a -> IO ()
readInputQ :: forall a. WorkerConf a -> IO (Input a)
..} Manager
mgr Handle
th ThreadContinue
tconf a
strm (Request InpObj
req) (Response OutObj
rsp) [PushPromise]
pps = case OutObj -> OutBody
outObjBody OutObj
rsp of
    OutBody
OutBodyNone -> do
        ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
True
        Output a -> IO ()
writeOutputQ forall a b. (a -> b) -> a -> b
$ forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
strm OutObj
rsp OutputType
OObj forall a. Maybe a
Nothing (forall (m :: * -> *) a. Monad m => a -> m a
return ())
    OutBodyBuilder Builder
_ -> do
        OutputType
otyp <- forall a.
WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
pushStream WorkerConf a
wc a
strm ValueTable
reqvt [PushPromise]
pps
        ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
True
        Output a -> IO ()
writeOutputQ forall a b. (a -> b) -> a -> b
$ forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
strm OutObj
rsp OutputType
otyp forall a. Maybe a
Nothing (forall (m :: * -> *) a. Monad m => a -> m a
return ())
    OutBodyFile FileSpec
_ -> do
        OutputType
otyp <- forall a.
WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
pushStream WorkerConf a
wc a
strm ValueTable
reqvt [PushPromise]
pps
        ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
True
        Output a -> IO ()
writeOutputQ forall a b. (a -> b) -> a -> b
$ forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
strm OutObj
rsp OutputType
otyp forall a. Maybe a
Nothing (forall (m :: * -> *) a. Monad m => a -> m a
return ())
    OutBodyStreaming (Builder -> IO ()) -> IO () -> IO ()
strmbdy -> do
        OutputType
otyp <- forall a.
WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
pushStream WorkerConf a
wc a
strm ValueTable
reqvt [PushPromise]
pps
        -- We must not exit this server application.
        -- If the application exits, streaming would be also closed.
        -- So, this work occupies this thread.
        --
        -- We need to increase the number of workers.
        Manager -> IO ()
spawnAction Manager
mgr
        -- After this work, this thread stops to decease
        -- the number of workers.
        ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
False
        -- Since streaming body is loop, we cannot control it.
        -- So, let's serialize 'Builder' with a designated queue.
        TBQueue StreamingChunk
tbq <- forall (m :: * -> *) a. MonadIO m => Natural -> m (TBQueue a)
newTBQueueIO Natural
10 -- fixme: hard coding: 10
        Output a -> IO ()
writeOutputQ forall a b. (a -> b) -> a -> b
$ forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
strm OutObj
rsp OutputType
otyp (forall a. a -> Maybe a
Just TBQueue StreamingChunk
tbq) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
        let push :: Builder -> IO ()
push Builder
b = do
                Handle -> IO ()
T.pause Handle
th
                forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (Builder -> StreamingChunk
StreamingBuilder Builder
b)
                Handle -> IO ()
T.resume Handle
th
            flush :: IO ()
flush = forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq StreamingChunk
StreamingFlush
            finished :: IO ()
finished = forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq forall a b. (a -> b) -> a -> b
$ IO () -> StreamingChunk
StreamingFinished (Manager -> IO ()
decCounter Manager
mgr)
        Manager -> IO ()
incCounter Manager
mgr
        (Builder -> IO ()) -> IO () -> IO ()
strmbdy Builder -> IO ()
push IO ()
flush forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`E.finally` IO ()
finished
    OutBodyStreamingUnmask (forall x. IO x -> IO x) -> (Builder -> IO ()) -> IO () -> IO ()
_ ->
        forall a. HasCallStack => [Char] -> a
error [Char]
"response: server does not support OutBodyStreamingUnmask"
  where
    ([(Token, ByteString)]
_, ValueTable
reqvt) = InpObj -> ([(Token, ByteString)], ValueTable)
inpObjHeaders InpObj
req

-- | Worker for server applications.
worker :: WorkerConf a -> Manager -> Server -> Action
worker :: forall a. WorkerConf a -> Manager -> Server -> IO ()
worker wc :: WorkerConf a
wc@WorkerConf{IO Bool
IO (Input a)
SockAddr
a -> IO ()
a -> PushPromise -> IO (StreamId, a)
Output a -> IO ()
peerSockAddr :: SockAddr
mySockAddr :: SockAddr
makePushStream :: a -> PushPromise -> IO (StreamId, a)
isPushable :: IO Bool
workerCleanup :: a -> IO ()
writeOutputQ :: Output a -> IO ()
readInputQ :: IO (Input a)
peerSockAddr :: forall a. WorkerConf a -> SockAddr
mySockAddr :: forall a. WorkerConf a -> SockAddr
makePushStream :: forall a. WorkerConf a -> a -> PushPromise -> IO (StreamId, a)
isPushable :: forall a. WorkerConf a -> IO Bool
workerCleanup :: forall a. WorkerConf a -> a -> IO ()
writeOutputQ :: forall a. WorkerConf a -> Output a -> IO ()
readInputQ :: forall a. WorkerConf a -> IO (Input a)
..} Manager
mgr Server
server = do
    StreamInfo a
sinfo <- forall a. IO (StreamInfo a)
newStreamInfo
    ThreadContinue
tcont <- IO ThreadContinue
newThreadContinue
    forall a. Manager -> (Handle -> IO a) -> IO a
timeoutKillThread Manager
mgr forall a b. (a -> b) -> a -> b
$ StreamInfo a -> ThreadContinue -> Handle -> IO ()
go StreamInfo a
sinfo ThreadContinue
tcont
  where
    go :: StreamInfo a -> ThreadContinue -> Handle -> IO ()
go StreamInfo a
sinfo ThreadContinue
tcont Handle
th = do
        ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tcont Bool
True
        Either SomeException ()
ex <- forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
m a -> m (Either e a)
E.trySyncOrAsync forall a b. (a -> b) -> a -> b
$ do
            Handle -> IO ()
T.pause Handle
th
            Input a
strm InpObj
req <- IO (Input a)
readInputQ
            let req' :: InpObj
req' = InpObj -> Handle -> InpObj
pauseRequestBody InpObj
req Handle
th
            forall a. StreamInfo a -> a -> IO ()
setStreamInfo StreamInfo a
sinfo a
strm
            Handle -> IO ()
T.resume Handle
th
            Handle -> IO ()
T.tickle Handle
th
            let aux :: Aux
aux = Handle -> SockAddr -> SockAddr -> Aux
Aux Handle
th SockAddr
mySockAddr SockAddr
peerSockAddr
            Server
server (InpObj -> Request
Request InpObj
req') Aux
aux forall a b. (a -> b) -> a -> b
$ forall a.
WorkerConf a
-> Manager
-> Handle
-> ThreadContinue
-> a
-> Request
-> Response
-> [PushPromise]
-> IO ()
response WorkerConf a
wc Manager
mgr Handle
th ThreadContinue
tcont a
strm (InpObj -> Request
Request InpObj
req')
        Bool
cont1 <- case Either SomeException ()
ex of
            Right () -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
            Left e :: SomeException
e@(SomeException e
_)
                -- killed by the local worker manager
                | Just KilledByHttp2ThreadManager{} <- forall e. Exception e => SomeException -> Maybe e
E.fromException SomeException
e -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
                -- killed by the local timeout manager
                | Just TimeoutThread
T.TimeoutThread <- forall e. Exception e => SomeException -> Maybe e
E.fromException SomeException
e -> do
                    StreamInfo a -> IO ()
cleanup StreamInfo a
sinfo
                    forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
                | Bool
otherwise -> do
                    StreamInfo a -> IO ()
cleanup StreamInfo a
sinfo
                    forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        Bool
cont2 <- ThreadContinue -> IO Bool
getThreadContinue ThreadContinue
tcont
        forall a. StreamInfo a -> IO ()
clearStreamInfo StreamInfo a
sinfo
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool
cont1 Bool -> Bool -> Bool
&& Bool
cont2) forall a b. (a -> b) -> a -> b
$ StreamInfo a -> ThreadContinue -> Handle -> IO ()
go StreamInfo a
sinfo ThreadContinue
tcont Handle
th
    pauseRequestBody :: InpObj -> Handle -> InpObj
pauseRequestBody InpObj
req Handle
th = InpObj
req{inpObjBody :: InpBody
inpObjBody = InpBody
readBody'}
      where
        readBody :: InpBody
readBody = InpObj -> InpBody
inpObjBody InpObj
req
        readBody' :: InpBody
readBody' = do
            Handle -> IO ()
T.pause Handle
th
            ByteString
bs <- InpBody
readBody
            Handle -> IO ()
T.resume Handle
th
            forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
bs
    cleanup :: StreamInfo a -> IO ()
cleanup StreamInfo a
sinfo = do
        Maybe a
minp <- forall a. StreamInfo a -> IO (Maybe a)
getStreamInfo StreamInfo a
sinfo
        case Maybe a
minp of
            Maybe a
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Just a
strm -> a -> IO ()
workerCleanup a
strm

----------------------------------------------------------------

--   A reference is shared by a responder and its worker.
--   The reference refers a value of this type as a return value.
--   If 'True', the worker continue to serve requests.
--   Otherwise, the worker get finished.
newtype ThreadContinue = ThreadContinue (IORef Bool)

{-# INLINE newThreadContinue #-}
newThreadContinue :: IO ThreadContinue
newThreadContinue :: IO ThreadContinue
newThreadContinue = IORef Bool -> ThreadContinue
ThreadContinue forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. a -> IO (IORef a)
newIORef Bool
True

{-# INLINE setThreadContinue #-}
setThreadContinue :: ThreadContinue -> Bool -> IO ()
setThreadContinue :: ThreadContinue -> Bool -> IO ()
setThreadContinue (ThreadContinue IORef Bool
ref) Bool
x = forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
ref Bool
x

{-# INLINE getThreadContinue #-}
getThreadContinue :: ThreadContinue -> IO Bool
getThreadContinue :: ThreadContinue -> IO Bool
getThreadContinue (ThreadContinue IORef Bool
ref) = forall a. IORef a -> IO a
readIORef IORef Bool
ref

----------------------------------------------------------------

-- | The type for cleaning up.
newtype StreamInfo a = StreamInfo (IORef (Maybe a))

{-# INLINE newStreamInfo #-}
newStreamInfo :: IO (StreamInfo a)
newStreamInfo :: forall a. IO (StreamInfo a)
newStreamInfo = forall a. IORef (Maybe a) -> StreamInfo a
StreamInfo forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. a -> IO (IORef a)
newIORef forall a. Maybe a
Nothing

{-# INLINE clearStreamInfo #-}
clearStreamInfo :: StreamInfo a -> IO ()
clearStreamInfo :: forall a. StreamInfo a -> IO ()
clearStreamInfo (StreamInfo IORef (Maybe a)
ref) = forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref forall a. Maybe a
Nothing

{-# INLINE setStreamInfo #-}
setStreamInfo :: StreamInfo a -> a -> IO ()
setStreamInfo :: forall a. StreamInfo a -> a -> IO ()
setStreamInfo (StreamInfo IORef (Maybe a)
ref) a
inp = forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just a
inp

{-# INLINE getStreamInfo #-}
getStreamInfo :: StreamInfo a -> IO (Maybe a)
getStreamInfo :: forall a. StreamInfo a -> IO (Maybe a)
getStreamInfo (StreamInfo IORef (Maybe a)
ref) = forall a. IORef a -> IO a
readIORef IORef (Maybe a)
ref