{-# LANGUAGE BangPatterns #-} {-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE OverloadedStrings #-} -- | Stream operations on 'ByteString'. module System.IO.Streams.ByteString ( -- * Counting bytes countInput , countOutput -- * Treating strings as streams , fromByteString , fromLazyByteString -- * Input and output , readExactly , takeBytesWhile , writeLazyByteString -- * Stream transformers -- ** Splitting/Joining , splitOn , lines , unlines , words , unwords -- ** Other , giveBytes , giveExactly , takeBytes , throwIfConsumesMoreThan , throwIfProducesMoreThan -- ** Rate limiting , throwIfTooSlow -- * String search , MatchInfo(..) , search -- * Exception types , RateTooSlowException , ReadTooShortException , TooManyBytesReadException , TooManyBytesWrittenException , TooFewBytesWrittenException ) where ------------------------------------------------------------------------------ import Control.Exception (Exception, throwIO) import Control.Monad (when, (>=>)) import Data.ByteString (ByteString) import qualified Data.ByteString.Char8 as S import qualified Data.ByteString.Lazy.Char8 as L import qualified Data.ByteString.Unsafe as S import Data.Char (isSpace) import Data.Int (Int64) import Data.IORef (IORef, newIORef, readIORef, writeIORef) import Data.Time.Clock.POSIX (getPOSIXTime) import Data.Typeable (Typeable) import Prelude hiding (lines, read, takeWhile, unlines, unwords, words) ------------------------------------------------------------------------------ import System.IO.Streams.Combinators (filterM, intersperse, outputFoldM) import System.IO.Streams.Internal (InputStream, OutputStream, SP (..), Sink (..), Source (..), makeInputStream, makeOutputStream, nullSink, pushback, read, sinkToStream, sourceToStream, unRead, write) import System.IO.Streams.Internal.Search (MatchInfo (..), search) import System.IO.Streams.List (fromList, writeList) ------------------------------------------------------------------------------ {-# INLINE modifyRef #-} modifyRef :: IORef a -> (a -> a) -> IO () modifyRef ref f = do x <- readIORef ref writeIORef ref $! f x ------------------------------------------------------------------------------ -- | Writes a lazy 'ByteString' to an 'OutputStream'. -- -- Example: -- -- @ -- ghci> Streams.'writeLazyByteString' \"Test\\n\" Streams.'System.IO.Streams.stdout' -- Test -- @ writeLazyByteString :: L.ByteString -- ^ string to write to output -> OutputStream ByteString -- ^ output stream -> IO () writeLazyByteString = writeList . L.toChunks {-# INLINE writeLazyByteString #-} ------------------------------------------------------------------------------ -- | Creates an 'InputStream' from a 'ByteString'. fromByteString :: ByteString -> IO (InputStream ByteString) fromByteString = fromList . (:[]) ------------------------------------------------------------------------------ -- | Creates an 'InputStream' from a lazy 'ByteString'. fromLazyByteString :: L.ByteString -> IO (InputStream ByteString) fromLazyByteString = fromList . L.toChunks ------------------------------------------------------------------------------ -- | Wraps an 'InputStream', counting the number of bytes produced by the -- stream as a side effect. Produces a new 'InputStream' as well as an IO -- action to retrieve the count of bytes produced. -- -- Strings pushed back to the returned 'InputStream' will be pushed back to the -- original stream, and the count of produced bytes will be subtracted -- accordingly. -- -- Example: -- -- @ -- ghci> is <- Streams.'System.IO.Streams.fromList' [\"abc\", \"def\", \"ghi\"::ByteString] -- ghci> (is', getCount) <- Streams.'countInput' is -- ghci> Streams.'read' is' -- Just \"abc\" -- ghci> getCount -- 3 -- ghci> Streams.'unRead' \"bc\" is' -- ghci> getCount -- 1 -- ghci> Streams.'System.IO.Streams.peek' is -- Just \"bc\" -- ghci> Streams.'System.IO.Streams.toList' is' -- [\"bc\",\"def\",\"ghi\"] -- ghci> getCount -- 9 -- @ -- countInput :: InputStream ByteString -> IO (InputStream ByteString, IO Int64) countInput src = do ref <- newIORef 0 stream <- sourceToStream $ source ref return $! (stream, readIORef ref) where eof !ref = return $! SP (eofSrc ref) Nothing eofSrc !ref = Source { produce = eof ref , pushback = pb ref } pb !ref !s = do unRead s src modifyRef ref $ \x -> x - (toEnum $ S.length s) return $! source ref source ref = Source { produce = read src >>= maybe (eof ref) chunk , pushback = pb ref } where chunk s = let !l = toEnum $ S.length s in do modifyRef ref (+ l) return $! SP (source ref) (Just s) ------------------------------------------------------------------------------ -- | Wraps an 'OutputStream', counting the number of bytes consumed by the -- stream as a side effect. Produces a new 'OutputStream' as well as an IO -- action to retrieve the count of bytes consumed. -- -- Example: -- -- @ -- ghci> (os :: OutputStream ByteString, getList) <- Streams.'System.IO.Streams.listOutputStream' -- ghci> (os', getCount) <- Streams.'countOutput' os -- ghci> Streams.'System.IO.Streams.fromList' [\"abc\", \"def\", \"ghi\"] >>= Streams.'System.IO.Streams.connectTo' os' -- ghci> getList -- [\"abc\",\"def\",\"ghi\"] -- ghci> getCount -- 9 -- @ countOutput :: OutputStream ByteString -> IO (OutputStream ByteString, IO Int64) countOutput = outputFoldM f 0 where f !count s = return z where !c = S.length s !z = toEnum c + count ------------------------------------------------------------------------------ -- | Wraps an 'InputStream', producing a new 'InputStream' that will produce at -- most @n@ bytes, subsequently yielding end-of-stream forever. -- -- Strings pushed back to the returned 'InputStream' will be propagated -- upstream, modifying the count of taken bytes accordingly. -- -- Example: -- -- @ -- ghci> is <- Streams.'System.IO.Streams.fromList' [\"truncated\", \" string\"::ByteString] -- ghci> is' <- Streams.'takeBytes' 9 is -- ghci> Streams.'read' is' -- Just \"truncated\" -- ghci> Streams.'read' is' -- Nothing -- ghci> Streams.'System.IO.Streams.peek' is -- Just \" string\" -- ghci> Streams.'unRead' \"cated\" is' -- ghci> Streams.'System.IO.Streams.peek' is -- Just \"cated\" -- ghci> Streams.'System.IO.Streams.peek' is' -- Just \"cated\" -- ghci> Streams.'read' is' -- Just \"cated\" -- ghci> Streams.'read' is' -- Nothing -- ghci> Streams.'read' is -- Just \" string\" -- @ takeBytes :: Int64 -- ^ maximum number of bytes to read -> InputStream ByteString -- ^ input stream to wrap -> IO (InputStream ByteString) takeBytes k0 src = sourceToStream $ source k0 where eof !n = return $! SP (eofSrc n) Nothing eofSrc !n = Source (eof n) (pb n) pb !n s = do unRead s src return $! source $! n + toEnum (S.length s) source !k = Source grab (pb k) where grab = if k <= 0 then eof k else read src >>= maybe (eof k) chunk chunk s = let l = toEnum $ S.length s k' = k - l in if k' <= 0 then let (a,b) = S.splitAt (fromEnum k) s in do when (not $ S.null b) $ unRead b src return $! SP (eofSrc 0) (Just a) else return $! SP (source k') (Just s) ------------------------------------------------------------------------------ -- | Splits an 'InputStream' over 'ByteString's using a delimiter predicate. -- -- Note that: -- -- * data pushed back with 'unRead' is *not* propagated upstream here. -- -- * the resulting 'InputStream' may hold an unbounded amount of the -- bytestring in memory waiting for the function to return true, so this -- function should not be used in unsafe contexts. -- -- * the delimiter is NOT included in the output. -- -- * consecutive delimiters are not merged. -- -- Example: -- -- @ -- ghci> Streams.'System.IO.Streams.fromList' [\"the quick br\", \"own fox\"::'ByteString'] >>= -- Streams.'splitOn' (== \' \') >>= Streams.'System.IO.Streams.toList' -- [\"the\",\"quick\",\"brown\",\"\",\"fox\"] -- @ -- splitOn :: (Char -> Bool) -- ^ predicate used to break the input -- stream into chunks -> InputStream ByteString -- ^ input stream -> IO (InputStream ByteString) splitOn p is = do ref <- newIORef id makeInputStream $ start ref where start ref = go where go = read is >>= maybe end chunk end = do dl <- readIORef ref case dl [] of [] -> return Nothing xs -> writeIORef ref id >> (return $! Just $! S.concat xs) chunk s = let (a, b) = S.break p s in if S.null b then modifyRef ref (\f -> f . (a:)) >> go else do let !b' = S.unsafeDrop 1 b dl <- readIORef ref if S.null b' then do writeIORef ref ("" :) return $ Just $! S.concat $ dl [a] else do writeIORef ref id unRead b' is return $ Just $! S.concat $ dl [a] ------------------------------------------------------------------------------ -- | Splits a bytestring 'InputStream' into lines. See 'splitOn' and -- 'Prelude.lines'. -- -- Example: -- -- @ -- ghci> is \<- Streams.'System.IO.Streams.fromList' [\"Hello,\\n world!\"] >>= Streams.'lines' -- ghci> replicateM 3 (Streams.'read' is) -- [Just \"Hello\", Just \", world!\", Nothing] -- @ -- -- Note that this may increase the chunk size if the input contains extremely -- long lines. lines :: InputStream ByteString -> IO (InputStream ByteString) lines = splitOn (== '\n') ------------------------------------------------------------------------------ -- | Splits a bytestring 'InputStream' into words. See 'splitOn' and -- 'Prelude.words'. -- -- Example: -- -- @ -- ghci> is \<- Streams.'System.IO.Streams.fromList' [\"Hello, world!\"] >>= Streams.'words' -- ghci> replicateM 3 (Streams.'read' is) -- [Just \"Hello,\", Just \"world!\", Nothing] -- @ -- -- Note that this may increase the chunk size if the input contains extremely -- long words. words :: InputStream ByteString -> IO (InputStream ByteString) words = splitOn isSpace >=> filterM (return . not . S.all isSpace) ------------------------------------------------------------------------------ -- | Intersperses string chunks sent to the given 'OutputStream' with newlines. -- See 'intersperse' and 'Prelude.unlines'. -- -- @ -- ghci> os <- Streams.'unlines' Streams.'System.IO.Streams.stdout' -- ghci> Streams.'write' (Just \"Hello,\") os -- Hello -- ghci> Streams.'write' Nothing os -- ghci> Streams.'write' (Just \"world!\") os -- world! -- @ unlines :: OutputStream ByteString -> IO (OutputStream ByteString) unlines os = makeOutputStream $ \m -> do write m os case m of Nothing -> return $! () Just _ -> write (Just "\n") os ------------------------------------------------------------------------------ -- | Intersperses string chunks sent to the given 'OutputStream' with spaces. -- See 'intersperse' and 'Prelude.unwords'. -- -- @ -- ghci> os <- Streams.'unwords' Streams.'System.IO.Streams.stdout' -- ghci> forM_ [Just \"Hello,\", Nothing, Just \"world!\\n\"] $ \w -> Streams.'write' w os -- Hello, world! -- @ unwords :: OutputStream ByteString -> IO (OutputStream ByteString) unwords = intersperse " " ------------------------------------------------------------------------------ -- | Thrown by 'throwIfProducesMoreThan' when too many bytes were read from the -- original 'InputStream'. data TooManyBytesReadException = TooManyBytesReadException deriving (Typeable) instance Show TooManyBytesReadException where show TooManyBytesReadException = "Too many bytes read" instance Exception TooManyBytesReadException ------------------------------------------------------------------------------ -- | Thrown by 'giveExactly' when too few bytes were written to the produced -- 'OutputStream'. data TooFewBytesWrittenException = TooFewBytesWrittenException deriving (Typeable) instance Show TooFewBytesWrittenException where show TooFewBytesWrittenException = "Too few bytes written" instance Exception TooFewBytesWrittenException ------------------------------------------------------------------------------ -- | Thrown by 'throwIfConsumesMoreThan' when too many bytes were sent to the -- produced 'OutputStream'. data TooManyBytesWrittenException = TooManyBytesWrittenException deriving (Typeable) instance Show TooManyBytesWrittenException where show TooManyBytesWrittenException = "Too many bytes written" instance Exception TooManyBytesWrittenException ------------------------------------------------------------------------------ -- | Thrown by 'readExactly' when not enough bytes were available on the input. data ReadTooShortException = ReadTooShortException Int deriving (Typeable) instance Show ReadTooShortException where show (ReadTooShortException x) = "Short read, expected " ++ show x ++ " bytes" instance Exception ReadTooShortException ------------------------------------------------------------------------------ -- | Wraps an 'InputStream'. If more than @n@ bytes are produced by this -- stream, 'read' will throw a 'TooManyBytesReadException'. -- -- If a chunk yielded by the input stream would result in more than @n@ bytes -- being produced, 'throwIfProducesMoreThan' will cut the generated string such -- that exactly @n@ bytes are yielded by the returned stream, and the -- /subsequent/ read will throw an exception. Example: -- -- @ -- ghci> is \<- Streams.'System.IO.Streams.fromList' [\"abc\", \"def\", \"ghi\"] >>= -- Streams.'throwIfProducesMoreThan' 5 -- ghci> 'Control.Monad.replicateM' 2 ('read' is) -- [Just \"abc\",Just \"de\"] -- ghci> Streams.'read' is -- *** Exception: Too many bytes read -- @ -- -- Strings pushed back to the returned 'InputStream' will be propagated -- upstream, modifying the count of taken bytes accordingly. Example: -- -- @ -- ghci> is <- Streams.'System.IO.Streams.fromList' [\"abc\", \"def\", \"ghi\"] -- ghci> is' <- Streams.'throwIfProducesMoreThan' 5 is -- ghci> Streams.'read' is' -- Just \"abc\" -- ghci> Streams.'unRead' \"xyz\" is' -- ghci> Streams.'System.IO.Streams.peek' is -- Just \"xyz\" -- ghci> Streams.'read' is -- Just \"xyz\" -- ghci> Streams.'read' is -- Just \"de\" -- ghci> Streams.'read' is -- *** Exception: Too many bytes read -- @ -- throwIfProducesMoreThan :: Int64 -- ^ maximum number of bytes to read -> InputStream ByteString -- ^ input stream -> IO (InputStream ByteString) throwIfProducesMoreThan k0 src = sourceToStream $ source k0 where eofSrc n = Source { produce = eof n , pushback = pb n } eof n = return $! SP (eofSrc n) Nothing pb n s = do unRead s src return $! source $! n + toEnum (S.length s) source !k = Source prod (pb k) where prod = read src >>= maybe (eof k) chunk chunk s | l == 0 = return $! SP (source k) (Just s) | k == 0 = throwIO TooManyBytesReadException | k' >= 0 = return $! SP (source k') (Just s) | otherwise = do unRead b src return $! SP (source 0) (Just a) where l = toEnum $ S.length s k' = k - l (a,b) = S.splitAt (fromEnum k) s ------------------------------------------------------------------------------ -- | Reads an @n@-byte ByteString from an input stream. Throws a -- 'ReadTooShortException' if fewer than @n@ bytes were available. -- -- Example: -- -- @ -- ghci> Streams.'System.IO.Streams.fromList' [\"long string\"] >>= Streams.'readExactly' 6 -- \"long s\" -- ghci> Streams.'System.IO.Streams.fromList' [\"short\"] >>= Streams.'readExactly' 6 -- *** Exception: Short read, expected 6 bytes -- @ -- readExactly :: Int -- ^ number of bytes to read -> InputStream ByteString -- ^ input stream -> IO ByteString readExactly n input = go id n where go !dl 0 = return $! S.concat $! dl [] go !dl k = read input >>= maybe (throwIO $ ReadTooShortException n) (\s -> do let l = S.length s if l >= k then do let (a,b) = S.splitAt k s when (not $ S.null b) $ unRead b input return $! S.concat $! dl [a] else go (dl . (s:)) (k - l)) ------------------------------------------------------------------------------ -- | Takes from a stream until the given predicate is no longer satisfied. -- Returns Nothing on end-of-stream, or @Just \"\"@ if the predicate is never -- satisfied. See 'Prelude.takeWhile' and 'Data.ByteString.Char8.takeWhile'. -- -- Example: -- -- @ -- ghci> Streams.'System.IO.Streams.fromList' [\"Hello, world!\"] >>= Streams.'takeBytesWhile' (/= ',') -- Just \"Hello\" -- ghci> import Data.Char -- ghci> Streams.'System.IO.Streams.fromList' [\"7 Samurai\"] >>= Streams.'takeBytesWhile' isAlpha -- Just \"\" -- ghci> Streams.'System.IO.Streams.fromList' [] >>= Streams.'takeBytesWhile' isAlpha -- Nothing -- @ takeBytesWhile :: (Char -> Bool) -- ^ predicate -> InputStream ByteString -- ^ input stream -> IO (Maybe ByteString) takeBytesWhile p input = read input >>= maybe (return Nothing) (go id) where go dl !s | S.null b = read input >>= maybe finish (go dl') | otherwise = unRead b input >> finish where (a, b) = S.span p s dl' = dl . (a:) finish = return $! Just $! S.concat $! dl [a] ------------------------------------------------------------------------------ -- | Wraps an 'OutputStream', producing a new stream that will pass along at -- most @n@ bytes to the wrapped stream, throwing any subsequent input away. -- -- Example: -- -- @ -- ghci> (os :: OutputStream ByteString, getList) <- Streams.'System.IO.Streams.listOutputStream' -- ghci> os' <- Streams.'giveBytes' 6 os -- ghci> Streams.'System.IO.Streams.fromList' [\"long \", \"string\"] >>= Streams.'System.IO.Streams.connectTo' os' -- ghci> getList -- [\"long \",\"s\"] -- @ giveBytes :: Int64 -- ^ maximum number of bytes to send -- to the wrapped stream -> OutputStream ByteString -- ^ output stream to wrap -> IO (OutputStream ByteString) giveBytes k0 str = sinkToStream $ sink k0 where sink !k = Sink g where g Nothing = write Nothing str >> return nullSink g mb@(Just x) = let l = toEnum $ S.length x k' = k - l in if k' < 0 then do let a = S.take (fromEnum k) x when (not $ S.null a) $ write (Just a) str return nullStr else write mb str >> return (sink k') nullStr = Sink h h Nothing = write Nothing str >> return nullSink h _ = return nullSink ------------------------------------------------------------------------------ -- | Wraps an 'OutputStream', producing a new stream that will pass along -- exactly @n@ bytes to the wrapped stream. If the stream is sent more or fewer -- than the given number of bytes, the resulting stream will throw an exception -- (either 'TooFewBytesWrittenException' or 'TooManyBytesWrittenException') -- during a call to 'write'. -- -- Example: -- -- @ -- ghci> is <- Streams.'System.IO.Streams.fromList' [\"ok\"] -- ghci> Streams.'System.IO.Streams.outputToList' (Streams.'giveExactly' 2 >=> Streams.'System.IO.Streams.connect' is) -- [\"ok\"] -- ghci> is <- Streams.'System.IO.Streams.fromList' [\"ok\"] -- ghci> Streams.'System.IO.Streams.outputToList' (Streams.'giveExactly' 1 >=> Streams.'System.IO.Streams.connect' is) -- *** Exception: Too many bytes written -- ghci> is <- Streams.'System.IO.Streams.fromList' [\"ok\"] -- ghci> Streams.'System.IO.Streams.outputToList' (Streams.'giveExactly' 3 >=> Streams.'System.IO.Streams.connect' is) -- *** Exception: Too few bytes written -- @ giveExactly :: Int64 -> OutputStream ByteString -> IO (OutputStream ByteString) giveExactly k0 os = do ref <- newIORef k0 makeOutputStream $ go ref where go ref chunk = do !n <- readIORef ref case chunk of Nothing -> if n /= 0 then throwIO TooFewBytesWrittenException else return $! () Just s -> let n' = n - fromIntegral (S.length s) in if n' < 0 then throwIO TooManyBytesWrittenException else do writeIORef ref n' write chunk os ------------------------------------------------------------------------------ -- | Wraps an 'OutputStream', producing a new stream that will pass along at -- most @n@ bytes to the wrapped stream. If more than @n@ bytes are sent to the -- outer stream, a 'TooManyBytesWrittenException' will be thrown. -- -- /Note/: if more than @n@ bytes are sent to the outer stream, -- 'throwIfConsumesMoreThan' will not necessarily send the first @n@ bytes -- through to the wrapped stream before throwing the exception. -- -- Example: -- -- @ -- ghci> (os :: OutputStream ByteString, getList) <- Streams.'System.IO.Streams.listOutputStream' -- ghci> os' <- Streams.'throwIfConsumesMoreThan' 5 os -- ghci> Streams.'System.IO.Streams.fromList' [\"short\"] >>= Streams.'System.IO.Streams.connectTo' os' -- ghci> getList -- [\"short\"] -- ghci> os'' <- Streams.'throwIfConsumesMoreThan' 5 os -- ghci> Streams.'System.IO.Streams.fromList' [\"long\", \"string\"] >>= Streams.'System.IO.Streams.connectTo' os'' -- *** Exception: Too many bytes written -- @ throwIfConsumesMoreThan :: Int64 -- ^ maximum number of bytes to send to the -- wrapped stream -> OutputStream ByteString -- ^ output stream to wrap -> IO (OutputStream ByteString) throwIfConsumesMoreThan k0 str = sinkToStream $ sink k0 where sink !k = Sink g where g Nothing = write Nothing str >> return nullSink g mb@(Just x) = let l = toEnum $ S.length x k' = k - l in if k' < 0 then throwIO TooManyBytesWrittenException else write mb str >> return (sink k') ------------------------------------------------------------------------------ -- | Gets the current posix time getTime :: IO Double getTime = realToFrac `fmap` getPOSIXTime ------------------------------------------------------------------------------ -- | Thrown by 'throwIfTooSlow' if input is not being produced fast enough by -- the given 'InputStream'. -- data RateTooSlowException = RateTooSlowException deriving (Typeable) instance Show RateTooSlowException where show RateTooSlowException = "Input rate too slow" instance Exception RateTooSlowException ------------------------------------------------------------------------------ -- | Rate-limits an input stream. If the input stream is not read from faster -- than the given rate, reading from the wrapped stream will throw a -- 'RateTooSlowException'. -- -- Strings pushed back to the returned 'InputStream' will be propagated up to -- the original stream. throwIfTooSlow :: IO () -- ^ action to bump timeout -> Double -- ^ minimum data rate, in bytes per second -> Int -- ^ amount of time in seconds to wait before -- data rate calculation takes effect -> InputStream ByteString -- ^ input stream -> IO (InputStream ByteString) throwIfTooSlow !bump !minRate !minSeconds' !stream = do !_ <- bump startTime <- getTime sourceToStream $ source startTime 0 where minSeconds = fromIntegral minSeconds' source !startTime = proc where eof !nb = return $! SP (eofSrc nb) Nothing eofSrc !nb = Source { produce = eof nb , pushback = pb nb } pb !nb s = do unRead s stream return $ proc $ nb - S.length s proc !nb = Source prod (pb nb) where prod = read stream >>= maybe (eof nb) (\s -> do let slen = S.length s now <- getTime let !delta = now - startTime let !newBytes = nb + slen when (delta > minSeconds + 1 && (fromIntegral newBytes / (delta - minSeconds)) < minRate) $ throwIO RateTooSlowException -- otherwise, bump the timeout and return the input !_ <- bump return $! SP (proc newBytes) (Just s))