-- Copyright (c) 2016-present, Facebook, Inc. -- All rights reserved. -- -- This source code is licensed under the BSD-style license found in -- the LICENSE file in the root directory of this source tree. An -- additional grant of patent rights can be found in the PATENTS file -- in the same directory. {-# LANGUAGE MultiWayIf #-} -- | -- Module : Codec.Compression.Zstd.Streaming -- Copyright : (c) 2016-present, Facebook, Inc. All rights reserved. -- -- License : BSD3 -- Maintainer : bryano@fb.com -- Stability : experimental -- Portability : GHC -- -- Streaming compression and decompression support for zstd. module Codec.Compression.Zstd.Streaming ( Result(..) , compress , decompress , maxCLevel ) where import Codec.Compression.Zstd.FFI hiding (compress, decompress) import Codec.Compression.Zstd.FFI.Types (peekPos) import qualified Data.ByteString as B import Data.ByteString.Internal (ByteString(..), mallocByteString) import Foreign.Marshal.Alloc (finalizerFree, malloc) import Foreign.C.Types (CSize) import Foreign.ForeignPtr import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr) import Foreign.Storable (poke) import Foreign.Ptr (Ptr, plusPtr) import Data.Word (Word8) -- | The result of a streaming compression or decompression step. data Result = Produce ByteString (IO Result) -- ^ A single frame of transformed data, and an action that when -- executed will yield the next step in the streaming operation. -- The action is ephemeral; you should discard it as soon as you -- use it. | Consume (ByteString -> IO Result) -- ^ Provide the function with more input for the streaming -- operation to continue. This function is ephemeral. You should -- call it exactly once, and discard it immediately after you call -- it. -- -- To signal the end of a stream of data, supply an 'B.empty' -- input. | Error String String -- ^ An error has occurred. If an error occurs, the streaming -- operation cannot continue. | Done ByteString -- ^ The streaming operation has ended. This payload may be -- empty. If it is not, it must be written out. -- -- A non-empty payload consists of a frame epilogue, possibly -- preceded by any data left over from the final streaming step. instance Show Result where show (Produce bs _) = "Produce " ++ show bs ++ " _" show (Consume _) = "Consume _" show (Error n d) = "Error " ++ show n ++ " " ++ show d show (Done bs) = "Done " ++ show bs -- | Begin a streaming compression operation. -- -- The initial result will be either an 'Error' or a 'Consume'. compress :: Int -- ^ Compression level. Must be >= 1 and <= 'maxCLevel'. -> IO Result compress level | level < 1 || level > maxCLevel = return (Error "compress" "unsupported compression level") | otherwise = streaming createCStream p_freeCStream outSize (\cs -> initCStream cs (fromIntegral level)) compressStream finish where outSize = fromIntegral cstreamOutSize finish cfp obfp opos dfp = do let cptr = unsafeForeignPtrToPtr cfp obuf = unsafeForeignPtrToPtr obfp check "endStream" (endStream cptr obuf) $ \leftover -> do touchForeignPtr cfp touchForeignPtr obfp if | leftover <= 0 -> do -- leftover will never be <0, but compiler does not know that opos1 <- fromIntegral `fmap` peekPos obuf Done `fmap` shrink outSize dfp opos1 | leftover > 0 -> do opos1 <- fromIntegral `fmap` peekPos obuf dfp1 <- mallocByteString (fromIntegral leftover) poke obuf (buffer (unsafeForeignPtrToPtr dfp1) leftover) touchForeignPtr obfp bs <- shrink outSize dfp opos1 return (Produce bs (finish cfp obfp 0 dfp1)) type ConsumeBlock ctx io = Ptr ctx -> Ptr (Buffer Out) -> Ptr (Buffer In) -> IO CSize type Finish ctx io = ForeignPtr ctx -> ForeignPtr (Buffer Out) -> Int -> ForeignPtr Word8 -> IO Result streaming :: IO (Ptr ctx) -> FinalizerPtr ctx -> Int -> (Ptr ctx -> IO CSize) -> ConsumeBlock ctx io -> Finish ctx io -> IO Result streaming createStream freeStream outSize initStream consumeBlock finish = do cx <- checkAlloc "createStream" createStream cxfp <- newForeignPtr freeStream cx check "initStream" (initStream cx) $ \_ -> do ibfp <- newForeignPtr finalizerFree =<< malloc obfp <- newForeignPtr finalizerFree =<< malloc dfp <- newOutput obfp advanceInput cxfp ibfp obfp 0 dfp where advanceInput cxfp ibfp obfp opos dfp = do let prompt (PS fp off len) | len == 0 = finish cxfp obfp opos dfp | otherwise = do withForeignPtr fp $ \sp0 -> withForeignPtr ibfp $ \ibuf -> poke ibuf (buffer (sp0 `plusPtr` off) (fromIntegral len)) consume cxfp ibfp 0 len obfp 0 dfp fp return (Consume prompt) newOutput obfp = do dfp <- mallocByteString outSize withForeignPtr dfp $ \dp -> withForeignPtr obfp $ \obuf -> poke obuf (buffer dp (fromIntegral outSize)) return dfp consume cxfp ibfp ipos ilen obfp opos dfp fp = do if | fromIntegral ipos == ilen -> advanceInput cxfp ibfp obfp opos dfp | opos == outSize -> do let go = do ndfp <- newOutput obfp consume cxfp ibfp ipos ilen obfp 0 ndfp fp return (Produce (PS dfp 0 opos) go) | otherwise -> do let obuf = unsafeForeignPtrToPtr obfp ibuf = unsafeForeignPtrToPtr ibfp check "consumeBlock" (withForeignPtr cxfp $ \cptr -> consumeBlock cptr obuf ibuf <* touchForeignPtr fp) $ \_ -> do opos1 <- fromIntegral `fmap` peekPos obuf ipos1 <- peekPos ibuf touchForeignPtr obfp touchForeignPtr ibfp consume cxfp ibfp ipos1 ilen obfp opos1 dfp fp -- | Begin a streaming decompression operation. -- -- The initial result will be either an 'Error' or a 'Consume'. decompress :: IO Result decompress = streaming createDStream p_freeDStream outSize initDStream decompressStream finish where outSize = fromIntegral dstreamOutSize finish _cxfp _obfp opos dfp = Done `fmap` shrink outSize dfp opos shrink :: Int -> ForeignPtr Word8 -> Int -> IO B.ByteString shrink capacity dfp opos | opos == 0 = return B.empty | let unused = capacity - opos in unused >= 1024 || unused > capacity `rem` 8 = return (B.copy (PS dfp 0 opos)) | otherwise = return (PS dfp 0 opos) buffer :: Ptr a -> CSize -> Buffer io buffer ptr size = Buffer ptr size 0 check :: String -> IO CSize -> (CSize -> IO Result) -> IO Result check name act onSuccess = do ret <- act if isError ret then return (Error name (getErrorName ret)) else onSuccess ret