{-# LANGUAGE Rank2Types, TypeFamilies #-}
module Bio.Streaming.Bytes (
ByteStream(..)
, empty
, singleton
, fromLazy
, fromChunks
, toLazy
, toStrict
, effects
, mwrap
, cons
, nextByte
, nextByteOff
, break
, drop
, dropWhile
, splitAt
, splitAt'
, trim
, lines
, lines'
, concat
, toByteStream
, toByteStreamWith
, concatBuilders
, withOutputFile
, writeFile
, hGetContents
, hGetContentsN
, hPut
, nextChunk
, nextChunkOff
, consChunk
, consChunkOff
, chunk
, copy
, mapChunksM_
, gzip
, gunzip
, gunzipWith
) where
import Bio.Prelude hiding (break,concat,drop,dropWhile,lines,splitAt,writeFile,empty,loop)
import Data.ByteString.Builder.Internal
(Builder,builder,runBuilder,runBuilderWith,bufferSize
,AllocationStrategy,ChunkIOStream(..),buildStepToCIOS
,byteStringFromBuffer,safeStrategy,defaultChunkSize)
import GHC.Exts (SpecConstrAnnotation(..))
import Streaming (MFunctor(..),Of(..),Identity(..),destroy)
import Streaming.Internal (Stream (..))
import System.Directory (renameFile)
import qualified Codec.Compression.Zlib.Internal as Z
import qualified Data.ByteString as B
import qualified Data.ByteString.Internal as B
import qualified Data.ByteString.Lazy.Internal as L (foldrChunks,ByteString(..),smallChunkSize,defaultChunkSize)
import qualified Data.ByteString.Unsafe as B
import qualified Streaming.Prelude as Q
data ByteStream m r
= Empty r
| Chunk {-# UNPACK #-} !Bytes {-# UNPACK #-} !Int64 (ByteStream m r)
| Go (m (ByteStream m r))
instance Monad m => Functor (ByteStream m) where
fmap f x = case x of
Empty a -> Empty (f a)
Chunk bs o bss -> Chunk bs o (fmap f bss)
Go mbss -> Go (liftM (fmap f) mbss)
instance Monad m => Applicative (ByteStream m) where
pure = Empty
{-# INLINE pure #-}
bf <*> bx = do {f <- bf; x <- bx; Empty (f x)}
{-# INLINE (<*>) #-}
(*>) = (>>)
{-# INLINE (*>) #-}
instance Monad m => Monad (ByteStream m) where
return = Empty
{-# INLINE return #-}
x0 >> y = loop SPEC x0 where
loop !_ x = case x of
Empty _ -> y
Chunk a o b -> Chunk a o (loop SPEC b)
Go m -> Go (liftM (loop SPEC) m)
{-# INLINEABLE (>>) #-}
x >>= f =
loop SPEC2 x where
loop !_ y = case y of
Empty a -> f a
Chunk bs o bss -> Chunk bs o (loop SPEC bss)
Go mbss -> Go (liftM (loop SPEC) mbss)
{-# INLINEABLE (>>=) #-}
instance MonadIO m => MonadIO (ByteStream m) where
liftIO io = Go (liftM Empty (liftIO io))
{-# INLINE liftIO #-}
instance MonadTrans ByteStream where
lift ma = Go $ liftM Empty ma
{-# INLINE lift #-}
instance MFunctor ByteStream where
hoist f = loop where
loop (Empty r) = Empty r
loop (Chunk c o s) = Chunk c o (loop s)
loop (Go m) = Go (f (liftM loop m))
{-# INLINEABLE hoist #-}
instance (r ~ ()) => IsString (ByteStream m r) where
fromString = chunk . fromString
{-# INLINE fromString #-}
instance (m ~ Identity, Show r) => Show (ByteStream m r) where
show bs0 = case bs0 of
Empty r -> "Empty (" ++ show r ++ ")"
Go (Identity bs') -> "Go (Identity (" ++ show bs' ++ "))"
Chunk bs'' o bs -> "Chunk " ++ show bs'' ++ " " ++ show o ++ " (" ++ show bs ++ ")"
instance (Semigroup r, Monad m) => Semigroup (ByteStream m r) where
(<>) = liftM2 (<>)
{-# INLINE (<>) #-}
instance (Semigroup r, Monoid r, Monad m) => Monoid (ByteStream m r) where
mempty = Empty mempty
{-# INLINE mempty #-}
mappend = (<>)
{-# INLINE mappend #-}
data SPEC = SPEC | SPEC2
{-# ANN type SPEC ForceSpecConstr #-}
consChunk :: Bytes -> ByteStream m r -> ByteStream m r
consChunk c = consChunkOff c 0
{-# INLINE consChunk #-}
consChunkOff :: Bytes -> Int64 -> ByteStream m r -> ByteStream m r
consChunkOff c@(B.PS _ _ len) off cs
| len == 0 = cs
| otherwise = Chunk c off cs
{-# INLINE consChunkOff #-}
chunk :: Bytes -> ByteStream m ()
chunk bs = consChunk bs empty
{-# INLINE chunk #-}
mwrap :: m (ByteStream m r) -> ByteStream m r
mwrap = Go
{-# INLINE mwrap #-}
materialize :: (forall x . (r -> x) -> (Bytes -> Int64 -> x -> x) -> (m x -> x) -> x)
-> ByteStream m r
materialize phi = phi Empty Chunk Go
{-# INLINE[0] materialize #-}
dematerialize :: Monad m
=> ByteStream m r
-> (forall x . (r -> x) -> (Bytes -> Int64 -> x -> x) -> (m x -> x) -> x)
dematerialize x0 nil con fin = loop SPEC x0
where
loop !_ x = case x of
Empty r -> nil r
Chunk b o bs -> con b o (loop SPEC bs )
Go ms -> fin (liftM (loop SPEC) ms)
{-# INLINE [1] dematerialize #-}
{-# RULES
"dematerialize/materialize" forall (phi :: forall b . (r -> b) -> (Bytes -> Int64 -> b -> b) -> (m b -> b) -> b) . dematerialize (materialize phi) = phi ;
#-}
copy :: Monad m => ByteStream m r -> ByteStream (ByteStream m) r
copy = loop where
loop str = case str of
Empty r -> Empty r
Go m -> Go (liftM loop (lift m))
Chunk bs o rest -> Chunk bs o (Go (Chunk bs o (Empty (loop rest))))
{-# INLINABLE copy #-}
concat :: Monad m => Stream (ByteStream m) m r -> ByteStream m r
concat x = destroy x join Go Empty
{-# INLINE concat #-}
effects :: Monad m => ByteStream m r -> m r
effects bs = case bs of
Empty r -> return r
Go m -> m >>= effects
Chunk _ _ rest -> effects rest
{-# INLINABLE effects #-}
empty :: ByteStream m ()
empty = Empty ()
{-# INLINE empty #-}
singleton :: Word8 -> ByteStream m ()
singleton w = Chunk (B.singleton w) 0 (Empty ())
{-# INLINE singleton #-}
toChunks :: Monad m => ByteStream m r -> Stream (Of Bytes) m r
toChunks bs = dematerialize bs return (\b _ mx -> Step (b :> mx)) Effect
{-# INLINE toChunks #-}
mapChunksM_ :: Monad m => (Bytes -> m ()) -> ByteStream m r -> m r
mapChunksM_ f bs = dematerialize bs return (\c _ k -> f c >> k) join
{-# INLINE mapChunksM_ #-}
fromChunks :: Monad m => Stream (Of Bytes) m r -> ByteStream m r
fromChunks bs = destroy bs
(\(b :> mx) !i -> Chunk b i (mx (i + fromIntegral (B.length b))))
(\k !i -> Go (k >>= \f -> return (f i)))
(\r !_ -> return r) 0
{-# INLINE fromChunks #-}
toStrict :: Monad m => ByteStream m r -> m (Of Bytes r)
toStrict bs = do
(bss :> r) <- Q.toList (toChunks bs)
return $ (B.concat bss :> r)
{-# INLINE toStrict #-}
fromLazy :: LazyBytes -> ByteStream m ()
fromLazy = L.foldrChunks consChunk empty
{-# INLINE fromLazy #-}
toLazy :: Monad m => ByteStream m r -> m (Of LazyBytes r)
toLazy bs0 = dematerialize bs0
(\r -> return (L.Empty :> r))
(\b _ mx -> do
(bs :> x) <- mx
return $ L.Chunk b bs :> x
)
join
{-# INLINE toLazy #-}
cons :: Word8 -> ByteStream m r -> ByteStream m r
cons c cs = Chunk (B.singleton c) 0 cs
{-# INLINE cons #-}
nextByte :: Monad m => ByteStream m r -> m (Either r (Word8, ByteStream m r))
nextByte = liftM (either Left (\(a,_,b) -> Right (a,b))) . nextByteOff
{-# INLINE nextByte #-}
nextByteOff :: Monad m => ByteStream m r -> m (Either r (Word8, Int64, ByteStream m r))
nextByteOff (Empty r) = return (Left r)
nextByteOff (Chunk c o cs)
= if B.null c
then nextByteOff cs
else return $ Right (B.unsafeHead c, o
, if B.length c == 1
then cs
else Chunk (B.unsafeTail c) (o+1) cs)
nextByteOff (Go m) = m >>= nextByteOff
{-# INLINABLE nextByteOff #-}
nextChunk :: Monad m => ByteStream m r -> m (Either r (Bytes, ByteStream m r))
nextChunk = liftM (either Left (\(a,_,b) -> Right (a,b))) . nextChunkOff
{-# INLINE nextChunk #-}
nextChunkOff :: Monad m => ByteStream m r -> m (Either r (Bytes, Int64, ByteStream m r))
nextChunkOff (Empty r) = return (Left r)
nextChunkOff (Go m) = m >>= nextChunkOff
nextChunkOff (Chunk c o cs)
| B.null c = nextChunkOff cs
| otherwise = return (Right (c,o,cs))
{-# INLINABLE nextChunkOff #-}
drop :: Monad m => Int64 -> ByteStream m r -> ByteStream m r
drop i p | i <= 0 = p
drop i cs0 = drop' i cs0
where drop' 0 cs = cs
drop' _ (Empty r) = Empty r
drop' n (Chunk c o cs) =
if n < fromIntegral (B.length c)
then Chunk (B.drop (fromIntegral n) c) (o+n) cs
else drop' (n - fromIntegral (B.length c)) cs
drop' n (Go m) = Go (liftM (drop' n) m)
{-# INLINABLE drop #-}
splitAt :: Monad m => Int64 -> ByteStream m r -> ByteStream m (ByteStream m r)
splitAt i cs0 | i <= 0 = Empty cs0
splitAt i cs0 = go i cs0
where go 0 cs = Empty cs
go _ (Empty r) = Empty (Empty r)
go n (Chunk c o cs) =
if n < fromIntegral (B.length c)
then Chunk (B.take (fromIntegral n) c) o $
Empty (Chunk (B.drop (fromIntegral n) c) (o+n) cs)
else Chunk c o (go (n - fromIntegral (B.length c)) cs)
go n (Go m) = Go (liftM (go n) m)
{-# INLINABLE splitAt #-}
splitAt' :: Monad m => Int -> ByteStream m r -> m (Of Bytes (ByteStream m r))
splitAt' i cs0 | i <= 0 = return $! B.empty :> cs0
splitAt' i cs0 = go i [] cs0
where go 0 acc cs = return $! B.concat (reverse acc) :> cs
go _ acc (Empty r) = return $! B.concat (reverse acc) :> Empty r
go n acc (Chunk c o cs) =
if n < B.length c
then return $! B.concat (reverse (B.take n c : acc))
:> Chunk (B.drop n c) (o + fromIntegral n) cs
else go (n - B.length c) (c:acc) cs
go n acc (Go m) = m >>= go n acc
{-# INLINABLE splitAt' #-}
trim :: Monad m => Int64 -> ByteStream m () -> ByteStream m ()
trim eoff = go
where
go (Empty _) = Empty ()
go (Go m) = lift m >>= go
go (Chunk c o s) | o < eoff = Chunk c o (go s)
| otherwise = Empty ()
{-# INLINABLE trim #-}
dropWhile :: Monad m => (Word8 -> Bool) -> ByteStream m r -> ByteStream m r
dropWhile pr = drop' where
drop' bs = case bs of
Empty r -> Empty r
Go m -> Go (liftM drop' m)
Chunk c o cs -> case findIndexOrEnd (not.pr) c of
0 -> Chunk c o cs
n | n < B.length c -> Chunk (B.drop n c) (o + fromIntegral n) cs
| otherwise -> drop' cs
{-# INLINABLE dropWhile #-}
break :: Monad m => (Word8 -> Bool) -> ByteStream m r -> ByteStream m (ByteStream m r)
break f cs0 = break' cs0
where break' (Empty r) = Empty (Empty r)
break' (Chunk c o cs) =
case findIndexOrEnd f c of
0 -> Empty (Chunk c o cs)
n | n < B.length c -> Chunk (B.take n c) o $
Empty (Chunk (B.drop n c) (o + fromIntegral n) cs)
| otherwise -> Chunk c o (break' cs)
break' (Go m) = Go (liftM break' m)
{-# INLINABLE break #-}
hGetContentsN :: MonadIO m => Int -> Handle -> ByteStream m ()
hGetContentsN k h = loop 0
where
loop !o = do
c <- liftIO (B.hGetSome h k)
if B.null c
then Empty ()
else Chunk c o (loop (o + fromIntegral (B.length c)))
{-# INLINABLE hGetContentsN #-}
hGetContents :: MonadIO m => Handle -> ByteStream m ()
hGetContents = hGetContentsN defaultChunkSize
{-# INLINE hGetContents #-}
withOutputFile :: (MonadIO m, MonadMask m) => FilePath -> (Handle -> m a) -> m a
withOutputFile "-" k = k stdout
withOutputFile f k = bracket (liftIO $ openBinaryFile (f++".#~#") WriteMode) (liftIO . hClose) $ \hdl ->
k hdl >>= \r -> liftIO (renameFile (f++".#~#") f) >> return r
{-# INLINE withOutputFile #-}
writeFile :: (MonadIO m, MonadMask m) => FilePath -> ByteStream m r -> m r
writeFile f str = withOutputFile f $ \hdl -> hPut hdl str
{-# INLINE writeFile #-}
hPut :: MonadIO m => Handle -> ByteStream m r -> m r
hPut h cs = dematerialize cs return (\x _ y -> liftIO (B.hPut h x) >> y) (>>= id)
{-# INLINE hPut #-}
findIndexOrEnd :: (Word8 -> Bool) -> Bytes -> Int
findIndexOrEnd k (B.PS x s l) =
unsafeDupablePerformIO $
withForeignPtr x $ \f -> go (f `plusPtr` s) 0
where
go !ptr !n | n >= l = return l
| otherwise = do w <- peek ptr
if k w
then return n
else go (ptr `plusPtr` 1) (n+1)
{-# INLINABLE findIndexOrEnd #-}
toByteStream :: MonadIO m => Builder -> ByteStream m ()
toByteStream = toByteStreamWith (safeStrategy L.smallChunkSize L.defaultChunkSize)
{-# INLINE toByteStream #-}
toByteStreamWith :: MonadIO m => AllocationStrategy -> Builder -> ByteStream m ()
toByteStreamWith strategy builder0 = do
cios <- liftIO (buildStepToCIOS strategy (runBuilder builder0))
let loop !o cios0 = case cios0 of
Yield1 bs io -> Chunk bs o $ do
cios1 <- liftIO io
loop (o + fromIntegral (B.length bs)) cios1
Finished buf r -> trimmedChunkFromBuffer o buf (Empty r)
trimmedChunkFromBuffer o buffer k
| B.null bs = k
| 2 * B.length bs < bufferSize buffer = Chunk (B.copy bs) o k
| otherwise = Chunk bs o k
where
bs = byteStringFromBuffer buffer
loop 0 cios
{-# INLINABLE toByteStreamWith #-}
{-# SPECIALIZE toByteStreamWith :: AllocationStrategy -> Builder -> ByteStream IO () #-}
concatBuilders :: Stream (Of Builder) IO () -> Builder
concatBuilders p = builder $ \bstep r -> do
case p of
Return _ -> runBuilderWith mempty bstep r
Step (b :> rest) -> runBuilderWith (b `mappend` concatBuilders rest) bstep r
Effect m -> m >>= \p' -> runBuilderWith (concatBuilders p') bstep r
{-# INLINABLE concatBuilders #-}
lines :: Monad m => ByteStream m r -> Stream (ByteStream m) m r
lines text0 = loop1 text0
where
loop1 :: Monad m => ByteStream m r -> Stream (ByteStream m) m r
loop1 text =
case text of
Empty r -> Return r
Go m -> Effect $ liftM loop1 m
Chunk c _ cs
| B.null c -> loop1 cs
| otherwise -> Step (loop2 Nothing text)
loop2 :: Monad m => Maybe Int64 -> ByteStream m r -> ByteStream m (Stream (ByteStream m) m r)
loop2 prevCr text =
case text of
Empty r -> case prevCr of
Just o -> Chunk (B.singleton 13) o (Empty (Return r))
Nothing -> Empty (Return r)
Go m -> Go $ liftM (loop2 prevCr) m
Chunk c o cs ->
case B.elemIndex 10 c of
Nothing -> case B.length c of
0 -> loop2 prevCr cs
l -> if B.unsafeLast c == 13
then Chunk (B.unsafeInit c) o (loop2 (Just (o-1 + fromIntegral l)) cs)
else Chunk c o (loop2 Nothing cs)
Just i -> do
let prefixLength =
if i >= 1 && B.unsafeIndex c (i-1) == 13
then i-1
else i
rest =
if B.length c > i+1
then Chunk (B.drop (i+1) c) (o+1 + fromIntegral i) cs
else cs
result = Chunk (B.unsafeTake prefixLength c) o (Empty (loop1 rest))
case prevCr of
Just oo | i > 0 -> Chunk (B.singleton 13) oo result
_ -> result
{-# INLINABLE lines #-}
lines' :: Monad m => ByteStream m r -> Stream (Of Bytes) m r
lines' = loop1 []
where
loop1 :: Monad m => [Bytes] -> ByteStream m r -> Stream (Of Bytes) m r
loop1 acc text =
case text of
Empty r -> r <$ unless (null acc) (Q.yield (checkCR $ B.concat (reverse acc)))
Go m -> Effect $ liftM (loop1 acc) m
Chunk c o cs
| B.null c -> loop1 acc cs
| otherwise ->
case B.elemIndex 10 c of
Just i -> Q.cons (checkCR $ if null acc then B.take i c else B.concat (reverse (B.take i c : acc)))
(loop1 [] (Chunk (B.drop (i+1) c) (o+1 + fromIntegral i) cs))
Nothing -> loop1 (c:acc) cs
checkCR s
| B.null s = s
| B.last s == 13 = B.init s
| otherwise = s
{-# INLINABLE lines' #-}
gunzip :: MonadIO m => ByteStream m r -> ByteStream m r
gunzip = gunzipWith id
{-# INLINABLE gunzip #-}
gunzipWith :: MonadIO m => (ByteStream m r -> ByteStream m r)
-> ByteStream m r -> ByteStream m r
gunzipWith k s0 = lift (nextByteOff s0) >>= \case
Right (31, o, s') -> lift (nextByte s') >>= \case
Right (139,s'') -> gunzipLoop o $ Chunk (B.pack [31,139]) o s''
Right ( c, s'') -> k $ Chunk (B.pack [31,c]) o s''
Left r -> k $ Chunk (B.singleton 31) o (pure r)
Right ( c, o, s') -> k $ Chunk (B.singleton c) o s'
Left r -> k $ pure r
{-# INLINABLE gunzipWith #-}
gunzipLoop :: MonadIO m => Int64 -> ByteStream m r -> ByteStream m r
gunzipLoop o = go o (shiftL o 16) $ Z.decompressIO Z.gzipOrZlibFormat Z.defaultDecompressParams
where
go inoff outoff (Z.DecompressInputRequired next) inp =
lift (nextChunk inp) >>= \case
Left r -> do z <- liftIO (next B.empty)
go inoff outoff z (pure r)
Right (ck,inp')
| B.null ck -> go inoff outoff (Z.DecompressInputRequired next) inp'
| otherwise -> do z <- liftIO (next ck)
go (inoff + fromIntegral (B.length ck)) outoff z inp'
go inoff outoff (Z.DecompressOutputAvailable outchunk next) inp = do
z <- Chunk outchunk outoff (liftIO next)
go inoff (outoff + fromIntegral (B.length outchunk)) z inp
go inoff _outoff (Z.DecompressStreamEnd inchunk) inp =
gunzipWith (lift . effects) (Chunk inchunk (inoff - fromIntegral (B.length inchunk)) inp)
go _inoff _outoff (Z.DecompressStreamError derr) _inp =
liftIO $ throwIO derr
gzip :: MonadIO m => ByteStream m r -> ByteStream m r
gzip = go $ Z.compressIO Z.gzipFormat Z.defaultCompressParams
where
go (Z.CompressInputRequired next) inp =
lift (nextChunk inp) >>= \case
Left r -> liftIO (next B.empty) >>= flip go (pure r)
Right (ck,inp')
| B.null ck -> go (Z.CompressInputRequired next) inp'
| otherwise -> liftIO (next ck) >>= flip go inp'
go (Z.CompressOutputAvailable outchunk next) inp =
chunk outchunk >> liftIO next >>= flip go inp
go Z.CompressStreamEnd inp = lift (effects inp)