{-# LANGUAGE RankNTypes #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE TupleSections #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE UndecidableInstances #-} -- | Please see the project README for more details: -- -- https://github.com/jwiegley/simple-conduit/blob/master/README.md -- -- Also see this blog article: -- -- https://www.newartisans.com/2014/06/simpler-conduit-library module Conduit.Simple ( Source(..), Conduit, Sink , sequenceSources , ZipSink(..), sequenceSinks , source, conduit, conduitWith, sink , ($=), (=$), ($$) , returnC, abort, skip, awaitForever , yieldMany, sourceList , unfoldC , enumFromToC , iterateC , repeatC , replicateC , sourceLazy , repeatMC , repeatWhileMC , replicateMC , sourceHandle , sourceFile , sourceIOHandle , stdinC , initRepeat , initReplicate , sourceRandom , sourceRandomN , sourceRandomGen , sourceRandomNGen , sourceDirectory , sourceDirectoryDeep , dropC , dropCE , dropWhileC , dropWhileCE , foldC , foldCE , foldlC , foldlCE , foldMapC , foldMapCE , allC , allCE , anyC , anyCE , andC , andCE , orC , orCE , elemC , elemCE , notElemC , notElemCE , sinkLazy , sinkList , sinkVector , sinkVectorN , sinkBuilder , sinkLazyBuilder , sinkNull , awaitNonNull , headCE , lastC , lastCE , lengthC , lengthCE , lengthIfC , lengthIfCE , maximumC , maximumCE , minimumC , minimumCE , sumC , sumCE , productC , productCE , findC , mapM_C , mapM_CE , foldMC , foldMCE , foldMapMC , foldMapMCE , sinkFile , sinkHandle , sinkIOHandle , printC , stdoutC , stderrC , mapC , mapCE , omapCE , concatMapC , concatMapCE , takeC , takeCE , takeWhileC , takeWhileCE , takeExactlyC , takeExactlyCE , concatC , filterC , filterCE , mapWhileC , conduitVector , scanlC , concatMapAccumC , intersperseC , encodeBase64C , decodeBase64C , encodeBase64URLC , decodeBase64URLC , encodeBase16C , decodeBase16C , mapMC , mapMCE , omapMCE , concatMapMC , filterMC , filterMCE , iterMC , scanlMC , concatMapAccumMC , encodeUtf8C , decodeUtf8C , lineC , lineAsciiC , unlinesC , unlinesAsciiC , linesUnboundedC_ , linesUnboundedC , linesUnboundedAsciiC , zipSinks , sourceMaybeMVar , sourceMaybeTMVar , asyncC , fromFoldM , toFoldM , sourceTChan , sourceTQueue , sourceTBQueue , untilMC , whileMC ) where import Control.Applicative (Alternative((<|>), empty), Applicative((<*>), pure), (<$>)) import Control.Concurrent (MVar, takeMVar, putMVar, newEmptyMVar) import Control.Concurrent.Async.Lifted (Async, withAsync, waitBoth, async) import Control.Concurrent.STM import Control.Exception.Lifted (bracket) import Control.Foldl (PrimMonad, Vector, FoldM(..)) import Control.Monad (liftM, MonadPlus(..), ap, (<=<)) import Control.Monad.Base (MonadBase(..)) import Control.Monad.Catch (MonadThrow(..), MonadMask, MonadCatch) import qualified Control.Monad.Catch as Catch import Control.Monad.Error.Class (MonadError(..)) import Control.Monad.IO.Class (MonadIO(..)) import Control.Monad.Morph (MonadTrans(..), MMonad(..), MFunctor(..)) import Control.Monad.Primitive (PrimMonad(PrimState)) import Control.Monad.Reader.Class (MonadReader(..)) import Control.Monad.State.Class (MonadState(..)) import Control.Monad.Trans.Control (MonadBaseControl(StM)) import Control.Monad.Trans.Either (EitherT(..), left) import Control.Monad.Writer.Class (MonadWriter(..)) import Data.Bifunctor (Bifunctor(bimap)) import Data.Builder (Builder(builderToLazy), ToBuilder(..)) import Data.ByteString (ByteString) import Data.Foldable (Foldable(foldMap)) import Data.Functor.Identity (Identity(runIdentity)) import Data.IOData (IOData(hGetChunk, hPut)) import Data.List (unfoldr) import Data.MonoTraversable (MonoTraversable, MonoFunctor, Element, MonoFoldable(oall, oany, ofoldMap, ofoldl', ofoldlM, olength, onull)) import Data.NonNull as NonNull (NonNull, fromNullable) import Data.Semigroup (Any(..), All(..), Monoid(..), Semigroup((<>))) import Data.Sequences as Seq (OrdSequence, EqSequence(elem, notElem), SemiSequence(Index), singleton, IsSequence(break, drop, dropWhile, fromList, splitAt)) import Data.Sequences.Lazy (LazySequence(fromChunks, toChunks)) import qualified Data.Streaming.Filesystem as F import Data.Text (Text) import Data.Textual.Encoding (Utf8(encodeUtf8)) import Data.Traversable (Traversable(sequenceA)) import Data.Word (Word8) import System.FilePath (()) import System.IO (stdout, stdin, stderr, openFile, hClose, Handle, IOMode(ReadMode, WriteMode)) import System.Random.MWC as MWC (Gen, Variate(uniform), createSystemRandom) -- | The type of Source should recall 'foldM': -- -- @ -- Monad m => (a -> b -> m a) -> a -> [b] -> m a -- @ -- -- 'EitherT' is used to signal short-circuiting of the pipeline. And if it -- weren't for conduits like 'takeC', we wouldn't even need that most of the -- time. -- -- Sources form a Monad that behaves a lot like 'ListT'; for example: -- -- @ -- do line <- sourceFile "foo.txt" -- liftIO $ putStrLn $ "line: " ++ show line -- x <- yieldMany [1..10] -- return (x, line) -- @ -- -- The above Source yields a series of pairs, proving ten copies of each line -- from the file plus an index number. -- -- To skip to the next value in a Source, use the function 'skip' or 'mempty'; -- to abort the whole pipeline, use 'abort' or 'mzero'. For example: -- -- @ -- do x <- yieldMany [1..10] -- if x == 2 || x == 9 -- then return x -- else if x < 5 -- then skip -- else abort -- @ -- -- This outputs the list @[2]@. -- -- One difference from conduit is that monadic chaining of sources with '>>' -- results in the values from the first source being used to determine how -- many values are generated by the next source, just like 'ListT': -- -- >>> sinkList $ yieldMany [1..3] >> yieldMany [4..6] -- [4,5,6,4,5,6,4,5,6] -- -- To achieve the same behavior as conduit, use the Monoid instance for -- Sources: -- -- >>> sinkList $ yieldMany [1..3] <> yieldMany [4..6] -- [1,2,3,4,5,6] newtype Source m a = Source { runSource :: forall r. r -> (r -> a -> EitherT r m r) -> EitherT r m r } type Conduit a m b = Source m a -> Source m b type Sink a m r = Source m a -> m r instance Monad m => Semigroup (Source m a) where x <> y = Source $ \r f -> lift $ do r' <- sink r f x sink r' f y {-# INLINE (<>) #-} instance Monad m => Monoid (Source m a) where mempty = skip {-# INLINE mempty #-} mappend = (<>) {-# INLINE mappend #-} instance Monad m => Alternative (Source m) where empty = skip {-# INLINE empty #-} (<|>) = (<>) {-# INLINE (<|>) #-} instance Functor (Source m) where fmap f = conduit $ \r yield -> yield r . f {-# INLINE fmap #-} instance Applicative (Source m) where pure = return {-# INLINE pure #-} (<*>) = ap {-# INLINE (<*>) #-} instance Monad m => MonadPlus (Source m) where mzero = abort {-# INLINE mzero #-} mplus = (<|>) {-# INLINE mplus #-} instance Monad (Source m) where return x = Source $ \z yield -> yield z x {-# INLINE return #-} Source await >>= f = Source $ \z yield -> await z $ \r x -> runSource (f x) r yield {-# INLINE (>>=) #-} instance MFunctor Source where hoist nat m = Source $ \z yield -> runSource (hoist nat m) z yield {-# INLINE hoist #-} instance MMonad Source where embed f m = Source $ \z yield -> runSource (embed f m) z yield {-# INLINE embed #-} instance MonadIO m => MonadIO (Source m) where liftIO m = Source $ \z yield -> yield z =<< liftIO m {-# INLINE liftIO #-} instance MonadTrans Source where lift m = Source $ \z yield -> yield z =<< lift m {-# INLINE lift #-} instance MonadReader r m => MonadReader r (Source m) where ask = lift ask {-# INLINE ask #-} local f = conduit $ \r yield -> local f . yield r {-# INLINE local #-} reader = lift . reader {-# INLINE reader #-} instance MonadState s m => MonadState s (Source m) where get = lift get {-# INLINE get #-} put = lift . put {-# INLINE put #-} state = lift . state {-# INLINE state #-} instance MonadWriter w m => MonadWriter w (Source m) where writer = lift . writer {-# INLINE writer #-} tell = lift . tell {-# INLINE tell #-} listen = conduit $ \r yield x -> do ((), w) <- listen $ return () yield r (x, w) {-# INLINE listen #-} pass = conduit $ \r yield (x, f) -> do pass $ return ((), f) yield r x {-# INLINE pass #-} instance MonadError e m => MonadError e (Source m) where throwError = lift . throwError {-# INLINE throwError #-} catchError src f = Source $ \z yield -> EitherT $ runEitherT (runSource src z yield) `catchError` \e -> runEitherT (runSource (f e) z yield) {-# INLINE catchError #-} instance MonadThrow m => MonadThrow (Source m) where throwM e = lift $ throwM e {-# INLINE throwM #-} instance MonadCatch m => MonadCatch (Source m) where catch src f = Source $ \z yield -> EitherT $ runEitherT (runSource src z yield) `Catch.catch` \e -> runEitherT (runSource (f e) z yield) {-# INLINE catch #-} instance MonadMask m => MonadMask (Source m) where mask a = Source $ \z yield -> EitherT $ Catch.mask $ \u -> runEitherT $ runSource (a $ \b -> Source $ \r yield' -> EitherT $ liftM Right $ u $ sink r yield' b) z yield {-# INLINE mask #-} uninterruptibleMask a = Source $ \z yield -> EitherT $ Catch.uninterruptibleMask $ \u -> runEitherT $ runSource (a $ \b -> Source $ \r yield' -> EitherT $ liftM Right $ u $ sink r yield' b) z yield {-# INLINE uninterruptibleMask #-} instance Foldable (Source Identity) where foldMap f = runIdentity . sink mempty (\r x -> return $ r `mappend` f x) {-# INLINE foldMap #-} -- | Sequence a collection of sources. -- -- >>> sinkList $ sequenceSources [yieldOne 1, yieldOne 2, yieldOne 3] -- [[1,2,3]] sequenceSources :: (Traversable f, Monad m) => f (Source m a) -> Source m (f a) sequenceSources = sequenceA {-# INLINE sequenceSources #-} -- | Compose a 'Source' and a 'Conduit' into a new 'Source'. Note that this -- is just flipped function application, so ($) can be used to achieve the -- same thing. infixl 1 $= ($=) :: a -> (a -> b) -> b ($=) = flip ($) {-# INLINE ($=) #-} -- | Compose a 'Conduit' and a 'Sink' into a new 'Sink'. Note that this is -- just function composition, so (.) can be used to achieve the same thing. infixr 2 =$ (=$) :: (a -> b) -> (b -> c) -> a -> c (=$) = flip (.) {-# INLINE (=$) #-} -- | Compose a 'Source' and a 'Sink' and compute the result. Note that this -- is just flipped function application, so ($) can be used to achieve the -- same thing. infixr 0 $$ ($$) :: a -> (a -> b) -> b ($$) = flip ($) {-# INLINE ($$) #-} awaitForever :: (a -> Source m b) -> Conduit a m b awaitForever = flip (>>=) {-# INLINE awaitForever #-} -- | Promote any sink to a source. This can be used as if it were a source -- transformer (aka, a conduit): -- -- >>> sinkList $ returnC $ sumC $ mapC (+1) $ yieldMany [1..10] -- [65] -- -- Note that 'returnC' is a synonym for 'Control.Monad.Trans.Class.lift'. returnC :: Monad m => m a -> Source m a returnC = lift abort :: Monad m => Source m a abort = Source $ const . left {-# INLINE abort #-} skip :: Monad m => Source m a skip = Source $ const . return {-# INLINE skip #-} conduit :: (forall r. r -> (r -> b -> EitherT r m r) -> a -> EitherT r m r) -> Conduit a m b conduit f (Source await) = Source $ \z -> await z . flip f {-# INLINE conduit #-} sink :: forall m a r. Monad m => r -> (r -> a -> EitherT r m r) -> Sink a m r sink z f (Source await) = either id id `liftM` runEitherT (await z f) {-# INLINE sink #-} -- | Most of the time conduit will pass through the fold variable unmolested, -- but sometimes you need to ignore that variable and use your own within -- that stage of the pipeline. This is done by wrapping the fold variable -- in a tuple and then unwrapping it when the conduit is done. -- 'conduitWith' makes this transparent. conduitWith :: Monad m => s -> (forall r. (r, s) -> (r -> b -> EitherT (r, s) m (r, s)) -> a -> EitherT (r, s) m (r, s)) -> Conduit a m b conduitWith s f (Source await) = Source $ \z yield -> rewrap fst $ await (z, s) $ \(r, t) -> f (r, t) (\r' -> rewrap (, t) . yield r') {-# INLINE conduitWith #-} rewrap :: Monad m => (a -> b) -> EitherT a m a -> EitherT b m b rewrap f k = EitherT $ bimap f f `liftM` runEitherT k {-# INLINE rewrap #-} source :: Monad m => (forall r. r -> (r -> a -> EitherT r m r) -> EitherT r m r) -> Source m a source = Source {-# INLINE source #-} yieldMany :: (Monad m, MonoFoldable mono) => mono -> Source m (Element mono) yieldMany xs = Source $ \z yield -> ofoldlM yield z xs {-# INLINE yieldMany #-} sourceList :: Monad m => [a] -> Source m a sourceList = yieldMany {-# INLINE sourceList #-} unfoldC :: forall m a b. Monad m => (b -> Maybe (a, b)) -> b -> Source m a unfoldC = (yieldMany .) . Data.List.unfoldr {-# INLINE unfoldC #-} enumFromToC :: forall m a. (Monad m, Enum a, Eq a) => a -> a -> Source m a enumFromToC = (yieldMany .) . enumFromTo {-# INLINE enumFromToC #-} iterateC :: forall m a. Monad m => (a -> a) -> a -> Source m a iterateC = (yieldMany .) . iterate {-# INLINE iterateC #-} repeatC :: forall m a. Monad m => a -> Source m a repeatC = yieldMany . Prelude.repeat {-# INLINE repeatC #-} replicateC :: forall m a. Monad m => Int -> a -> Source m a replicateC = (yieldMany .) . Prelude.replicate {-# INLINE replicateC #-} sourceLazy :: (Monad m, LazySequence lazy strict) => lazy -> Source m strict sourceLazy = yieldMany . toChunks {-# INLINE sourceLazy #-} repeatMC :: forall m a. Monad m => m a -> Source m a repeatMC x = Source go where go :: r -> (r -> a -> EitherT r m r) -> EitherT r m r go z yield = loop z where loop r = loop =<< yield r =<< lift x repeatWhileMC :: forall m a. Monad m => m a -> (a -> Bool) -> Source m a repeatWhileMC m f = Source go where go :: r -> (r -> a -> EitherT r m r) -> EitherT r m r go z yield = loop z where loop r = do x <- lift m if f x then loop =<< yield r x else return r replicateMC :: forall m a. Monad m => Int -> m a -> Source m a replicateMC n m = Source $ go n where go :: Int -> r -> (r -> a -> EitherT r m r) -> EitherT r m r go i z yield = loop i z where loop n' r | n' > 0 = loop (n' - 1) =<< yield r =<< lift m loop _ r = return r sourceHandle :: forall m a. (MonadIO m, IOData a) => Handle -> Source m a sourceHandle h = Source go where go :: r -> (r -> a -> EitherT r m r) -> EitherT r m r go z yield = loop z where loop y = do x <- liftIO $ hGetChunk h if onull x then return y else loop =<< yield y x sourceFile :: (MonadBaseControl IO m, MonadIO m, IOData a) => FilePath -> Source m a sourceFile path = Source $ \z yield -> bracket (liftIO $ openFile path ReadMode) (liftIO . hClose) (\h -> runSource (sourceHandle h) z yield) {-# INLINE sourceFile #-} sourceIOHandle :: (MonadBaseControl IO m, MonadIO m, IOData a) => IO Handle -> Source m a sourceIOHandle f = Source $ \z yield -> bracket (liftIO f) (liftIO . hClose) (\h -> runSource (sourceHandle h) z yield) {-# INLINE sourceIOHandle #-} stdinC :: (MonadBaseControl IO m, MonadIO m, IOData a) => Source m a stdinC = sourceHandle stdin {-# INLINE stdinC #-} initRepeat :: Monad m => m seed -> (seed -> m a) -> Source m a initRepeat mseed f = Source $ \z yield -> lift mseed >>= \seed -> runSource (repeatMC (f seed)) z yield {-# INLINE initRepeat #-} initReplicate :: Monad m => m seed -> (seed -> m a) -> Int -> Source m a initReplicate mseed f n = Source $ \z yield -> lift mseed >>= \seed -> runSource (replicateMC n (f seed)) z yield {-# INLINE initReplicate #-} sourceRandom :: (Variate a, MonadIO m) => Source m a sourceRandom = initRepeat (liftIO MWC.createSystemRandom) (liftIO . MWC.uniform) {-# INLINE sourceRandom #-} sourceRandomN :: (Variate a, MonadIO m) => Int -> Source m a sourceRandomN = initReplicate (liftIO MWC.createSystemRandom) (liftIO . MWC.uniform) {-# INLINE sourceRandomN #-} sourceRandomGen :: (Variate a, MonadBase base m, PrimMonad base) => Gen (PrimState base) -> Source m a sourceRandomGen gen = initRepeat (return gen) (liftBase . MWC.uniform) {-# INLINE sourceRandomGen #-} sourceRandomNGen :: (Variate a, MonadBase base m, PrimMonad base) => Gen (PrimState base) -> Int -> Source m a sourceRandomNGen gen = initReplicate (return gen) (liftBase . MWC.uniform) {-# INLINE sourceRandomNGen #-} sourceDirectory :: forall m. (MonadBaseControl IO m, MonadIO m) => FilePath -> Source m FilePath sourceDirectory dir = Source $ \z yield -> bracket (liftIO (F.openDirStream dir)) (liftIO . F.closeDirStream) (go z yield) where go :: r -> (r -> FilePath -> EitherT r m r) -> F.DirStream -> EitherT r m r go z yield ds = loop z where loop r = do mfp <- liftIO $ F.readDirStream ds case mfp of Nothing -> return r Just fp -> loop =<< yield r (dir fp) sourceDirectoryDeep :: forall m. (MonadBaseControl IO m, MonadIO m) => Bool -> FilePath -> Source m FilePath sourceDirectoryDeep followSymlinks startDir = Source go where go :: r -> (r -> FilePath -> EitherT r m r) -> EitherT r m r go z yield = start startDir z where start dir r = runSource (sourceDirectory dir) r entry entry r fp = do ft <- liftIO $ F.getFileType fp case ft of F.FTFile -> yield r fp F.FTFileSym -> yield r fp F.FTDirectory -> start fp r F.FTDirectorySym | followSymlinks -> start fp r | otherwise -> return r F.FTOther -> return r dropC :: Monad m => Int -> Conduit a m a dropC n = conduitWith n go where go (r, n') _ _ | n' > 0 = return (r, n' - 1) go (r, _) yield x = yield r x {-# INLINE dropC #-} dropCE :: (Monad m, IsSequence seq) => Index seq -> Conduit seq m seq dropCE n = conduitWith n go where go (r, n') yield s | onull y = return (r, n' - xn) | otherwise = yield r y where (x, y) = Seq.splitAt n' s xn = n' - fromIntegral (olength x) dropWhileC :: Monad m => (a -> Bool) -> Conduit a m a dropWhileC f = conduitWith f go where go (r, k) _ x | k x = return (r, k) -- Change out the predicate for one that always fails go (r, _) yield x = fmap (const (const False)) <$> yield r x dropWhileCE :: (Monad m, IsSequence seq) => (Element seq -> Bool) -> Conduit seq m seq dropWhileCE f = conduitWith f go where go (r, k) yield s | onull x = return (r, k) | otherwise = fmap (const (const False)) <$> yield r s where x = Seq.dropWhile k s foldC :: (Monad m, Monoid a) => Sink a m a foldC = foldMapC id {-# INLINE foldC #-} foldCE :: (Monad m, MonoFoldable mono, Monoid (Element mono)) => Sink mono m (Element mono) foldCE = foldlC (\acc mono -> acc `mappend` ofoldMap id mono) mempty {-# INLINE foldCE #-} foldlC :: Monad m => (a -> b -> a) -> a -> Sink b m a foldlC f z = sink z ((return .) . f) {-# INLINE foldlC #-} foldlCE :: (Monad m, MonoFoldable mono) => (a -> Element mono -> a) -> a -> Sink mono m a foldlCE f = foldlC (ofoldl' f) {-# INLINE foldlCE #-} foldMapC :: (Monad m, Monoid b) => (a -> b) -> Sink a m b foldMapC f = foldlC (\acc x -> acc `mappend` f x) mempty {-# INLINE foldMapC #-} foldMapCE :: (Monad m, MonoFoldable mono, Monoid w) => (Element mono -> w) -> Sink mono m w foldMapCE = foldMapC . ofoldMap {-# INLINE foldMapCE #-} allC :: Monad m => (a -> Bool) -> Sink a m Bool allC f = liftM getAll `liftM` foldMapC (All . f) {-# INLINE allC #-} allCE :: (Monad m, MonoFoldable mono) => (Element mono -> Bool) -> Sink mono m Bool allCE = allC . oall {-# INLINE allCE #-} anyC :: Monad m => (a -> Bool) -> Sink a m Bool anyC f = liftM getAny `liftM` foldMapC (Any . f) {-# INLINE anyC #-} anyCE :: (Monad m, MonoFoldable mono) => (Element mono -> Bool) -> Sink mono m Bool anyCE = anyC . oany {-# INLINE anyCE #-} andC :: Monad m => Sink Bool m Bool andC = allC id {-# INLINE andC #-} andCE :: (Monad m, MonoFoldable mono, Element mono ~ Bool) => Sink mono m Bool andCE = allCE id {-# INLINE andCE #-} orC :: Monad m => Sink Bool m Bool orC = anyC id {-# INLINE orC #-} orCE :: (Monad m, MonoFoldable mono, Element mono ~ Bool) => Sink mono m Bool orCE = anyCE id {-# INLINE orCE #-} elemC :: (Monad m, Eq a) => a -> Sink a m Bool elemC x = anyC (== x) {-# INLINE elemC #-} elemCE :: (Monad m, EqSequence seq) => Element seq -> Sink seq m Bool elemCE = anyC . Seq.elem {-# INLINE elemCE #-} notElemC :: (Monad m, Eq a) => a -> Sink a m Bool notElemC x = allC (/= x) {-# INLINE notElemC #-} notElemCE :: (Monad m, EqSequence seq) => Element seq -> Sink seq m Bool notElemCE = allC . Seq.notElem {-# INLINE notElemCE #-} produceList :: Monad m => ([a] -> b) -> Sink a m b produceList f = liftM (liftM (f . ($ []))) $ sink id (\front x -> return (front . (x:))) {-# INLINE produceList #-} sinkLazy :: (Monad m, LazySequence lazy strict) => Sink strict m lazy sinkLazy = produceList fromChunks -- {-# INLINE sinkLazy #-} sinkList :: Monad m => Sink a m [a] sinkList = produceList id {-# INLINE sinkList #-} sinkVector :: (MonadBase base m, Vector v a, PrimMonad base) => Sink a m (v a) sinkVector = undefined sinkVectorN :: (MonadBase base m, Vector v a, PrimMonad base) => Int -> Sink a m (v a) sinkVectorN = undefined sinkBuilder :: (Monad m, Monoid builder, ToBuilder a builder) => Sink a m builder sinkBuilder = foldMapC toBuilder {-# INLINE sinkBuilder #-} sinkLazyBuilder :: (Monad m, Monoid builder, ToBuilder a builder, Builder builder lazy) => Sink a m lazy sinkLazyBuilder = liftM builderToLazy . foldMapC toBuilder {-# INLINE sinkLazyBuilder #-} sinkNull :: Monad m => Sink a m () sinkNull _ = return () {-# INLINE sinkNull #-} awaitNonNull :: (Monad m, MonoFoldable a) => Conduit a m (Maybe (NonNull a)) awaitNonNull = conduit $ \r yield x -> maybe (return r) (yield r . Just) (NonNull.fromNullable x) {-# INLINE awaitNonNull #-} headCE :: (Monad m, IsSequence seq) => Sink seq m (Maybe (Element seq)) headCE = undefined {-# INLINE headCE #-} -- jww (2014-06-07): These two cannot be implemented without leftover support. -- peekC :: Monad m => Sink a m (Maybe a) -- peekC = undefined -- peekCE :: (Monad m, MonoFoldable mono) => Sink mono m (Maybe (Element mono)) -- peekCE = undefined lastC :: Monad m => Sink a m (Maybe a) lastC = sink Nothing (const (return . Just)) {-# INLINE lastC #-} lastCE :: (Monad m, IsSequence seq) => Sink seq m (Maybe (Element seq)) lastCE = undefined {-# INLINE lastCE #-} lengthC :: (Monad m, Num len) => Sink a m len lengthC = foldlC (\x _ -> x + 1) 0 {-# INLINE lengthC #-} lengthCE :: (Monad m, Num len, MonoFoldable mono) => Sink mono m len lengthCE = foldlC (\x y -> x + fromIntegral (olength y)) 0 {-# INLINE lengthCE #-} lengthIfC :: (Monad m, Num len) => (a -> Bool) -> Sink a m len lengthIfC f = foldlC (\cnt a -> if f a then cnt + 1 else cnt) 0 {-# INLINE lengthIfC #-} lengthIfCE :: (Monad m, Num len, MonoFoldable mono) => (Element mono -> Bool) -> Sink mono m len lengthIfCE f = foldlCE (\cnt a -> if f a then cnt + 1 else cnt) 0 {-# INLINE lengthIfCE #-} maximumC :: (Monad m, Ord a) => Sink a m (Maybe a) maximumC = sink Nothing $ \r y -> return $ Just $ maybe y (max y) r {-# INLINE maximumC #-} maximumCE :: (Monad m, OrdSequence seq) => Sink seq m (Maybe (Element seq)) maximumCE = undefined {-# INLINE maximumCE #-} minimumC :: (Monad m, Ord a) => Sink a m (Maybe a) minimumC = sink Nothing $ \r y -> return $ Just $ maybe y (min y) r {-# INLINE minimumC #-} minimumCE :: (Monad m, OrdSequence seq) => Sink seq m (Maybe (Element seq)) minimumCE = undefined {-# INLINE minimumCE #-} -- jww (2014-06-07): These two cannot be implemented without leftover support. -- nullC :: Monad m => Sink a m Bool -- nullC = undefined -- nullCE :: (Monad m, MonoFoldable mono) => Sink mono m Bool -- nullCE = undefined sumC :: (Monad m, Num a) => Sink a m a sumC = foldlC (+) 0 {-# INLINE sumC #-} sumCE :: (Monad m, MonoFoldable mono, Num (Element mono)) => Sink mono m (Element mono) sumCE = undefined {-# INLINE sumCE #-} productC :: (Monad m, Num a) => Sink a m a productC = foldlC (*) 1 {-# INLINE productC #-} productCE :: (Monad m, MonoFoldable mono, Num (Element mono)) => Sink mono m (Element mono) productCE = undefined {-# INLINE productCE #-} findC :: Monad m => (a -> Bool) -> Sink a m (Maybe a) findC f = sink Nothing $ \r x -> if f x then left (Just x) else return r {-# INLINE findC #-} mapM_C :: Monad m => (a -> m ()) -> Sink a m () mapM_C f = sink () (const $ lift . f) {-# INLINE mapM_C #-} mapM_CE :: (Monad m, MonoFoldable mono) => (Element mono -> m ()) -> Sink mono m () mapM_CE = undefined {-# INLINE mapM_CE #-} foldMC :: Monad m => (a -> b -> m a) -> a -> Sink b m a foldMC f = flip sink ((lift .) . f) {-# INLINE foldMC #-} foldMCE :: (Monad m, MonoFoldable mono) => (a -> Element mono -> m a) -> a -> Sink mono m a foldMCE = undefined {-# INLINE foldMCE #-} foldMapMC :: (Monad m, Monoid w) => (a -> m w) -> Sink a m w foldMapMC f = foldMC (\acc x -> (acc `mappend`) `liftM` f x) mempty {-# INLINE foldMapMC #-} foldMapMCE :: (Monad m, MonoFoldable mono, Monoid w) => (Element mono -> m w) -> Sink mono m w foldMapMCE = undefined {-# INLINE foldMapMCE #-} sinkFile :: (MonadBaseControl IO m, MonadIO m, IOData a) => FilePath -> Sink a m () sinkFile fp = sinkIOHandle (liftIO $ openFile fp WriteMode) {-# INLINE sinkFile #-} sinkHandle :: (MonadIO m, IOData a) => Handle -> Sink a m () sinkHandle = mapM_C . hPut {-# INLINE sinkHandle #-} sinkIOHandle :: (MonadBaseControl IO m, MonadIO m, IOData a) => IO Handle -> Sink a m () sinkIOHandle alloc = bracket (liftIO alloc) (liftIO . hClose) . flip sinkHandle {-# INLINE sinkIOHandle #-} printC :: (Show a, MonadIO m) => Sink a m () printC = mapM_C (liftIO . print) {-# INLINE printC #-} stdoutC :: (MonadIO m, IOData a) => Sink a m () stdoutC = sinkHandle stdout {-# INLINE stdoutC #-} stderrC :: (MonadIO m, IOData a) => Sink a m () stderrC = sinkHandle stderr {-# INLINE stderrC #-} mapC :: Monad m => (a -> b) -> Conduit a m b mapC = fmap {-# INLINE mapC #-} mapCE :: (Monad m, Functor f) => (a -> b) -> Conduit (f a) m (f b) mapCE = undefined {-# INLINE mapCE #-} omapCE :: (Monad m, MonoFunctor mono) => (Element mono -> Element mono) -> Conduit mono m mono omapCE = undefined {-# INLINE omapCE #-} concatMapC :: (Monad m, MonoFoldable mono) => (a -> mono) -> Conduit a m (Element mono) concatMapC f = conduit $ \r yield -> ofoldlM yield r . f {-# INLINE concatMapC #-} concatMapCE :: (Monad m, MonoFoldable mono, Monoid w) => (Element mono -> w) -> Conduit mono m w concatMapCE = undefined {-# INLINE concatMapCE #-} takeC :: Monad m => Int -> Conduit a m a takeC n = conduitWith n go where go (z', n') yield x | n' > 1 = next | n' > 0 = left =<< next | otherwise = left (z', 0) where next = fmap pred <$> yield z' x takeCE :: (Monad m, IsSequence seq) => Index seq -> Conduit seq m seq takeCE = undefined -- | This function reads one more element than it yields, which would be a -- problem if Sinks were monadic, as they are in conduit or pipes. There is -- no such concept as "resuming where the last conduit left off" in this -- library. takeWhileC :: Monad m => (a -> Bool) -> Conduit a m a takeWhileC f = conduitWith f go where go (z', k) yield x | k x = yield z' x go (z', _) _ _ = left (z', const False) takeWhileCE :: (Monad m, IsSequence seq) => (Element seq -> Bool) -> Conduit seq m seq takeWhileCE = undefined takeExactlyC :: Monad m => Int -> Conduit a m b -> Conduit a m b takeExactlyC = undefined takeExactlyCE :: (Monad m, IsSequence a) => Index a -> Conduit a m b -> Conduit a m b takeExactlyCE = undefined concatC :: (Monad m, MonoFoldable mono) => Conduit mono m (Element mono) concatC = undefined filterC :: Monad m => (a -> Bool) -> Conduit a m a filterC f = awaitForever $ \x -> if f x then return x else skip {-# INLINE filterC #-} filterCE :: (IsSequence seq, Monad m) => (Element seq -> Bool) -> Conduit seq m seq filterCE = undefined {-# INLINE filterCE #-} mapWhileC :: Monad m => (a -> Maybe b) -> Conduit a m b mapWhileC f = awaitForever $ \x -> case f x of Just y -> return y; _ -> abort {-# INLINE mapWhileC #-} conduitVector :: (MonadBase base m, Vector v a, PrimMonad base) => Int -> Conduit a m (v a) conduitVector = undefined scanlC :: Monad m => (a -> b -> a) -> a -> Conduit b m a scanlC = undefined concatMapAccumC :: Monad m => (a -> accum -> (accum, [b])) -> accum -> Conduit a m b concatMapAccumC = undefined intersperseC :: Monad m => a -> Source m a -> Source m a intersperseC s (Source await) = Source $ \z yield -> EitherT $ do eres <- runEitherT $ await (Nothing, z) $ \(my, r) x -> case my of Nothing -> return (Just x, r) Just y -> do r' <- rewrap (Nothing,) $ yield r y rewrap (Just x,) $ yield (snd r') s case eres of Left (_, r) -> return $ Left r Right (Nothing, r) -> return $ Right r Right (Just x, r) -> runEitherT $ yield r x encodeBase64C :: Monad m => Conduit ByteString m ByteString encodeBase64C = undefined decodeBase64C :: Monad m => Conduit ByteString m ByteString decodeBase64C = undefined encodeBase64URLC :: Monad m => Conduit ByteString m ByteString encodeBase64URLC = undefined decodeBase64URLC :: Monad m => Conduit ByteString m ByteString decodeBase64URLC = undefined encodeBase16C :: Monad m => Conduit ByteString m ByteString encodeBase16C = undefined decodeBase16C :: Monad m => Conduit ByteString m ByteString decodeBase16C = undefined mapMC :: Monad m => (a -> m b) -> Conduit a m b mapMC f src = src >>= lift . f {-# INLINE mapMC #-} mapMCE :: (Monad m, Traversable f) => (a -> m b) -> Conduit (f a) m (f b) mapMCE = undefined {-# INLINE mapMCE #-} omapMCE :: (Monad m, MonoTraversable mono) => (Element mono -> m (Element mono)) -> Conduit mono m mono omapMCE = undefined concatMapMC :: (Monad m, MonoFoldable mono) => (a -> m mono) -> Conduit a m (Element mono) concatMapMC f = awaitForever $ yieldMany <=< lift . f filterMC :: Monad m => (a -> m Bool) -> Conduit a m a filterMC f = awaitForever $ \x -> do res <- lift $ f x if res then return x else skip {-# INLINE filterMC #-} filterMCE :: (Monad m, IsSequence seq) => (Element seq -> m Bool) -> Conduit seq m seq filterMCE = undefined iterMC :: Monad m => (a -> m ()) -> Conduit a m a iterMC = undefined scanlMC :: Monad m => (a -> b -> m a) -> a -> Conduit b m a scanlMC = undefined concatMapAccumMC :: Monad m => (a -> accum -> m (accum, [b])) -> accum -> Conduit a m b concatMapAccumMC = undefined encodeUtf8C :: (Monad m, Utf8 text binary) => Conduit text m binary encodeUtf8C = mapC encodeUtf8 {-# INLINE encodeUtf8C #-} decodeUtf8C :: MonadThrow m => Conduit ByteString m Text decodeUtf8C = undefined lineC :: (Monad m, IsSequence seq, Element seq ~ Char) => Conduit seq m o -> Conduit seq m o lineC = undefined lineAsciiC :: (Monad m, IsSequence seq, Element seq ~ Word8) => Conduit seq m o -> Conduit seq m o lineAsciiC = undefined unlinesC :: (Monad m, IsSequence seq, Element seq ~ Char) => Conduit seq m seq unlinesC = concatMapC (: [Seq.singleton '\n']) {-# INLINE unlinesC #-} unlinesAsciiC :: (Monad m, IsSequence seq, Element seq ~ Word8) => Conduit seq m seq unlinesAsciiC = concatMapC (: [Seq.singleton 10]) {-# INLINE unlinesAsciiC #-} linesUnboundedC_ :: forall m seq. (Monad m, IsSequence seq, Eq (Element seq)) => Element seq -> Conduit seq m seq linesUnboundedC_ sep (Source await) = Source $ \z yield -> EitherT $ do eres <- runEitherT $ await (z, n) (go yield) case eres of Left (r, _) -> return $ Left r Right (r, t) | onull t -> return $ Right r | otherwise -> runEitherT $ yield r t where n = Seq.fromList [] go :: (r -> seq -> EitherT r m r) -> (r, seq) -> seq -> EitherT (r, seq) m (r, seq) go yield = loop where loop (r, t') t | onull y = return (r, t <> t') | otherwise = do r' <- rewrap (, n) $ yield r (t' <> x) loop r' (Seq.drop 1 y) where (x, y) = Seq.break (== sep) t linesUnboundedC :: (Monad m, IsSequence seq, Element seq ~ Char) => Conduit seq m seq linesUnboundedC = linesUnboundedC_ '\n' {-# INLINE linesUnboundedC #-} linesUnboundedAsciiC :: (Monad m, IsSequence seq, Element seq ~ Word8) => Conduit seq m seq linesUnboundedAsciiC = linesUnboundedC_ 10 {-# INLINE linesUnboundedAsciiC #-} -- | Zip sinks together. This function may be used multiple times: -- -- >>> let mySink s = sink () $ \() x -> liftIO $ print $ s <> show x -- >>> zipSinks sinkList (zipSinks (mySink "foo") (mySink "bar")) $ yieldMany [1,2,3] -- "foo: 1" -- "bar: 1" -- "foo: 2" -- "bar: 2" -- "foo: 3" -- "bar: 3" -- ([1,2,3],((),())) -- -- Note that the two sinks are run concurrently, so watch out for possible -- race conditions if they try to interact with the same resources. zipSinks :: forall a m r r'. (MonadBaseControl IO m, MonadIO m) => Sink a m r -> Sink a m r' -> Sink a m (r, r') zipSinks sink1 sink2 (Source await) = do x <- liftIO newEmptyMVar y <- liftIO newEmptyMVar withAsync (sink1 $ sourceMaybeMVar x) $ \a -> withAsync (sink2 $ sourceMaybeMVar y) $ \b -> do _ <- runEitherT $ await () $ \() val -> do liftIO $ putMVar x (Just val) liftIO $ putMVar y (Just val) liftIO $ putMVar x Nothing liftIO $ putMVar y Nothing waitBoth a b -- | Keep taking from an @MVar (Maybe a)@ until it yields 'Nothing'. sourceMaybeMVar :: forall m a. MonadIO m => MVar (Maybe a) -> Source m a sourceMaybeMVar var = Source go where go :: r -> (r -> a -> EitherT r m r) -> EitherT r m r go z yield = loop z where loop r = do mx <- liftIO $ takeMVar var case mx of Nothing -> return r Just x -> loop =<< yield r x -- | Keep taking from an @TMVar (Maybe a)@ until it yields 'Nothing'. sourceMaybeTMVar :: forall a. TMVar (Maybe a) -> Source STM a sourceMaybeTMVar var = Source go where go :: r -> (r -> a -> EitherT r STM r) -> EitherT r STM r go z yield = loop z where loop r = do mx <- lift $ takeTMVar var case mx of Nothing -> return r Just x -> loop =<< yield r x newtype ZipSink a m r = ZipSink { getZipSink :: Source m a -> m r } instance Monad m => Functor (ZipSink a m) where fmap f (ZipSink k) = ZipSink $ liftM f . k instance Monad m => Applicative (ZipSink a m) where pure x = ZipSink $ \_ -> return x ZipSink f <*> ZipSink x = ZipSink $ \await -> f await `ap` x await -- | Send incoming values to all of the @Sink@ providing, and ultimately -- coalesce together all return values. -- -- Implemented on top of @ZipSink@, see that data type for more details. sequenceSinks :: (Traversable f, Monad m) => f (Sink a m r) -> Sink a m (f r) sequenceSinks = getZipSink . sequenceA . fmap ZipSink {-# INLINE sequenceSinks #-} asyncC :: (MonadBaseControl IO m, Monad m) => (a -> m b) -> Conduit a m (Async (StM m b)) asyncC f = awaitForever $ lift . async . f {-# INLINE asyncC #-} -- | Convert a 'Control.Foldl.FoldM' fold abstraction into a Sink. -- -- NOTE: This requires ImpredicativeTypes in the code that uses it. -- -- >>> fromFoldM (FoldM ((return .) . (+)) (return 0) return) $ yieldMany [1..10] -- 55 fromFoldM :: Monad m => FoldM m a b -> Source m a -> m b fromFoldM (FoldM step initial final) src = do r <- initial final =<< sink r ((lift .) . step) src {-# INLINE fromFoldM #-} -- | Convert a Sink into a 'Control.Foldl.FoldM', passing it into a -- continuation. -- -- >>> toFoldM sumC (\f -> Control.Foldl.foldM f [1..10]) -- 55 toFoldM :: Monad m => Sink a m r -> (forall s. FoldM (EitherT s m) a s -> EitherT s m s) -> m r toFoldM s f = s $ source $ \k yield -> f $ FoldM yield (return k) return {-# INLINE toFoldM #-} sourceSTM :: forall container a. (container a -> STM a) -> (container a -> STM Bool) -> container a -> Source STM a sourceSTM getter tester chan = Source go where go :: r -> (r -> a -> EitherT r STM r) -> EitherT r STM r go z yield = loop z where loop r = do x <- lift $ getter chan r' <- yield r x mt <- lift $ tester chan if mt then return r' else loop r' -- | A Source for exhausting a TChan, but blocks if it is initially empty. sourceTChan :: forall a. TChan a -> Source STM a sourceTChan = sourceSTM readTChan isEmptyTChan {-# INLINE sourceTChan #-} sourceTQueue :: forall a. TQueue a -> Source STM a sourceTQueue = sourceSTM readTQueue isEmptyTQueue {-# INLINE sourceTQueue #-} sourceTBQueue :: forall a. TBQueue a -> Source STM a sourceTBQueue = sourceSTM readTBQueue isEmptyTBQueue {-# INLINE sourceTBQueue #-} untilMC :: forall m a. Monad m => m a -> m Bool -> Source m a untilMC m f = Source go where go :: r -> (r -> a -> EitherT r m r) -> EitherT r m r go z yield = loop z where loop r = do x <- lift m r' <- yield r x cont <- lift f if cont then loop r' else return r' whileMC :: forall m a. Monad m => m Bool -> m a -> Source m a whileMC f m = Source go where go :: r -> (r -> a -> EitherT r m r) -> EitherT r m r go z yield = loop z where loop r = do cont <- lift f if cont then lift m >>= yield r >>= loop else return r