{-# LANGUAGE RankNTypes #-} {-# LANGUAGE BangPatterns #-} {-# LANGUAGE TupleSections #-} -- | Higher-level functions to interact with the elements of a stream. Most of -- these are based on list functions. -- -- Note that these functions all deal with individual elements of a stream as a -- sort of \"black box\", where there is no introspection of the contained -- elements. Values such as @ByteString@ and @Text@ will likely need to be -- treated specially to deal with their contents properly (@Word8@ and @Char@, -- respectively). See the "Data.Conduit.Binary" and "Data.Conduit.Text" -- modules. module Data.Conduit.List ( -- * Sources sourceList , sourceNull , unfold , unfoldM , enumFromTo , iterate , replicate , replicateM -- * Sinks -- ** Pure , fold , foldMap , take , drop , head , peek , consume , sinkNull -- ** Monadic , foldMapM , foldM , mapM_ -- * Conduits -- ** Pure , map , mapMaybe , mapFoldable , catMaybes , concat , concatMap , concatMapAccum , scanl , scan , mapAccum , groupBy , groupOn1 , isolate , filter -- ** Monadic , mapM , iterM , scanlM , scanM , mapAccumM , mapMaybeM , mapFoldableM , concatMapM , concatMapAccumM -- * Misc , sequence ) where import qualified Prelude import Prelude ( ($), return, (==), (-), Int , (.), id, Maybe (..), Monad , Bool (..) , (>>) , (>>=) , seq , otherwise , Enum, Eq , maybe , (<=) ) import Data.Monoid (Monoid, mempty, mappend) import qualified Data.Foldable as F import Data.Conduit import qualified Data.Conduit.Internal as CI import Data.Conduit.Internal.Fusion import Control.Monad (when, (<=<), liftM, void) import Control.Monad.Trans.Class (lift) -- | Generate a source from a seed value. -- -- Since 0.4.2 unfold :: Monad m => (b -> Maybe (a, b)) -> b -> Producer m a unfold f = go where go seed = case f seed of Just (a, seed') -> yield a >> go seed' Nothing -> return () -- | A monadic unfold. -- -- Since 1.1.2 unfoldM :: Monad m => (b -> m (Maybe (a, b))) -> b -> Producer m a unfoldM f = go where go seed = do mres <- lift $ f seed case mres of Just (a, seed') -> yield a >> go seed' Nothing -> return () sourceList :: Monad m => [a] -> Producer m a sourceList = Prelude.mapM_ yield -- | Enumerate from a value to a final value, inclusive, via 'succ'. -- -- This is generally more efficient than using @Prelude@\'s @enumFromTo@ and -- combining with @sourceList@ since this avoids any intermediate data -- structures. -- -- Subject to fusion -- -- Since 0.4.2 enumFromTo :: (Enum a, Prelude.Ord a, Monad m) => a -> a -> Producer m a enumFromTo x y = unstream $ streamSource $ enumFromToS x y {-# INLINE [0] enumFromTo #-} {-# RULES "unstream enumFromTo" forall x y. enumFromTo x y = unstream (streamSourcePure $ enumFromToS x y) #-} enumFromToC :: (Enum a, Prelude.Ord a, Monad m) => a -> a -> Producer m a enumFromToC x0 y = loop x0 where loop x | x Prelude.> y = return () | otherwise = yield x >> loop (Prelude.succ x) {-# INLINE [0] enumFromToC #-} enumFromToS :: (Enum a, Prelude.Ord a, Monad m) => a -> a -> Stream m a () enumFromToS x0 y = Stream step (return x0) where step x = return $ if x Prelude.> y then Stop () else Emit (Prelude.succ x) x {-# INLINE [0] enumFromToS #-} enumFromToS_int :: (Prelude.Integral a, Monad m) => a -> a -> Stream m a () enumFromToS_int x0 y = x0 `seq` y `seq` Stream step (return x0) where step x | x <= y = return $ Emit (x Prelude.+ 1) x | otherwise = return $ Stop () {-# INLINE enumFromToS_int #-} {-# RULES "enumFromTo" enumFromToS = enumFromToS_int :: Monad m => Int -> Int -> Stream m Int () #-} -- | Produces an infinite stream of repeated applications of f to x. iterate :: Monad m => (a -> a) -> a -> Producer m a iterate f = go where go a = yield a >> go (f a) -- | Replicate a single value the given number of times. -- -- Subject to fusion -- -- Since 1.2.0 replicate :: Monad m => Int -> a -> Producer m a replicate = replicateC {-# INLINE [0] replicate #-} {-# RULES "unstream replicate" forall i a. replicate i a = unstream (streamConduit (replicateC i a) (\_ -> replicateS i a)) #-} replicateC :: Monad m => Int -> a -> Producer m a replicateC cnt0 a = loop cnt0 where loop i | i <= 0 = return () | otherwise = yield a >> loop (i - 1) {-# INLINE replicateC #-} replicateS :: Monad m => Int -> a -> Stream m a () replicateS cnt0 a = Stream step (return cnt0) where step cnt | cnt <= 0 = return $ Stop () | otherwise = return $ Emit (cnt - 1) a {-# INLINE replicateS #-} -- | Replicate a monadic value the given number of times. -- -- Since 1.2.0 replicateM :: Monad m => Int -> m a -> Producer m a replicateM = replicateMC {-# INLINE [0] replicateM #-} {-# RULES "unstream replicateM" forall i a. replicateM i a = unstream (streamConduit (replicateMC i a) (\_ -> replicateMS i a)) #-} replicateMC :: Monad m => Int -> m a -> Producer m a replicateMC cnt0 ma = loop cnt0 where loop i | i <= 0 = return () | otherwise = lift ma >>= yield >> loop (i - 1) {-# INLINE replicateMC #-} replicateMS :: Monad m => Int -> m a -> Stream m a () replicateMS cnt0 ma = Stream step (return cnt0) where step cnt | cnt <= 0 = return $ Stop () | otherwise = Emit (cnt - 1) `liftM` ma {-# INLINE replicateMS #-} -- | A strict left fold. -- -- Subject to fusion -- -- Since 0.3.0 fold :: Monad m => (b -> a -> b) -> b -> Consumer a m b fold = foldC {-# INLINE [0] fold #-} {-# RULES "unstream fold" forall f b. fold f b = unstream (streamConduit (foldC f b) (foldS f b)) #-} foldC :: Monad m => (b -> a -> b) -> b -> Consumer a m b foldC f = loop where loop !accum = await >>= maybe (return accum) (loop . f accum) {-# INLINE foldC #-} foldS :: Monad m => (b -> a -> b) -> b -> Stream m a () -> Stream m o b foldS f b0 (Stream step ms0) = Stream step' (liftM (b0, ) ms0) where step' (!b, s) = do res <- step s return $ case res of Stop () -> Stop b Skip s' -> Skip (b, s') Emit s' a -> Skip (f b a, s') {-# INLINE foldS #-} -- | A monadic strict left fold. -- -- Subject to fusion -- -- Since 0.3.0 foldM :: Monad m => (b -> a -> m b) -> b -> Consumer a m b foldM = foldMC {-# INLINE [0] foldM #-} {-# RULES "unstream foldM" forall f b. foldM f b = unstream (streamConduit (foldMC f b) (foldMS f b)) #-} foldMC :: Monad m => (b -> a -> m b) -> b -> Consumer a m b foldMC f = loop where loop accum = do await >>= maybe (return accum) go where go a = do accum' <- lift $ f accum a accum' `seq` loop accum' {-# INLINE foldMC #-} foldMS :: Monad m => (b -> a -> m b) -> b -> Stream m a () -> Stream m o b foldMS f b0 (Stream step ms0) = Stream step' (liftM (b0, ) ms0) where step' (!b, s) = do res <- step s case res of Stop () -> return $ Stop b Skip s' -> return $ Skip (b, s') Emit s' a -> do b' <- f b a return $ Skip (b', s') {-# INLINE foldMS #-} ----------------------------------------------------------------- -- These are for cases where- for whatever reason- stream fusion cannot be -- applied. connectFold :: Monad m => Source m a -> (b -> a -> b) -> b -> m b connectFold (CI.ConduitM src0) f = go (src0 CI.Done) where go (CI.Done ()) b = return b go (CI.HaveOutput src _ a) b = go src Prelude.$! f b a go (CI.NeedInput _ c) b = go (c ()) b go (CI.Leftover src ()) b = go src b go (CI.PipeM msrc) b = do src <- msrc go src b {-# INLINE connectFold #-} {-# RULES "$$ fold" forall src f b. src $$ fold f b = connectFold src f b #-} connectFoldM :: Monad m => Source m a -> (b -> a -> m b) -> b -> m b connectFoldM (CI.ConduitM src0) f = go (src0 CI.Done) where go (CI.Done ()) b = return b go (CI.HaveOutput src _ a) b = do !b' <- f b a go src b' go (CI.NeedInput _ c) b = go (c ()) b go (CI.Leftover src ()) b = go src b go (CI.PipeM msrc) b = do src <- msrc go src b {-# INLINE connectFoldM #-} {-# RULES "$$ foldM" forall src f b. src $$ foldM f b = connectFoldM src f b #-} ----------------------------------------------------------------- -- | A monoidal strict left fold. -- -- Since 0.5.3 foldMap :: (Monad m, Monoid b) => (a -> b) -> Consumer a m b foldMap f = fold combiner mempty where combiner accum = mappend accum . f -- | A monoidal strict left fold in a Monad. -- -- Since 1.0.8 foldMapM :: (Monad m, Monoid b) => (a -> m b) -> Consumer a m b foldMapM f = foldM combiner mempty where combiner accum = liftM (mappend accum) . f -- | Apply the action to all values in the stream. -- -- Since 0.3.0 mapM_ :: Monad m => (a -> m ()) -> Consumer a m () mapM_ f = awaitForever $ lift . f {-# INLINE [1] mapM_ #-} srcMapM_ :: Monad m => Source m a -> (a -> m ()) -> m () srcMapM_ (CI.ConduitM src) f = go (src CI.Done) where go (CI.Done ()) = return () go (CI.PipeM mp) = mp >>= go go (CI.Leftover p ()) = go p go (CI.HaveOutput p _ o) = f o >> go p go (CI.NeedInput _ c) = go (c ()) {-# INLINE srcMapM_ #-} {-# RULES "connect to mapM_" forall f src. src $$ mapM_ f = srcMapM_ src f #-} -- | Ignore a certain number of values in the stream. This function is -- semantically equivalent to: -- -- > drop i = take i >> return () -- -- However, @drop@ is more efficient as it does not need to hold values in -- memory. -- -- Since 0.3.0 drop :: Monad m => Int -> Consumer a m () drop = loop where loop i | i <= 0 = return () loop count = await >>= maybe (return ()) (\_ -> loop (count - 1)) -- | Take some values from the stream and return as a list. If you want to -- instead create a conduit that pipes data to another sink, see 'isolate'. -- This function is semantically equivalent to: -- -- > take i = isolate i =$ consume -- -- Since 0.3.0 take :: Monad m => Int -> Consumer a m [a] take = loop id where loop front count | count <= 0 = return $ front [] loop front count = await >>= maybe (return $ front []) (\x -> loop (front .(x:)) (count - 1)) -- | Take a single value from the stream, if available. -- -- Since 0.3.0 head :: Monad m => Consumer a m (Maybe a) head = await -- | Look at the next value in the stream, if available. This function will not -- change the state of the stream. -- -- Since 0.3.0 peek :: Monad m => Consumer a m (Maybe a) peek = await >>= maybe (return Nothing) (\x -> leftover x >> return (Just x)) -- | Apply a transformation to all values in a stream. -- -- Subject to fusion -- -- Since 0.3.0 map :: Monad m => (a -> b) -> Conduit a m b map = mapC {-# INLINE [0] map #-} {-# RULES "unstream map" forall f. map f = unstream (streamConduit (mapC f) (mapS f)) #-} mapC :: Monad m => (a -> b) -> Conduit a m b mapC f = awaitForever $ yield . f {-# INLINE mapC #-} mapS :: Monad m => (a -> b) -> Stream m a r -> Stream m b r mapS f (Stream step ms0) = Stream step' ms0 where step' s = do res <- step s return $ case res of Stop r -> Stop r Emit s' a -> Emit s' (f a) Skip s' -> Skip s' {-# INLINE mapS #-} -- Since a Source never has any leftovers, fusion rules on it are safe. {- {-# RULES "source/map fusion =$=" forall f src. src =$= map f = mapFuseRight src f #-} mapFuseRight :: Monad m => Source m a -> (a -> b) -> Source m b mapFuseRight src f = CIC.mapOutput f src {-# INLINE mapFuseRight #-} -} {- It might be nice to include these rewrite rules, but they may have subtle differences based on leftovers. {-# RULES "map-to-mapOutput pipeL" forall f src. pipeL src (map f) = mapOutput f src #-} {-# RULES "map-to-mapOutput $=" forall f src. src $= (map f) = mapOutput f src #-} {-# RULES "map-to-mapOutput pipe" forall f src. pipe src (map f) = mapOutput f src #-} {-# RULES "map-to-mapOutput >+>" forall f src. src >+> (map f) = mapOutput f src #-} {-# RULES "map-to-mapInput pipeL" forall f sink. pipeL (map f) sink = mapInput f (Prelude.const Prelude.Nothing) sink #-} {-# RULES "map-to-mapInput =$" forall f sink. map f =$ sink = mapInput f (Prelude.const Prelude.Nothing) sink #-} {-# RULES "map-to-mapInput pipe" forall f sink. pipe (map f) sink = mapInput f (Prelude.const Prelude.Nothing) sink #-} {-# RULES "map-to-mapInput >+>" forall f sink. map f >+> sink = mapInput f (Prelude.const Prelude.Nothing) sink #-} {-# RULES "map-to-mapOutput =$=" forall f con. con =$= map f = mapOutput f con #-} {-# RULES "map-to-mapInput =$=" forall f con. map f =$= con = mapInput f (Prelude.const Prelude.Nothing) con #-} {-# INLINE [1] map #-} -} -- | Apply a monadic transformation to all values in a stream. -- -- If you do not need the transformed values, and instead just want the monadic -- side-effects of running the action, see 'mapM_'. -- -- Subject to fusion -- -- Since 0.3.0 mapM :: Monad m => (a -> m b) -> Conduit a m b mapM = mapMC {-# INLINE [0] mapM #-} {-# RULES "unstream mapM" forall f. mapM f = unstream (streamConduit (mapMC f) (mapMS f)) #-} mapMC :: Monad m => (a -> m b) -> Conduit a m b mapMC f = awaitForever $ \a -> lift (f a) >>= yield {-# INLINE mapMC #-} mapMS :: Monad m => (a -> m b) -> Stream m a r -> Stream m b r mapMS f (Stream step ms0) = Stream step' ms0 where step' s = do res <- step s case res of Stop r -> return $ Stop r Emit s' a -> Emit s' `liftM` f a Skip s' -> return $ Skip s' {-# INLINE mapMS #-} -- | Apply a monadic action on all values in a stream. -- -- This @Conduit@ can be used to perform a monadic side-effect for every -- value, whilst passing the value through the @Conduit@ as-is. -- -- > iterM f = mapM (\a -> f a >>= \() -> return a) -- -- Since 0.5.6 iterM :: Monad m => (a -> m ()) -> Conduit a m a iterM f = awaitForever $ \a -> lift (f a) >> yield a -- | Apply a transformation that may fail to all values in a stream, discarding -- the failures. -- -- Since 0.5.1 mapMaybe :: Monad m => (a -> Maybe b) -> Conduit a m b mapMaybe f = awaitForever $ maybe (return ()) yield . f -- | Apply a monadic transformation that may fail to all values in a stream, -- discarding the failures. -- -- Since 0.5.1 mapMaybeM :: Monad m => (a -> m (Maybe b)) -> Conduit a m b mapMaybeM f = awaitForever $ maybe (return ()) yield <=< lift . f -- | Filter the @Just@ values from a stream, discarding the @Nothing@ values. -- -- Since 0.5.1 catMaybes :: Monad m => Conduit (Maybe a) m a catMaybes = awaitForever $ maybe (return ()) yield -- | Generalization of 'catMaybes'. It puts all values from -- 'F.Foldable' into stream. -- -- Since 1.0.6 concat :: (Monad m, F.Foldable f) => Conduit (f a) m a concat = awaitForever $ F.mapM_ yield -- | Apply a transformation to all values in a stream, concatenating the output -- values. -- -- Since 0.3.0 concatMap :: Monad m => (a -> [b]) -> Conduit a m b concatMap f = awaitForever $ sourceList . f -- | Apply a monadic transformation to all values in a stream, concatenating -- the output values. -- -- Since 0.3.0 concatMapM :: Monad m => (a -> m [b]) -> Conduit a m b concatMapM f = awaitForever $ sourceList <=< lift . f -- | 'concatMap' with an accumulator. -- -- Since 0.3.0 concatMapAccum :: Monad m => (a -> accum -> (accum, [b])) -> accum -> Conduit a m b concatMapAccum f x0 = void (mapAccum f x0) =$= concat -- | Deprecated synonym for @mapAccum@ -- -- Since 1.0.6 scanl :: Monad m => (a -> s -> (s, b)) -> s -> Conduit a m b scanl f s = void $ mapAccum f s {-# DEPRECATED scanl "Use mapAccum instead" #-} -- | Deprecated synonym for @mapAccumM@ -- -- Since 1.0.6 scanlM :: Monad m => (a -> s -> m (s, b)) -> s -> Conduit a m b scanlM f s = void $ mapAccumM f s {-# DEPRECATED scanlM "Use mapAccumM instead" #-} -- | Analog of @mapAccumL@ for lists. -- -- Since 1.1.1 mapAccum :: Monad m => (a -> s -> (s, b)) -> s -> ConduitM a b m s mapAccum f = loop where loop s = await >>= maybe (return s) go where go a = case f a s of (s', b) -> yield b >> loop s' -- | Monadic `mapAccum`. -- -- Since 1.1.1 mapAccumM :: Monad m => (a -> s -> m (s, b)) -> s -> ConduitM a b m s mapAccumM f = loop where loop s = await >>= maybe (return s) go where go a = do (s', b) <- lift $ f a s yield b loop s' -- | Analog of 'Prelude.scanl' for lists. -- -- Since 1.1.1 scan :: Monad m => (a -> b -> b) -> b -> ConduitM a b m b scan f = mapAccum $ \a b -> let b' = f a b in (b', b') -- | Monadic @scanl@. -- -- Since 1.1.1 scanM :: Monad m => (a -> b -> m b) -> b -> ConduitM a b m b scanM f = mapAccumM $ \a b -> do b' <- f a b return (b', b') -- | 'concatMapM' with an accumulator. -- -- Since 0.3.0 concatMapAccumM :: Monad m => (a -> accum -> m (accum, [b])) -> accum -> Conduit a m b concatMapAccumM f x0 = void (mapAccumM f x0) =$= concat -- | Generalization of 'mapMaybe' and 'concatMap'. It applies function -- to all values in a stream and send values inside resulting -- 'Foldable' downstream. -- -- Since 1.0.6 mapFoldable :: (Monad m, F.Foldable f) => (a -> f b) -> Conduit a m b mapFoldable f = awaitForever $ F.mapM_ yield . f -- | Monadic variant of 'mapFoldable'. -- -- Since 1.0.6 mapFoldableM :: (Monad m, F.Foldable f) => (a -> m (f b)) -> Conduit a m b mapFoldableM f = awaitForever $ F.mapM_ yield <=< lift . f -- | Consume all values from the stream and return as a list. Note that this -- will pull all values into memory. For a lazy variant, see -- "Data.Conduit.Lazy". -- -- Subject to fusion -- -- Since 0.3.0 consume :: Monad m => Consumer a m [a] consume = consumeC {-# INLINE [0] consume #-} {-# RULES "unstream consume" consume = unstream (streamConduit consumeC consumeS) #-} consumeC :: Monad m => Consumer a m [a] consumeC = loop id where loop front = await >>= maybe (return $ front []) (\x -> loop $ front . (x:)) {-# INLINE consumeC #-} consumeS :: Monad m => Stream m a () -> Stream m o [a] consumeS (Stream step ms0) = Stream step' (liftM (id,) ms0) where step' (front, s) = do res <- step s return $ case res of Stop () -> Stop (front []) Skip s' -> Skip (front, s') Emit s' a -> Skip (front . (a:), s') {-# INLINE consumeS #-} -- | Grouping input according to an equality function. -- -- Since 0.3.0 groupBy :: Monad m => (a -> a -> Bool) -> Conduit a m [a] groupBy f = start where start = await >>= maybe (return ()) (loop id) loop rest x = await >>= maybe (yield (x : rest [])) go where go y | f x y = loop (rest . (y:)) x | otherwise = yield (x : rest []) >> loop id y -- | 'groupOn1' is similar to @groupBy id@ -- -- returns a pair, indicating there are always 1 or more items in the grouping. -- This is designed to be converted into a NonEmpty structure -- but it avoids a dependency on another package -- -- > import Data.List.NonEmpty -- > -- > groupOn1 :: (Monad m, Eq b) => (a -> b) -> Conduit a m (NonEmpty a) -- > groupOn1 f = CL.groupOn1 f =$= CL.map (uncurry (:|)) -- -- Since 1.1.7 groupOn1 :: (Monad m, Eq b) => (a -> b) -> Conduit a m (a, [a]) groupOn1 f = start where start = await >>= maybe (return ()) (loop id) loop rest x = await >>= maybe (yield (x, rest [])) go where go y | f x == f y = loop (rest . (y:)) x | otherwise = yield (x, rest []) >> loop id y -- | Ensure that the inner sink consumes no more than the given number of -- values. Note this this does /not/ ensure that the sink consumes all of those -- values. To get the latter behavior, combine with 'sinkNull', e.g.: -- -- > src $$ do -- > x <- isolate count =$ do -- > x <- someSink -- > sinkNull -- > return x -- > someOtherSink -- > ... -- -- Since 0.3.0 isolate :: Monad m => Int -> Conduit a m a isolate = loop where loop count | count <= 0 = return () loop count = await >>= maybe (return ()) (\x -> yield x >> loop (count - 1)) -- | Keep only values in the stream passing a given predicate. -- -- Since 0.3.0 filter :: Monad m => (a -> Bool) -> Conduit a m a filter f = awaitForever $ \i -> when (f i) (yield i) filterFuseRight :: Monad m => Source m a -> (a -> Bool) -> Source m a filterFuseRight (CI.ConduitM src) f = CI.ConduitM $ \rest -> let go (CI.Done ()) = rest () go (CI.PipeM mp) = CI.PipeM (liftM go mp) go (CI.Leftover p i) = CI.Leftover (go p) i go (CI.HaveOutput p c o) | f o = CI.HaveOutput (go p) c o | otherwise = go p go (CI.NeedInput p c) = CI.NeedInput (go . p) (go . c) in go (src CI.Done) -- Intermediate finalizers are dropped, but this is acceptable: the next -- yielded value would be demanded by downstream in any event, and that new -- finalizer will always override the existing finalizer. {-# RULES "source/filter fusion =$=" forall f src. src =$= filter f = filterFuseRight src f #-} {-# INLINE filterFuseRight #-} -- | Ignore the remainder of values in the source. Particularly useful when -- combined with 'isolate'. -- -- Since 0.3.0 sinkNull :: Monad m => Consumer a m () sinkNull = awaitForever $ \_ -> return () {-# RULES "connect to sinkNull" forall src. src $$ sinkNull = srcSinkNull src #-} srcSinkNull :: Monad m => Source m a -> m () srcSinkNull (CI.ConduitM src) = go (src CI.Done) where go (CI.Done ()) = return () go (CI.PipeM mp) = mp >>= go go (CI.Leftover p ()) = go p go (CI.HaveOutput p _ _) = go p go (CI.NeedInput _ c) = go (c ()) {-# INLINE srcSinkNull #-} -- | A source that outputs no values. Note that this is just a type-restricted -- synonym for 'mempty'. -- -- Since 0.3.0 sourceNull :: Monad m => Producer m a sourceNull = return () -- | Run a @Pipe@ repeatedly, and output its result value downstream. Stops -- when no more input is available from upstream. -- -- Since 0.5.0 sequence :: Monad m => Consumer i m o -- ^ @Pipe@ to run repeatedly -> Conduit i m o sequence sink = self where self = awaitForever $ \i -> leftover i >> sink >>= yield