{-# LANGUAGE Rank2Types, TypeFamilies #-}

-- This library emulates Data.ByteStream.Lazy but includes a monadic element
-- and thus at certain points uses a `Stream`/`FreeT` type in place of lists.

-- |
-- Module      : ByteStream
-- Copyright   : (c) Don Stewart 2006
--               (c) Duncan Coutts 2006-2011
--               (c) Michael Thompson 2015
--               (c) Udo Stenzel 2018
-- License     : BSD-style
--
-- Maintainer  : u.stenzel@web.de
-- Stability   : experimental
-- Portability : portable
--
-- See the simple examples of use <https://gist.github.com/michaelt/6c6843e6dd8030e95d58 here>.
-- We begin with a slight modification of the documentation to "Data.ByteStream.Lazy":
--
-- A time and space-efficient implementation of effectful byte streams
-- using a stream of packed 'Word8' arrays, suitable for high performance
-- use, both in terms of large data quantities, or high speed
-- requirements. ByteStreams are encoded as streams of strict chunks
-- of bytes.
--
-- A key feature of ByteStreams is the means to manipulate large or
-- unbounded streams of data without requiring the entire sequence to be
-- resident in memory. To take advantage of this you have to write your
-- functions in a streaming style, e.g. classic pipeline composition. The
-- default I\/O chunk size is 32k, which should be good in most circumstances.
--
-- Some operations, such as 'concat', 'append', 'reverse' and 'cons', have
-- better complexity than their "Data.ByteStream" equivalents, due to
-- optimisations resulting from the list spine structure. For other
-- operations streaming, like lazy, ByteStreams are usually within a few percent of
-- strict ones.
--
-- This module is intended to be imported @qualified@, to avoid name
-- clashes with "Prelude" functions.  eg.
--
-- > import qualified Bio.Streaming.Bytes as B
--
-- Original GHC implementation by Bryan O\'Sullivan.
-- Rewritten to use 'Data.Array.Unboxed.UArray' by Simon Marlow.
-- Rewritten to support slices and use 'Foreign.ForeignPtr.ForeignPtr'
-- by David Roundy.
-- Rewritten again and extended by Don Stewart and Duncan Coutts.
-- Lazy variant by Duncan Coutts and Don Stewart.
-- Streaming variant by Michael Thompson, following the ideas of Gabriel Gonzales' pipes-bytestring
-- Adapted for use in biohazard by Udo Stenzel.
--
module Bio.Streaming.Bytes (
    -- * The @ByteStream@ type
    ByteStream(..)

    -- * Introducing and eliminating 'ByteStream's
    , empty            -- empty :: ByteStream m ()
    , singleton        -- singleton :: Monad m => Word8 -> ByteStream m ()
    , fromLazy         -- fromLazy :: Monad m => ByteStream -> ByteStream m ()
    , fromChunks       -- fromChunks :: Monad m => Stream (Of Bytes) m r -> ByteStream m r
    , toLazy           -- toLazy :: Monad m => ByteStream m () -> m ByteStream
    , toStrict         -- toStrict :: Monad m => ByteStream m () -> m ByteStream
    , effects
    , mwrap

    -- * Basic interface
    , cons             -- cons :: Monad m => Word8 -> ByteStream m r -> ByteStream m r
    , nextByte         -- nextByte :: Monad m => ByteStream m r -> m (Either r (Word8, ByteStream m r))
    , nextByteOff      -- nextByteOff :: Monad m => ByteStream m r -> m (Either r (Word8, Int64, ByteStream m r))

    -- * Substrings

    -- ** Breaking strings
    , break            -- break :: Monad m => (Word8 -> Bool) -> ByteStream m r -> ByteStream m (ByteStream m r)
    , drop             -- drop :: Monad m => GHC.Int.Int64 -> ByteStream m r -> ByteStream m r
    , dropWhile
    , splitAt          -- splitAt :: Monad m => GHC.Int.Int64 -> ByteStream m r -> ByteStream m (ByteStream m r)
    , splitAt'         -- splitAt' :: Monad m => Int -> ByteStream m r -> m (Of Bytes (ByteStream m r))
    , trim

    -- ** Breaking into many substrings
    , lines
    , lines'

    -- ** Special folds
    , concat          -- concat :: Monad m => Stream (ByteStream m) m r -> ByteStream m r

    -- * Builders
    , toByteStream
    , toByteStreamWith
    , concatBuilders

    -- * I\/O with 'ByteStream's

    -- ** Files
    , withOutputFile
    , writeFile        -- writeFile :: FilePath -> ByteStream IO r -> IO r

    -- ** I\/O with Handles
    , hGetContents     -- hGetContents :: Handle -> ByteStream IO ()
    , hGetContentsN    -- hGetContentsN :: Int -> Handle -> ByteStream IO ()
    , hPut             -- hPut :: Handle -> ByteStream IO r -> IO r

    -- * Simple chunkwise operations
    , nextChunk
    , nextChunkOff
    , consChunk             -- :: Bytes -> ByteStream m r -> ByteStream m r
    , consChunkOff          -- :: Bytes -> Int64 -> ByteStream m r -> ByteStream m r
    , chunk
    , copy
    , mapChunksM_

    -- * compression support
    , gzip
    , gunzip
    , gunzipWith

  ) where

import Bio.Prelude                      hiding (break,concat,drop,dropWhile,lines,splitAt,writeFile,empty,loop)
import Data.ByteString.Builder.Internal
        (Builder,builder,runBuilder,runBuilderWith,bufferSize
        ,AllocationStrategy,ChunkIOStream(..),buildStepToCIOS
        ,byteStringFromBuffer,safeStrategy,defaultChunkSize)
import GHC.Exts                                (SpecConstrAnnotation(..))
import Streaming                               (MFunctor(..),Of(..),Identity(..),destroy)
import Streaming.Internal                      (Stream (..))
import System.Directory                        (renameFile)

import qualified Codec.Compression.Zlib.Internal as Z
import qualified Data.ByteString                 as B
import qualified Data.ByteString.Internal        as B
import qualified Data.ByteString.Lazy.Internal   as L (foldrChunks,ByteString(..),smallChunkSize,defaultChunkSize)
import qualified Data.ByteString.Unsafe          as B
import qualified Streaming.Prelude               as Q

-- | A space-efficient representation of a succession of 'Word8' vectors, supporting many
-- efficient operations.
--
-- An effectful 'ByteStream' contains 8-bit bytes, or by using certain
-- operations can be interpreted as containing 8-bit characters.  It
-- also contains an offset, which will be needed to track the virtual
-- offsets in the BGZF decode.

data ByteStream m r
  = Empty r
  | Chunk {-# UNPACK #-} !Bytes {-# UNPACK #-} !Int64 (ByteStream m r)
  | Go (m (ByteStream m r))

instance Monad m => Functor (ByteStream m) where
  fmap f x = case x of
    Empty a        -> Empty (f a)
    Chunk bs o bss -> Chunk bs o (fmap f bss)
    Go mbss        -> Go (liftM (fmap f) mbss)

instance Monad m => Applicative (ByteStream m) where
  pure = Empty
  {-# INLINE pure #-}
  bf <*> bx = do {f <- bf; x <- bx; Empty (f x)}
  {-# INLINE (<*>) #-}
  (*>) = (>>)
  {-# INLINE (*>) #-}

instance Monad m => Monad (ByteStream m) where
  return = Empty
  {-# INLINE return #-}
  x0 >> y = loop SPEC x0 where
    loop !_ x = case x of   -- this seems to be insanely effective
      Empty _     -> y
      Chunk a o b -> Chunk a o (loop SPEC b)
      Go m        -> Go (liftM (loop SPEC) m)
  {-# INLINEABLE (>>) #-}
  x >>= f =
    loop SPEC2 x where -- unlike >> this SPEC seems pointless
      loop !_ y = case y of
        Empty a        -> f a
        Chunk bs o bss -> Chunk bs o (loop SPEC bss)
        Go mbss        -> Go (liftM (loop SPEC) mbss)
  {-# INLINEABLE (>>=) #-}

instance MonadIO m => MonadIO (ByteStream m) where
  liftIO io = Go (liftM Empty (liftIO io))
  {-# INLINE liftIO #-}

instance MonadTrans ByteStream where
  lift ma = Go $ liftM Empty ma
  {-# INLINE lift #-}

instance MFunctor ByteStream where
  hoist f = loop where
    loop (Empty     r) = Empty r
    loop (Chunk c o s) = Chunk c o (loop s)
    loop (Go        m) = Go (f (liftM loop m))
  {-# INLINEABLE hoist #-}

instance (r ~ ()) => IsString (ByteStream m r) where
  fromString = chunk . fromString
  {-# INLINE fromString #-}

instance (m ~ Identity, Show r) => Show (ByteStream m r) where
  show bs0 = case bs0 of  -- the implementation this instance deserves ...
    Empty r           -> "Empty (" ++ show r ++ ")"
    Go (Identity bs') -> "Go (Identity (" ++ show bs' ++ "))"
    Chunk bs'' o bs   -> "Chunk " ++ show bs'' ++ " " ++ show o ++ " (" ++ show bs ++ ")"

instance (Semigroup r, Monad m) => Semigroup (ByteStream m r) where
  (<>) = liftM2 (<>)
  {-# INLINE (<>) #-}

instance (Semigroup r, Monoid r, Monad m) => Monoid (ByteStream m r) where
  mempty = Empty mempty
  {-# INLINE mempty #-}
  mappend = (<>)
  {-# INLINE mappend #-}


data SPEC = SPEC | SPEC2
{-# ANN type SPEC ForceSpecConstr #-}

-- --------------------------------------------------------------------------

-- | Smart constructor for 'Chunk'.
consChunk :: Bytes -> ByteStream m r -> ByteStream m r
consChunk c = consChunkOff c 0
{-# INLINE consChunk #-}

consChunkOff :: Bytes -> Int64 -> ByteStream m r -> ByteStream m r
consChunkOff c@(B.PS _ _ len) off cs
  | len == 0  = cs
  | otherwise = Chunk c off cs
{-# INLINE consChunkOff #-}

-- | Yield-style smart constructor for 'Chunk'.
chunk :: Bytes -> ByteStream m ()
chunk bs = consChunk bs empty
{-# INLINE chunk #-}


{- | Reconceive an effect that results in an effectful bytestring as an effectful bytestring.
    Compare Streaming.mwrap. The closes equivalent of

>>> Streaming.wrap :: f (Stream f m r) -> Stream f m r

    is here  @consChunk@. @mwrap@ is the smart constructor for the internal @Go@ constructor.
-}
mwrap :: m (ByteStream m r) -> ByteStream m r
mwrap = Go
{-# INLINE mwrap #-}

-- | Construct a succession of chunks from its Church encoding (compare @GHC.Exts.build@)
materialize :: (forall x . (r -> x) -> (Bytes -> Int64 -> x -> x) -> (m x -> x) -> x)
            -> ByteStream m r
materialize phi = phi Empty Chunk Go
{-# INLINE[0] materialize #-}

-- | Resolve a succession of chunks into its Church encoding; this is
-- not a safe operation; it is equivalent to exposing the constructors
dematerialize :: Monad m
              => ByteStream m r
              -> (forall x . (r -> x) -> (Bytes -> Int64 -> x -> x) -> (m x -> x) -> x)
dematerialize x0 nil con fin = loop SPEC x0
  where
  loop !_ x = case x of
     Empty r      -> nil r
     Chunk b o bs -> con b o (loop SPEC bs )
     Go ms        -> fin (liftM (loop SPEC) ms)
{-# INLINE [1] dematerialize #-}

{-# RULES
  "dematerialize/materialize" forall (phi :: forall b . (r -> b) -> (Bytes -> Int64 -> b -> b) -> (m b -> b) -> b) . dematerialize (materialize phi) = phi ;
  #-}
------------------------------------------------------------------------

copy :: Monad m => ByteStream m r -> ByteStream (ByteStream m) r
copy = loop where
  loop str = case str of
    Empty r         -> Empty r
    Go m            -> Go (liftM loop (lift m))
    Chunk bs o rest -> Chunk bs o (Go (Chunk bs o (Empty (loop rest))))
{-# INLINABLE copy #-}

-- | /O(n)/ Concatenate a stream of byte streams.
concat :: Monad m => Stream (ByteStream m) m r -> ByteStream m r
concat x = destroy x join Go Empty
{-# INLINE concat #-}

-- | Perform the effects contained in an effectful bytestring, ignoring the bytes.
effects :: Monad m => ByteStream m r -> m r
effects bs = case bs of
  Empty r        -> return r
  Go m           -> m >>= effects
  Chunk _ _ rest -> effects rest
{-# INLINABLE effects #-}


-- -----------------------------------------------------------------------------
-- Introducing and eliminating 'ByteStream's

{-| /O(1)/ The empty 'ByteStream' -- i.e. @return ()@ Note that @ByteStream m w@ is
  generally a monoid for monoidal values of @w@, like @()@
-}
empty :: ByteStream m ()
empty = Empty ()
{-# INLINE empty #-}

{-| /O(1)/ Yield a 'Word8' as a minimal 'ByteStream'
-}
singleton :: Word8 -> ByteStream m ()
singleton w = Chunk (B.singleton w) 0 (Empty ())
{-# INLINE singleton #-}

-- | /O(c)/ Converts a byte stream into a stream of individual strict bytestrings.
--   This of course exposes the internal chunk structure.
toChunks :: Monad m => ByteStream m r -> Stream (Of Bytes) m r
toChunks bs = dematerialize bs return (\b _ mx -> Step (b :> mx)) Effect
{-# INLINE toChunks #-}

mapChunksM_ :: Monad m => (Bytes -> m ()) -> ByteStream m r -> m r
mapChunksM_ f bs = dematerialize bs return (\c _ k -> f c >> k) join
{-# INLINE mapChunksM_ #-}


-- | /O(c)/ Converts a stream of strict bytestrings into a byte stream.
fromChunks :: Monad m => Stream (Of Bytes) m r -> ByteStream m r
fromChunks bs = destroy bs
      (\(b :> mx) !i -> Chunk b i (mx (i + fromIntegral (B.length b))))
      (\k !i -> Go (k >>= \f -> return (f i)))
      (\r !_ -> return r) 0
{-# INLINE fromChunks #-}

{-| /O(n)/ Convert a monadic byte stream into a single strict 'ByteStream',
   retaining the return value of the original pair. This operation is
   for use with 'mapped'.

> mapped R.toStrict :: Monad m => Stream (ByteStream m) m r -> Stream (Of ByteStream) m r

   It is subject to all the objections one makes to Data.ByteStream.Lazy 'toStrict';
   all of these are devastating.
-}
toStrict :: Monad m => ByteStream m r -> m (Of Bytes r)
toStrict bs = do
  (bss :> r) <- Q.toList (toChunks bs)
  return $ (B.concat bss :> r)
{-# INLINE toStrict #-}

{-| /O(c)/ Transmute a pseudo-pure lazy bytestring to its representation
    as a monadic stream of chunks.

>>> Q.putStrLn $ Q.fromLazy "hi"
hi
>>>  Q.fromLazy "hi"
Chunk "hi" (Empty (()))  -- note: a 'show' instance works in the identity monad
>>>  Q.fromLazy $ BL.fromChunks ["here", "are", "some", "chunks"]
Chunk "here" (Chunk "are" (Chunk "some" (Chunk "chunks" (Empty (())))))

-}
fromLazy :: LazyBytes -> ByteStream m ()
fromLazy = L.foldrChunks consChunk empty
{-# INLINE fromLazy #-}

{-| /O(n)/ Convert an effectful byte stream into a single lazy 'ByteStream'
    with the same internal chunk structure, retaining the original
    return value.

    This is the canonical way of breaking streaming (@toStrict@ and the
    like are far more demonic). Essentially one is dividing the interleaved
    layers of effects and bytes into one immense layer of effects,
    followed by the memory of the succession of bytes.

    Because one preserves the return value, @toLazy@ is a suitable argument
    for 'Streaming.mapped'

>   B.mapped Q.toLazy :: Stream (ByteStream m) m r -> Stream (Of LazyBytes) m r

>>> Q.toLazy "hello"
"hello" :> ()
>>> B.toListM $ traverses Q.toLazy $ Q.lines "one\ntwo\nthree\nfour\nfive\n"
["one","two","three","four","five",""]  -- [LazyBytes]

-}
toLazy :: Monad m => ByteStream m r -> m (Of LazyBytes r)
toLazy bs0 = dematerialize bs0
                (\r -> return (L.Empty :> r))
                (\b _ mx -> do
                      (bs :> x) <- mx
                      return $ L.Chunk b bs :> x
                      )
                join
{-# INLINE toLazy #-}

-- | /O(1)/ 'cons' is analogous to '(:)' for lists.
cons :: Word8 -> ByteStream m r -> ByteStream m r
cons c cs = Chunk (B.singleton c) 0 cs
{-# INLINE cons #-}

-- | /O(1)/ Extract the head and tail of a 'ByteStream', or its return value
-- if it is empty. This is the \'natural\' uncons for an effectful byte stream.
nextByte :: Monad m => ByteStream m r -> m (Either r (Word8, ByteStream m r))
nextByte = liftM (either Left (\(a,_,b) -> Right (a,b))) . nextByteOff
{-# INLINE nextByte #-}

nextByteOff :: Monad m => ByteStream m r -> m (Either r (Word8, Int64, ByteStream m r))
nextByteOff (Empty r) = return (Left r)
nextByteOff (Chunk c o cs)
    = if B.null c
        then nextByteOff cs
        else return $ Right (B.unsafeHead c, o
                     , if B.length c == 1
                         then cs
                         else Chunk (B.unsafeTail c) (o+1) cs)
nextByteOff (Go m) = m >>= nextByteOff
{-# INLINABLE nextByteOff #-}

nextChunk :: Monad m => ByteStream m r -> m (Either r (Bytes, ByteStream m r))
nextChunk = liftM (either Left (\(a,_,b) -> Right (a,b))) . nextChunkOff
{-# INLINE nextChunk #-}

nextChunkOff :: Monad m => ByteStream m r -> m (Either r (Bytes, Int64, ByteStream m r))
nextChunkOff (Empty r)      = return (Left r)
nextChunkOff (Go m)         = m >>= nextChunkOff
nextChunkOff (Chunk c o cs)
    | B.null c              = nextChunkOff cs
    | otherwise             = return (Right (c,o,cs))
{-# INLINABLE nextChunkOff #-}

{-| /O(n\/c)/ 'drop' @n xs@ returns the suffix of @xs@ after the first @n@
    elements, or @[]@ if @n > 'length' xs@.

>>> Q.putStrLn $ Q.drop 6 "Wisconsin"
sin
>>> Q.putStrLn $ Q.drop 16 "Wisconsin"

>>>
-}
drop  :: Monad m => Int64 -> ByteStream m r -> ByteStream m r
drop i p | i <= 0 = p
drop i cs0 = drop' i cs0
  where drop' 0 cs           = cs
        drop' _ (Empty r)    = Empty r
        drop' n (Chunk c o cs) =
          if n < fromIntegral (B.length c)
            then Chunk (B.drop (fromIntegral n) c) (o+n) cs
            else drop' (n - fromIntegral (B.length c)) cs
        drop' n (Go m) = Go (liftM (drop' n) m)
{-# INLINABLE drop #-}


{-| /O(n\/c)/ 'splitAt' @n xs@ is equivalent to @('take' n xs, 'drop' n xs)@.

>>> rest <- Q.putStrLn $ Q.splitAt 3 "therapist is a danger to good hyphenation, as Knuth notes"
the
>>> Q.putStrLn $ Q.splitAt 19 rest
rapist is a danger

-}
splitAt :: Monad m => Int64 -> ByteStream m r -> ByteStream m (ByteStream m r)
splitAt i cs0 | i <= 0 = Empty cs0
splitAt i cs0 = go i cs0
  where go 0 cs             = Empty cs
        go _ (Empty r)      = Empty (Empty r)
        go n (Chunk c o cs) =
          if n < fromIntegral (B.length c)
            then Chunk (B.take (fromIntegral n) c) o $
                     Empty (Chunk (B.drop (fromIntegral n) c) (o+n) cs)
            else Chunk c o (go (n - fromIntegral (B.length c)) cs)
        go n (Go m) = Go (liftM (go n) m)
{-# INLINABLE splitAt #-}

-- | Strictly splits off a piece.  This breaks streaming, so reserve its
-- use for small strings or when conversion to strict 'Bytes' is needed
-- anyway.
splitAt' :: Monad m => Int -> ByteStream m r -> m (Of Bytes (ByteStream m r))
splitAt' i cs0 | i <= 0 = return $! B.empty :> cs0
splitAt' i cs0 = go i [] cs0
  where go 0 acc cs             = return $! B.concat (reverse acc) :> cs
        go _ acc (Empty r)      = return $! B.concat (reverse acc) :> Empty r
        go n acc (Chunk c o cs) =
          if n < B.length c
            then return $! B.concat (reverse (B.take n c : acc))
                        :> Chunk (B.drop n c) (o + fromIntegral n) cs
            else go (n - B.length c) (c:acc) cs
        go n acc (Go m) = m >>= go n acc
{-# INLINABLE splitAt' #-}

trim :: Monad m => Int64 -> ByteStream m () -> ByteStream m ()
trim eoff = go
  where
    go (Empty     _)             = Empty ()
    go (Go        m)             = lift m >>= go
    go (Chunk c o s) | o <  eoff = Chunk c o (go s)
                     | otherwise = Empty ()
{-# INLINABLE trim #-}

-- | 'dropWhile' @p xs@ returns the suffix remaining after 'takeWhile' @p xs@.
dropWhile :: Monad m => (Word8 -> Bool) -> ByteStream m r -> ByteStream m r
dropWhile pr = drop' where
  drop' bs = case bs of
    Empty r      -> Empty r
    Go m         -> Go (liftM drop' m)
    Chunk c o cs -> case findIndexOrEnd (not.pr) c of
        0                  -> Chunk c o cs
        n | n < B.length c -> Chunk (B.drop n c) (o + fromIntegral n) cs
          | otherwise      -> drop' cs
{-# INLINABLE dropWhile #-}

-- | 'break' @p@ is equivalent to @'span' ('not' . p)@.
break :: Monad m => (Word8 -> Bool) -> ByteStream m r -> ByteStream m (ByteStream m r)
break f cs0 = break' cs0
  where break' (Empty r)        = Empty (Empty r)
        break' (Chunk c o cs) =
          case findIndexOrEnd f c of
            0                  -> Empty (Chunk c o cs)
            n | n < B.length c -> Chunk (B.take n c) o $
                                      Empty (Chunk (B.drop n c) (o + fromIntegral n) cs)
              | otherwise      -> Chunk c o (break' cs)
        break' (Go m) = Go (liftM break' m)
{-# INLINABLE break #-}

{- | Read entire handle contents /lazily/ into a 'ByteStream'. Chunks
    are read on demand, in at most @k@-sized chunks. It does not block
    waiting for a whole @k@-sized chunk, so if less than @k@ bytes are
    available then they will be returned immediately as a smaller chunk.

    The handle is closed on EOF.

    Note: the 'Handle' should be placed in binary mode with
    'System.IO.hSetBinaryMode' for 'hGetContentsN' to
    work correctly.
-}
hGetContentsN :: MonadIO m => Int -> Handle -> ByteStream m ()
hGetContentsN k h = loop 0
  where
    loop !o = do
        c <- liftIO (B.hGetSome h k)
        -- only blocks if there is no data available
        if B.null c
          then Empty ()
          else Chunk c o (loop (o + fromIntegral (B.length c)))
{-# INLINABLE hGetContentsN #-} -- very effective inline pragma

{-| Read entire handle contents /lazily/ into a 'ByteStream'. Chunks
    are read on demand, using the default chunk size.

    Note: the 'Handle' should be placed in binary mode with
    'System.IO.hSetBinaryMode' for 'hGetContents' to
    work correctly.
-}
hGetContents :: MonadIO m => Handle -> ByteStream m ()
hGetContents = hGetContentsN defaultChunkSize
{-# INLINE hGetContents #-}


withOutputFile :: (MonadIO m, MonadMask m) => FilePath -> (Handle -> m a) -> m a
withOutputFile "-" k = k stdout
withOutputFile  f  k = bracket (liftIO $ openBinaryFile (f++".#~#") WriteMode) (liftIO . hClose) $ \hdl ->
                       k hdl >>= \r -> liftIO (renameFile (f++".#~#") f) >> return r
{-# INLINE withOutputFile #-}

{-| Writes a 'ByteStream' to a file.  Actually writes to a temporary
    file and renames it on successful completion.  The filename \"-\"
    causes it to write to stdout instead.
 -}
writeFile :: (MonadIO m, MonadMask m) => FilePath -> ByteStream m r -> m r
writeFile f str = withOutputFile f $ \hdl -> hPut hdl str
{-# INLINE writeFile #-}

-- | Outputs a 'ByteStream' to the specified 'Handle'.
hPut ::  MonadIO m => Handle -> ByteStream m r -> m r
hPut h cs = dematerialize cs return (\x _ y -> liftIO (B.hPut h x) >> y) (>>= id)
{-# INLINE hPut #-}

-- -- ---------------------------------------------------------------------
-- -- Internal utilities

-- | 'findIndexOrEnd' is a variant of findIndex, that returns the length
-- of the string if no element is found, rather than Nothing.
findIndexOrEnd :: (Word8 -> Bool) -> Bytes -> Int
findIndexOrEnd k (B.PS x s l) =
    unsafeDupablePerformIO $
    withForeignPtr x $ \f -> go (f `plusPtr` s) 0
  where
    go !ptr !n | n >= l    = return l
               | otherwise = do w <- peek ptr
                                if k w
                                  then return n
                                  else go (ptr `plusPtr` 1) (n+1)
{-# INLINABLE findIndexOrEnd #-}

{- Take a builder constructed otherwise and convert it to a genuine
   streaming bytestring.

>>>  Q.putStrLn $ Q.toByteStream $ stringUtf8 "哈斯克尔" <> stringUtf8 " " <> integerDec 98
哈斯克尔 98

    <https://gist.github.com/michaelt/6ea89ca95a77b0ef91f3 This benchmark> shows its
    indistinguishable performance is indistinguishable from @toLazyByteStream@
-}

toByteStream :: MonadIO m => Builder -> ByteStream m ()
toByteStream = toByteStreamWith (safeStrategy L.smallChunkSize L.defaultChunkSize)
{-# INLINE toByteStream #-}

{-| Take a builder and convert it to a genuine
   streaming bytestring, using a specific allocation strategy.
-}
toByteStreamWith :: MonadIO m => AllocationStrategy -> Builder -> ByteStream m ()
toByteStreamWith strategy builder0 = do
       cios <- liftIO (buildStepToCIOS strategy (runBuilder builder0))
       let loop !o cios0 = case cios0 of
              Yield1 bs io   -> Chunk bs o $ do
                    cios1 <- liftIO io
                    loop (o + fromIntegral (B.length bs)) cios1
              Finished buf r -> trimmedChunkFromBuffer o buf (Empty r)
           trimmedChunkFromBuffer o buffer k
              | B.null bs                            = k
              |  2 * B.length bs < bufferSize buffer = Chunk (B.copy bs) o k
              | otherwise                            = Chunk bs          o k
              where
                bs = byteStringFromBuffer buffer
       loop 0 cios
{-# INLINABLE toByteStreamWith #-}
{-# SPECIALIZE toByteStreamWith :: AllocationStrategy -> Builder -> ByteStream IO () #-}


{- Concatenate a stream of builders (not a streaming bytestring!) into a single builder.

>>> let aa = yield (integerDec 10000) >> yield (string8 " is a number.") >> yield (char8 '\n')
>>>  hPutBuilder  IO.stdout $ concatBuilders aa
10000 is a number.

-}
concatBuilders :: Stream (Of Builder) IO () -> Builder
concatBuilders p = builder $ \bstep r -> do
  case p of
    Return _          -> runBuilderWith mempty bstep r
    Step (b :> rest)  -> runBuilderWith (b `mappend` concatBuilders rest) bstep r
    Effect m            -> m >>= \p' -> runBuilderWith (concatBuilders p') bstep r
{-# INLINABLE concatBuilders #-}

{- | Turns a ByteStream into a connected stream of ByteStreams that
     divide at newline characters. The resulting strings do not contain
     newlines.  This is the genuinely streaming 'lines' which only
     breaks chunks, and thus never increases the use of memory.

     Because 'ByteStream's are usually read in binary mode, with no line
     ending conversion, this function recognizes both @\\n@ and @\\r\\n@
     endings (regardless of the current platform). -}

lines :: Monad m => ByteStream m r -> Stream (ByteStream m) m r
lines text0 = loop1 text0
  where
    loop1 :: Monad m => ByteStream m r -> Stream (ByteStream m) m r
    loop1 text =
      case text of
        Empty r -> Return r
        Go m -> Effect $ liftM loop1 m
        Chunk c _ cs
          | B.null c -> loop1 cs
          | otherwise -> Step (loop2 Nothing text)
    loop2 :: Monad m => Maybe Int64 -> ByteStream m r -> ByteStream m (Stream (ByteStream m) m r)
    loop2 prevCr text =
      case text of
        Empty r -> case prevCr of
          Just  o -> Chunk (B.singleton 13) o (Empty (Return r))
          Nothing -> Empty (Return r)
        Go m -> Go $ liftM (loop2 prevCr) m
        Chunk c o cs ->
          case B.elemIndex 10 c of
            Nothing -> case B.length c of
              0 -> loop2 prevCr cs
              l -> if B.unsafeLast c == 13
                     then Chunk (B.unsafeInit c) o (loop2 (Just (o-1 + fromIntegral l)) cs)
                     else Chunk c o (loop2 Nothing cs)
            Just i -> do
              let prefixLength =
                    if i >= 1 && B.unsafeIndex c (i-1) == 13 -- \r\n (dos)
                      then i-1
                      else i
                  rest =
                    if B.length c > i+1
                      then Chunk (B.drop (i+1) c) (o+1 + fromIntegral i) cs
                      else cs
                  result = Chunk (B.unsafeTake prefixLength c) o (Empty (loop1 rest))
              case prevCr of
                Just oo | i > 0 -> Chunk (B.singleton 13) oo result
                _               -> result
{-# INLINABLE lines #-}

{- | Turns a 'ByteStream' into a stream of strict 'Bytes' that divide at
     newline characters. The resulting strings do not contain newlines.
     This will cost memory if the lines are very long.  -}

lines' :: Monad m => ByteStream m r -> Stream (Of Bytes) m r
lines' = loop1 []
  where
    loop1 :: Monad m => [Bytes] -> ByteStream m r -> Stream (Of Bytes) m r
    loop1 acc text =
      case text of
        Empty r -> r <$ unless (null acc) (Q.yield (checkCR $ B.concat (reverse acc)))
        Go m    -> Effect $ liftM (loop1 acc) m
        Chunk c o cs
          | B.null c  -> loop1 acc cs
          | otherwise ->
              case B.elemIndex 10 c of
                Just  i -> Q.cons (checkCR $ if null acc then B.take i c else B.concat (reverse (B.take i c : acc)))
                                  (loop1 [] (Chunk (B.drop (i+1) c) (o+1 + fromIntegral i) cs))
                Nothing -> loop1 (c:acc) cs
    checkCR s
        | B.null s        =  s
        | B.last s == 13  =  B.init s
        | otherwise       =  s
{-# INLINABLE lines' #-}

-- --------------------------------------------------------------------------

{-| Decompresses GZip if present.  If any GZip stream is found, all
    such streams are decompressed and any remaining data is discarded.
    Else, the input is returned unchanged.  If the input is BGZF, the
    result will contain meaningful virtual offsets.  If the input
    contains exactly one GZip stream, the result will have meaningfull
    offsets into the uncompressed data.  Else, the offsets will be
    bogus. -}

gunzip :: MonadIO m => ByteStream m r -> ByteStream m r
gunzip = gunzipWith id
{-# INLINABLE gunzip #-}

{-| Checks if the input is GZip at all, and runs gunzip if it is.  If
    it isn't, it runs 'k' on the input. -}

gunzipWith :: MonadIO m => (ByteStream m r -> ByteStream m r)
                        -> ByteStream m r -> ByteStream m r
gunzipWith k s0 = lift (nextByteOff s0) >>= \case
    Right (31, o, s') -> lift (nextByte s') >>= \case
        Right (139,s'') -> gunzipLoop o $ Chunk (B.pack [31,139]) o s''
        Right ( c, s'') -> k $ Chunk (B.pack [31,c]) o s''
        Left     r      -> k $ Chunk (B.singleton 31) o (pure r)
    Right ( c, o, s')   -> k $ Chunk (B.singleton c) o s'
    Left       r        -> k $ pure r
{-# INLINABLE gunzipWith #-}

{-| Decompresses a gzip stream.  If the leftovers look like another
    gzip stream, it recurses (some files, notably those produced by
    bgzip, contain multiple streams).  Otherwise, the leftovers are
    discarded (some compressed HETFA files appear to have junk at the
    end). -}

gunzipLoop :: MonadIO m => Int64 -> ByteStream m r -> ByteStream m r
gunzipLoop o = go o (shiftL o 16) $ Z.decompressIO Z.gzipOrZlibFormat Z.defaultDecompressParams
  where
    -- get next chunk, make sure it is empty iff the input ended
    go inoff outoff (Z.DecompressInputRequired next) inp =
        lift (nextChunk inp) >>= \case
            Left r          -> do z <- liftIO (next B.empty)
                                  go inoff outoff z (pure r)
            Right (ck,inp')
                | B.null ck ->    go inoff outoff (Z.DecompressInputRequired next) inp'
                | otherwise -> do z <- liftIO (next ck)
                                  go (inoff + fromIntegral (B.length ck)) outoff z inp'

    go inoff outoff (Z.DecompressOutputAvailable outchunk next) inp = do
        z <- Chunk outchunk outoff (liftIO next)
        go inoff (outoff + fromIntegral (B.length outchunk)) z inp

    go inoff _outoff (Z.DecompressStreamEnd inchunk) inp =
        -- decompress leftovers if possible, else return
        gunzipWith (lift . effects) (Chunk inchunk (inoff - fromIntegral (B.length inchunk)) inp)

    go _inoff _outoff (Z.DecompressStreamError derr) _inp =
        liftIO $ throwIO derr

-- | Compresses a byte stream using GZip with default parameters.
gzip :: MonadIO m => ByteStream m r -> ByteStream m r
gzip = go $ Z.compressIO Z.gzipFormat Z.defaultCompressParams
  where
    -- get next chunk, make sure it is empty iff the input ended
    go (Z.CompressInputRequired next) inp =
        lift (nextChunk inp) >>= \case
            Left r          -> liftIO (next B.empty) >>= flip go (pure r)
            Right (ck,inp')
                | B.null ck -> go (Z.CompressInputRequired next) inp'
                | otherwise -> liftIO (next ck) >>= flip go inp'

    go (Z.CompressOutputAvailable outchunk next) inp =
        chunk outchunk >> liftIO next >>= flip go inp

    go Z.CompressStreamEnd inp = lift (effects inp)