{-# LANGUAGE MagicHash #-}
{-# LANGUAGE UnboxedTuples #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE RankNTypes #-}
module Data.ByteString.FastBuilder.Internal
(
Builder(..)
, BuilderState
, DataSink(..)
, DynamicSink(..)
, Queue(..)
, Request(..)
, Response(..)
, SuspendBuilderException(..)
, ChunkOverflowException(..)
, BuildM(..)
, mkBuilder
, useBuilder
, getSink
, getCur
, getEnd
, setCur
, setEnd
, runBuilder
, toLazyByteString
, toLazyByteStringWith
, toStrictByteString
, hPutBuilder
, hPutBuilderWith
, primBounded
, primFixed
, primMapListBounded
, primMapListFixed
, byteString
, byteStringThreshold
, byteStringCopy
, byteStringCopyNoCheck
, byteStringInsert
, unsafeCString
, unsafeCStringLen
, ensureBytes
, getBytes
, rebuild
) where
import Control.Concurrent (forkIOWithUnmask, myThreadId)
import Control.Concurrent.MVar
import qualified Control.Exception as E
import Control.Monad
import qualified Data.ByteString as S
import qualified Data.ByteString.Internal as S
import qualified Data.ByteString.Unsafe as S
import qualified Data.ByteString.Lazy as L
import Data.IORef
import Data.Semigroup as Sem
import Data.String
import Data.Word
import Foreign.C.String
import Foreign.C.Types
import Foreign.ForeignPtr
import Foreign.ForeignPtr.Unsafe
import Foreign.Marshal.Utils
import Foreign.Ptr
import qualified System.IO as IO
import System.IO.Unsafe
import GHC.Exts (Addr#, State#, RealWorld, Ptr(..), Int(..), Int#)
import GHC.Magic (oneShot)
import GHC.IO (IO(..), unIO)
import GHC.CString (unpackCString#)
import qualified Data.ByteString.Builder.Prim as P
import qualified Data.ByteString.Builder.Prim.Internal as PI
import qualified Data.ByteString.Builder.Extra as X
newtype Builder = Builder
{ unBuilder :: DataSink -> BuilderState -> BuilderState
}
type BuilderState = (# Addr#, Addr#, State# RealWorld #)
instance Sem.Semigroup Builder where
(<>) = appendBuilder
{-# INLINE (<>) #-}
appendBuilder :: Builder -> Builder -> Builder
appendBuilder (Builder a) (Builder b)
= rebuild $ Builder $ \dex bs -> b dex (a dex bs)
{-# INLINE[1] appendBuilder #-}
{-# RULES "appendBuilder/assoc"
forall x y z.
appendBuilder (appendBuilder x y) z = appendBuilder x (appendBuilder y z)
#-}
instance Monoid Builder where
mempty = Builder $ \_ bs -> bs
{-# INLINE mempty #-}
mappend = (<>)
{-# INLINE mappend #-}
mconcat = foldr mappend mempty
{-# INLINE mconcat #-}
instance IsString Builder where
fromString = builderFromString
{-# INLINE fromString #-}
data DataSink
= DynamicSink !(IORef DynamicSink)
| GrowingBuffer !(IORef (ForeignPtr Word8))
| HandleSink !IO.Handle !Int !(IORef Queue)
data DynamicSink
= ThreadedSink !(MVar Request) !(MVar Response)
| BoundedGrowingBuffer {-# UNPACK #-} !(ForeignPtr Word8) !Int
data Queue = Queue !(ForeignPtr Word8) !Int
data Request
= Request {-# UNPACK #-} !(Ptr Word8) {-# UNPACK #-} !(Ptr Word8)
data Response
= Error E.SomeException
| Done !(Ptr Word8)
| MoreBuffer !(Ptr Word8) !Int
| InsertByteString !(Ptr Word8) !S.ByteString
deriving (Show)
data ChunkOverflowException
= ChunkOverflowException
!S.ByteString !(MVar Request) !(MVar Response) !Int
instance Show ChunkOverflowException where
show (ChunkOverflowException buf _ _ req) =
"ChunkOverflowException " ++ show buf ++ " _ _ " ++ show req
instance E.Exception ChunkOverflowException
data SuspendBuilderException = SuspendBuilderException !(MVar ())
instance Show SuspendBuilderException where
show _ = "SuspendBuilderException"
instance E.Exception SuspendBuilderException
newtype BuildM a = BuildM { runBuildM :: (a -> Builder) -> Builder }
deriving (Functor)
instance Applicative BuildM where
pure = return
(<*>) = ap
instance Monad BuildM where
return x = BuildM $ \k -> k x
{-# INLINE return #-}
BuildM b >>= f = BuildM $ \k -> b $ \r -> runBuildM (f r) k
{-# INLINE (>>=) #-}
mkBuilder :: BuildM () -> Builder
mkBuilder (BuildM bb) = bb $ \_ -> mempty
{-# INLINE mkBuilder #-}
useBuilder :: Builder -> BuildM ()
useBuilder b = BuildM $ \k -> b <> k ()
{-# INLINE useBuilder #-}
getSink :: BuildM DataSink
getSink = BuildM $ \k -> Builder $ \dex (# cur, end, s #) ->
unBuilder (k dex) dex (# cur, end, s #)
getCur :: BuildM (Ptr Word8)
getCur = BuildM $ \k -> Builder $ \dex (# cur, end, s #) ->
unBuilder (k (Ptr cur)) dex (# cur, end, s #)
getEnd :: BuildM (Ptr Word8)
getEnd = BuildM $ \k -> Builder $ \dex (# cur, end, s #) ->
unBuilder (k (Ptr end)) dex (# cur, end, s #)
setCur :: Ptr Word8 -> BuildM ()
setCur (Ptr p) = BuildM $ \k -> Builder $ \dex (# _, end, s #) ->
unBuilder (k ()) dex (# p, end, s #)
setEnd :: Ptr Word8 -> BuildM ()
setEnd (Ptr p) = BuildM $ \k -> Builder $ \dex (# cur, _, s #) ->
unBuilder (k ()) dex (# cur, p, s #)
io :: IO a -> BuildM a
io (IO x) = BuildM $ \k -> Builder $ \dex (# cur, end, s #) -> case x s of
(# s', val #) -> unBuilder (k val) dex (# cur, end, s' #)
updateState :: (BuilderState -> BuilderState) -> BuildM ()
updateState f = BuildM $ \k -> Builder $ \sink bs ->
unBuilder (k ()) sink (f bs)
data Write = Write !Int (BuilderState -> BuilderState)
instance Sem.Semigroup Write where
Write s0 w0 <> Write s1 w1 = Write (s0 + s1) (\s -> w1 (w0 s))
instance Monoid Write where
mempty = Write 0 (\s -> s)
mappend = (<>)
{-# INLINE mappend #-}
writeBoundedPrim :: PI.BoundedPrim a -> a -> Write
writeBoundedPrim prim x =
Write (PI.sizeBound prim) $ \(# cur, end, s #) ->
case unIO (PI.runB prim x (Ptr cur)) s of
(# s', Ptr cur' #) -> (# cur', end, s' #)
runBuilder :: Builder -> DataSink -> Ptr Word8 -> Ptr Word8 -> IO (Ptr Word8)
runBuilder (Builder f) sink (Ptr cur) (Ptr end) = IO $ \s ->
case f sink (# cur, end, s #) of
(# cur', _, s' #) -> (# s', Ptr cur' #)
toLazyByteString :: Builder -> L.ByteString
toLazyByteString = toLazyByteStringWith 100 32768
toLazyByteStringWith :: Int -> Int -> Builder -> L.ByteString
toLazyByteStringWith !initialSize !maxSize builder = unsafePerformIO $ do
fptr <- mallocForeignPtrBytes initialSize
sink <- newIORef $ BoundedGrowingBuffer fptr maxSize
let !base = unsafeForeignPtrToPtr fptr
let
finalPtr = unsafeDupablePerformIO $
runBuilder builder (DynamicSink sink) base (base `plusPtr` initialSize)
{-# NOINLINE finalPtr #-}
loop thunk = do
r <- E.try $ E.evaluate thunk
case r of
Right p -> do
BoundedGrowingBuffer finalFptr _ <- readIORef sink
let !finalBase = unsafeForeignPtrToPtr finalFptr
return $! L.fromStrict $
S.fromForeignPtr finalFptr 0 (p `minusPtr` finalBase)
Left ex
| Just (ChunkOverflowException chunk reqV respV minSize)
<- E.fromException ex
-> do
let rest = continueBuilderThreaded reqV respV minSize maxSize thunk
return $ L.fromChunks $
if S.null chunk then rest else chunk : rest
| otherwise -> do
myTid <- myThreadId
E.throwTo myTid ex
loop thunk
loop finalPtr
continueBuilderThreaded
:: MVar Request -> MVar Response -> Int -> Int -> Ptr Word8
-> [S.ByteString]
continueBuilderThreaded !reqV !respV !initialSize !maxSize thunk =
makeChunks (max maxSize initialSize) maxSize $ toBufferWriter reqV respV thunk
toBufferWriter :: MVar Request -> MVar Response -> Ptr Word8 -> X.BufferWriter
toBufferWriter !reqV !respV thunk buf0 sz0 = E.mask_ $
writer Nothing buf0 sz0
where
writer !maybeBuilderTid !buf !sz = do
putMVar reqV $ Request buf (buf `plusPtr` sz)
builderTid <- case maybeBuilderTid of
Just t -> return t
Nothing -> forkIOWithUnmask $ \u ->
builderThreadWithUnmask u respV thunk
resp <- wait builderTid
let go cur next = return(written, next)
where !written = cur `minusPtr` buf
case resp of
Error ex -> E.throwIO ex
Done cur -> go cur X.Done
MoreBuffer cur k -> go cur $ X.More k $ writer (Just builderTid)
InsertByteString cur str -> go cur $ X.Chunk str $ writer (Just builderTid)
wait !builderTid = do
r <- E.try $ takeMVar respV
case r of
Right resp -> return resp
Left exn -> do
resumeVar <- newEmptyMVar
E.throwTo builderTid $ SuspendBuilderException resumeVar
thisTid <- myThreadId
E.throwTo thisTid (exn :: E.SomeException)
putMVar resumeVar ()
wait builderTid
builderThreadWithUnmask
:: (forall a. IO a -> IO a) -> MVar Response -> Ptr Word8
-> IO ()
builderThreadWithUnmask unmask !respV thunk = loop
where
loop = do
r <- E.try $ unmask $ E.evaluate thunk
case r of
Right p -> putMVar respV $ Done p
Left ex
| Just (SuspendBuilderException lock) <- E.fromException ex
-> do takeMVar lock; loop
| otherwise -> putMVar respV $ Error ex
makeChunks :: Int -> Int -> X.BufferWriter -> [S.ByteString]
makeChunks !initialBufSize maxBufSize = go initialBufSize
where
go !bufSize w = unsafePerformIO $ do
fptr <- S.mallocByteString bufSize
(written, next) <- withForeignPtr fptr $ \buf -> w buf bufSize
let rest = case next of
X.Done -> []
X.More reqSize w' -> go (max reqSize maxBufSize) w'
X.Chunk chunk w' -> chunk : go maxBufSize w'
return $ if written == 0
then rest
else S.fromForeignPtr fptr 0 written : rest
toStrictByteString :: Builder -> S.ByteString
toStrictByteString builder = unsafePerformIO $ do
let cap = 100
fptr <- mallocForeignPtrBytes cap
bufferRef <- newIORef fptr
let !base = unsafeForeignPtrToPtr fptr
cur <- runBuilder builder (GrowingBuffer bufferRef) base (base `plusPtr` cap)
endFptr <- readIORef bufferRef
let !written = cur `minusPtr` unsafeForeignPtrToPtr endFptr
return $ S.fromForeignPtr endFptr 0 written
hPutBuilder :: IO.Handle -> Builder -> IO ()
hPutBuilder !h builder = hPutBuilderWith h 100 4096 builder
hPutBuilderWith :: IO.Handle -> Int -> Int -> Builder -> IO ()
hPutBuilderWith !h !initialCap !nextCap builder = do
fptr <- mallocForeignPtrBytes initialCap
qRef <- newIORef $ Queue fptr 0
let !base = unsafeForeignPtrToPtr fptr
cur <- runBuilder builder (HandleSink h nextCap qRef)
base (base `plusPtr` initialCap)
flushQueue h qRef cur
builderFromString :: String -> Builder
builderFromString = primMapListBounded P.charUtf8
{-# NOINLINE[0] builderFromString #-}
{-# RULES "FastBuilder: builderFromString/unpackCString#"
forall addr.
builderFromString (unpackCString# addr) = unsafeCString (Ptr addr)
#-}
primBounded :: PI.BoundedPrim a -> a -> Builder
primBounded prim = write . writeBoundedPrim prim
{-# INLINE primBounded #-}
write :: Write -> Builder
write (Write size w) = rebuild $ mkBuilder $ do
useBuilder $ ensureBytes size
updateState w
{-# INLINE[1] write #-}
{-# RULES "fast-builder: write/write"
forall w0 w1.
appendBuilder (write w0) (write w1) = write (w0 <> w1)
#-}
{-# RULES "fast-builder: write/write/x"
forall w0 w1 x.
appendBuilder (write w0) (appendBuilder (write w1) x)
= appendBuilder (write (w0 <> w1)) x
#-}
primFixed :: PI.FixedPrim a -> a -> Builder
primFixed prim = primBounded (PI.toB prim)
{-# INLINE primFixed #-}
primMapListBounded :: PI.BoundedPrim a -> [a] -> Builder
primMapListBounded prim = mconcat . map (primBounded prim)
{-# INLINE primMapListBounded #-}
primMapListFixed :: PI.FixedPrim a -> [a] -> Builder
primMapListFixed prim = primMapListBounded (PI.toB prim)
{-# INLINE primMapListFixed #-}
byteString :: S.ByteString -> Builder
byteString = byteStringThreshold maximalCopySize
{-# INLINE byteString #-}
maximalCopySize :: Int
maximalCopySize = 2 * X.smallChunkSize
byteStringThreshold :: Int -> S.ByteString -> Builder
byteStringThreshold th bstr = rebuild $
if S.length bstr >= th
then byteStringInsert bstr
else byteStringCopy bstr
byteStringCopy :: S.ByteString -> Builder
byteStringCopy !bstr =
ensureBytes (S.length bstr) <> byteStringCopyNoCheck bstr
byteStringCopyNoCheck :: S.ByteString -> Builder
byteStringCopyNoCheck !bstr = mkBuilder $ do
cur <- getCur
io $ S.unsafeUseAsCString bstr $ \ptr ->
copyBytes cur (castPtr ptr) len
setCur $ cur `plusPtr` len
where
!len = S.length bstr
byteStringInsert :: S.ByteString -> Builder
byteStringInsert !bstr = byteStringInsert_ bstr
byteStringInsert_ :: S.ByteString -> Builder
byteStringInsert_ bstr = mkBuilder $ do
sink <- getSink
case sink of
DynamicSink dRef -> do
dyn <- io $ readIORef dRef
case dyn of
ThreadedSink reqV respV -> do
cur <- getCur
io $ putMVar respV $ InsertByteString cur bstr
handleRequest reqV
BoundedGrowingBuffer fptr bound -> do
r <- remainingBytes
when (r < S.length bstr) $
growBufferBounded dRef fptr bound (S.length bstr)
useBuilder $ byteStringCopyNoCheck bstr
GrowingBuffer bufRef -> do
r <- remainingBytes
when (r < S.length bstr) $
growBuffer bufRef (S.length bstr)
useBuilder $ byteStringCopyNoCheck bstr
HandleSink h _nextCap queueRef -> do
cur <- getCur
io $ flushQueue h queueRef cur
io $ S.hPut h bstr
{-# NOINLINE byteStringInsert_ #-}
unsafeCString :: CString -> Builder
unsafeCString cstr = rebuild $ let
!len = fromIntegral $ c_pure_strlen cstr
in unsafeCStringLen (cstr, len)
foreign import ccall unsafe "strlen" c_pure_strlen :: CString -> CSize
unsafeCStringLen :: CStringLen -> Builder
unsafeCStringLen (ptr, len) = mappend (ensureBytes len) $ mkBuilder $ do
cur <- getCur
io $ copyBytes cur (castPtr ptr) len
setCur $ cur `plusPtr` len
ensureBytes :: Int -> Builder
ensureBytes !n = mkBuilder $ do
r <- remainingBytes
when (r < n) $ useBuilder $ getBytes n
{-# INLINE ensureBytes #-}
getBytes :: Int -> Builder
getBytes (I# n) = getBytes_ n
getBytes_ :: Int# -> Builder
getBytes_ n = mkBuilder $ do
sink <- getSink
case sink of
DynamicSink dRef -> do
dyn <- io $ readIORef dRef
case dyn of
ThreadedSink reqV respV -> do
cur <- getCur
io $ putMVar respV $ MoreBuffer cur $ I# n
handleRequest reqV
BoundedGrowingBuffer fptr bound ->
growBufferBounded dRef fptr bound (I# n)
GrowingBuffer bufRef -> growBuffer bufRef (I# n)
HandleSink h nextCap queueRef -> do
cur <- getCur
io $ flushQueue h queueRef cur
switchQueue queueRef $ max nextCap (I# n)
{-# NOINLINE getBytes_ #-}
remainingBytes :: BuildM Int
remainingBytes = minusPtr <$> getEnd <*> getCur
{-# INLINE remainingBytes #-}
rebuild :: Builder -> Builder
rebuild (Builder f) = Builder $ oneShot $ \dex -> oneShot $
\(# cur, end, s #) -> f dex (# cur, end, s #)
handleRequest :: MVar Request -> BuildM ()
handleRequest reqV = do
Request newCur newEnd <- io $ takeMVar reqV
setCur newCur
setEnd newEnd
growBuffer :: IORef (ForeignPtr Word8) -> Int -> BuildM ()
growBuffer !bufRef !req = do
cur <- getCur
end <- getCur
fptr <- io $ readIORef bufRef
let !base = unsafeForeignPtrToPtr fptr
let !size = cur `minusPtr` base
let !cap = end `minusPtr` base
let !newCap = cap + max cap req
newFptr <- io $ mallocForeignPtrBytes newCap
let !newBase = unsafeForeignPtrToPtr newFptr
setCur $ newBase `plusPtr` size
setEnd $ newBase `plusPtr` newCap
io $ do
copyBytes newBase base size
touchForeignPtr fptr
touchForeignPtr newFptr
writeIORef bufRef newFptr
{-# INLINE growBuffer #-}
flushQueue :: IO.Handle -> IORef Queue -> Ptr Word8 -> IO ()
flushQueue !h !qRef !cur = do
Queue fptr start <- readIORef qRef
let !end = cur `minusPtr` unsafeForeignPtrToPtr fptr
when (end > start) $ do
S.hPut h $ S.fromForeignPtr fptr start (end - start)
writeIORef qRef $ Queue fptr end
switchQueue :: IORef Queue -> Int -> BuildM ()
switchQueue !qRef !minSize = do
end <- getCur
Queue fptr _ <- io $ readIORef qRef
let !base = unsafeForeignPtrToPtr fptr
let !cap = end `minusPtr` base
newFptr <- if minSize <= cap
then return fptr
else io $ mallocForeignPtrBytes minSize
let !newBase = unsafeForeignPtrToPtr newFptr
io $ writeIORef qRef $ Queue newFptr 0
setCur newBase
setEnd $ newBase `plusPtr` max minSize cap
growBufferBounded
:: IORef DynamicSink -> ForeignPtr Word8 -> Int -> Int -> BuildM ()
growBufferBounded !dRef !fptr !bound !req = do
cur <- getCur
end <- getCur
let !base = unsafeForeignPtrToPtr fptr
let !size = cur `minusPtr` base
let !cap = end `minusPtr` base
let !newCap = cap + max cap req
if bound < newCap
then chunkOverflow dRef req $ S.fromForeignPtr fptr 0 size
else do
newFptr <- io $ mallocForeignPtrBytes newCap
let !newBase = unsafeForeignPtrToPtr newFptr
setCur $ newBase `plusPtr` size
setEnd $ newBase `plusPtr` newCap
io $ do
copyBytes newBase base size
touchForeignPtr fptr
touchForeignPtr newFptr
writeIORef dRef $ BoundedGrowingBuffer newFptr bound
{-# INLINE growBufferBounded #-}
chunkOverflow :: IORef DynamicSink -> Int -> S.ByteString -> BuildM ()
chunkOverflow !dRef !minSize !chunk = do
myTid <- io myThreadId
reqV <- io newEmptyMVar
respV <- io newEmptyMVar
io $ E.throwTo myTid $ ChunkOverflowException chunk reqV respV minSize
io $ writeIORef dRef $ ThreadedSink reqV respV
handleRequest reqV