{-# LANGUAGE Rank2Types, TypeFamilies #-} -- This library emulates Data.ByteStream.Lazy but includes a monadic element -- and thus at certain points uses a `Stream`/`FreeT` type in place of lists. -- | -- Module : ByteStream -- Copyright : (c) Don Stewart 2006 -- (c) Duncan Coutts 2006-2011 -- (c) Michael Thompson 2015 -- (c) Udo Stenzel 2018 -- License : BSD-style -- -- Maintainer : u.stenzel@web.de -- Stability : experimental -- Portability : portable -- -- See the simple examples of use . -- We begin with a slight modification of the documentation to "Data.ByteStream.Lazy": -- -- A time and space-efficient implementation of effectful byte streams -- using a stream of packed 'Word8' arrays, suitable for high performance -- use, both in terms of large data quantities, or high speed -- requirements. ByteStreams are encoded as streams of strict chunks -- of bytes. -- -- A key feature of ByteStreams is the means to manipulate large or -- unbounded streams of data without requiring the entire sequence to be -- resident in memory. To take advantage of this you have to write your -- functions in a streaming style, e.g. classic pipeline composition. The -- default I\/O chunk size is 32k, which should be good in most circumstances. -- -- Some operations, such as 'concat', 'append', 'reverse' and 'cons', have -- better complexity than their "Data.ByteStream" equivalents, due to -- optimisations resulting from the list spine structure. For other -- operations streaming, like lazy, ByteStreams are usually within a few percent of -- strict ones. -- -- This module is intended to be imported @qualified@, to avoid name -- clashes with "Prelude" functions. eg. -- -- > import qualified Bio.Streaming.Bytes as B -- -- Original GHC implementation by Bryan O\'Sullivan. -- Rewritten to use 'Data.Array.Unboxed.UArray' by Simon Marlow. -- Rewritten to support slices and use 'Foreign.ForeignPtr.ForeignPtr' -- by David Roundy. -- Rewritten again and extended by Don Stewart and Duncan Coutts. -- Lazy variant by Duncan Coutts and Don Stewart. -- Streaming variant by Michael Thompson, following the ideas of Gabriel Gonzales' pipes-bytestring -- Adapted for use in biohazard by Udo Stenzel. -- module Bio.Streaming.Bytes ( -- * The @ByteStream@ type ByteStream(..) -- * Introducing and eliminating 'ByteStream's , empty -- empty :: ByteStream m () , singleton -- singleton :: Monad m => Word8 -> ByteStream m () , fromLazy -- fromLazy :: Monad m => ByteStream -> ByteStream m () , fromChunks -- fromChunks :: Monad m => Stream (Of Bytes) m r -> ByteStream m r , toLazy -- toLazy :: Monad m => ByteStream m () -> m ByteStream , toStrict -- toStrict :: Monad m => ByteStream m () -> m ByteStream , effects , mwrap -- * Basic interface , cons -- cons :: Monad m => Word8 -> ByteStream m r -> ByteStream m r , nextByte -- nextByte :: Monad m => ByteStream m r -> m (Either r (Word8, ByteStream m r)) , nextByteOff -- nextByteOff :: Monad m => ByteStream m r -> m (Either r (Word8, Int64, ByteStream m r)) -- * Substrings -- ** Breaking strings , break -- break :: Monad m => (Word8 -> Bool) -> ByteStream m r -> ByteStream m (ByteStream m r) , drop -- drop :: Monad m => GHC.Int.Int64 -> ByteStream m r -> ByteStream m r , dropWhile , splitAt -- splitAt :: Monad m => GHC.Int.Int64 -> ByteStream m r -> ByteStream m (ByteStream m r) , splitAt' -- splitAt' :: Monad m => Int -> ByteStream m r -> m (Of Bytes (ByteStream m r)) , trim -- ** Breaking into many substrings , lines , lines' -- ** Special folds , concat -- concat :: Monad m => Stream (ByteStream m) m r -> ByteStream m r -- * Builders , toByteStream , toByteStreamWith , concatBuilders -- * I\/O with 'ByteStream's -- ** Files , withOutputFile , writeFile -- writeFile :: FilePath -> ByteStream IO r -> IO r -- ** I\/O with Handles , hGetContents -- hGetContents :: Handle -> ByteStream IO () , hGetContentsN -- hGetContentsN :: Int -> Handle -> ByteStream IO () , hPut -- hPut :: Handle -> ByteStream IO r -> IO r -- * Simple chunkwise operations , nextChunk , nextChunkOff , consChunk -- :: Bytes -> ByteStream m r -> ByteStream m r , consChunkOff -- :: Bytes -> Int64 -> ByteStream m r -> ByteStream m r , chunk , copy , mapChunksM_ -- * compression support , gzip , gunzip , gunzipWith ) where import Bio.Prelude hiding (break,concat,drop,dropWhile,lines,splitAt,writeFile,empty,loop) import Data.ByteString.Builder.Internal (Builder,builder,runBuilder,runBuilderWith,bufferSize ,AllocationStrategy,ChunkIOStream(..),buildStepToCIOS ,byteStringFromBuffer,safeStrategy,defaultChunkSize) import GHC.Exts (SpecConstrAnnotation(..)) import Streaming (Of(..),Identity(..),destroy) import Streaming.Internal (Stream (..)) import System.Directory (renameFile) import qualified Codec.Compression.Zlib.Internal as Z import qualified Data.ByteString as B import qualified Data.ByteString.Internal as B import qualified Data.ByteString.Lazy.Internal as L (foldrChunks,ByteString(..),smallChunkSize,defaultChunkSize) import qualified Data.ByteString.Unsafe as B import qualified Streaming.Prelude as Q -- | A space-efficient representation of a succession of 'Word8' vectors, supporting many -- efficient operations. -- -- An effectful 'ByteStream' contains 8-bit bytes, or by using certain -- operations can be interpreted as containing 8-bit characters. It -- also contains an offset, which will be needed to track the virtual -- offsets in the BGZF decode. data ByteStream m r = Empty r | Chunk {-# UNPACK #-} !Bytes {-# UNPACK #-} !Int64 (ByteStream m r) | Go (m (ByteStream m r)) instance Monad m => Functor (ByteStream m) where fmap f x = case x of Empty a -> Empty (f a) Chunk bs o bss -> Chunk bs o (fmap f bss) Go mbss -> Go (liftM (fmap f) mbss) instance Monad m => Applicative (ByteStream m) where pure = Empty {-# INLINE pure #-} bf <*> bx = do {f <- bf; x <- bx; Empty (f x)} {-# INLINE (<*>) #-} (*>) = (>>) {-# INLINE (*>) #-} instance Monad m => Monad (ByteStream m) where return = Empty {-# INLINE return #-} x0 >> y = loop SPEC x0 where loop !_ x = case x of -- this seems to be insanely effective Empty _ -> y Chunk a o b -> Chunk a o (loop SPEC b) Go m -> Go (liftM (loop SPEC) m) {-# INLINEABLE (>>) #-} x >>= f = loop SPEC2 x where -- unlike >> this SPEC seems pointless loop !_ y = case y of Empty a -> f a Chunk bs o bss -> Chunk bs o (loop SPEC bss) Go mbss -> Go (liftM (loop SPEC) mbss) {-# INLINEABLE (>>=) #-} instance MonadIO m => MonadIO (ByteStream m) where liftIO io = Go (liftM Empty (liftIO io)) {-# INLINE liftIO #-} instance MonadTrans ByteStream where lift ma = Go $ liftM Empty ma {-# INLINE lift #-} instance (r ~ ()) => IsString (ByteStream m r) where fromString = chunk . fromString {-# INLINE fromString #-} instance (m ~ Identity, Show r) => Show (ByteStream m r) where show bs0 = case bs0 of -- the implementation this instance deserves ... Empty r -> "Empty (" ++ show r ++ ")" Go (Identity bs') -> "Go (Identity (" ++ show bs' ++ "))" Chunk bs'' o bs -> "Chunk " ++ show bs'' ++ " " ++ show o ++ " (" ++ show bs ++ ")" instance (Semigroup r, Monad m) => Semigroup (ByteStream m r) where (<>) = liftM2 (<>) {-# INLINE (<>) #-} instance (Semigroup r, Monoid r, Monad m) => Monoid (ByteStream m r) where mempty = Empty mempty {-# INLINE mempty #-} mappend = (<>) {-# INLINE mappend #-} data SPEC = SPEC | SPEC2 {-# ANN type SPEC ForceSpecConstr #-} -- -------------------------------------------------------------------------- -- | Smart constructor for 'Chunk'. consChunk :: Bytes -> ByteStream m r -> ByteStream m r consChunk c = consChunkOff c 0 {-# INLINE consChunk #-} consChunkOff :: Bytes -> Int64 -> ByteStream m r -> ByteStream m r consChunkOff c@(B.PS _ _ len) off cs | len == 0 = cs | otherwise = Chunk c off cs {-# INLINE consChunkOff #-} -- | Yield-style smart constructor for 'Chunk'. chunk :: Bytes -> ByteStream m () chunk bs = consChunk bs empty {-# INLINE chunk #-} {- | Reconceive an effect that results in an effectful bytestring as an effectful bytestring. Compare Streaming.mwrap. The closes equivalent of >>> Streaming.wrap :: f (Stream f m r) -> Stream f m r is here @consChunk@. @mwrap@ is the smart constructor for the internal @Go@ constructor. -} mwrap :: m (ByteStream m r) -> ByteStream m r mwrap = Go {-# INLINE mwrap #-} -- | Construct a succession of chunks from its Church encoding (compare @GHC.Exts.build@) materialize :: (forall x . (r -> x) -> (Bytes -> Int64 -> x -> x) -> (m x -> x) -> x) -> ByteStream m r materialize phi = phi Empty Chunk Go {-# INLINE[0] materialize #-} -- | Resolve a succession of chunks into its Church encoding; this is -- not a safe operation; it is equivalent to exposing the constructors dematerialize :: Monad m => ByteStream m r -> (forall x . (r -> x) -> (Bytes -> Int64 -> x -> x) -> (m x -> x) -> x) dematerialize x0 nil con fin = loop SPEC x0 where loop !_ x = case x of Empty r -> nil r Chunk b o bs -> con b o (loop SPEC bs ) Go ms -> fin (liftM (loop SPEC) ms) {-# INLINE [1] dematerialize #-} {-# RULES "dematerialize/materialize" forall (phi :: forall b . (r -> b) -> (Bytes -> Int64 -> b -> b) -> (m b -> b) -> b) . dematerialize (materialize phi) = phi ; #-} ------------------------------------------------------------------------ copy :: Monad m => ByteStream m r -> ByteStream (ByteStream m) r copy = loop where loop str = case str of Empty r -> Empty r Go m -> Go (liftM loop (lift m)) Chunk bs o rest -> Chunk bs o (Go (Chunk bs o (Empty (loop rest)))) {-# INLINABLE copy #-} -- | /O(n)/ Concatenate a stream of byte streams. concat :: Monad m => Stream (ByteStream m) m r -> ByteStream m r concat x = destroy x join Go Empty {-# INLINE concat #-} -- | Perform the effects contained in an effectful bytestring, ignoring the bytes. effects :: Monad m => ByteStream m r -> m r effects bs = case bs of Empty r -> return r Go m -> m >>= effects Chunk _ _ rest -> effects rest {-# INLINABLE effects #-} -- ----------------------------------------------------------------------------- -- Introducing and eliminating 'ByteStream's {-| /O(1)/ The empty 'ByteStream' -- i.e. @return ()@ Note that @ByteStream m w@ is generally a monoid for monoidal values of @w@, like @()@ -} empty :: ByteStream m () empty = Empty () {-# INLINE empty #-} {-| /O(1)/ Yield a 'Word8' as a minimal 'ByteStream' -} singleton :: Word8 -> ByteStream m () singleton w = Chunk (B.singleton w) 0 (Empty ()) {-# INLINE singleton #-} -- | /O(c)/ Converts a byte stream into a stream of individual strict bytestrings. -- This of course exposes the internal chunk structure. toChunks :: Monad m => ByteStream m r -> Stream (Of Bytes) m r toChunks bs = dematerialize bs return (\b _ mx -> Step (b :> mx)) Effect {-# INLINE toChunks #-} mapChunksM_ :: Monad m => (Bytes -> m ()) -> ByteStream m r -> m r mapChunksM_ f bs = dematerialize bs return (\c _ k -> f c >> k) join {-# INLINE mapChunksM_ #-} -- | /O(c)/ Converts a stream of strict bytestrings into a byte stream. fromChunks :: Monad m => Stream (Of Bytes) m r -> ByteStream m r fromChunks bs = destroy bs (\(b :> mx) !i -> Chunk b i (mx (i + fromIntegral (B.length b)))) (\k !i -> Go (k >>= \f -> return (f i))) (\r !_ -> return r) 0 {-# INLINE fromChunks #-} {-| /O(n)/ Convert a monadic byte stream into a single strict 'ByteStream', retaining the return value of the original pair. This operation is for use with 'mapped'. > mapped R.toStrict :: Monad m => Stream (ByteStream m) m r -> Stream (Of ByteStream) m r It is subject to all the objections one makes to Data.ByteStream.Lazy 'toStrict'; all of these are devastating. -} toStrict :: Monad m => ByteStream m r -> m (Of Bytes r) toStrict bs = do (bss :> r) <- Q.toList (toChunks bs) return $ (B.concat bss :> r) {-# INLINE toStrict #-} {-| /O(c)/ Transmute a pseudo-pure lazy bytestring to its representation as a monadic stream of chunks. >>> Q.putStrLn $ Q.fromLazy "hi" hi >>> Q.fromLazy "hi" Chunk "hi" (Empty (())) -- note: a 'show' instance works in the identity monad >>> Q.fromLazy $ BL.fromChunks ["here", "are", "some", "chunks"] Chunk "here" (Chunk "are" (Chunk "some" (Chunk "chunks" (Empty (()))))) -} fromLazy :: LazyBytes -> ByteStream m () fromLazy = L.foldrChunks consChunk empty {-# INLINE fromLazy #-} {-| /O(n)/ Convert an effectful byte stream into a single lazy 'ByteStream' with the same internal chunk structure, retaining the original return value. This is the canonical way of breaking streaming (@toStrict@ and the like are far more demonic). Essentially one is dividing the interleaved layers of effects and bytes into one immense layer of effects, followed by the memory of the succession of bytes. Because one preserves the return value, @toLazy@ is a suitable argument for 'Streaming.mapped' > B.mapped Q.toLazy :: Stream (ByteStream m) m r -> Stream (Of LazyBytes) m r >>> Q.toLazy "hello" "hello" :> () >>> B.toListM $ traverses Q.toLazy $ Q.lines "one\ntwo\nthree\nfour\nfive\n" ["one","two","three","four","five",""] -- [LazyBytes] -} toLazy :: Monad m => ByteStream m r -> m (Of LazyBytes r) toLazy bs0 = dematerialize bs0 (\r -> return (L.Empty :> r)) (\b _ mx -> do (bs :> x) <- mx return $ L.Chunk b bs :> x ) join {-# INLINE toLazy #-} -- | /O(1)/ 'cons' is analogous to '(:)' for lists. cons :: Word8 -> ByteStream m r -> ByteStream m r cons c cs = Chunk (B.singleton c) 0 cs {-# INLINE cons #-} -- | /O(1)/ Extract the head and tail of a 'ByteStream', or its return value -- if it is empty. This is the \'natural\' uncons for an effectful byte stream. nextByte :: Monad m => ByteStream m r -> m (Either r (Word8, ByteStream m r)) nextByte = liftM (either Left (\(a,_,b) -> Right (a,b))) . nextByteOff {-# INLINE nextByte #-} nextByteOff :: Monad m => ByteStream m r -> m (Either r (Word8, Int64, ByteStream m r)) nextByteOff (Empty r) = return (Left r) nextByteOff (Chunk c o cs) = if B.null c then nextByteOff cs else return $ Right (B.unsafeHead c, o , if B.length c == 1 then cs else Chunk (B.unsafeTail c) (o+1) cs) nextByteOff (Go m) = m >>= nextByteOff {-# INLINABLE nextByteOff #-} nextChunk :: Monad m => ByteStream m r -> m (Either r (Bytes, ByteStream m r)) nextChunk = liftM (either Left (\(a,_,b) -> Right (a,b))) . nextChunkOff {-# INLINE nextChunk #-} nextChunkOff :: Monad m => ByteStream m r -> m (Either r (Bytes, Int64, ByteStream m r)) nextChunkOff (Empty r) = return (Left r) nextChunkOff (Go m) = m >>= nextChunkOff nextChunkOff (Chunk c o cs) | B.null c = nextChunkOff cs | otherwise = return (Right (c,o,cs)) {-# INLINABLE nextChunkOff #-} {-| /O(n\/c)/ 'drop' @n xs@ returns the suffix of @xs@ after the first @n@ elements, or @[]@ if @n > 'length' xs@. >>> Q.putStrLn $ Q.drop 6 "Wisconsin" sin >>> Q.putStrLn $ Q.drop 16 "Wisconsin" >>> -} drop :: Monad m => Int64 -> ByteStream m r -> ByteStream m r drop i p | i <= 0 = p drop i cs0 = drop' i cs0 where drop' 0 cs = cs drop' _ (Empty r) = Empty r drop' n (Chunk c o cs) = if n < fromIntegral (B.length c) then Chunk (B.drop (fromIntegral n) c) (o+n) cs else drop' (n - fromIntegral (B.length c)) cs drop' n (Go m) = Go (liftM (drop' n) m) {-# INLINABLE drop #-} {-| /O(n\/c)/ 'splitAt' @n xs@ is equivalent to @('take' n xs, 'drop' n xs)@. >>> rest <- Q.putStrLn $ Q.splitAt 3 "therapist is a danger to good hyphenation, as Knuth notes" the >>> Q.putStrLn $ Q.splitAt 19 rest rapist is a danger -} splitAt :: Monad m => Int64 -> ByteStream m r -> ByteStream m (ByteStream m r) splitAt i cs0 | i <= 0 = Empty cs0 splitAt i cs0 = go i cs0 where go 0 cs = Empty cs go _ (Empty r) = Empty (Empty r) go n (Chunk c o cs) = if n < fromIntegral (B.length c) then Chunk (B.take (fromIntegral n) c) o $ Empty (Chunk (B.drop (fromIntegral n) c) (o+n) cs) else Chunk c o (go (n - fromIntegral (B.length c)) cs) go n (Go m) = Go (liftM (go n) m) {-# INLINABLE splitAt #-} -- | Strictly splits off a piece. This breaks streaming, so reserve its -- use for small strings or when conversion to strict 'Bytes' is needed -- anyway. splitAt' :: Monad m => Int -> ByteStream m r -> m (Of Bytes (ByteStream m r)) splitAt' i cs0 | i <= 0 = return $! B.empty :> cs0 splitAt' i cs0 = go i [] cs0 where go 0 acc cs = return $! B.concat (reverse acc) :> cs go _ acc (Empty r) = return $! B.concat (reverse acc) :> Empty r go n acc (Chunk c o cs) = if n < B.length c then return $! B.concat (reverse (B.take n c : acc)) :> Chunk (B.drop n c) (o + fromIntegral n) cs else go (n - B.length c) (c:acc) cs go n acc (Go m) = m >>= go n acc {-# INLINABLE splitAt' #-} trim :: Monad m => Int64 -> ByteStream m () -> ByteStream m () trim eoff = go where go (Empty _) = Empty () go (Go m) = lift m >>= go go (Chunk c o s) | o < eoff = Chunk c o (go s) | otherwise = Empty () {-# INLINABLE trim #-} -- | 'dropWhile' @p xs@ returns the suffix remaining after 'takeWhile' @p xs@. dropWhile :: Monad m => (Word8 -> Bool) -> ByteStream m r -> ByteStream m r dropWhile pr = drop' where drop' bs = case bs of Empty r -> Empty r Go m -> Go (liftM drop' m) Chunk c o cs -> case findIndexOrEnd (not.pr) c of 0 -> Chunk c o cs n | n < B.length c -> Chunk (B.drop n c) (o + fromIntegral n) cs | otherwise -> drop' cs {-# INLINABLE dropWhile #-} -- | 'break' @p@ is equivalent to @'span' ('not' . p)@. break :: Monad m => (Word8 -> Bool) -> ByteStream m r -> ByteStream m (ByteStream m r) break f cs0 = break' cs0 where break' (Empty r) = Empty (Empty r) break' (Chunk c o cs) = case findIndexOrEnd f c of 0 -> Empty (Chunk c o cs) n | n < B.length c -> Chunk (B.take n c) o $ Empty (Chunk (B.drop n c) (o + fromIntegral n) cs) | otherwise -> Chunk c o (break' cs) break' (Go m) = Go (liftM break' m) {-# INLINABLE break #-} {- | Read entire handle contents /lazily/ into a 'ByteStream'. Chunks are read on demand, in at most @k@-sized chunks. It does not block waiting for a whole @k@-sized chunk, so if less than @k@ bytes are available then they will be returned immediately as a smaller chunk. The handle is closed on EOF. Note: the 'Handle' should be placed in binary mode with 'System.IO.hSetBinaryMode' for 'hGetContentsN' to work correctly. -} hGetContentsN :: MonadIO m => Int -> Handle -> ByteStream m () hGetContentsN k h = loop 0 where loop !o = do c <- liftIO (B.hGetSome h k) -- only blocks if there is no data available if B.null c then Empty () else Chunk c o (loop (o + fromIntegral (B.length c))) {-# INLINABLE hGetContentsN #-} -- very effective inline pragma {-| Read entire handle contents /lazily/ into a 'ByteStream'. Chunks are read on demand, using the default chunk size. Note: the 'Handle' should be placed in binary mode with 'System.IO.hSetBinaryMode' for 'hGetContents' to work correctly. -} hGetContents :: MonadIO m => Handle -> ByteStream m () hGetContents = hGetContentsN defaultChunkSize {-# INLINE hGetContents #-} withOutputFile :: (MonadIO m, MonadMask m) => FilePath -> (Handle -> m a) -> m a withOutputFile "-" k = k stdout withOutputFile f k = bracket (liftIO $ openBinaryFile (f++".#~#") WriteMode) (liftIO . hClose) $ \hdl -> k hdl >>= \r -> liftIO (renameFile (f++".#~#") f) >> return r {-# INLINE withOutputFile #-} {-| Writes a 'ByteStream' to a file. Actually writes to a temporary file and renames it on successful completion. The filename \"-\" causes it to write to stdout instead. -} writeFile :: (MonadIO m, MonadMask m) => FilePath -> ByteStream m r -> m r writeFile f str = withOutputFile f $ \hdl -> hPut hdl str {-# INLINE writeFile #-} -- | Outputs a 'ByteStream' to the specified 'Handle'. hPut :: MonadIO m => Handle -> ByteStream m r -> m r hPut h cs = dematerialize cs return (\x _ y -> liftIO (B.hPut h x) >> y) (>>= id) {-# INLINE hPut #-} -- -- --------------------------------------------------------------------- -- -- Internal utilities -- | 'findIndexOrEnd' is a variant of findIndex, that returns the length -- of the string if no element is found, rather than Nothing. findIndexOrEnd :: (Word8 -> Bool) -> Bytes -> Int findIndexOrEnd k (B.PS x s l) = unsafeDupablePerformIO $ withForeignPtr x $ \f -> go (f `plusPtr` s) 0 where go !ptr !n | n >= l = return l | otherwise = do w <- peek ptr if k w then return n else go (ptr `plusPtr` 1) (n+1) {-# INLINABLE findIndexOrEnd #-} {- Take a builder constructed otherwise and convert it to a genuine streaming bytestring. >>> Q.putStrLn $ Q.toByteStream $ stringUtf8 "哈斯克尔" <> stringUtf8 " " <> integerDec 98 哈斯克尔 98 shows its indistinguishable performance is indistinguishable from @toLazyByteStream@ -} toByteStream :: MonadIO m => Builder -> ByteStream m () toByteStream = toByteStreamWith (safeStrategy L.smallChunkSize L.defaultChunkSize) {-# INLINE toByteStream #-} {-| Take a builder and convert it to a genuine streaming bytestring, using a specific allocation strategy. -} toByteStreamWith :: MonadIO m => AllocationStrategy -> Builder -> ByteStream m () toByteStreamWith strategy builder0 = do cios <- liftIO (buildStepToCIOS strategy (runBuilder builder0)) let loop !o cios0 = case cios0 of Yield1 bs io -> Chunk bs o $ do cios1 <- liftIO io loop (o + fromIntegral (B.length bs)) cios1 Finished buf r -> trimmedChunkFromBuffer o buf (Empty r) trimmedChunkFromBuffer o buffer k | B.null bs = k | 2 * B.length bs < bufferSize buffer = Chunk (B.copy bs) o k | otherwise = Chunk bs o k where bs = byteStringFromBuffer buffer loop 0 cios {-# INLINABLE toByteStreamWith #-} {-# SPECIALIZE toByteStreamWith :: AllocationStrategy -> Builder -> ByteStream IO () #-} {- Concatenate a stream of builders (not a streaming bytestring!) into a single builder. >>> let aa = yield (integerDec 10000) >> yield (string8 " is a number.") >> yield (char8 '\n') >>> hPutBuilder IO.stdout $ concatBuilders aa 10000 is a number. -} concatBuilders :: Stream (Of Builder) IO () -> Builder concatBuilders p = builder $ \bstep r -> do case p of Return _ -> runBuilderWith mempty bstep r Step (b :> rest) -> runBuilderWith (b `mappend` concatBuilders rest) bstep r Effect m -> m >>= \p' -> runBuilderWith (concatBuilders p') bstep r {-# INLINABLE concatBuilders #-} {- | Turns a ByteStream into a connected stream of ByteStreams that divide at newline characters. The resulting strings do not contain newlines. This is the genuinely streaming 'lines' which only breaks chunks, and thus never increases the use of memory. Because 'ByteStream's are usually read in binary mode, with no line ending conversion, this function recognizes both @\\n@ and @\\r\\n@ endings (regardless of the current platform). -} lines :: Monad m => ByteStream m r -> Stream (ByteStream m) m r lines text0 = loop1 text0 where loop1 :: Monad m => ByteStream m r -> Stream (ByteStream m) m r loop1 text = case text of Empty r -> Return r Go m -> Effect $ liftM loop1 m Chunk c _ cs | B.null c -> loop1 cs | otherwise -> Step (loop2 Nothing text) loop2 :: Monad m => Maybe Int64 -> ByteStream m r -> ByteStream m (Stream (ByteStream m) m r) loop2 prevCr text = case text of Empty r -> case prevCr of Just o -> Chunk (B.singleton 13) o (Empty (Return r)) Nothing -> Empty (Return r) Go m -> Go $ liftM (loop2 prevCr) m Chunk c o cs -> case B.elemIndex 10 c of Nothing -> case B.length c of 0 -> loop2 prevCr cs l -> if B.unsafeLast c == 13 then Chunk (B.unsafeInit c) o (loop2 (Just (o-1 + fromIntegral l)) cs) else Chunk c o (loop2 Nothing cs) Just i -> do let prefixLength = if i >= 1 && B.unsafeIndex c (i-1) == 13 -- \r\n (dos) then i-1 else i rest = if B.length c > i+1 then Chunk (B.drop (i+1) c) (o+1 + fromIntegral i) cs else cs result = Chunk (B.unsafeTake prefixLength c) o (Empty (loop1 rest)) case prevCr of Just oo | i > 0 -> Chunk (B.singleton 13) oo result _ -> result {-# INLINABLE lines #-} {- | Turns a 'ByteStream' into a stream of strict 'Bytes' that divide at newline characters. The resulting strings do not contain newlines. This will cost memory if the lines are very long, and it does not recognize DOS line endings. -} lines' :: Monad m => ByteStream m r -> Stream (Of Bytes) m r lines' = loop1 [] where loop1 :: Monad m => [Bytes] -> ByteStream m r -> Stream (Of Bytes) m r loop1 acc text = case text of Empty r -> Return r Go m -> Effect $ liftM (loop1 acc) m Chunk c o cs | B.null c -> loop1 acc cs | otherwise -> case B.elemIndex 10 c of Just i -> Q.cons (if null acc then B.take i c else B.concat (reverse (B.take i c : acc))) (loop1 [] (Chunk (B.drop (i+1) c) (o+1 + fromIntegral i) cs)) Nothing -> loop1 (c:acc) cs {-# INLINABLE lines' #-} -- -------------------------------------------------------------------------- {-| Decompresses GZip if present. If any GZip stream is found, all such streams are decompressed and any remaining data is discarded. Else, the input is returned unchanged. If the input is BGZF, the result will contain meaningful virtual offsets. If the input contains exactly one GZip stream, the result will have meaningfull offsets into the uncompressed data. Else, the offsets will be bogus. -} gunzip :: MonadIO m => ByteStream m r -> ByteStream m r gunzip = gunzipWith id {-# INLINABLE gunzip #-} {-| Checks if the input is GZip at all, and runs gunzip if it is. If it isn't, it runs 'k' on the input. -} gunzipWith :: MonadIO m => (ByteStream m r -> ByteStream m r) -> ByteStream m r -> ByteStream m r gunzipWith k s0 = lift (nextByteOff s0) >>= \case Right (31, o, s') -> lift (nextByte s') >>= \case Right (139,s'') -> gunzipLoop o $ Chunk (B.pack [31,139]) o s'' Right ( c, s'') -> k $ Chunk (B.pack [31,c]) o s'' Left r -> k $ Chunk (B.singleton 31) o (pure r) Right ( c, o, s') -> k $ Chunk (B.singleton c) o s' Left r -> k $ pure r {-# INLINABLE gunzipWith #-} {-| Decompresses a gzip stream. If the leftovers look like another gzip stream, it recurses (some files, notably those produced by bgzip, contain multiple streams). Otherwise, the leftovers are discarded (some compressed HETFA files appear to have junk at the end). -} gunzipLoop :: MonadIO m => Int64 -> ByteStream m r -> ByteStream m r gunzipLoop o = go o (shiftL o 16) $ Z.decompressIO Z.gzipOrZlibFormat Z.defaultDecompressParams where -- get next chunk, make sure it is empty iff the input ended go inoff outoff (Z.DecompressInputRequired next) inp = lift (nextChunk inp) >>= \case Left r -> do z <- liftIO (next B.empty) go inoff outoff z (pure r) Right (ck,inp') | B.null ck -> go inoff outoff (Z.DecompressInputRequired next) inp' | otherwise -> do z <- liftIO (next ck) go (inoff + fromIntegral (B.length ck)) outoff z inp' go inoff outoff (Z.DecompressOutputAvailable outchunk next) inp = do z <- Chunk outchunk outoff (liftIO next) go inoff (outoff + fromIntegral (B.length outchunk)) z inp go inoff _outoff (Z.DecompressStreamEnd inchunk) inp = -- decompress leftovers if possible, else return gunzipWith (lift . effects) (Chunk inchunk (inoff - fromIntegral (B.length inchunk)) inp) go _inoff _outoff (Z.DecompressStreamError derr) _inp = liftIO $ throwIO derr -- | Compresses a byte stream using GZip with default parameters. gzip :: MonadIO m => ByteStream m r -> ByteStream m r gzip = go $ Z.compressIO Z.gzipFormat Z.defaultCompressParams where -- get next chunk, make sure it is empty iff the input ended go (Z.CompressInputRequired next) inp = lift (nextChunk inp) >>= \case Left r -> liftIO (next B.empty) >>= flip go (pure r) Right (ck,inp') | B.null ck -> go (Z.CompressInputRequired next) inp' | otherwise -> liftIO (next ck) >>= flip go inp' go (Z.CompressOutputAvailable outchunk next) inp = chunk outchunk >> liftIO next >>= flip go inp go Z.CompressStreamEnd inp = lift (effects inp)