{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE Trustworthy #-}
module Data.Conduit.Internal.List.Stream where
import Control.Monad (liftM)
import Data.Conduit.Internal.Fusion
import qualified Data.Foldable as F
unfoldS :: Monad m
=> (b -> Maybe (a, b))
-> b
-> StreamProducer m a
unfoldS f s0 _ =
Stream step (return s0)
where
step s = return $
case f s of
Nothing -> Stop ()
Just (x, s') -> Emit s' x
{-# INLINE unfoldS #-}
unfoldEitherS :: Monad m
=> (b -> Either r (a, b))
-> b
-> StreamConduitT i a m r
unfoldEitherS f s0 _ =
Stream step (return s0)
where
step s = return $
case f s of
Left r -> Stop r
Right (x, s') -> Emit s' x
{-# INLINE unfoldEitherS #-}
unfoldMS :: Monad m
=> (b -> m (Maybe (a, b)))
-> b
-> StreamProducer m a
unfoldMS f s0 _ =
Stream step (return s0)
where
step s = do
ms' <- f s
return $ case ms' of
Nothing -> Stop ()
Just (x, s') -> Emit s' x
{-# INLINE unfoldMS #-}
unfoldEitherMS :: Monad m
=> (b -> m (Either r (a, b)))
-> b
-> StreamConduitT i a m r
unfoldEitherMS f s0 _ =
Stream step (return s0)
where
step s = do
ms' <- f s
return $ case ms' of
Left r -> Stop r
Right (x, s') -> Emit s' x
{-# INLINE unfoldEitherMS #-}
sourceListS :: Monad m => [a] -> StreamProducer m a
sourceListS xs0 _ =
Stream (return . step) (return xs0)
where
step [] = Stop ()
step (x:xs) = Emit xs x
{-# INLINE sourceListS #-}
enumFromToS :: (Enum a, Prelude.Ord a, Monad m)
=> a
-> a
-> StreamProducer m a
enumFromToS x0 y _ =
Stream step (return x0)
where
step x = return $ if x Prelude.> y
then Stop ()
else Emit (Prelude.succ x) x
{-# INLINE [0] enumFromToS #-}
enumFromToS_int :: (Prelude.Integral a, Monad m)
=> a
-> a
-> StreamProducer m a
enumFromToS_int x0 y _ = x0 `seq` y `seq` Stream step (return x0)
where
step x | x <= y = return $ Emit (x Prelude.+ 1) x
| otherwise = return $ Stop ()
{-# INLINE enumFromToS_int #-}
{-# RULES "conduit: enumFromTo<Int>" forall f t.
enumFromToS f t = enumFromToS_int f t :: Monad m => StreamProducer m Int
#-}
iterateS :: Monad m => (a -> a) -> a -> StreamProducer m a
iterateS f x0 _ =
Stream (return . step) (return x0)
where
step x = Emit x' x
where
x' = f x
{-# INLINE iterateS #-}
replicateS :: Monad m => Int -> a -> StreamProducer m a
replicateS cnt0 a _ =
Stream step (return cnt0)
where
step cnt
| cnt <= 0 = return $ Stop ()
| otherwise = return $ Emit (cnt - 1) a
{-# INLINE replicateS #-}
replicateMS :: Monad m => Int -> m a -> StreamProducer m a
replicateMS cnt0 ma _ =
Stream step (return cnt0)
where
step cnt
| cnt <= 0 = return $ Stop ()
| otherwise = Emit (cnt - 1) `liftM` ma
{-# INLINE replicateMS #-}
foldS :: Monad m => (b -> a -> b) -> b -> StreamConsumer a m b
foldS f b0 (Stream step ms0) =
Stream step' (liftM (b0, ) ms0)
where
step' (!b, s) = do
res <- step s
return $ case res of
Stop () -> Stop b
Skip s' -> Skip (b, s')
Emit s' a -> Skip (f b a, s')
{-# INLINE foldS #-}
foldMS :: Monad m => (b -> a -> m b) -> b -> StreamConsumer a m b
foldMS f b0 (Stream step ms0) =
Stream step' (liftM (b0, ) ms0)
where
step' (!b, s) = do
res <- step s
case res of
Stop () -> return $ Stop b
Skip s' -> return $ Skip (b, s')
Emit s' a -> do
b' <- f b a
return $ Skip (b', s')
{-# INLINE foldMS #-}
mapM_S :: Monad m
=> (a -> m ())
-> StreamConsumer a m ()
mapM_S f (Stream step ms0) =
Stream step' ms0
where
step' s = do
res <- step s
case res of
Stop () -> return $ Stop ()
Skip s' -> return $ Skip s'
Emit s' x -> f x >> return (Skip s')
{-# INLINE [1] mapM_S #-}
dropS :: Monad m
=> Int
-> StreamConsumer a m ()
dropS n0 (Stream step ms0) =
Stream step' (liftM (, n0) ms0)
where
step' (_, n) | n <= 0 = return $ Stop ()
step' (s, n) = do
res <- step s
return $ case res of
Stop () -> Stop ()
Skip s' -> Skip (s', n)
Emit s' _ -> Skip (s', n - 1)
{-# INLINE dropS #-}
takeS :: Monad m
=> Int
-> StreamConsumer a m [a]
takeS n0 (Stream step s0) =
Stream step' (liftM (id, n0,) s0)
where
step' (output, n, _) | n <= 0 = return $ Stop (output [])
step' (output, n, s) = do
res <- step s
return $ case res of
Stop () -> Stop (output [])
Skip s' -> Skip (output, n, s')
Emit s' x -> Skip (output . (x:), n - 1, s')
{-# INLINE takeS #-}
headS :: Monad m => StreamConsumer a m (Maybe a)
headS (Stream step s0) =
Stream step' s0
where
step' s = do
res <- step s
return $ case res of
Stop () -> Stop Nothing
Skip s' -> Skip s'
Emit _ x -> Stop (Just x)
{-# INLINE headS #-}
mapS :: Monad m => (a -> b) -> StreamConduit a m b
mapS f (Stream step ms0) =
Stream step' ms0
where
step' s = do
res <- step s
return $ case res of
Stop r -> Stop r
Emit s' a -> Emit s' (f a)
Skip s' -> Skip s'
{-# INLINE mapS #-}
mapMS :: Monad m => (a -> m b) -> StreamConduit a m b
mapMS f (Stream step ms0) =
Stream step' ms0
where
step' s = do
res <- step s
case res of
Stop r -> return $ Stop r
Emit s' a -> Emit s' `liftM` f a
Skip s' -> return $ Skip s'
{-# INLINE mapMS #-}
iterMS :: Monad m => (a -> m ()) -> StreamConduit a m a
iterMS f (Stream step ms0) =
Stream step' ms0
where
step' s = do
res <- step s
case res of
Stop () -> return $ Stop ()
Skip s' -> return $ Skip s'
Emit s' x -> f x >> return (Emit s' x)
{-# INLINE iterMS #-}
mapMaybeS :: Monad m => (a -> Maybe b) -> StreamConduit a m b
mapMaybeS f (Stream step ms0) =
Stream step' ms0
where
step' s = do
res <- step s
return $ case res of
Stop () -> Stop ()
Skip s' -> Skip s'
Emit s' x ->
case f x of
Just y -> Emit s' y
Nothing -> Skip s'
{-# INLINE mapMaybeS #-}
mapMaybeMS :: Monad m => (a -> m (Maybe b)) -> StreamConduit a m b
mapMaybeMS f (Stream step ms0) =
Stream step' ms0
where
step' s = do
res <- step s
case res of
Stop () -> return $ Stop ()
Skip s' -> return $ Skip s'
Emit s' x -> do
my <- f x
case my of
Just y -> return $ Emit s' y
Nothing -> return $ Skip s'
{-# INLINE mapMaybeMS #-}
catMaybesS :: Monad m => StreamConduit (Maybe a) m a
catMaybesS (Stream step ms0) =
Stream step' ms0
where
step' s = do
res <- step s
return $ case res of
Stop () -> Stop ()
Skip s' -> Skip s'
Emit s' Nothing -> Skip s'
Emit s' (Just x) -> Emit s' x
{-# INLINE catMaybesS #-}
concatS :: (Monad m, F.Foldable f) => StreamConduit (f a) m a
concatS (Stream step ms0) =
Stream step' (liftM ([], ) ms0)
where
step' ([], s) = do
res <- step s
return $ case res of
Stop () -> Stop ()
Skip s' -> Skip ([], s')
Emit s' x -> Skip (F.toList x, s')
step' ((x:xs), s) = return (Emit (xs, s) x)
{-# INLINE concatS #-}
concatMapS :: Monad m => (a -> [b]) -> StreamConduit a m b
concatMapS f (Stream step ms0) =
Stream step' (liftM ([], ) ms0)
where
step' ([], s) = do
res <- step s
return $ case res of
Stop () -> Stop ()
Skip s' -> Skip ([], s')
Emit s' x -> Skip (f x, s')
step' ((x:xs), s) = return (Emit (xs, s) x)
{-# INLINE concatMapS #-}
concatMapMS :: Monad m => (a -> m [b]) -> StreamConduit a m b
concatMapMS f (Stream step ms0) =
Stream step' (liftM ([], ) ms0)
where
step' ([], s) = do
res <- step s
case res of
Stop () -> return $ Stop ()
Skip s' -> return $ Skip ([], s')
Emit s' x -> do
xs <- f x
return $ Skip (xs, s')
step' ((x:xs), s) = return (Emit (xs, s) x)
{-# INLINE concatMapMS #-}
concatMapAccumS :: Monad m => (a -> accum -> (accum, [b])) -> accum -> StreamConduit a m b
concatMapAccumS f initial (Stream step ms0) =
Stream step' (liftM (initial, [], ) ms0)
where
step' (accum, [], s) = do
res <- step s
return $ case res of
Stop () -> Stop ()
Skip s' -> Skip (accum, [], s')
Emit s' x ->
let (accum', xs) = f x accum
in Skip (accum', xs, s')
step' (accum, (x:xs), s) = return (Emit (accum, xs, s) x)
{-# INLINE concatMapAccumS #-}
mapAccumS :: Monad m => (a -> s -> (s, b)) -> s -> StreamConduitT a b m s
mapAccumS f initial (Stream step ms0) =
Stream step' (liftM (initial, ) ms0)
where
step' (accum, s) = do
res <- step s
return $ case res of
Stop () -> Stop accum
Skip s' -> Skip (accum, s')
Emit s' x ->
let (accum', r) = f x accum
in Emit (accum', s') r
{-# INLINE mapAccumS #-}
mapAccumMS :: Monad m => (a -> s -> m (s, b)) -> s -> StreamConduitT a b m s
mapAccumMS f initial (Stream step ms0) =
Stream step' (liftM (initial, ) ms0)
where
step' (accum, s) = do
res <- step s
case res of
Stop () -> return $ Stop accum
Skip s' -> return $ Skip (accum, s')
Emit s' x -> do
(accum', r) <- f x accum
return $ Emit (accum', s') r
{-# INLINE mapAccumMS #-}
concatMapAccumMS :: Monad m => (a -> accum -> m (accum, [b])) -> accum -> StreamConduit a m b
concatMapAccumMS f initial (Stream step ms0) =
Stream step' (liftM (initial, [], ) ms0)
where
step' (accum, [], s) = do
res <- step s
case res of
Stop () -> return $ Stop ()
Skip s' -> return $ Skip (accum, [], s')
Emit s' x -> do
(accum', xs) <- f x accum
return $ Skip (accum', xs, s')
step' (accum, (x:xs), s) = return (Emit (accum, xs, s) x)
{-# INLINE concatMapAccumMS #-}
mapFoldableS :: (Monad m, F.Foldable f) => (a -> f b) -> StreamConduit a m b
mapFoldableS f (Stream step ms0) =
Stream step' (liftM ([], ) ms0)
where
step' ([], s) = do
res <- step s
return $ case res of
Stop () -> Stop ()
Skip s' -> Skip ([], s')
Emit s' x -> Skip (F.toList (f x), s')
step' ((x:xs), s) = return (Emit (xs, s) x)
{-# INLINE mapFoldableS #-}
mapFoldableMS :: (Monad m, F.Foldable f) => (a -> m (f b)) -> StreamConduit a m b
mapFoldableMS f (Stream step ms0) =
Stream step' (liftM ([], ) ms0)
where
step' ([], s) = do
res <- step s
case res of
Stop () -> return $ Stop ()
Skip s' -> return $ Skip ([], s')
Emit s' x -> do
y <- f x
return $ Skip (F.toList y, s')
step' ((x:xs), s) = return (Emit (xs, s) x)
{-# INLINE mapFoldableMS #-}
consumeS :: Monad m => StreamConsumer a m [a]
consumeS (Stream step ms0) =
Stream step' (liftM (id,) ms0)
where
step' (front, s) = do
res <- step s
return $ case res of
Stop () -> Stop (front [])
Skip s' -> Skip (front, s')
Emit s' a -> Skip (front . (a:), s')
{-# INLINE consumeS #-}
groupByS :: Monad m => (a -> a -> Bool) -> StreamConduit a m [a]
groupByS f = mapS (Prelude.uncurry (:)) . groupBy1S id f
{-# INLINE groupByS #-}
groupOn1S :: (Monad m, Eq b) => (a -> b) -> StreamConduit a m (a, [a])
groupOn1S f = groupBy1S f (==)
{-# INLINE groupOn1S #-}
data GroupByState a b s
= GBStart s
| GBLoop ([a] -> [a]) a b s
| GBDone
groupBy1S :: Monad m => (a -> b) -> (b -> b -> Bool) -> StreamConduit a m (a, [a])
groupBy1S f eq (Stream step ms0) =
Stream step' (liftM GBStart ms0)
where
step' (GBStart s) = do
res <- step s
return $ case res of
Stop () -> Stop ()
Skip s' -> Skip (GBStart s')
Emit s' x0 -> Skip (GBLoop id x0 (f x0) s')
step' (GBLoop rest x0 fx0 s) = do
res <- step s
return $ case res of
Stop () -> Emit GBDone (x0, rest [])
Skip s' -> Skip (GBLoop rest x0 fx0 s')
Emit s' x
| fx0 `eq` f x -> Skip (GBLoop (rest . (x:)) x0 fx0 s')
| otherwise -> Emit (GBLoop id x (f x) s') (x0, rest [])
step' GBDone = return $ Stop ()
{-# INLINE groupBy1S #-}
isolateS :: Monad m => Int -> StreamConduit a m a
isolateS count (Stream step ms0) =
Stream step' (liftM (count,) ms0)
where
step' (n, _) | n <= 0 = return $ Stop ()
step' (n, s) = do
res <- step s
return $ case res of
Stop () -> Stop ()
Skip s' -> Skip (n, s')
Emit s' x -> Emit (n - 1, s') x
{-# INLINE isolateS #-}
filterS :: Monad m => (a -> Bool) -> StreamConduit a m a
filterS f (Stream step ms0) =
Stream step' ms0
where
step' s = do
res <- step s
return $ case res of
Stop () -> Stop ()
Skip s' -> Skip s'
Emit s' x
| f x -> Emit s' x
| otherwise -> Skip s'
sinkNullS :: Monad m => StreamConsumer a m ()
sinkNullS (Stream step ms0) =
Stream step' ms0
where
step' s = do
res <- step s
return $ case res of
Stop () -> Stop ()
Skip s' -> Skip s'
Emit s' _ -> Skip s'
{-# INLINE sinkNullS #-}
sourceNullS :: Monad m => StreamProducer m a
sourceNullS _ = Stream (\_ -> return (Stop ())) (return ())
{-# INLINE sourceNullS #-}