{-# LANGUAGE BangPatterns       #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE OverloadedStrings  #-}

-- | Stream operations on 'ByteString'.
module System.IO.Streams.ByteString
 ( -- * Counting bytes
   countInput
 , countOutput

   -- * Treating strings as streams
 , fromByteString
 , fromLazyByteString

   -- * Input and output
 , readExactly
 , takeBytesWhile
 , writeLazyByteString

   -- * Stream transformers
   -- ** Splitting/Joining
 , splitOn
 , lines
 , unlines
 , words
 , unwords

   -- ** Other
 , giveBytes
 , giveExactly
 , takeBytes
 , takeExactly
 , throwIfConsumesMoreThan
 , throwIfProducesMoreThan

   -- ** Rate limiting
 , throwIfTooSlow

   -- * String search
 , MatchInfo(..)
 , search

   -- * Exception types
 , RateTooSlowException
 , ReadTooShortException
 , TooManyBytesReadException
 , TooManyBytesWrittenException
 , TooFewBytesWrittenException

 ) where

------------------------------------------------------------------------------
import           Control.Exception                 (Exception, throwIO)
import           Control.Monad                     (when, (>=>))
import           Data.ByteString                   (ByteString)
import qualified Data.ByteString.Char8             as S
import qualified Data.ByteString.Lazy.Char8        as L
import qualified Data.ByteString.Unsafe            as S
import           Data.Char                         (isSpace)
import           Data.Int                          (Int64)
import           Data.IORef                        (IORef, newIORef, readIORef, writeIORef)
import           Data.Time.Clock.POSIX             (getPOSIXTime)
import           Data.Typeable                     (Typeable)
import           Prelude                           hiding (lines, read, unlines, unwords, words)
------------------------------------------------------------------------------
import           System.IO.Streams.Combinators     (filterM, intersperse, outputFoldM)
import           System.IO.Streams.Internal        (InputStream (..), OutputStream, makeInputStream, makeOutputStream, read, unRead, write)
import           System.IO.Streams.Internal.Search (MatchInfo (..), search)
import           System.IO.Streams.List            (fromList, writeList)


------------------------------------------------------------------------------
{-# INLINE modifyRef #-}
modifyRef :: IORef a -> (a -> a) -> IO ()
modifyRef :: IORef a -> (a -> a) -> IO ()
modifyRef IORef a
ref a -> a
f = do
    a
x <- IORef a -> IO a
forall a. IORef a -> IO a
readIORef IORef a
ref
    IORef a -> a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef a
ref (a -> IO ()) -> a -> IO ()
forall a b. (a -> b) -> a -> b
$! a -> a
f a
x


------------------------------------------------------------------------------
-- | Writes a lazy 'ByteString' to an 'OutputStream'.
--
-- Example:
--
-- @
-- ghci> Streams.'writeLazyByteString' \"Test\\n\" Streams.'System.IO.Streams.stdout'
-- Test
-- @
writeLazyByteString :: L.ByteString             -- ^ string to write to output
                    -> OutputStream ByteString  -- ^ output stream
                    -> IO ()
writeLazyByteString :: ByteString -> OutputStream ByteString -> IO ()
writeLazyByteString = [ByteString] -> OutputStream ByteString -> IO ()
forall a. [a] -> OutputStream a -> IO ()
writeList ([ByteString] -> OutputStream ByteString -> IO ())
-> (ByteString -> [ByteString])
-> ByteString
-> OutputStream ByteString
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> [ByteString]
L.toChunks
{-# INLINE writeLazyByteString #-}


------------------------------------------------------------------------------
-- | Creates an 'InputStream' from a 'ByteString'.
fromByteString :: ByteString -> IO (InputStream ByteString)
fromByteString :: ByteString -> IO (InputStream ByteString)
fromByteString = [ByteString] -> IO (InputStream ByteString)
forall c. [c] -> IO (InputStream c)
fromList ([ByteString] -> IO (InputStream ByteString))
-> (ByteString -> [ByteString])
-> ByteString
-> IO (InputStream ByteString)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
:[])


------------------------------------------------------------------------------
-- | Creates an 'InputStream' from a lazy 'ByteString'.
fromLazyByteString :: L.ByteString -> IO (InputStream ByteString)
fromLazyByteString :: ByteString -> IO (InputStream ByteString)
fromLazyByteString = [ByteString] -> IO (InputStream ByteString)
forall c. [c] -> IO (InputStream c)
fromList ([ByteString] -> IO (InputStream ByteString))
-> (ByteString -> [ByteString])
-> ByteString
-> IO (InputStream ByteString)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> [ByteString]
L.toChunks


------------------------------------------------------------------------------
-- | Wraps an 'InputStream', counting the number of bytes produced by the
-- stream as a side effect. Produces a new 'InputStream' as well as an IO
-- action to retrieve the count of bytes produced.
--
-- Strings pushed back to the returned 'InputStream' will be pushed back to the
-- original stream, and the count of produced bytes will be subtracted
-- accordingly.
--
-- Example:
--
-- @
-- ghci> is <- Streams.'System.IO.Streams.fromList' [\"abc\", \"def\", \"ghi\"::ByteString]
-- ghci> (is', getCount) <- Streams.'countInput' is
-- ghci> Streams.'read' is'
-- Just \"abc\"
-- ghci> getCount
-- 3
-- ghci> Streams.'unRead' \"bc\" is'
-- ghci> getCount
-- 1
-- ghci> Streams.'System.IO.Streams.peek' is
-- Just \"bc\"
-- ghci> Streams.'System.IO.Streams.toList' is'
-- [\"bc\",\"def\",\"ghi\"]
-- ghci> getCount
-- 9
-- @
--
countInput :: InputStream ByteString -> IO (InputStream ByteString, IO Int64)
countInput :: InputStream ByteString -> IO (InputStream ByteString, IO Int64)
countInput InputStream ByteString
src = do
    IORef Int64
ref    <- Int64 -> IO (IORef Int64)
forall a. a -> IO (IORef a)
newIORef (Int64
0 :: Int64)
    (InputStream ByteString, IO Int64)
-> IO (InputStream ByteString, IO Int64)
forall (m :: * -> *) a. Monad m => a -> m a
return ((InputStream ByteString, IO Int64)
 -> IO (InputStream ByteString, IO Int64))
-> (InputStream ByteString, IO Int64)
-> IO (InputStream ByteString, IO Int64)
forall a b. (a -> b) -> a -> b
$! (IO (Maybe ByteString)
-> (ByteString -> IO ()) -> InputStream ByteString
forall a. IO (Maybe a) -> (a -> IO ()) -> InputStream a
InputStream (IORef Int64 -> IO (Maybe ByteString)
forall a. Num a => IORef a -> IO (Maybe ByteString)
prod IORef Int64
ref) (IORef Int64 -> ByteString -> IO ()
forall a. Num a => IORef a -> ByteString -> IO ()
pb IORef Int64
ref), IORef Int64 -> IO Int64
forall a. IORef a -> IO a
readIORef IORef Int64
ref)

  where
    prod :: IORef a -> IO (Maybe ByteString)
prod IORef a
ref = InputStream ByteString -> IO (Maybe ByteString)
forall a. InputStream a -> IO (Maybe a)
read InputStream ByteString
src IO (Maybe ByteString)
-> (Maybe ByteString -> IO (Maybe ByteString))
-> IO (Maybe ByteString)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO (Maybe ByteString)
-> (ByteString -> IO (Maybe ByteString))
-> Maybe ByteString
-> IO (Maybe ByteString)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Maybe ByteString -> IO (Maybe ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ByteString
forall a. Maybe a
Nothing) (\ByteString
x -> do
        IORef a -> (a -> a) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyRef IORef a
ref (a -> a -> a
forall a. Num a => a -> a -> a
+ (Int -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> a) -> Int -> a
forall a b. (a -> b) -> a -> b
$ ByteString -> Int
S.length ByteString
x))
        Maybe ByteString -> IO (Maybe ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ByteString -> IO (Maybe ByteString))
-> Maybe ByteString -> IO (Maybe ByteString)
forall a b. (a -> b) -> a -> b
$! ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
x)

    pb :: IORef a -> ByteString -> IO ()
pb IORef a
ref ByteString
s = do
        IORef a -> (a -> a) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyRef IORef a
ref (\a
x -> a
x a -> a -> a
forall a. Num a => a -> a -> a
- (Int -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> a) -> Int -> a
forall a b. (a -> b) -> a -> b
$ ByteString -> Int
S.length ByteString
s))
        ByteString -> InputStream ByteString -> IO ()
forall a. a -> InputStream a -> IO ()
unRead ByteString
s InputStream ByteString
src


------------------------------------------------------------------------------
-- | Wraps an 'OutputStream', counting the number of bytes consumed by the
-- stream as a side effect. Produces a new 'OutputStream' as well as an IO
-- action to retrieve the count of bytes consumed.
--
-- Example:
--
-- @
-- ghci> (os :: OutputStream ByteString, getList) <- Streams.'System.IO.Streams.listOutputStream'
-- ghci> (os', getCount) <- Streams.'countOutput' os
-- ghci> Streams.'System.IO.Streams.fromList' [\"abc\", \"def\", \"ghi\"] >>= Streams.'System.IO.Streams.connectTo' os'
-- ghci> getList
-- [\"abc\",\"def\",\"ghi\"]
-- ghci> getCount
-- 9
-- @
countOutput :: OutputStream ByteString
            -> IO (OutputStream ByteString, IO Int64)
countOutput :: OutputStream ByteString -> IO (OutputStream ByteString, IO Int64)
countOutput = (Int64 -> ByteString -> IO Int64)
-> Int64
-> OutputStream ByteString
-> IO (OutputStream ByteString, IO Int64)
forall a b.
(a -> b -> IO a)
-> a -> OutputStream b -> IO (OutputStream b, IO a)
outputFoldM Int64 -> ByteString -> IO Int64
forall (m :: * -> *) a.
(Monad m, Num a, Enum a) =>
a -> ByteString -> m a
f Int64
0
  where
    f :: a -> ByteString -> m a
f !a
count ByteString
s = a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
z
      where
        !c :: Int
c = ByteString -> Int
S.length ByteString
s
        !z :: a
z = Int -> a
forall a. Enum a => Int -> a
toEnum Int
c a -> a -> a
forall a. Num a => a -> a -> a
+ a
count


------------------------------------------------------------------------------
-- | Wraps an 'InputStream', producing a new 'InputStream' that will produce at
-- most @n@ bytes, subsequently yielding end-of-stream forever.
--
-- Strings pushed back to the returned 'InputStream' will be propagated
-- upstream, modifying the count of taken bytes accordingly.
--
-- Example:
--
-- @
-- ghci> is <- Streams.'System.IO.Streams.fromList' [\"truncated\", \" string\"::ByteString]
-- ghci> is' <- Streams.'takeBytes' 9 is
-- ghci> Streams.'read' is'
-- Just \"truncated\"
-- ghci> Streams.'read' is'
-- Nothing
-- ghci> Streams.'System.IO.Streams.peek' is
-- Just \" string\"
-- ghci> Streams.'unRead' \"cated\" is'
-- ghci> Streams.'System.IO.Streams.peek' is
-- Just \"cated\"
-- ghci> Streams.'System.IO.Streams.peek' is'
-- Just \"cated\"
-- ghci> Streams.'read' is'
-- Just \"cated\"
-- ghci> Streams.'read' is'
-- Nothing
-- ghci> Streams.'read' is
-- Just \" string\"
-- @
takeBytes :: Int64                        -- ^ maximum number of bytes to read
          -> InputStream ByteString       -- ^ input stream to wrap
          -> IO (InputStream ByteString)
takeBytes :: Int64 -> InputStream ByteString -> IO (InputStream ByteString)
takeBytes Int64
k0 = Int64
-> IO (Maybe ByteString)
-> InputStream ByteString
-> IO (InputStream ByteString)
takeBytes' Int64
k0 (Maybe ByteString -> IO (Maybe ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ByteString
forall a. Maybe a
Nothing)
{-# INLINE takeBytes #-}


------------------------------------------------------------------------------
-- | Like @Streams.'takeBytes'@, but throws 'ReadTooShortException' when
-- there aren't enough bytes present on the source.
takeExactly :: Int64                        -- ^ number of bytes to read
            -> InputStream ByteString       -- ^ input stream to wrap
            -> IO (InputStream ByteString)
takeExactly :: Int64 -> InputStream ByteString -> IO (InputStream ByteString)
takeExactly Int64
k0 = Int64
-> IO (Maybe ByteString)
-> InputStream ByteString
-> IO (InputStream ByteString)
takeBytes' Int64
k0 (ReadTooShortException -> IO (Maybe ByteString)
forall e a. Exception e => e -> IO a
throwIO (ReadTooShortException -> IO (Maybe ByteString))
-> ReadTooShortException -> IO (Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ Int64 -> ReadTooShortException
ReadTooShortException Int64
k0)
{-# INLINE takeExactly #-}


------------------------------------------------------------------------------
-- Helper for the two above.
takeBytes' :: Int64
           -> IO (Maybe ByteString)
           -- ^ What to do if the input ends before having consumed the
           -- right amount of bytes.
           -> InputStream ByteString
           -> IO (InputStream ByteString)
takeBytes' :: Int64
-> IO (Maybe ByteString)
-> InputStream ByteString
-> IO (InputStream ByteString)
takeBytes' Int64
k0 IO (Maybe ByteString)
h InputStream ByteString
src = do
    IORef Int64
kref <- Int64 -> IO (IORef Int64)
forall a. a -> IO (IORef a)
newIORef Int64
k0
    InputStream ByteString -> IO (InputStream ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return (InputStream ByteString -> IO (InputStream ByteString))
-> InputStream ByteString -> IO (InputStream ByteString)
forall a b. (a -> b) -> a -> b
$! IO (Maybe ByteString)
-> (ByteString -> IO ()) -> InputStream ByteString
forall a. IO (Maybe a) -> (a -> IO ()) -> InputStream a
InputStream (IORef Int64 -> IO (Maybe ByteString)
forall a. Integral a => IORef a -> IO (Maybe ByteString)
prod IORef Int64
kref) (IORef Int64 -> ByteString -> IO ()
forall a. Num a => IORef a -> ByteString -> IO ()
pb IORef Int64
kref)
  where
    prod :: IORef a -> IO (Maybe ByteString)
prod IORef a
kref = do
        a
k <- IORef a -> IO a
forall a. IORef a -> IO a
readIORef IORef a
kref
        if a
k a -> a -> Bool
forall a. Ord a => a -> a -> Bool
<= a
0
           then Maybe ByteString -> IO (Maybe ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ByteString
forall a. Maybe a
Nothing
           else InputStream ByteString -> IO (Maybe ByteString)
forall a. InputStream a -> IO (Maybe a)
read InputStream ByteString
src IO (Maybe ByteString)
-> (Maybe ByteString -> IO (Maybe ByteString))
-> IO (Maybe ByteString)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO (Maybe ByteString)
-> (ByteString -> IO (Maybe ByteString))
-> Maybe ByteString
-> IO (Maybe ByteString)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IO (Maybe ByteString)
h (a -> ByteString -> IO (Maybe ByteString)
chunk a
k)
      where
        chunk :: a -> ByteString -> IO (Maybe ByteString)
chunk a
k ByteString
s = do
            let l :: a
l  = Int -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> a) -> Int -> a
forall a b. (a -> b) -> a -> b
$ ByteString -> Int
S.length ByteString
s
            let k' :: a
k' = a
k a -> a -> a
forall a. Num a => a -> a -> a
- a
l
            if a
k' a -> a -> Bool
forall a. Ord a => a -> a -> Bool
<= a
0
              then let (ByteString
a,ByteString
b) = Int -> ByteString -> (ByteString, ByteString)
S.splitAt (a -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral a
k) ByteString
s
                   in do
                       Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ ByteString -> Bool
S.null ByteString
b) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> InputStream ByteString -> IO ()
forall a. a -> InputStream a -> IO ()
unRead ByteString
b InputStream ByteString
src
                       IORef a -> a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef a
kref a
0
                       Maybe ByteString -> IO (Maybe ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ByteString -> IO (Maybe ByteString))
-> Maybe ByteString -> IO (Maybe ByteString)
forall a b. (a -> b) -> a -> b
$! ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
a
              else IORef a -> a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef a
kref a
k' IO () -> IO (Maybe ByteString) -> IO (Maybe ByteString)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe ByteString -> IO (Maybe ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
s)

    pb :: IORef a -> ByteString -> IO ()
pb IORef a
kref ByteString
s = do
        IORef a -> (a -> a) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyRef IORef a
kref (a -> a -> a
forall a. Num a => a -> a -> a
+ (Int -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> a) -> Int -> a
forall a b. (a -> b) -> a -> b
$ ByteString -> Int
S.length ByteString
s))
        ByteString -> InputStream ByteString -> IO ()
forall a. a -> InputStream a -> IO ()
unRead ByteString
s InputStream ByteString
src
{-# INLINE takeBytes' #-}


------------------------------------------------------------------------------
-- | Splits an 'InputStream' over 'ByteString's using a delimiter predicate.
--
-- Note that:
--
--   * data pushed back with 'unRead' is *not* propagated upstream here.
--
--   * the resulting 'InputStream' may hold an unbounded amount of the
--     bytestring in memory waiting for the function to return true, so this
--     function should not be used in unsafe contexts.
--
--   * the delimiter is NOT included in the output.
--
--   * consecutive delimiters are not merged.
--
--   * if the input ends in the delimiter, a final empty string is /not/
--     emitted. (/Since: 1.5.0.0. Previous versions had the opposite behaviour,
--     which was changed to match 'Prelude.lines'./)
--
-- Example:
--
-- @
-- ghci> Streams.'System.IO.Streams.fromList' [\"the quick br\", \"own  fox\"::'ByteString'] >>=
--       Streams.'splitOn' (== \' \') >>= Streams.'System.IO.Streams.toList'
-- [\"the\",\"quick\",\"brown\",\"\",\"fox\"]
-- @
--
splitOn :: (Char -> Bool)               -- ^ predicate used to break the input
                                        -- stream into chunks
        -> InputStream ByteString       -- ^ input stream
        -> IO (InputStream ByteString)
splitOn :: (Char -> Bool)
-> InputStream ByteString -> IO (InputStream ByteString)
splitOn Char -> Bool
p InputStream ByteString
is = do
    IORef ([ByteString] -> [ByteString])
ref <- ([ByteString] -> [ByteString])
-> IO (IORef ([ByteString] -> [ByteString]))
forall a. a -> IO (IORef a)
newIORef [ByteString] -> [ByteString]
forall a. a -> a
id
    IO (Maybe ByteString) -> IO (InputStream ByteString)
forall a. IO (Maybe a) -> IO (InputStream a)
makeInputStream (IO (Maybe ByteString) -> IO (InputStream ByteString))
-> IO (Maybe ByteString) -> IO (InputStream ByteString)
forall a b. (a -> b) -> a -> b
$ IORef ([ByteString] -> [ByteString]) -> IO (Maybe ByteString)
start IORef ([ByteString] -> [ByteString])
ref
  where
    start :: IORef ([ByteString] -> [ByteString]) -> IO (Maybe ByteString)
start IORef ([ByteString] -> [ByteString])
ref = IO (Maybe ByteString)
go
      where
        go :: IO (Maybe ByteString)
go  = InputStream ByteString -> IO (Maybe ByteString)
forall a. InputStream a -> IO (Maybe a)
read InputStream ByteString
is IO (Maybe ByteString)
-> (Maybe ByteString -> IO (Maybe ByteString))
-> IO (Maybe ByteString)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO (Maybe ByteString)
-> (ByteString -> IO (Maybe ByteString))
-> Maybe ByteString
-> IO (Maybe ByteString)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IO (Maybe ByteString)
end ByteString -> IO (Maybe ByteString)
chunk

        end :: IO (Maybe ByteString)
end = do
            [ByteString] -> [ByteString]
dl <- IORef ([ByteString] -> [ByteString])
-> IO ([ByteString] -> [ByteString])
forall a. IORef a -> IO a
readIORef IORef ([ByteString] -> [ByteString])
ref
            case [ByteString] -> [ByteString]
dl [] of
              [] -> Maybe ByteString -> IO (Maybe ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ByteString
forall a. Maybe a
Nothing
              [ByteString]
xs -> IORef ([ByteString] -> [ByteString])
-> ([ByteString] -> [ByteString]) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef ([ByteString] -> [ByteString])
ref [ByteString] -> [ByteString]
forall a. a -> a
id IO () -> IO (Maybe ByteString) -> IO (Maybe ByteString)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>>
                    (Maybe ByteString -> IO (Maybe ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ByteString -> IO (Maybe ByteString))
-> Maybe ByteString -> IO (Maybe ByteString)
forall a b. (a -> b) -> a -> b
$! ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (ByteString -> Maybe ByteString) -> ByteString -> Maybe ByteString
forall a b. (a -> b) -> a -> b
$! [ByteString] -> ByteString
S.concat [ByteString]
xs)

        chunk :: ByteString -> IO (Maybe ByteString)
chunk ByteString
s = let (ByteString
a, ByteString
b) = (Char -> Bool) -> ByteString -> (ByteString, ByteString)
S.break Char -> Bool
p ByteString
s
                  in if ByteString -> Bool
S.null ByteString
b
                       then IORef ([ByteString] -> [ByteString])
-> (([ByteString] -> [ByteString]) -> [ByteString] -> [ByteString])
-> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyRef IORef ([ByteString] -> [ByteString])
ref (\[ByteString] -> [ByteString]
f -> [ByteString] -> [ByteString]
f ([ByteString] -> [ByteString])
-> ([ByteString] -> [ByteString]) -> [ByteString] -> [ByteString]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ByteString
aByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
:)) IO () -> IO (Maybe ByteString) -> IO (Maybe ByteString)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO (Maybe ByteString)
go
                       else do
                         let !b' :: ByteString
b' = Int -> ByteString -> ByteString
S.unsafeDrop Int
1 ByteString
b
                         [ByteString] -> [ByteString]
dl <- IORef ([ByteString] -> [ByteString])
-> IO ([ByteString] -> [ByteString])
forall a. IORef a -> IO a
readIORef IORef ([ByteString] -> [ByteString])
ref
                         Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ ByteString -> Bool
S.null ByteString
b') (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> InputStream ByteString -> IO ()
forall a. a -> InputStream a -> IO ()
unRead ByteString
b' InputStream ByteString
is
                         IORef ([ByteString] -> [ByteString])
-> ([ByteString] -> [ByteString]) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef ([ByteString] -> [ByteString])
ref [ByteString] -> [ByteString]
forall a. a -> a
id
                         Maybe ByteString -> IO (Maybe ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ByteString -> IO (Maybe ByteString))
-> Maybe ByteString -> IO (Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (ByteString -> Maybe ByteString) -> ByteString -> Maybe ByteString
forall a b. (a -> b) -> a -> b
$! [ByteString] -> ByteString
S.concat ([ByteString] -> ByteString) -> [ByteString] -> ByteString
forall a b. (a -> b) -> a -> b
$ [ByteString] -> [ByteString]
dl [ByteString
a]


------------------------------------------------------------------------------
-- | Splits a bytestring 'InputStream' into lines. See 'splitOn' and
-- 'Prelude.lines'.
--
-- Example:
--
-- @
-- ghci> is \<- Streams.'System.IO.Streams.fromList' [\"Hello,\\n world!\"] >>= Streams.'lines'
-- ghci> replicateM 3 (Streams.'read' is)
-- [Just \"Hello\", Just \", world!\", Nothing]
-- @
--
-- Note that this may increase the chunk size if the input contains extremely
-- long lines.
lines :: InputStream ByteString -> IO (InputStream ByteString)
lines :: InputStream ByteString -> IO (InputStream ByteString)
lines = (Char -> Bool)
-> InputStream ByteString -> IO (InputStream ByteString)
splitOn (Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
== Char
'\n')


------------------------------------------------------------------------------
-- | Splits a bytestring 'InputStream' into words. See 'splitOn' and
-- 'Prelude.words'.
--
-- Example:
--
-- @
-- ghci> is \<- Streams.'System.IO.Streams.fromList' [\"Hello, world!\"] >>= Streams.'words'
-- ghci> replicateM 3 (Streams.'read' is)
-- [Just \"Hello,\", Just \"world!\", Nothing]
-- @
--
-- Note that this may increase the chunk size if the input contains extremely
-- long words.
words :: InputStream ByteString -> IO (InputStream ByteString)
words :: InputStream ByteString -> IO (InputStream ByteString)
words = (Char -> Bool)
-> InputStream ByteString -> IO (InputStream ByteString)
splitOn Char -> Bool
isSpace (InputStream ByteString -> IO (InputStream ByteString))
-> (InputStream ByteString -> IO (InputStream ByteString))
-> InputStream ByteString
-> IO (InputStream ByteString)
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> (ByteString -> IO Bool)
-> InputStream ByteString -> IO (InputStream ByteString)
forall a. (a -> IO Bool) -> InputStream a -> IO (InputStream a)
filterM (Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> (ByteString -> Bool) -> ByteString -> IO Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Bool -> Bool
not (Bool -> Bool) -> (ByteString -> Bool) -> ByteString -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Char -> Bool) -> ByteString -> Bool
S.all Char -> Bool
isSpace)


------------------------------------------------------------------------------
-- | Intersperses string chunks sent to the given 'OutputStream' with newlines.
-- See 'intersperse' and 'Prelude.unlines'.
--
-- @
-- ghci> os <- Streams.'unlines' Streams.'System.IO.Streams.stdout'
-- ghci> Streams.'write' (Just \"Hello,\") os
-- Hello
-- ghci> Streams.'write' Nothing os
-- ghci> Streams.'write' (Just \"world!\") os
-- world!
-- @
unlines :: OutputStream ByteString -> IO (OutputStream ByteString)
unlines :: OutputStream ByteString -> IO (OutputStream ByteString)
unlines OutputStream ByteString
os = (Maybe ByteString -> IO ()) -> IO (OutputStream ByteString)
forall a. (Maybe a -> IO ()) -> IO (OutputStream a)
makeOutputStream ((Maybe ByteString -> IO ()) -> IO (OutputStream ByteString))
-> (Maybe ByteString -> IO ()) -> IO (OutputStream ByteString)
forall a b. (a -> b) -> a -> b
$ \Maybe ByteString
m -> do
    Maybe ByteString -> OutputStream ByteString -> IO ()
forall a. Maybe a -> OutputStream a -> IO ()
write Maybe ByteString
m OutputStream ByteString
os
    case Maybe ByteString
m of
        Maybe ByteString
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> IO ()) -> () -> IO ()
forall a b. (a -> b) -> a -> b
$! ()
        Just ByteString
_  -> Maybe ByteString -> OutputStream ByteString -> IO ()
forall a. Maybe a -> OutputStream a -> IO ()
write (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
"\n") OutputStream ByteString
os


------------------------------------------------------------------------------
-- | Intersperses string chunks sent to the given 'OutputStream' with spaces.
-- See 'intersperse' and 'Prelude.unwords'.
--
-- @
-- ghci> os <- Streams.'unwords' Streams.'System.IO.Streams.stdout'
-- ghci> forM_ [Just \"Hello,\", Nothing, Just \"world!\\n\"] $ \w -> Streams.'write' w os
-- Hello, world!
-- @
unwords :: OutputStream ByteString -> IO (OutputStream ByteString)
unwords :: OutputStream ByteString -> IO (OutputStream ByteString)
unwords = ByteString
-> OutputStream ByteString -> IO (OutputStream ByteString)
forall a. a -> OutputStream a -> IO (OutputStream a)
intersperse ByteString
" "


------------------------------------------------------------------------------
-- | Thrown by 'throwIfProducesMoreThan' when too many bytes were read from the
-- original 'InputStream'.
data TooManyBytesReadException = TooManyBytesReadException deriving (Typeable)

instance Show TooManyBytesReadException where
    show :: TooManyBytesReadException -> String
show TooManyBytesReadException
TooManyBytesReadException = String
"Too many bytes read"

instance Exception TooManyBytesReadException


------------------------------------------------------------------------------
-- | Thrown by 'giveExactly' when too few bytes were written to the produced
-- 'OutputStream'.
data TooFewBytesWrittenException = TooFewBytesWrittenException deriving (Typeable)

instance Show TooFewBytesWrittenException where
    show :: TooFewBytesWrittenException -> String
show TooFewBytesWrittenException
TooFewBytesWrittenException = String
"Too few bytes written"

instance Exception TooFewBytesWrittenException


------------------------------------------------------------------------------
-- | Thrown by 'throwIfConsumesMoreThan' when too many bytes were sent to the
-- produced 'OutputStream'.
data TooManyBytesWrittenException =
    TooManyBytesWrittenException deriving (Typeable)

instance Show TooManyBytesWrittenException where
    show :: TooManyBytesWrittenException -> String
show TooManyBytesWrittenException
TooManyBytesWrittenException = String
"Too many bytes written"

instance Exception TooManyBytesWrittenException


------------------------------------------------------------------------------
-- | Thrown by 'readExactly' and 'takeExactly' when not enough bytes were
-- available on the input.
data ReadTooShortException = ReadTooShortException Int64 deriving (Typeable)

instance Show ReadTooShortException where
    show :: ReadTooShortException -> String
show (ReadTooShortException Int64
x) = String
"Short read, expected " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int64 -> String
forall a. Show a => a -> String
show Int64
x
                                     String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" bytes"

instance Exception ReadTooShortException


------------------------------------------------------------------------------
-- | Wraps an 'InputStream'. If more than @n@ bytes are produced by this
-- stream, 'read' will throw a 'TooManyBytesReadException'.
--
-- If a chunk yielded by the input stream would result in more than @n@ bytes
-- being produced, 'throwIfProducesMoreThan' will cut the generated string such
-- that exactly @n@ bytes are yielded by the returned stream, and the
-- /subsequent/ read will throw an exception. Example:
--
-- @
-- ghci> is \<- Streams.'System.IO.Streams.fromList' [\"abc\", \"def\", \"ghi\"] >>=
--             Streams.'throwIfProducesMoreThan' 5
-- ghci> 'Control.Monad.replicateM' 2 ('read' is)
-- [Just \"abc\",Just \"de\"]
-- ghci> Streams.'read' is
-- *** Exception: Too many bytes read
-- @
--
-- Strings pushed back to the returned 'InputStream' will be propagated
-- upstream, modifying the count of taken bytes accordingly. Example:
--
-- @
-- ghci> is  <- Streams.'System.IO.Streams.fromList' [\"abc\", \"def\", \"ghi\"]
-- ghci> is' <- Streams.'throwIfProducesMoreThan' 5 is
-- ghci> Streams.'read' is'
-- Just \"abc\"
-- ghci> Streams.'unRead' \"xyz\" is'
-- ghci> Streams.'System.IO.Streams.peek' is
-- Just \"xyz\"
-- ghci> Streams.'read' is
-- Just \"xyz\"
-- ghci> Streams.'read' is
-- Just \"de\"
-- ghci> Streams.'read' is
-- *** Exception: Too many bytes read
-- @
--
throwIfProducesMoreThan
    :: Int64                    -- ^ maximum number of bytes to read
    -> InputStream ByteString   -- ^ input stream
    -> IO (InputStream ByteString)
throwIfProducesMoreThan :: Int64 -> InputStream ByteString -> IO (InputStream ByteString)
throwIfProducesMoreThan Int64
k0 InputStream ByteString
src = do
    IORef Int64
kref <- Int64 -> IO (IORef Int64)
forall a. a -> IO (IORef a)
newIORef Int64
k0
    InputStream ByteString -> IO (InputStream ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return (InputStream ByteString -> IO (InputStream ByteString))
-> InputStream ByteString -> IO (InputStream ByteString)
forall a b. (a -> b) -> a -> b
$! IO (Maybe ByteString)
-> (ByteString -> IO ()) -> InputStream ByteString
forall a. IO (Maybe a) -> (a -> IO ()) -> InputStream a
InputStream (IORef Int64 -> IO (Maybe ByteString)
forall a.
(Ord a, Num a, Enum a) =>
IORef a -> IO (Maybe ByteString)
prod IORef Int64
kref) (IORef Int64 -> ByteString -> IO ()
forall a. Num a => IORef a -> ByteString -> IO ()
pb IORef Int64
kref)
  where
    prod :: IORef a -> IO (Maybe ByteString)
prod IORef a
kref = InputStream ByteString -> IO (Maybe ByteString)
forall a. InputStream a -> IO (Maybe a)
read InputStream ByteString
src IO (Maybe ByteString)
-> (Maybe ByteString -> IO (Maybe ByteString))
-> IO (Maybe ByteString)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO (Maybe ByteString)
-> (ByteString -> IO (Maybe ByteString))
-> Maybe ByteString
-> IO (Maybe ByteString)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Maybe ByteString -> IO (Maybe ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ByteString
forall a. Maybe a
Nothing) ByteString -> IO (Maybe ByteString)
chunk
      where
        chunk :: ByteString -> IO (Maybe ByteString)
chunk ByteString
s = do
            a
k <- IORef a -> IO a
forall a. IORef a -> IO a
readIORef IORef a
kref
            let k' :: a
k'    = a
k a -> a -> a
forall a. Num a => a -> a -> a
- a
l
            case () of !()
_ | a
l a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
0  -> Maybe ByteString -> IO (Maybe ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
s)
                          | a
k a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
0  -> TooManyBytesReadException -> IO (Maybe ByteString)
forall e a. Exception e => e -> IO a
throwIO TooManyBytesReadException
TooManyBytesReadException
                          | a
k' a -> a -> Bool
forall a. Ord a => a -> a -> Bool
>= a
0 -> IORef a -> a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef a
kref a
k' IO () -> IO (Maybe ByteString) -> IO (Maybe ByteString)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe ByteString -> IO (Maybe ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
s)
                          | Bool
otherwise -> do
                                let (!ByteString
a,!ByteString
b) = Int -> ByteString -> (ByteString, ByteString)
S.splitAt (a -> Int
forall a. Enum a => a -> Int
fromEnum a
k) ByteString
s
                                IORef a -> a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef a
kref a
0
                                ByteString -> InputStream ByteString -> IO ()
forall a. a -> InputStream a -> IO ()
unRead ByteString
b InputStream ByteString
src
                                Maybe ByteString -> IO (Maybe ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ByteString -> IO (Maybe ByteString))
-> Maybe ByteString -> IO (Maybe ByteString)
forall a b. (a -> b) -> a -> b
$! ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
a
          where
            l :: a
l     = Int -> a
forall a. Enum a => Int -> a
toEnum (Int -> a) -> Int -> a
forall a b. (a -> b) -> a -> b
$ ByteString -> Int
S.length ByteString
s

    pb :: IORef a -> ByteString -> IO ()
pb IORef a
kref ByteString
s = do
        ByteString -> InputStream ByteString -> IO ()
forall a. a -> InputStream a -> IO ()
unRead ByteString
s InputStream ByteString
src
        IORef a -> (a -> a) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyRef IORef a
kref (a -> a -> a
forall a. Num a => a -> a -> a
+ (Int -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> a) -> Int -> a
forall a b. (a -> b) -> a -> b
$ ByteString -> Int
S.length ByteString
s))


------------------------------------------------------------------------------
-- | Reads an @n@-byte ByteString from an input stream. Throws a
-- 'ReadTooShortException' if fewer than @n@ bytes were available.
--
-- Example:
--
-- @
-- ghci> Streams.'System.IO.Streams.fromList' [\"long string\"] >>= Streams.'readExactly' 6
-- \"long s\"
-- ghci> Streams.'System.IO.Streams.fromList' [\"short\"] >>= Streams.'readExactly' 6
-- *** Exception: Short read, expected 6 bytes
-- @
--
readExactly :: Int                     -- ^ number of bytes to read
            -> InputStream ByteString  -- ^ input stream
            -> IO ByteString
readExactly :: Int -> InputStream ByteString -> IO ByteString
readExactly Int
n InputStream ByteString
input = ([ByteString] -> [ByteString]) -> Int -> IO ByteString
go [ByteString] -> [ByteString]
forall a. a -> a
id Int
n
  where
    go :: ([ByteString] -> [ByteString]) -> Int -> IO ByteString
go ![ByteString] -> [ByteString]
dl Int
0  = ByteString -> IO ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$! [ByteString] -> ByteString
S.concat ([ByteString] -> ByteString) -> [ByteString] -> ByteString
forall a b. (a -> b) -> a -> b
$! [ByteString] -> [ByteString]
dl []
    go ![ByteString] -> [ByteString]
dl Int
k  =
        InputStream ByteString -> IO (Maybe ByteString)
forall a. InputStream a -> IO (Maybe a)
read InputStream ByteString
input IO (Maybe ByteString)
-> (Maybe ByteString -> IO ByteString) -> IO ByteString
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
        IO ByteString
-> (ByteString -> IO ByteString)
-> Maybe ByteString
-> IO ByteString
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (ReadTooShortException -> IO ByteString
forall e a. Exception e => e -> IO a
throwIO (ReadTooShortException -> IO ByteString)
-> ReadTooShortException -> IO ByteString
forall a b. (a -> b) -> a -> b
$ Int64 -> ReadTooShortException
ReadTooShortException (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n))
              (\ByteString
s -> do
                 let l :: Int
l = ByteString -> Int
S.length ByteString
s
                 if Int
l Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
k
                   then do
                     let (ByteString
a,ByteString
b) = Int -> ByteString -> (ByteString, ByteString)
S.splitAt Int
k ByteString
s
                     Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ ByteString -> Bool
S.null ByteString
b) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> InputStream ByteString -> IO ()
forall a. a -> InputStream a -> IO ()
unRead ByteString
b InputStream ByteString
input
                     ByteString -> IO ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$! [ByteString] -> ByteString
S.concat ([ByteString] -> ByteString) -> [ByteString] -> ByteString
forall a b. (a -> b) -> a -> b
$! [ByteString] -> [ByteString]
dl [ByteString
a]
                   else ([ByteString] -> [ByteString]) -> Int -> IO ByteString
go ([ByteString] -> [ByteString]
dl ([ByteString] -> [ByteString])
-> ([ByteString] -> [ByteString]) -> [ByteString] -> [ByteString]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ByteString
sByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
:)) (Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
l))


------------------------------------------------------------------------------
-- | Takes from a stream until the given predicate is no longer satisfied.
-- Returns Nothing on end-of-stream, or @Just \"\"@ if the predicate is never
-- satisfied. See 'Prelude.takeWhile' and 'Data.ByteString.Char8.takeWhile'.
--
-- Example:
--
-- @
-- ghci> Streams.'System.IO.Streams.fromList' [\"Hello, world!\"] >>= Streams.'takeBytesWhile' (/= ',')
-- Just \"Hello\"
-- ghci> import Data.Char
-- ghci> Streams.'System.IO.Streams.fromList' [\"7 Samurai\"] >>= Streams.'takeBytesWhile' isAlpha
-- Just \"\"
-- ghci> Streams.'System.IO.Streams.fromList' [] >>= Streams.'takeBytesWhile' isAlpha
-- Nothing
-- @
takeBytesWhile :: (Char -> Bool)          -- ^ predicate
               -> InputStream ByteString  -- ^ input stream
               -> IO (Maybe ByteString)
takeBytesWhile :: (Char -> Bool) -> InputStream ByteString -> IO (Maybe ByteString)
takeBytesWhile Char -> Bool
p InputStream ByteString
input = InputStream ByteString -> IO (Maybe ByteString)
forall a. InputStream a -> IO (Maybe a)
read InputStream ByteString
input IO (Maybe ByteString)
-> (Maybe ByteString -> IO (Maybe ByteString))
-> IO (Maybe ByteString)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO (Maybe ByteString)
-> (ByteString -> IO (Maybe ByteString))
-> Maybe ByteString
-> IO (Maybe ByteString)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Maybe ByteString -> IO (Maybe ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ByteString
forall a. Maybe a
Nothing) (([ByteString] -> [ByteString])
-> ByteString -> IO (Maybe ByteString)
go [ByteString] -> [ByteString]
forall a. a -> a
id)
  where
    go :: ([ByteString] -> [ByteString])
-> ByteString -> IO (Maybe ByteString)
go [ByteString] -> [ByteString]
dl !ByteString
s | ByteString -> Bool
S.null ByteString
b  = InputStream ByteString -> IO (Maybe ByteString)
forall a. InputStream a -> IO (Maybe a)
read InputStream ByteString
input IO (Maybe ByteString)
-> (Maybe ByteString -> IO (Maybe ByteString))
-> IO (Maybe ByteString)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO (Maybe ByteString)
-> (ByteString -> IO (Maybe ByteString))
-> Maybe ByteString
-> IO (Maybe ByteString)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IO (Maybe ByteString)
finish (([ByteString] -> [ByteString])
-> ByteString -> IO (Maybe ByteString)
go [ByteString] -> [ByteString]
dl')
             | Bool
otherwise = ByteString -> InputStream ByteString -> IO ()
forall a. a -> InputStream a -> IO ()
unRead ByteString
b InputStream ByteString
input IO () -> IO (Maybe ByteString) -> IO (Maybe ByteString)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO (Maybe ByteString)
finish
      where
        (ByteString
a, ByteString
b) = (Char -> Bool) -> ByteString -> (ByteString, ByteString)
S.span Char -> Bool
p ByteString
s
        dl' :: [ByteString] -> [ByteString]
dl'    = [ByteString] -> [ByteString]
dl ([ByteString] -> [ByteString])
-> ([ByteString] -> [ByteString]) -> [ByteString] -> [ByteString]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ByteString
aByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
:)
        finish :: IO (Maybe ByteString)
finish = Maybe ByteString -> IO (Maybe ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ByteString -> IO (Maybe ByteString))
-> Maybe ByteString -> IO (Maybe ByteString)
forall a b. (a -> b) -> a -> b
$! ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (ByteString -> Maybe ByteString) -> ByteString -> Maybe ByteString
forall a b. (a -> b) -> a -> b
$! [ByteString] -> ByteString
S.concat ([ByteString] -> ByteString) -> [ByteString] -> ByteString
forall a b. (a -> b) -> a -> b
$! [ByteString] -> [ByteString]
dl [ByteString
a]


------------------------------------------------------------------------------
-- | Wraps an 'OutputStream', producing a new stream that will pass along at
-- most @n@ bytes to the wrapped stream, throwing any subsequent input away.
--
-- Example:
--
-- @
-- ghci> (os :: OutputStream ByteString, getList) <- Streams.'System.IO.Streams.listOutputStream'
-- ghci> os' <- Streams.'giveBytes' 6 os
-- ghci> Streams.'System.IO.Streams.fromList' [\"long \", \"string\"] >>= Streams.'System.IO.Streams.connectTo' os'
-- ghci> getList
-- [\"long \",\"s\"]
-- @
giveBytes :: Int64                        -- ^ maximum number of bytes to send
                                          -- to the wrapped stream
          -> OutputStream ByteString      -- ^ output stream to wrap
          -> IO (OutputStream ByteString)
giveBytes :: Int64 -> OutputStream ByteString -> IO (OutputStream ByteString)
giveBytes Int64
k0 OutputStream ByteString
str = do
    IORef Int64
kref <- Int64 -> IO (IORef Int64)
forall a. a -> IO (IORef a)
newIORef Int64
k0
    (Maybe ByteString -> IO ()) -> IO (OutputStream ByteString)
forall a. (Maybe a -> IO ()) -> IO (OutputStream a)
makeOutputStream ((Maybe ByteString -> IO ()) -> IO (OutputStream ByteString))
-> (Maybe ByteString -> IO ()) -> IO (OutputStream ByteString)
forall a b. (a -> b) -> a -> b
$ IORef Int64 -> Maybe ByteString -> IO ()
forall a. Integral a => IORef a -> Maybe ByteString -> IO ()
sink IORef Int64
kref
  where
    sink :: IORef a -> Maybe ByteString -> IO ()
sink IORef a
_ Maybe ByteString
Nothing        = Maybe ByteString -> OutputStream ByteString -> IO ()
forall a. Maybe a -> OutputStream a -> IO ()
write Maybe ByteString
forall a. Maybe a
Nothing OutputStream ByteString
str
    sink IORef a
kref mb :: Maybe ByteString
mb@(Just ByteString
x) = do
        a
k <- IORef a -> IO a
forall a. IORef a -> IO a
readIORef IORef a
kref
        let l :: a
l  = Int -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> a) -> Int -> a
forall a b. (a -> b) -> a -> b
$ ByteString -> Int
S.length ByteString
x
        let k' :: a
k' = a
k a -> a -> a
forall a. Num a => a -> a -> a
- a
l
        if a
k' a -> a -> Bool
forall a. Ord a => a -> a -> Bool
< a
0
          then do let a :: ByteString
a = Int -> ByteString -> ByteString
S.take (a -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral a
k) ByteString
x
                  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ ByteString -> Bool
S.null ByteString
a) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Maybe ByteString -> OutputStream ByteString -> IO ()
forall a. Maybe a -> OutputStream a -> IO ()
write (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
a) OutputStream ByteString
str
                  IORef a -> a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef a
kref a
0
          else IORef a -> a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef a
kref a
k' IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe ByteString -> OutputStream ByteString -> IO ()
forall a. Maybe a -> OutputStream a -> IO ()
write Maybe ByteString
mb OutputStream ByteString
str


------------------------------------------------------------------------------
-- | Wraps an 'OutputStream', producing a new stream that will pass along
-- exactly @n@ bytes to the wrapped stream. If the stream is sent more or fewer
-- than the given number of bytes, the resulting stream will throw an exception
-- (either 'TooFewBytesWrittenException' or 'TooManyBytesWrittenException')
-- during a call to 'write'.
--
-- Example:
--
-- @
-- ghci> is <- Streams.'System.IO.Streams.fromList' [\"ok\"]
-- ghci> Streams.'System.IO.Streams.outputToList' (Streams.'giveExactly' 2 >=> Streams.'System.IO.Streams.connect' is)
-- [\"ok\"]
-- ghci> is <- Streams.'System.IO.Streams.fromList' [\"ok\"]
-- ghci> Streams.'System.IO.Streams.outputToList' (Streams.'giveExactly' 1 >=> Streams.'System.IO.Streams.connect' is)
-- *** Exception: Too many bytes written
-- ghci> is <- Streams.'System.IO.Streams.fromList' [\"ok\"]
-- ghci> Streams.'System.IO.Streams.outputToList' (Streams.'giveExactly' 3 >=> Streams.'System.IO.Streams.connect' is)
-- *** Exception: Too few bytes written
-- @
giveExactly :: Int64
            -> OutputStream ByteString
            -> IO (OutputStream ByteString)
giveExactly :: Int64 -> OutputStream ByteString -> IO (OutputStream ByteString)
giveExactly Int64
k0 OutputStream ByteString
os = do
    IORef Int64
ref <- Int64 -> IO (IORef Int64)
forall a. a -> IO (IORef a)
newIORef Int64
k0
    (Maybe ByteString -> IO ()) -> IO (OutputStream ByteString)
forall a. (Maybe a -> IO ()) -> IO (OutputStream a)
makeOutputStream ((Maybe ByteString -> IO ()) -> IO (OutputStream ByteString))
-> (Maybe ByteString -> IO ()) -> IO (OutputStream ByteString)
forall a b. (a -> b) -> a -> b
$ IORef Int64 -> Maybe ByteString -> IO ()
forall a. (Ord a, Num a) => IORef a -> Maybe ByteString -> IO ()
go IORef Int64
ref
  where
    go :: IORef a -> Maybe ByteString -> IO ()
go IORef a
ref Maybe ByteString
chunk = do
        !a
n <- IORef a -> IO a
forall a. IORef a -> IO a
readIORef IORef a
ref
        case Maybe ByteString
chunk of
          Maybe ByteString
Nothing -> if a
n a -> a -> Bool
forall a. Eq a => a -> a -> Bool
/= a
0
                       then TooFewBytesWrittenException -> IO ()
forall e a. Exception e => e -> IO a
throwIO TooFewBytesWrittenException
TooFewBytesWrittenException
                       else () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> IO ()) -> () -> IO ()
forall a b. (a -> b) -> a -> b
$! ()
          Just ByteString
s  -> let n' :: a
n' = a
n a -> a -> a
forall a. Num a => a -> a -> a
- Int -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ByteString -> Int
S.length ByteString
s)
                     in if a
n' a -> a -> Bool
forall a. Ord a => a -> a -> Bool
< a
0
                          then TooManyBytesWrittenException -> IO ()
forall e a. Exception e => e -> IO a
throwIO TooManyBytesWrittenException
TooManyBytesWrittenException
                          else do IORef a -> a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef a
ref a
n'
        Maybe ByteString -> OutputStream ByteString -> IO ()
forall a. Maybe a -> OutputStream a -> IO ()
write Maybe ByteString
chunk OutputStream ByteString
os


------------------------------------------------------------------------------
-- | Wraps an 'OutputStream', producing a new stream that will pass along at
-- most @n@ bytes to the wrapped stream. If more than @n@ bytes are sent to the
-- outer stream, a 'TooManyBytesWrittenException' will be thrown.
--
-- /Note/: if more than @n@ bytes are sent to the outer stream,
-- 'throwIfConsumesMoreThan' will not necessarily send the first @n@ bytes
-- through to the wrapped stream before throwing the exception.
--
-- Example:
--
-- @
-- ghci> (os :: OutputStream ByteString, getList) <- Streams.'System.IO.Streams.listOutputStream'
-- ghci> os' <- Streams.'throwIfConsumesMoreThan' 5 os
-- ghci> Streams.'System.IO.Streams.fromList' [\"short\"] >>= Streams.'System.IO.Streams.connectTo' os'
-- ghci> getList
-- [\"short\"]
-- ghci> os'' <- Streams.'throwIfConsumesMoreThan' 5 os
-- ghci> Streams.'System.IO.Streams.fromList' [\"long\", \"string\"] >>= Streams.'System.IO.Streams.connectTo' os''
-- *** Exception: Too many bytes written
-- @
throwIfConsumesMoreThan
    :: Int64                    -- ^ maximum number of bytes to send to the
                                --   wrapped stream
    -> OutputStream ByteString  -- ^ output stream to wrap
    -> IO (OutputStream ByteString)
throwIfConsumesMoreThan :: Int64 -> OutputStream ByteString -> IO (OutputStream ByteString)
throwIfConsumesMoreThan Int64
k0 OutputStream ByteString
str = do
    IORef Int64
kref   <- Int64 -> IO (IORef Int64)
forall a. a -> IO (IORef a)
newIORef Int64
k0
    (Maybe ByteString -> IO ()) -> IO (OutputStream ByteString)
forall a. (Maybe a -> IO ()) -> IO (OutputStream a)
makeOutputStream ((Maybe ByteString -> IO ()) -> IO (OutputStream ByteString))
-> (Maybe ByteString -> IO ()) -> IO (OutputStream ByteString)
forall a b. (a -> b) -> a -> b
$ IORef Int64 -> Maybe ByteString -> IO ()
forall a.
(Ord a, Enum a, Num a) =>
IORef a -> Maybe ByteString -> IO ()
sink IORef Int64
kref
  where
    sink :: IORef a -> Maybe ByteString -> IO ()
sink IORef a
_ Maybe ByteString
Nothing        = Maybe ByteString -> OutputStream ByteString -> IO ()
forall a. Maybe a -> OutputStream a -> IO ()
write Maybe ByteString
forall a. Maybe a
Nothing OutputStream ByteString
str

    sink IORef a
kref mb :: Maybe ByteString
mb@(Just ByteString
x) = do
        a
k <- IORef a -> IO a
forall a. IORef a -> IO a
readIORef IORef a
kref
        let l :: a
l  = Int -> a
forall a. Enum a => Int -> a
toEnum (Int -> a) -> Int -> a
forall a b. (a -> b) -> a -> b
$ ByteString -> Int
S.length ByteString
x
        let k' :: a
k' = a
k a -> a -> a
forall a. Num a => a -> a -> a
- a
l
        if a
k' a -> a -> Bool
forall a. Ord a => a -> a -> Bool
< a
0
          then TooManyBytesWrittenException -> IO ()
forall e a. Exception e => e -> IO a
throwIO TooManyBytesWrittenException
TooManyBytesWrittenException
          else IORef a -> a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef a
kref a
k' IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe ByteString -> OutputStream ByteString -> IO ()
forall a. Maybe a -> OutputStream a -> IO ()
write Maybe ByteString
mb OutputStream ByteString
str


------------------------------------------------------------------------------
-- | Gets the current posix time
getTime :: IO Double
getTime :: IO Double
getTime = POSIXTime -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac (POSIXTime -> Double) -> IO POSIXTime -> IO Double
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` IO POSIXTime
getPOSIXTime


------------------------------------------------------------------------------
-- | Thrown by 'throwIfTooSlow' if input is not being produced fast enough by
-- the given 'InputStream'.
--
data RateTooSlowException = RateTooSlowException deriving (Typeable)
instance Show RateTooSlowException where
    show :: RateTooSlowException -> String
show RateTooSlowException
RateTooSlowException = String
"Input rate too slow"
instance Exception RateTooSlowException


------------------------------------------------------------------------------
-- | Rate-limits an input stream. If the input stream is not read from faster
-- than the given rate, reading from the wrapped stream will throw a
-- 'RateTooSlowException'.
--
-- Strings pushed back to the returned 'InputStream' will be propagated up to
-- the original stream.
throwIfTooSlow
    :: IO ()                   -- ^ action to bump timeout
    -> Double                  -- ^ minimum data rate, in bytes per second
    -> Int                     -- ^ amount of time in seconds to wait before
                               --   data rate calculation takes effect
    -> InputStream ByteString  -- ^ input stream
    -> IO (InputStream ByteString)
throwIfTooSlow :: IO ()
-> Double
-> Int
-> InputStream ByteString
-> IO (InputStream ByteString)
throwIfTooSlow !IO ()
bump !Double
minRate !Int
minSeconds' !InputStream ByteString
stream = do
    !()
_        <- IO ()
bump
    Double
startTime <- IO Double
getTime
    IORef Int64
bytesRead <- Int64 -> IO (IORef Int64)
forall a. a -> IO (IORef a)
newIORef (Int64
0 :: Int64)
    InputStream ByteString -> IO (InputStream ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return (InputStream ByteString -> IO (InputStream ByteString))
-> InputStream ByteString -> IO (InputStream ByteString)
forall a b. (a -> b) -> a -> b
$! IO (Maybe ByteString)
-> (ByteString -> IO ()) -> InputStream ByteString
forall a. IO (Maybe a) -> (a -> IO ()) -> InputStream a
InputStream (Double -> IORef Int64 -> IO (Maybe ByteString)
forall a. Integral a => Double -> IORef a -> IO (Maybe ByteString)
prod Double
startTime IORef Int64
bytesRead) (IORef Int64 -> ByteString -> IO ()
forall a. Num a => IORef a -> ByteString -> IO ()
pb IORef Int64
bytesRead)

  where
    prod :: Double -> IORef a -> IO (Maybe ByteString)
prod Double
startTime IORef a
bytesReadRef = InputStream ByteString -> IO (Maybe ByteString)
forall a. InputStream a -> IO (Maybe a)
read InputStream ByteString
stream IO (Maybe ByteString)
-> (Maybe ByteString -> IO (Maybe ByteString))
-> IO (Maybe ByteString)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO (Maybe ByteString)
-> (ByteString -> IO (Maybe ByteString))
-> Maybe ByteString
-> IO (Maybe ByteString)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Maybe ByteString -> IO (Maybe ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ByteString
forall a. Maybe a
Nothing) ByteString -> IO (Maybe ByteString)
chunk
      where
        chunk :: ByteString -> IO (Maybe ByteString)
chunk ByteString
s = do
            let slen :: Int
slen = ByteString -> Int
S.length ByteString
s
            Double
now <- IO Double
getTime
            let !delta :: Double
delta = Double
now Double -> Double -> Double
forall a. Num a => a -> a -> a
- Double
startTime
            a
nb <- IORef a -> IO a
forall a. IORef a -> IO a
readIORef IORef a
bytesReadRef
            let newBytes :: a
newBytes = a
nb a -> a -> a
forall a. Num a => a -> a -> a
+ Int -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
slen
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Double
delta Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> Double
minSeconds Double -> Double -> Double
forall a. Num a => a -> a -> a
+ Double
1 Bool -> Bool -> Bool
&&
                  (a -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral a
newBytes Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/
                    (Double
delta Double -> Double -> Double
forall a. Num a => a -> a -> a
- Double
minSeconds)) Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Double
minRate) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
                RateTooSlowException -> IO ()
forall e a. Exception e => e -> IO a
throwIO RateTooSlowException
RateTooSlowException
            -- otherwise, bump the timeout and return the input
            !()
_ <- IO ()
bump
            IORef a -> a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef a
bytesReadRef a
newBytes
            Maybe ByteString -> IO (Maybe ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ByteString -> IO (Maybe ByteString))
-> Maybe ByteString -> IO (Maybe ByteString)
forall a b. (a -> b) -> a -> b
$! ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
s

    pb :: IORef a -> ByteString -> IO ()
pb IORef a
bytesReadRef ByteString
s = do
        IORef a -> (a -> a) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyRef IORef a
bytesReadRef ((a -> a) -> IO ()) -> (a -> a) -> IO ()
forall a b. (a -> b) -> a -> b
$ \a
x -> a
x a -> a -> a
forall a. Num a => a -> a -> a
- (Int -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> a) -> Int -> a
forall a b. (a -> b) -> a -> b
$ ByteString -> Int
S.length ByteString
s)
        ByteString -> InputStream ByteString -> IO ()
forall a. a -> InputStream a -> IO ()
unRead ByteString
s InputStream ByteString
stream

    minSeconds :: Double
minSeconds = Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
minSeconds'