-- |Monadic Iteratees:
-- incremental input parsers, processors and transformers
--
-- This module provides many basic iteratees from which more complicated
-- iteratees can be built.  In general these iteratees parallel those in
-- @Data.List@, with some additions.

module Bio.Iteratee.List (
  -- * Iteratees
  -- ** Iteratee Utilities
  isFinished
  ,stream2list
  ,stream2stream
  -- ** Basic Iteratees
  ,dropWhileStream
  ,dropStream
  ,headStream
  ,tryHead
  ,lastStream
  ,heads
  ,peekStream
  ,roll
  ,lengthStream
  ,chunkLength
  ,takeFromChunk
  -- ** Nested iteratee combinators
  ,breakStream
  ,breakE
  ,takeStream
  ,takeUpTo
  ,takeWhileE
  ,mapStream
  ,concatMapStream
  ,concatMapStreamM
  ,mapMaybeStream
  ,filterStream
  ,filterStreamM
  ,groupStreamBy
  ,groupStreamOn
  ,mergeStreams
  ,mergeByChunks
  -- ** Folds
  ,foldStream
  -- * Enumerators
  -- ** Basic enumerators
  ,enumPureNChunk
  -- ** Enumerator Combinators
  ,enumWith
  ,zipStreams
  ,zipStreams3
  ,zipStreams4
  ,zipStreams5
  ,sequenceStreams_
  ,countConsumed
  -- ** Monadic functions
  ,mapStreamM
  ,mapStreamM_
  ,foldStreamM
  -- * Re-exported modules
  ,module Bio.Iteratee.Iteratee
)
where

import Bio.Iteratee.Iteratee
import Bio.Prelude
import Control.Monad.Trans.Class

-- import qualified Data.ByteString          as B


-- Useful combinators for implementing iteratees and enumerators

-- | Check if a stream has received 'EOF'.
isFinished :: Nullable s => Iteratee s m Bool
isFinished = liftI check
  where
  check c@(Chunk xs)
    | nullC xs    = liftI check
    | otherwise   = idone False c
  check s@(EOF _) = idone True s
{-# INLINE isFinished #-}

-- ------------------------------------------------------------------------
-- Primitive iteratees

-- |Read a stream to the end and return all of its elements as a list.
-- This iteratee returns all data from the stream *strictly*.
stream2list :: Monad m => Iteratee [el] m [el]
stream2list = liftM concat getChunks
{-# INLINE stream2list #-}

-- |Read a stream to the end and return all of its elements as a stream.
-- This iteratee returns all data from the stream *strictly*.
stream2stream :: (Monad m, Nullable s, Monoid s) => Iteratee s m s
stream2stream = liftM mconcat getChunks
{-# INLINE stream2stream #-}


-- ------------------------------------------------------------------------
-- Parser combinators

-- |Attempt to read the next element of the stream and return it
-- Raise a (recoverable) error if the stream is terminated.
--
-- The analogue of @List.head@
--
-- Because @head@ can raise an error, it shouldn't be used when constructing
-- iteratees for @convStream@.  Use @tryHead@ instead.
headStream :: Iteratee [el] m el
headStream = liftI step
  where
  step (Chunk [     ]) = icont step Nothing
  step (Chunk (hd:tl)) = idone hd (Chunk tl)
  step stream          = icont step (Just (setEOF stream))
{-# INLINE headStream #-}

-- | Similar to @headStream@, except it returns @Nothing@ if the stream
-- is terminated.
tryHead :: Iteratee [el] m (Maybe el)
tryHead = liftI step
  where
  step (Chunk [     ]) = liftI step
  step (Chunk (hd:tl)) = idone (Just hd) (Chunk tl)
  step stream          = idone Nothing stream
{-# INLINE tryHead #-}

-- |Attempt to read the last element of the stream and return it
-- Raise a (recoverable) error if the stream is terminated
--
-- The analogue of @List.last@
lastStream :: Iteratee [el] m el
lastStream = liftI (step Nothing)
  where
  step l (Chunk xs)
    | nullC xs     = liftI (step l)
    | otherwise    = liftI $ step (Just $ last xs)
  step l s@(EOF _) = case l of
    Nothing -> icont (step l) . Just . setEOF $ s
    Just x  -> idone x s
{-# INLINE lastStream #-}


-- |Given a sequence of characters, attempt to match them against
-- the characters on the stream.  Return the count of how many
-- characters matched.  The matched characters are removed from the
-- stream.
-- For example, if the stream contains 'abd', then (heads 'abc')
-- will remove the characters 'ab' and return 2.
heads :: (Monad m, Eq el) => [el] -> Iteratee [el] m Int
heads st | nullC st = return 0
heads st = loopE 0 st
  where
  loopE cnt xs
    | nullC xs  = return cnt
    | otherwise = liftI (step cnt xs)
  step cnt str (Chunk [])          = liftI (step cnt str)
  step cnt [ ] stream              = idone cnt stream
  step cnt (y:ys) s@(Chunk (x:xs))
    | y == x    = step (succ cnt) ys (Chunk xs)
    | otherwise = idone cnt s
  step cnt _ stream         = idone cnt stream
{-# INLINE heads #-}


-- |Look ahead at the next element of the stream, without removing
-- it from the stream.
-- Return @Just c@ if successful, return @Nothing@ if the stream is
-- terminated by 'EOF'.
peekStream :: Iteratee [el] m (Maybe el)
peekStream = liftI step
  where
    step   (Chunk [   ]) = liftI step
    step s@(Chunk (x:_)) = idone (Just x) s
    step stream          = idone Nothing stream
{-# INLINE peekStream #-}

-- | Return a chunk of @t@ elements length while consuming @d@ elements
--   from the stream.  Useful for creating a 'rolling average' with
--  'convStream'.
roll
  :: Monad m
  => Int  -- ^ length of chunk (t)
  -> Int  -- ^ amount to consume (d)
  -> Iteratee [el] m [[el]]
roll t d | t > d  = liftI step
  where
    step (Chunk vec)
      | length vec >= t =
          idone [take t vec] (Chunk $ drop d vec)
      | null vec        = liftI step
      | otherwise          = liftI (step' vec)
    step stream            = idone empty stream
    step' v1 (Chunk vec)   = step . Chunk $ v1 `mappend` vec
    step' v1 stream        = idone [v1] stream
roll t d = do r <- joinI (takeStream t stream2stream)
              dropStream (d-t)
              return [r]
  -- d is >= t, so this version works
{-# INLINE roll #-}


-- |Drop n elements of the stream, if there are that many.
--
-- The analogue of @List.drop@
dropStream :: Int -> Iteratee [el] m ()
dropStream 0  = idone () (Chunk emptyP)
dropStream n' = liftI (step n')
  where
    step n (Chunk str)
      | length str < n = liftI (step (n - length str))
      | otherwise         = idone () (Chunk (drop n str))
    step _ stream         = idone () stream
{-# INLINE dropStream #-}

-- |Skip all elements while the predicate is true.
--
-- The analogue of @List.dropWhile@
dropWhileStream :: (el -> Bool) -> Iteratee [el] m ()
dropWhileStream p = liftI step
  where
    step (Chunk str)
      | null rest = liftI step
      | otherwise    = idone () (Chunk rest)
      where
        rest = dropWhile p str
    step stream      = idone () stream
{-# INLINE dropWhileStream #-}


-- | Return the total length of the remaining part of the stream.
--
-- This forces evaluation of the entire stream.
--
-- The analogue of @List.length@
lengthStream :: Num a => Iteratee [el] m a
lengthStream = liftI (step 0)
  where
    step !i (Chunk xs) = liftI (step $ i + fromIntegral (length xs))
    step !i stream     = idone i stream
{-# INLINE lengthStream #-}

-- | Get the length of the current chunk, or @Nothing@ if 'EOF'.
--
-- This function consumes no input.
chunkLength :: Iteratee [el] m (Maybe Int)
chunkLength = liftI step
 where
  step s@(Chunk xs) = idone (Just $ length xs) s
  step stream       = idone Nothing stream
{-# INLINE chunkLength #-}

-- | Take @n@ elements from the current chunk, or the whole chunk if
-- @n@ is greater.
takeFromChunk :: Int -> Iteratee [el] m [el]
takeFromChunk n | n <= 0 = idone emptyP (Chunk emptyP)
takeFromChunk n = liftI step
 where
  step (Chunk xs) = let (h,t) = splitAt n xs in idone h $ Chunk t
  step stream     = idone emptyP stream
{-# INLINE takeFromChunk #-}

-- |Takes an element predicate and returns the (possibly empty) prefix of
-- the stream.  None of the characters in the string satisfy the character
-- predicate.
-- If the stream is not terminated, the first character of the remaining stream
-- satisfies the predicate.
--
-- N.B. 'breakE' should be used in preference to @breakStream@.
-- @breakStream@ will retain all data until the predicate is met, which may
-- result in a space leak.
--
-- The analogue of @List.break@

breakStream :: (el -> Bool) -> Iteratee [el] m [el]
breakStream cpred = icont (step mempty) Nothing
  where
    step bfr (Chunk str)
      | null str          =  icont (step bfr) Nothing
      | otherwise         =  case break cpred str of
        (str', tail')
          | null tail'    -> icont (step (bfr `mappend` str)) Nothing
          | otherwise     -> idone (bfr `mappend` str') (Chunk tail')
    step bfr stream       =  idone bfr stream
{-# INLINE breakStream #-}

-- ---------------------------------------------------
-- The converters show a different way of composing two iteratees:
-- `vertical' rather than `horizontal'

-- |Takes an element predicate and an iteratee, running the iteratee
-- on all elements of the stream until the predicate is met.
--
-- the following rule relates @break@ to @breakE@
-- @break@ pred === @joinI@ (@breakE@ pred stream2stream)
--
-- @breakE@ should be used in preference to @break@ whenever possible.
breakE :: (el -> Bool) -> Enumeratee [el] [el] m a
breakE cpred = eneeCheckIfDonePass (icont . step)
 where
  step k (Chunk s)
      | null s  = liftI (step k)
      | otherwise  = case break cpred s of
        (str', tail')
          | null tail'    -> eneeCheckIfDonePass (icont . step) . k $ Chunk str'
          | otherwise     -> idone (k $ Chunk str') (Chunk tail')
  step k stream           =  idone (liftI k) stream
{-# INLINE breakE #-}

-- |Read n elements from a stream and apply the given iteratee to the
-- stream of the read elements. Unless the stream is terminated early, we
-- read exactly n elements, even if the iteratee has accepted fewer.
--
-- The analogue of @List.take@
takeStream ::
  Monad m
  => Int   -- ^ number of elements to consume
  -> Enumeratee [el] [el] m a
takeStream n' iter
 | n' <= 0   = return iter
 | otherwise = Iteratee $ \od oc -> runIter iter (on_done od oc) (on_cont od oc)
  where
    on_done od oc x _ = runIter (dropStream n' >> return (return x)) od oc
    on_cont od oc k Nothing = if n' == 0 then od (liftI k) (Chunk mempty)
                                 else runIter (liftI (step n' k)) od oc
    on_cont od oc _ (Just e) = runIter (dropStream n' >> throwErr e) od oc
    step n k (Chunk str)
      | null str           = liftI (step n k)
      | length str <= n    = takeStream (n - length str) $ k (Chunk str)
      | otherwise          = idone (k (Chunk s1)) (Chunk s2)
      where (s1, s2) = splitAt n str
    step _n k stream       = idone (liftI k) stream
{-# INLINE takeStream #-}

-- |Read n elements from a stream and apply the given iteratee to the
-- stream of the read elements. If the given iteratee accepted fewer
-- elements, we stop.
-- This is the variation of 'takeStream' with the early termination
-- of processing of the outer stream once the processing of the inner stream
-- finished early.
--
-- Iteratees composed with 'takeUpTo' will consume only enough elements to
-- reach a done state.  Any remaining data will be available in the outer
-- stream.
--
-- > > let iter = do
-- > h <- joinI $ takeUpTo 5 I.head
-- > t <- stream2list
-- > return (h,t)
-- >
-- > > enumPureNChunk [1..10::Int] 3 iter >>= run >>= print
-- > (1,[2,3,4,5,6,7,8,9,10])
-- >
-- > > enumPureNChunk [1..10::Int] 7 iter >>= run >>= print
-- > (1,[2,3,4,5,6,7,8,9,10])
--
-- in each case, @I.head@ consumes only one element, returning the remaining
-- 4 elements to the outer stream
takeUpTo :: Monad m => Int -> Enumeratee [el] [el] m a
takeUpTo i iter
 | i <= 0    = idone iter (Chunk emptyP)
 | otherwise = Iteratee $ \od oc ->
    runIter iter (onDone od oc) (onCont od oc)
  where
    onDone od oc x str      = runIter (idone (return x) str) od oc
    onCont od oc k Nothing  = if i == 0 then od (liftI k) (Chunk mempty)
                                 else runIter (liftI (step i k)) od oc
    onCont od oc _ (Just e) = runIter (throwErr e) od oc
    step n k (Chunk str)
      | null str       = liftI (step n k)
      | length str < n = takeUpTo (n - length str) $ k (Chunk str)
      | otherwise      =
         -- check to see if the inner iteratee has completed, and if so,
         -- grab any remaining stream to put it in the outer iteratee.
         -- the outer iteratee is always complete at this stage, although
         -- the inner may not be.
         let (s1, s2) = splitAt n str
         in Iteratee $ \od' _ -> do
              res <- runIter (k (Chunk s1)) (\a s  -> return $ Left  (a, s))
                                            (\k' e -> return $ Right (k',e))
              case res of
                Left (a,Chunk s1') -> od' (return a)
                                          (Chunk $ s1' ++ s2)
                Left  (a,s')       -> od' (idone a s') (Chunk s2)
                Right (k',e)       -> od' (icont k' e) (Chunk s2)
    step _ k stream       = idone (liftI k) stream
{-# INLINE takeUpTo #-}


-- |Takes an element predicate and an iteratee, running the iteratee
-- on all elements of the stream while the predicate is met.
--
-- This is preferred to @takeWhile@.
takeWhileE :: (el -> Bool) -> Enumeratee [el] [el] m a
takeWhileE = breakE . (not .)
{-# INLINEABLE takeWhileE #-}

-- | Map a function over an 'Iteratee'.
-- This one is reimplemented and differs from the the one in
-- "Data.Iteratee.ListLike" in so far that it doesn't pass on an 'EOF'
-- received in the input, which is the expected behavior.
mapStream :: (el -> el') -> Enumeratee [el] [el'] m a
mapStream = mapChunks . map
{-# INLINE mapStream #-}

-- | Apply a function to the elements of a stream, concatenate the
-- results into a stream.  No giant intermediate list is produced.
concatMapStream :: Monoid t => (a -> t) -> Enumeratee [a] t m r
concatMapStream = mapChunks . foldMap
{-# INLINE concatMapStream #-}

-- | Apply a monadic function to the elements of a stream, concatenate
-- the results into a stream.  No giant intermediate list is produced.
concatMapStreamM :: Monad m => (a -> m t) -> Enumeratee [a] t m r
concatMapStreamM f = eneeCheckIfDone (liftI . go)
  where
    go k (EOF   mx)              = idone (liftI k) (EOF mx)
    go k (Chunk xs) | null xs    = liftI (go k)
                    | otherwise  = f (head xs) `mBind`
                                   eneeCheckIfDone (flip go (Chunk (tail xs))) . k . Chunk
{-# INLINE concatMapStreamM #-}

mapMaybeStream :: (a -> Maybe b) -> Enumeratee [a] [b] m r
mapMaybeStream = mapChunks . mapMaybe
{-# INLINE mapMaybeStream #-}

-- |Creates an 'enumeratee' with only elements from the stream that
-- satisfy the predicate function.  The outer stream is completely consumed.
--
-- The analogue of @List.filter@
filterStream :: (el -> Bool) -> Enumeratee [el] [el] m a
filterStream p = mapChunks (filter p)
{-# INLINE filterStream #-}

-- | Apply a monadic filter predicate to an 'Iteratee'.
filterStreamM :: Monad m => (a -> m Bool) -> Enumeratee [a] [a] m r
filterStreamM k = mapChunksM (go id)
  where
    go acc [   ] = return $! acc empty
    go acc (h:t) = do p <- k h
                      let acc' = if p then (:) h . acc else acc
                      go acc' t
{-# INLINE filterStreamM #-}

-- | Grouping on 'Iteratee's.  @groupStreamOn proj inner outer@ executes
-- @inner (proj e)@, where @e@ is the first input element, to obtain an
-- 'Iteratee' @i@, then passes elements @e@ to @i@ as long as @proj e@
-- produces the same result.  If @proj e@ changes or the input ends, the
-- pair of @proj e@ and the result of @run i@ is passed to @outer@.  At
-- end of input, the resulting @outer@ is returned.
groupStreamOn :: (Monad m, Eq t1)
              => (e -> t1)
              -> (t1 -> m (Iteratee [e] m t2))
              -> Enumeratee [e] [(t1, t2)] m a
groupStreamOn proj inner = eneeCheckIfDonePass (icont . step)
  where
    step outer   (EOF      mx) = idone (liftI outer) $ EOF mx
    step outer   (Chunk [   ]) = liftI $ step outer
    step outer c@(Chunk (h:_)) = let x = proj h
                                 in lift (inner x) >>= \i -> step' x i outer c

    -- We want to feed a 'Chunk' to the inner 'Iteratee', which might be
    -- finished.  In that case, we would want to abort, but we cannot,
    -- since the outer iteration is still going on.  So instead we
    -- discard data we would have fed to the inner 'Iteratee'.  (Use of
    -- 'enumPure1Chunk' is not appropriate, it would accumulate the
    -- data, just to have it discarded by the 'run' that eventually
    -- happens.

    step' c it outer (Chunk as)
        | null as = liftI $ step' c it outer
        | (l,r) <- span ((==) c . proj) as, not (null l) =
            let od a    _str = idoneM a $ EOF Nothing
                oc k Nothing = return $ k (Chunk l)
                oc k       m = icontM k m
            in lift (runIter it od oc) >>= \it' -> step' c it' outer (Chunk r)

    step' c it outer str =
        lift (run it) >>= \b -> eneeCheckIfDone (`step` str) . outer $ Chunk [(c,b)]


-- | Grouping on 'Iteratee's.  @groupStreamBy cmp inner outer@ executes
-- @inner@ to obtain an 'Iteratee' @i@, then passes elements @e@ to @i@
-- as long as @cmp e0 e@, where @e0@ is some preceeding element, is
-- true.  Else, the result of @run i@ is passed to @outer@ and
-- 'groupStreamBy' restarts.  At end of input, the resulting @outer@ is
-- returned.
groupStreamBy :: Monad m
              => (t -> t -> Bool)
              -> m (Iteratee [t] m t2)
              -> Enumeratee [t] [t2] m a
groupStreamBy cmp inner = eneeCheckIfDonePass (icont . step)
  where
    step outer   (EOF      mx) = idone (liftI outer) $ EOF mx
    step outer   (Chunk [   ]) = liftI $ step outer
    step outer c@(Chunk (h:_)) = lift inner >>= \i -> step' h i outer c

    step' c it outer (Chunk as)
        | null as = liftI $ step' c it outer
        | (l,r) <- span (cmp c) as, not (null l) =
            let od a    _str = idoneM a $ EOF Nothing
                oc k Nothing = return $ k (Chunk l)
                oc k       m = icontM k m
            in lift (runIter it od oc) >>= \it' -> step' (head l) it' outer (Chunk r)

    step' _ it outer str =
        lift (run it) >>= \b -> eneeCheckIfDone (`step` str) . outer $ Chunk [b]


-- | @mergeStreams@ offers another way to nest iteratees: as a monad stack.
-- This allows for the possibility of interleaving data from multiple
-- streams.
--
-- > -- print each element from a stream of lines.
-- > logger :: (MonadIO m) => Iteratee [ByteString] m ()
-- > logger = mapStreamM_ (liftIO . putStrLn . B.unpack)
-- >
-- > -- combine alternating lines from two sources
-- > -- To see how this was derived, follow the types from
-- > -- 'ileaveLines logger' and work outwards.
-- > run =<< enumFile 10 "file1" (joinI $ enumLinesBS $
-- >           ( enumFile 10 "file2" . joinI . enumLinesBS $ joinI
-- >                 (ileaveLines logger)) >>= run)
-- >
-- > ileaveLines :: (Functor m, Monad m)
-- >   => Enumeratee [ByteString] [ByteString] (Iteratee [ByteString] m)
-- >        [ByteString]
-- > ileaveLines = mergeStreams (\l1 l2 ->
-- >    [B.pack "f1:\n\t" ,l1 ,B.pack "f2:\n\t" ,l2 ]
-- >
-- >
--
mergeStreams :: Monad m => (el1 -> el2 -> b) -> Enumeratee [el2] b (Iteratee [el1] m) a
mergeStreams f = convStream $ liftM2 f (lift headStream) headStream
{-# INLINE mergeStreams #-}

-- | A version of mergeStreams which operates on chunks instead of
-- elements.
--
-- mergeByChunks offers more control than 'mergeStreams'.
-- 'mergeStreams' terminates when the first stream terminates, however
-- mergeByChunks will continue until both streams are exhausted.
--
-- 'mergeByChunks' guarantees that both chunks passed to the merge
-- function will have the same number of elements, although that number
-- may vary between calls.
mergeByChunks ::
  Monad m
  => ([el1] -> [el2] -> c3)  -- ^ merge function
  -> ([el1] -> c3)
  -> ([el2] -> c3)
  -> Enumeratee [el2] c3 (Iteratee [el1] m) a
mergeByChunks f f1 f2 = unfoldConvStream iter (0 :: Int)
 where
  iter 1 = (\x -> (1,f1 x)) `liftM` lift getChunk
  iter 2 = (\x -> (2,f2 x)) `liftM` getChunk
  iter _ = do
    ml1 <- lift chunkLength
    ml2 <- chunkLength
    case (ml1, ml2) of
      (Just l1, Just l2) -> do
        let tval = min l1 l2
        c1 <- lift $ takeFromChunk tval
        c2 <- takeFromChunk tval
        return (0, f c1 c2)
      (Just _, Nothing) -> iter 1
      (Nothing, _)      -> iter 2
{-# INLINE mergeByChunks #-}

-- ------------------------------------------------------------------------
-- Folds

-- | Left-associative fold that is strict in the accumulator.
-- This function should be used in preference to 'foldl' whenever possible.
--
-- The analogue of @List.foldl'@.
foldStream :: (a -> el -> a) -> a -> Iteratee [el] m a
foldStream f i = liftI (step i)
  where
    step acc (Chunk xs)
      | null xs = liftI (step acc)
      | otherwise  = liftI (step $! foldl' f acc xs)
    step acc stream = idone acc stream
{-# INLINE foldStream #-}

-- ------------------------------------------------------------------------
-- Zips

-- |Enumerate two iteratees over a single stream simultaneously.
--
-- Compare to @List.zip@.
zipStreams
  :: Monad m
  => Iteratee [el] m a
  -> Iteratee [el] m b
  -> Iteratee [el] m (a, b)
zipStreams x0 y0 = do
    -- need to check if both iteratees are initially finished.  If so,
    -- we don't want to push a chunk which will be dropped
    (a', x') <- lift $ runIter x0 od oc
    (b', y') <- lift $ runIter y0 od oc
    case checkDone a' b' of
      Just (Right (a,b,s))  -> idone (a,b) s  -- 's' may be EOF, needs to stay
      Just (Left (Left a))  -> liftM ((,) a) y'
      Just (Left (Right b)) -> liftM (flip (,) b) x'
      Nothing               -> liftI (step x' y')
  where
    step x y (Chunk xs) | nullC xs = liftI (step x y)
    step x y (Chunk xs) = do
      (a', x') <- lift $ (\i -> runIter i od oc) =<< enumPure1Chunk xs x
      (b', y') <- lift $ (\i -> runIter i od oc) =<< enumPure1Chunk xs y
      case checkDone a' b' of
        Just (Right (a,b,s))  -> idone (a,b) s
        Just (Left (Left a))  -> liftM ((,) a) y'
        Just (Left (Right b)) -> liftM (flip (,) b) x'
        Nothing               -> liftI (step x' y')
    step x y (EOF err) = joinIM $ case err of
      Nothing -> (liftM2.liftM2) (,) (enumEof   x) (enumEof   y)
      Just e  -> (liftM2.liftM2) (,) (enumErr e x) (enumErr e y)

    od a s = return (Just (a, s), idone a s)
    oc k e = return (Nothing    , icont k e)

    checkDone r1 r2 = case (r1, r2) of
      (Just (a, s1), Just (b,s2)) -> Just $ Right (a, b, shorter s1 s2)
      (Just (a, _), Nothing)      -> Just . Left $ Left a
      (Nothing, Just (b, _))      -> Just . Left $ Right b
      (Nothing, Nothing)          -> Nothing

    shorter c1@(Chunk xs) c2@(Chunk ys)
      | length xs < length ys = c1
      | otherwise                   = c2
    shorter e@(EOF _)  _         = e
    shorter _          e@(EOF _) = e
{-# INLINE zipStreams #-}

zipStreams3
  :: Monad m
  => Iteratee [el] m a -> Iteratee [el] m b
  -> Iteratee [el] m c -> Iteratee [el] m (a, b, c)
zipStreams3 a b c = zipStreams a (zipStreams b c) >>=
  \(r1, (r2, r3)) -> return (r1, r2, r3)
{-# INLINE zipStreams3 #-}

zipStreams4
  :: Monad m
  => Iteratee [el] m a -> Iteratee [el] m b
  -> Iteratee [el] m c -> Iteratee [el] m d
  -> Iteratee [el] m (a, b, c, d)
zipStreams4 a b c d = zipStreams a (zipStreams3 b c d) >>=
  \(r1, (r2, r3, r4)) -> return (r1, r2, r3, r4)
{-# INLINE zipStreams4 #-}

zipStreams5
  :: Monad m
  => Iteratee [el] m a -> Iteratee [el] m b
  -> Iteratee [el] m c -> Iteratee [el] m d
  -> Iteratee [el] m e -> Iteratee [el] m (a, b, c, d, e)
zipStreams5 a b c d e = zipStreams a (zipStreams4 b c d e) >>=
  \(r1, (r2, r3, r4, r5)) -> return (r1, r2, r3, r4, r5)
{-# INLINE zipStreams5 #-}

-- | Enumerate over two iteratees in parallel as long as the first iteratee
-- is still consuming input.  The second iteratee will be terminated with EOF
-- when the first iteratee has completed.  An example use is to determine
-- how many elements an iteratee has consumed:
--
-- > snd <$> enumWith (dropWhile (<5)) length
--
-- Compare to @zipStreams@
enumWith
  :: Monad m
  => Iteratee [el] m a
  -> Iteratee [el] m b
  -> Iteratee [el] m (a, b)
enumWith i1 i2 = do
    -- as with zipStreams, first check to see if the initial iteratee is complete,
    -- otherwise data would be dropped.
    -- running the second iteratee as well to prevent a monadic effect mismatch
    -- although I think that would be highly unlikely to happen in common
    -- code
    (a', x') <- lift $ runIter i1 od oc
    (_,  y') <- lift $ runIter i2 od oc
    case a' of
      Just (a, s) -> flip idone s =<< lift (liftM ((,) a) $ run i2)
      Nothing     -> go x' y'
  where
    od a s = return (Just (a, s), idone a s)
    oc k e = return (Nothing    , icont k e)

    getUsed xs (Chunk ys) = take (length xs - length ys) xs
    getUsed xs (EOF _)    = xs

    go x y = liftI step
      where
        step (Chunk xs) | nullC xs = liftI step
        step (Chunk xs) = do
          (a', x') <- lift $ (\i -> runIter i od oc) =<< enumPure1Chunk xs x
          case a' of
            Just (a, s) -> do
              b <- lift $ run =<< enumPure1Chunk (getUsed xs s) y
              idone (a, b) s
            Nothing        -> lift (enumPure1Chunk xs y) >>= go x'
        step (EOF err) = joinIM $ case err of
          Nothing -> (liftM2.liftM2) (,) (enumEof   x) (enumEof   y)
          Just e  -> (liftM2.liftM2) (,) (enumErr e x) (enumErr e y)
{-# INLINE enumWith #-}

-- |Enumerate a list of iteratees over a single stream simultaneously
-- and discard the results. This is a different behavior than Prelude's
-- sequence_ which runs iteratees in the list one after the other.
--
-- Compare to @Prelude.sequence_@.
sequenceStreams_
  :: Monad m
  => [Iteratee [el] m a]
  -> Iteratee [el] m ()
sequenceStreams_ = self
  where
    self is = liftI step
      where
        step (Chunk xs) | null xs = liftI step
        step s@(Chunk _) = do
          -- give a chunk to each iteratee
          is'  <- lift $ mapM (enumChunk s) is
          -- filter done iteratees
          (done, notDone) <- lift $ partition fst `liftM` mapM enumCheckIfDone is'
          if null notDone
            then idone () <=< remainingStream $ map snd done
            else self $ map snd notDone
        step s@(EOF _) = do
          s' <- remainingStream <=< lift $ mapM (enumChunk s) is
          case s' of
            EOF (Just e) -> throwErr e
            _            -> idone () s'

    -- returns the unconsumed part of the stream; "sequenceStreams_ is" consumes as
    -- much of the stream as the iteratee in is that consumes the most; e.g.
    -- sequenceStreams_ [I.head, I.last] consumes whole stream
    remainingStream :: Monad m => [Iteratee [el] m a] -> Iteratee [el] m (Stream [el])
    remainingStream is = lift $
      return . foldl1 shorter <=< mapM (\i -> runIter i od oc) $ is
      where
        od _ s = return s
        oc _ e = return $ case e of
          Nothing -> mempty
          _       -> EOF e

    -- return the shorter one of two streams; errors are propagated with the
    -- priority given to the "left"
    shorter c1@(Chunk xs) c2@(Chunk ys)
      | length xs < length ys = c1
      | otherwise                   = c2
    shorter (EOF e1 ) (EOF e2 ) = EOF (e1 `mplus` e2)
    shorter e@(EOF _) _         = e
    shorter _         e@(EOF _) = e

-- |Transform an iteratee into one that keeps track of how much data it
-- consumes.
countConsumed :: (Monad m, Integral n) => Iteratee [el] m a -> Iteratee [el] m (a, n)
countConsumed i = go 0 (const i) (Chunk emptyP)
  where
    go !n f str@(EOF _) = flip (,) n `liftM` f str
    go !n f str@(Chunk c) = Iteratee rI
      where
        newLen = n + fromIntegral (length c)
        rI od oc = runIter (f str) onDone onCont
          where
            onDone a str'@(Chunk c') =
                od (a, newLen - fromIntegral (length c')) str'
            onDone a str'@(EOF _) = od (a, n) str'
            onCont f' = oc (go newLen f')
{-# INLINE countConsumed #-}

-- ------------------------------------------------------------------------
-- Enumerators

-- |The pure n-chunk enumerator
-- It passes a given stream of elements to the iteratee in @n@-sized chunks.
enumPureNChunk :: Monad m => [el] -> Int -> Enumerator [el] m a
enumPureNChunk str n iter
  | null str = return iter
  | n > 0       = enum' str iter
  | otherwise   = error $ "enumPureNChunk called with n==" ++ show n
  where
    enum' str' iter'
      | null str' = return iter'
      | otherwise    = let (s1, s2) = splitAt n str'
                           on_cont k Nothing = enum' s2 . k $ Chunk s1
                           on_cont k e = return $ icont k e
                       in runIter iter' idoneM on_cont
{-# INLINE enumPureNChunk #-}

-- ------------------------------------------------------------------------
-- Monadic functions

-- | Maps a monadic function over the elements of the stream and ignores
-- the result.
mapStreamM_ :: Monad m => (el -> m b) -> Iteratee [el] m ()
mapStreamM_ = mapChunksM_ . mapM_
{-# INLINE mapStreamM_ #-}

-- | Maps a monadic function over an 'Iteratee'.
mapStreamM :: Monad m => (el -> m el') -> Enumeratee [el] [el'] m a
mapStreamM = mapChunksM . mapM
{-# INLINE mapStreamM #-}

-- | Folds a monadic function over an 'Iteratee'.
foldStreamM :: Monad m => (b -> a -> m b) -> b -> Iteratee [a] m b
foldStreamM = foldChunksM . foldM
{-# INLINE foldStreamM #-}