{-# LANGUAGE MagicHash #-}
{-# LANGUAGE UnboxedTuples #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE RankNTypes #-}

-- | This is an internal module; its interface is unstable.
module Data.ByteString.FastBuilder.Internal
  -- * Builder and related types
  , BuilderArg(..)
  , DataSink(..)
  , DynamicSink(..)
  , Queue(..)
  , Request(..)
  , Response(..)

  -- * Internally used exceptions
  , SuspendBuilderException(..)
  , ChunkOverflowException(..)

  -- * Builder building blocks
  , Builder_
  , BuildM(..)
  , toBuilder_
  , fromBuilder_
  , mkBuilder
  , useBuilder
  , getSink
  , getCur
  , getEnd
  , setCur
  , setEnd

  -- * Running builders
  , runBuilder
  , toLazyByteString
  , toLazyByteStringWith
  , toStrictByteString
  , hPutBuilder

  -- * Basic builders
  , primBounded
  , primFixed
  , primMapListBounded
  , primMapListFixed
  , byteString
  , byteStringThreshold
  , byteStringCopy
  , byteStringCopyNoCheck
  , byteStringInsert
  , unsafeCString
  , ensureBytes
  , getBytes

  -- * Performance tuning
  , rebuild
  ) where

import Control.Concurrent (forkIOWithUnmask, myThreadId)
import Control.Concurrent.MVar
import qualified Control.Exception as E
import Control.Monad
import qualified Data.ByteString as S
import qualified Data.ByteString.Internal as S
import qualified Data.ByteString.Unsafe as S
import qualified Data.ByteString.Lazy as L
import Data.IORef
import Data.Monoid
import Data.String
import Data.Word
import Foreign.C.String
import Foreign.C.Types
import Foreign.ForeignPtr
import Foreign.ForeignPtr.Unsafe
import Foreign.Marshal.Utils
import Foreign.Ptr
import qualified System.IO as IO
import System.IO.Unsafe

import GHC.Exts (Addr#, State#, RealWorld, Ptr(..), Int(..), Int#)
import GHC.Magic (oneShot)
import GHC.IO (IO(..))
import GHC.CString (unpackCString#)

import qualified Data.ByteString.Builder.Prim as P
import qualified Data.ByteString.Builder.Prim.Internal as PI
import qualified Data.ByteString.Builder.Extra as X

-- | 'Builder' is an auxiliary type for efficiently generating a long
-- 'L.ByteString'. It is isomorphic to lazy 'L.ByteString', but offers
-- constant-time concatanation via '<>'.
-- Use 'toLazyByteString' to turn a 'Builder' into a 'L.ByteString'
newtype Builder = Builder
  { unBuilder
      :: BuilderArg -> State# RealWorld -> (# Addr#, Addr#, State# RealWorld #)
  -- It takes and returns two pointers, "cur" and "end". "cur" points to
  -- the next location to put bytes to, and "end" points to the end of the
  -- buffer.

-- | This datatype exists only to work around the limitation that 'oneShot'
-- cannot work with unboxed argument types.
data BuilderArg = BuilderArg
  {-# UNPACK #-} !(Ptr Word8) -- "cur" pointer
  {-# UNPACK #-} !(Ptr Word8) -- "end" pointer

instance Monoid Builder where
  mempty = Builder $ \(BuilderArg _ (Ptr cur) (Ptr end)) s -> (# cur, end, s #)
  {-# INLINE mempty #-}
  mappend (Builder a) (Builder b) = Builder $ \(BuilderArg dex (Ptr cur) (Ptr end)) s ->
    case a (BuilderArg dex (Ptr cur) (Ptr end)) s of
      (# cur', end', s' #) -> b (BuilderArg dex (Ptr cur') (Ptr end')) s'
  {-# INLINE mappend #-}
  mconcat xs = foldr mappend mempty xs
  {-# INLINE mconcat #-}

-- | 'fromString' = 'stringUtf8'
instance IsString Builder where
  fromString = builderFromString
  {-# INLINE fromString #-}

-- | Specifies where bytes generated by a builder go.
data DataSink
  = DynamicSink !(IORef DynamicSink)
    -- ^ The destination of data changes while the builder is running.
  | GrowingBuffer !(IORef (ForeignPtr Word8))
    -- ^ Bytes are accumulated in a contiguous buffer.
  | HandleSink !IO.Handle !(IORef Queue)
    -- ^ Bytes are first accumulated in the 'Queue', then flushed to the
    -- 'IO.Handle'.

-- | Variable-destination cases.
data DynamicSink
  = ThreadedSink !(MVar Request) !(MVar Response)
      -- ^ Bytes are sent to another thread.
  | BoundedGrowingBuffer {-# UNPACK #-} !(ForeignPtr Word8) !Int{-bound-}
      -- ^ Bytes are accumulated in a contiguous buffer until the
      -- size limit is reached. After that, the destination switches
      -- to a 'ThreadedSink'.

-- | A mutable buffer. The 'Int' specifies where the data start.
data Queue = Queue !(ForeignPtr Word8) !Int{-start-}

-- | A request from the driver thread to the builder thread.
data Request
  = Request {-# UNPACK #-} !(Ptr Word8) {-# UNPACK #-} !(Ptr Word8)

-- | A response from the builder thread to the driver thread.
data Response
  = Error E.SomeException
      -- ^ A synchronous exception was thrown by the builder
  | Done !(Ptr Word8)
      -- ^ The builder thread has completed.
  | MoreBuffer !(Ptr Word8) !Int
      -- ^ The builder thread has finished generating one chunk,
      -- and waits for another request with the specified minimum size.
  | InsertByteString !(Ptr Word8) !S.ByteString
      -- ^ The builder thread has partially filled the current chunk,
      -- and wants to emit the bytestring to be included in the final
      -- output.
  deriving (Show)

-- Internally used exceptions

-- | Used in the implementation of 'toLazyByteString'. This is an exception
-- thrown by the consumer thread to itself when it has finished filling the
-- first chunk of the output. After this, a thread will be forked, and the
-- execution of the builder will be resumed in the new thread, using
-- 'ThreadedSink'.
data ChunkOverflowException
  = ChunkOverflowException
      !S.ByteString !(MVar Request) !(MVar Response) !Int

instance Show ChunkOverflowException where
  show (ChunkOverflowException buf _ _ req) =
    "ChunkOverflowException " ++ show buf ++ " _ _ " ++ show req

instance E.Exception ChunkOverflowException

-- | Used in the implementation of 'toLazyByteString'. This is a message sent
-- from the consumer thread to the builder thread, requesting the builder
-- thread to temporarily pause execution. Later, the consumer thread may
-- request resumption by filling the 'MVar'.
data SuspendBuilderException = SuspendBuilderException !(MVar ())

instance Show SuspendBuilderException where
  show _ = "SuspendBuilderException"

instance E.Exception SuspendBuilderException

-- Builder building blocks

-- | An internal type that is isomorphic to 'Builder'. This is a
-- maximaly efficient representation for NOINLINE functions.
type Builder_
  = DataSink -> Addr# -> Addr# -> State# RealWorld
  -> (# Addr#, Addr#, State# RealWorld #)

-- | Convert a 'Builder' into a 'Builder_'.
toBuilder_ :: Builder -> Builder_
toBuilder_ (Builder f) dex cur end s = f (BuilderArg dex (Ptr cur) (Ptr end)) s

-- | Convert a 'Builder_' into a 'Builder'.
fromBuilder_ :: Builder_ -> Builder
fromBuilder_ f = Builder $ \(BuilderArg dex (Ptr cur) (Ptr end)) s -> f dex cur end s

-- | An internal type for making it easier to define builders. A value of
-- @'BuildM' a@ can do everything a 'Builder' can do, and in addition,
-- returns a value of type @a@ upon completion.
newtype BuildM a = BuildM { runBuildM :: (a -> Builder) -> Builder }
  deriving (Functor)

instance Applicative BuildM where
  pure = return
  (<*>) = ap

instance Monad BuildM where
  return x = BuildM $ \k -> k x
  {-# INLINE return #-}
  BuildM b >>= f = BuildM $ \k -> b $ \r -> runBuildM (f r) k
  {-# INLINE (>>=) #-}

-- | Create a builder from a BuildM.
mkBuilder :: BuildM () -> Builder
mkBuilder (BuildM bb) = bb $ \_ -> mempty
{-# INLINE mkBuilder #-}

-- | Embed a builder in the BuildM context.
useBuilder :: Builder -> BuildM ()
useBuilder b = BuildM $ \k -> b <> k ()
{-# INLINE useBuilder #-}

-- | Get the 'DataSink'.
getSink :: BuildM DataSink
getSink = BuildM $ \k -> Builder $ \(BuilderArg dex cur end) s ->
  unBuilder (k dex) (BuilderArg dex cur end) s

-- | Get the current pointer.
getCur :: BuildM (Ptr Word8)
getCur = BuildM $ \k -> Builder $ \(BuilderArg dex cur end) s ->
  unBuilder (k cur) (BuilderArg dex cur end) s

-- | Get the end-of-buffer pointer.
getEnd :: BuildM (Ptr Word8)
getEnd = BuildM $ \k -> Builder $ \(BuilderArg dex cur end) s ->
  unBuilder (k end) (BuilderArg dex cur end) s

-- | Set the current pointer.
setCur :: Ptr Word8 -> BuildM ()
setCur p = BuildM $ \k -> Builder $ \(BuilderArg dex _ end) s ->
  unBuilder (k ()) (BuilderArg dex p end) s

-- | Set the end-of-buffer pointer.
setEnd :: Ptr Word8 -> BuildM ()
setEnd p = BuildM $ \k -> Builder $ \(BuilderArg dex cur _) s ->
  unBuilder (k ()) (BuilderArg dex cur p) s

-- | Perform IO.
io :: IO a -> BuildM a
io (IO x) = BuildM $ \k -> Builder $ \ba s -> case x s of
  (# s', val #) -> unBuilder (k val) ba s'

-- Running builders.

-- | Run a builder.
runBuilder :: Builder -> DataSink -> Ptr Word8 -> Ptr Word8 -> IO (Ptr Word8)
runBuilder (Builder f) sink !cur !end = IO $ \s ->
  case f (BuilderArg sink cur end) s of
    (# cur', _, s' #) -> (# s', Ptr cur' #)

-- | Turn a 'Builder' into a lazy 'L.ByteString'.
-- __Performance hint__: when the resulting 'L.ByteString' does not fit
-- in one chunk, this function forks a thread. Due to this, the performance
-- degrades sharply if you use this function from a bound thread. Note in
-- particular that the main thread is a bound thread when you use @ghc
-- -threaded@.
-- To avoid this problem, do one of these:
-- * Make sure the resulting 'L.ByteString' is consumed in an unbound
--    thread. Consider using 'runInUnboundThread' for this.
-- * Use other function to run the 'Builder' instead. Functions that don't
--    return a lazy 'L.ByteString' do not have this issue.
-- * Link your program without @-threaded@.
toLazyByteString :: Builder -> L.ByteString
toLazyByteString = toLazyByteStringWith 100 32768

-- | Like 'toLazyByteString', but allows the user to specify the initial
-- and the subsequent desired buffer sizes.
toLazyByteStringWith :: Int -> Int -> Builder -> L.ByteString

-- The implementation employs a two-phase strategy to minimize the overhead:
-- 0. Fill the first chunk in a single-threaded way. Start from 'initialSize'-
--    sized buffer and double the size whenever the buffer is full. This uses a
--    'BoundedGrowingBuffer' sink.
-- 1. If the first chunk is big enough and the builder still hasn't finished,
--    suspend the execution of the builder, fork a new thread and resume
--    execution of the builder in the new thread, using a 'ThreadedSink'.
toLazyByteStringWith !initialSize !maxSize builder = unsafePerformIO $ do
  fptr <- mallocForeignPtrBytes initialSize
  sink <- newIORef $ BoundedGrowingBuffer fptr maxSize
  let !base = unsafeForeignPtrToPtr fptr
    finalPtr = unsafeDupablePerformIO $
      -- The use of unsafeDupablePerformIO is safe here, because at any given
      -- time, at most one thread can be attempting to evaluate this finalPtr
      -- thunk.
      runBuilder builder (DynamicSink sink) base (base `plusPtr` initialSize)
    {-# NOINLINE finalPtr #-}

    loop thunk = do
      -- Pass around `thunk` as an argument, otherwise GHC 7.10.1 inlines it
      -- despite the NOINLINE pragma.
      r <- E.try $ E.evaluate thunk
      case r of
        Right p -> do
          BoundedGrowingBuffer finalFptr _ <- readIORef sink
          let !finalBase = unsafeForeignPtrToPtr finalFptr
          return $! L.fromStrict $
            S.fromForeignPtr finalFptr 0 (p `minusPtr` finalBase)
        Left ex
          | Just (ChunkOverflowException chunk reqV respV minSize)
              <- E.fromException ex
            -> do
              let rest = continueBuilderThreaded reqV respV minSize maxSize thunk
              return $ L.fromChunks $
                if S.null chunk then rest else chunk : rest
          | otherwise -> do
              -- Here, there is no way to tell whether 'ex' is an asynchronous
              -- exception or not. We re-throw is as if it were async. This is
              -- a safe assumption, because if it is actually a synchronous
              -- exception, it will be re-thrown when we try to resume
              -- the evaluation of 'thunk'.
              myTid <- myThreadId
              E.throwTo myTid ex

              loop thunk

  loop finalPtr

-- | Continue a suspended builder using threads.
  :: MVar Request -> MVar Response -> Int -> Int -> Ptr Word8
  -> [S.ByteString]
continueBuilderThreaded !reqV !respV !initialSize !maxSize thunk =
  makeChunks (max maxSize initialSize) maxSize $ toBufferWriter reqV respV thunk

-- | Run the given suspended builder using a new thread.
toBufferWriter :: MVar Request -> MVar Response -> Ptr Word8 -> X.BufferWriter
toBufferWriter !reqV !respV thunk buf0 sz0 = E.mask_ $ do
  writer Nothing buf0 sz0
    writer !maybeBuilderTid !buf !sz = do
      putMVar reqV $ Request buf (buf `plusPtr` sz)
      -- Fork after putMVar, in order to minimize the chance that
      -- the new thread is scheduled on a different CPU.
      builderTid <- case maybeBuilderTid of
        Just t -> return t
        Nothing -> forkIOWithUnmask $ \u ->
          builderThreadWithUnmask u respV thunk
      resp <- wait builderTid
      let go cur next = return(written, next)
            where !written = cur `minusPtr` buf
      case resp of
        Error ex -> E.throwIO ex
        Done cur -> go cur X.Done
        MoreBuffer cur k -> go cur $ X.More k $ writer (Just builderTid)
        InsertByteString cur str -> go cur $ X.Chunk str $ writer (Just builderTid)

    wait !builderTid = do
      r <- E.try $ takeMVar respV
      case r of
        Right resp -> return resp
        Left exn -> do
          -- exn must be an async exception, because takeMVar throws no
          -- synchronous exceptions.
          resumeVar <- newEmptyMVar
          E.throwTo builderTid $ SuspendBuilderException resumeVar
          thisTid <- myThreadId
          E.throwTo thisTid (exn :: E.SomeException)

          -- A thunk containing this computation has been resumed.
          -- Resume the builder thread, and retry.
          putMVar resumeVar ()
          wait builderTid

-- | The body of the builder thread.
  :: (forall a. IO a -> IO a) -> MVar Response -> Ptr Word8
  -> IO ()
builderThreadWithUnmask unmask !respV thunk = loop
    loop = do
      r <- E.try $ unmask $ E.evaluate thunk
      case r of
        Right p -> putMVar respV $ Done p
        Left ex
          | Just (SuspendBuilderException lock) <- E.fromException ex
            -> do takeMVar lock; loop
          | otherwise -> putMVar respV $ Error ex

-- | Run a 'X.BufferWriter'.
makeChunks :: Int -> Int -> X.BufferWriter -> [S.ByteString]
makeChunks !initialBufSize maxBufSize = go initialBufSize
    go !bufSize w = unsafePerformIO $ do
      fptr <- S.mallocByteString bufSize
      (written, next) <- withForeignPtr fptr $ \buf -> w buf bufSize
      let rest = case next of
            X.Done -> []
            X.More reqSize w' -> go (max reqSize maxBufSize) w'
            X.Chunk chunk w' -> chunk : go maxBufSize w'
              -- TODO: don't throw away the remaining part of the buffer
      return $ if written == 0
        then rest
        else S.fromForeignPtr fptr 0 written : rest

-- | Turn a 'Builder' into a strict 'S.ByteString'.
toStrictByteString :: Builder -> S.ByteString
toStrictByteString builder = unsafePerformIO $ do
  let cap = 100
  fptr <- mallocForeignPtrBytes cap
  bufferRef <- newIORef fptr
  let !base = unsafeForeignPtrToPtr fptr
  cur <- runBuilder builder (GrowingBuffer bufferRef) base (base `plusPtr` cap)
  endFptr <- readIORef bufferRef
  let !written = cur `minusPtr` unsafeForeignPtrToPtr endFptr
  return $ S.fromForeignPtr endFptr 0 written

-- | Output a 'Builder' to a 'IO.Handle'.
hPutBuilder :: IO.Handle -> Builder -> IO ()
hPutBuilder !h builder = do
  let cap = 100
  fptr <- mallocForeignPtrBytes cap
  qRef <- newIORef $ Queue fptr 0
  let !base = unsafeForeignPtrToPtr fptr
  cur <- runBuilder builder (HandleSink h qRef) base (base `plusPtr` cap)
  flushQueue h qRef cur

-- builders

-- | Turn a 'String' into a 'Builder', using UTF-8,
builderFromString :: String -> Builder
builderFromString = primMapListBounded P.charUtf8
{-# NOINLINE[0] builderFromString #-}

{-# RULES "FastBuilder: builderFromString/unpackCString#"
  forall addr.
    builderFromString (unpackCString# addr) = unsafeCString (Ptr addr)

-- | Turn a value of type @a@ into a 'Builder', using the given 'PI.BoundedPrim'.
primBounded :: PI.BoundedPrim a -> a -> Builder
primBounded prim !x = mappend (ensureBytes $ PI.sizeBound prim) $ mkBuilder $ do
  cur <- getCur
  cur' <- io $ PI.runB prim x cur
  setCur cur'
{-# INLINE primBounded #-}

-- | Turn a value of type @a@ into a 'Builder', using the given 'PI.FixedPrim'.
primFixed :: PI.FixedPrim a -> a -> Builder
primFixed prim x = primBounded (PI.toB prim) x
{-# INLINE primFixed #-}

-- | Turn a list of values of type @a@ into a 'Builder', using the given
-- 'PI.BoundedPrim'.
primMapListBounded :: PI.BoundedPrim a -> [a] -> Builder
primMapListBounded prim = \xs -> mconcat $ map (primBounded prim) xs
{-# INLINE primMapListBounded #-}

-- | Turn a list of values of type @a@ into a 'Builder', using the given
-- 'PI.FixedPrim'.
primMapListFixed :: PI.FixedPrim a -> [a] -> Builder
primMapListFixed prim = \xs -> primMapListBounded (PI.toB prim) xs
{-# INLINE primMapListFixed #-}

-- | Turn a 'S.ByteString' to a 'Builder'.
byteString :: S.ByteString -> Builder
byteString = byteStringThreshold maximalCopySize
{-# INLINE byteString #-}

maximalCopySize :: Int
maximalCopySize = 2 * X.smallChunkSize

-- | Turn a 'S.ByteString' to a 'Builder'. If the size of the 'S.ByteString'
-- is larger than the given threshold, avoid copying it as much
-- as possible.
byteStringThreshold :: Int -> S.ByteString -> Builder
byteStringThreshold th bstr = rebuild $
  if S.length bstr >= th
    then byteStringInsert bstr
    else byteStringCopy bstr

-- | Turn a 'S.ByteString' to a 'Builder'. The 'S.ByteString' will be copied
-- to the buffer, regardless of the size.
byteStringCopy :: S.ByteString -> Builder
byteStringCopy !bstr =
  -- TODO: this is suboptimal; should keep using the same buffer size.
  ensureBytes (S.length bstr) <> byteStringCopyNoCheck bstr

-- | Like 'byteStringCopy', but assumes that the current buffer is large enough.
byteStringCopyNoCheck :: S.ByteString -> Builder
byteStringCopyNoCheck !bstr = mkBuilder $ do
  cur <- getCur
  io $ S.unsafeUseAsCString bstr $ \ptr ->
    copyBytes cur (castPtr ptr) len
  setCur $ cur `plusPtr` len
    !len = S.length bstr

-- | Turn a 'S.ByteString' to a 'Builder'. When possible, the given
-- 'S.ByteString' will not be copied, and inserted directly into the output
-- instead.
byteStringInsert :: S.ByteString -> Builder
byteStringInsert !bstr = fromBuilder_ $ byteStringInsert_ bstr

-- | The body of the 'byteStringInsert', worker-wrappered manually.
byteStringInsert_ :: S.ByteString -> Builder_
byteStringInsert_ bstr = toBuilder_ $ mkBuilder $ do
  sink <- getSink
  case sink of
    DynamicSink dRef -> do
      dyn <- io $ readIORef dRef
      case dyn of
        ThreadedSink reqV respV -> do
          cur <- getCur
          io $ putMVar respV $ InsertByteString cur bstr
          handleRequest reqV
        BoundedGrowingBuffer fptr bound -> do
          r <- remainingBytes
          when (r < S.length bstr) $
            growBufferBounded dRef fptr bound (S.length bstr)
          -- TODO: insert rather than copy if the first chunk
          -- is full.
          useBuilder $ byteStringCopyNoCheck bstr
    GrowingBuffer bufRef -> do
      r <- remainingBytes
      when (r < S.length bstr) $
        growBuffer bufRef (S.length bstr)
      useBuilder $ byteStringCopyNoCheck bstr
    HandleSink h queueRef -> do
      cur <- getCur
      io $ flushQueue h queueRef cur
      io $ S.hPut h bstr
{-# NOINLINE byteStringInsert_ #-}

-- | Turn a C String into a 'Builder'. The behavior is undefined if the given
-- 'CString' does not point to a constant null-terminated string.
unsafeCString :: CString -> Builder
unsafeCString cstr = rebuild $ let
    !len = fromIntegral $ c_pure_strlen cstr
  mappend (ensureBytes len) $ mkBuilder $ do
  cur <- getCur
  io $ copyBytes cur (castPtr cstr) len
  setCur $ cur `plusPtr` len

foreign import ccall unsafe "strlen" c_pure_strlen :: CString -> CSize

-- | @'ensureBytes' n@ ensures that at least @n@ bytes of free space is
-- available in the current buffer, by allocating a new buffer when
-- necessary.
ensureBytes :: Int -> Builder
ensureBytes !n = mkBuilder $ do
  r <- remainingBytes
  when (r < n) $ useBuilder $ getBytes n
{-# INLINE ensureBytes #-}

-- | @'getBytes' n@ allocates a new buffer, containing at least @n@ bytes.
getBytes :: Int -> Builder
getBytes (I# n) = fromBuilder_ (getBytes_ n)

-- | The body of the 'getBytes' function, worker-wrappered manually.
getBytes_ :: Int# -> Builder_
getBytes_ n = toBuilder_ $ mkBuilder $ do
  sink <- getSink
  case sink of
    DynamicSink dRef -> do
      dyn <- io $ readIORef dRef
      case dyn of
        ThreadedSink reqV respV -> do
          cur <- getCur
          io $ putMVar respV $ MoreBuffer cur $ I# n
          handleRequest reqV
        BoundedGrowingBuffer fptr bound ->
          growBufferBounded dRef fptr bound (I# n)
    GrowingBuffer bufRef -> growBuffer bufRef (I# n)
    HandleSink h queueRef -> do
      cur <- getCur
      io $ flushQueue h queueRef cur
      switchQueue queueRef $ max 4096 (I# n)
{-# NOINLINE getBytes_ #-}

-- | Return the remaining size of the current buffer, in bytes.
remainingBytes :: BuildM Int
remainingBytes = minusPtr <$> getEnd <*> getCur
{-# INLINE remainingBytes #-}

-- Performance tuning

-- | @'rebuild' b@ is equivalent to @b@, but it allows GHC to assume
-- that @b@ will be run at most once. This can enable various
-- optimizations that greately improves performance.
-- There are two types of typical situations where a use of 'rebuild'
-- is often a win:
-- * When constructing a builder using a recursive function. e.g.
--  @rebuild $ foldr ...@.
-- * When constructing a builder using a conditional expression. e.g.
--  @rebuild $ case x of ... @
rebuild :: Builder -> Builder
rebuild (Builder f) = Builder $ oneShot (\ !arg s -> f arg s)

-- ThreadedSink

-- | Wait for a request, and switch to a new buffer.
handleRequest :: MVar Request -> BuildM ()
handleRequest reqV = do
  Request newCur newEnd <- io $ takeMVar reqV
  setCur newCur
  setEnd newEnd

-- GrowingBuffer

-- | @growBuffer bufRef req@ reallocates the buffer, growing it
-- by at least @req@.
growBuffer :: IORef (ForeignPtr Word8) -> Int -> BuildM ()
growBuffer !bufRef !req = do
  cur <- getCur
  end <- getCur
  fptr <- io $ readIORef bufRef
  let !base = unsafeForeignPtrToPtr fptr
  let !size = cur `minusPtr` base
  let !cap = end `minusPtr` base
  let !newCap = cap + max cap req
  newFptr <- io $ mallocForeignPtrBytes newCap
  let !newBase = unsafeForeignPtrToPtr newFptr
  setCur $ newBase `plusPtr` size
  setEnd $ newBase `plusPtr` newCap
  io $ do
    copyBytes newBase base size
    touchForeignPtr fptr
    touchForeignPtr newFptr
    writeIORef bufRef newFptr
{-# INLINE growBuffer #-}

-- HandleSink

-- | Put the content of the 'Queue' to the 'IO.Handle', and empty
-- the 'Queue'.
flushQueue :: IO.Handle -> IORef Queue -> Ptr Word8 -> IO ()
flushQueue !h !qRef !cur = do
  Queue fptr start <- readIORef qRef
  let !end = cur `minusPtr` unsafeForeignPtrToPtr fptr
  when (end > start) $ do
    S.hPut h $ S.fromForeignPtr fptr start (end - start)
    writeIORef qRef $ Queue fptr end

-- | @switchQueue qRef minSize@ discards the old 'Queue' and sets up
-- a new empty 'Queue' of at least @minSize@ large. If the old 'Queue'
-- is large enough, it is re-used.
switchQueue :: IORef Queue -> Int -> BuildM ()
switchQueue !qRef !minSize = do
  end <- getCur
  Queue fptr _ <- io $ readIORef qRef
  let !base = unsafeForeignPtrToPtr fptr
  let !cap = end `minusPtr` base
  newFptr <- if minSize <= cap
    then return fptr
    else io $ mallocForeignPtrBytes minSize
  let !newBase = unsafeForeignPtrToPtr newFptr
  io $ writeIORef qRef $ Queue newFptr 0
  setCur newBase
  setEnd $ newBase `plusPtr` max minSize cap

-- BoundedGrowingBuffer

-- | @growBufferBounded dRef fptr bound req@ reallocates the buffer, growing it
-- by at least @req@. If the buffer size would exceed @bound@, it instead
-- interrupts execution by throwing a 'ChunkOverflowException', and switches
-- to a 'ThreadedSink'.
  :: IORef DynamicSink -> ForeignPtr Word8 -> Int -> Int -> BuildM ()
growBufferBounded !dRef !fptr !bound !req = do
  cur <- getCur
  end <- getCur
  let !base = unsafeForeignPtrToPtr fptr
  let !size = cur `minusPtr` base
  let !cap = end `minusPtr` base
  let !newCap = cap + max cap req
  if bound < newCap
    then chunkOverflow dRef req $ S.fromForeignPtr fptr 0 size
    else do
      newFptr <- io $ mallocForeignPtrBytes newCap
      let !newBase = unsafeForeignPtrToPtr newFptr
      setCur $ newBase `plusPtr` size
      setEnd $ newBase `plusPtr` newCap
      io $ do
        copyBytes newBase base size
        touchForeignPtr fptr
        touchForeignPtr newFptr
        writeIORef dRef $ BoundedGrowingBuffer newFptr bound
{-# INLINE growBufferBounded #-}

-- | Throw a 'ChunkOverflowException' and switches to a 'ThreadedSink'.
chunkOverflow :: IORef DynamicSink -> Int -> S.ByteString -> BuildM ()
chunkOverflow !dRef !minSize !chunk = do
  myTid <- io $ myThreadId
  reqV <- io $ newEmptyMVar
  respV <- io $ newEmptyMVar
  io $ E.throwTo myTid $ ChunkOverflowException chunk reqV respV minSize
  io $ writeIORef dRef $ ThreadedSink reqV respV
  handleRequest reqV