module Data.Conduit.Combinators
(
yieldMany
, unfold
, enumFromTo
, iterate
, repeat
, replicate
, sourceLazy
, repeatM
, repeatWhileM
, replicateM
, sourceFile
, sourceHandle
, sourceIOHandle
, stdin
, sourceRandom
, sourceRandomN
, sourceRandomGen
, sourceRandomNGen
, sourceDirectory
, sourceDirectoryDeep
, drop
, dropE
, dropWhile
, dropWhileE
, fold
, foldE
, foldl
, foldl1
, foldlE
, foldMap
, foldMapE
, all
, allE
, any
, anyE
, and
, andE
, or
, orE
, elem
, elemE
, notElem
, notElemE
, sinkLazy
, sinkList
, sinkVector
, sinkVectorN
, sinkBuilder
, sinkLazyBuilder
, sinkNull
, awaitNonNull
, headE
, peek
, peekE
, last
, lastE
, length
, lengthE
, lengthIf
, lengthIfE
, maximum
, maximumE
, minimum
, minimumE
, null
, nullE
, sum
, sumE
, product
, productE
, find
, mapM_
, mapM_E
, foldM
, foldME
, foldMapM
, foldMapME
, sinkFile
, sinkHandle
, sinkIOHandle
, print
, stdout
, stderr
, map
, mapE
, omapE
, concatMap
, concatMapE
, take
, takeE
, takeWhile
, takeWhileE
, takeExactly
, takeExactlyE
, concat
, filter
, filterE
, mapWhile
, conduitVector
, scanl
, mapAccumWhile
, concatMapAccum
, intersperse
, slidingWindow
, encodeBase64
, decodeBase64
, encodeBase64URL
, decodeBase64URL
, encodeBase16
, decodeBase16
, mapM
, mapME
, omapME
, concatMapM
, filterM
, filterME
, iterM
, scanlM
, mapAccumWhileM
, concatMapAccumM
, encodeUtf8
, decodeUtf8
, decodeUtf8Lenient
, line
, lineAscii
, unlines
, unlinesAscii
, takeExactlyUntilE
, linesUnbounded
, linesUnboundedAscii
, splitOnUnboundedE
, vectorBuilder
, mapAccumS
, peekForever
) where
import Data.Builder
import qualified Data.NonNull as NonNull
import qualified Data.Traversable
import qualified Data.ByteString as S
import qualified Data.ByteString.Base16 as B16
import qualified Data.ByteString.Base64 as B64
import qualified Data.ByteString.Base64.URL as B64U
import Control.Applicative ((<$>))
import Control.Exception (assert)
import Control.Category (Category (..))
import Control.Monad (unless, when, (>=>), liftM, forever)
import Control.Monad.Base (MonadBase (liftBase))
import Control.Monad.IO.Class (MonadIO (..))
import Control.Monad.Primitive (PrimMonad, PrimState)
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Resource (MonadResource, MonadThrow)
import Data.Conduit
import qualified Data.Conduit.Filesystem as CF
import Data.Conduit.Internal (ConduitM (..), Pipe (..))
import qualified Data.Conduit.List as CL
import Data.IOData
import Data.Maybe (isNothing, isJust)
import Data.Monoid (Monoid (..))
import Data.MonoTraversable
import qualified Data.Sequences as Seq
import Data.Sequences.Lazy
import qualified Data.Vector.Generic as V
import qualified Data.Vector.Generic.Mutable as VM
import Data.Void (absurd)
import Prelude (Bool (..), Eq (..), Int,
Maybe (..), Either (..), Monad (..), Num (..),
Ord (..), fromIntegral, maybe, either,
($), Functor (..), Enum, seq, Show, Char,
mod, otherwise, Either (..),
($!), succ, FilePath)
import Data.Word (Word8)
import qualified Prelude
import System.IO (Handle)
import qualified System.IO as SIO
import qualified Data.Textual.Encoding as DTE
import qualified Data.Conduit.Text as CT
import Data.ByteString (ByteString)
import Data.Text (Text)
import qualified System.Random.MWC as MWC
import Data.Conduit.Combinators.Internal
import Data.Conduit.Combinators.Stream
import Data.Conduit.Internal.Fusion
import Data.Primitive.MutVar (MutVar, newMutVar, readMutVar,
writeMutVar)
#include "fusion-macros.h"
yieldMany, yieldManyC :: (Monad m, MonoFoldable mono)
=> mono
-> Producer m (Element mono)
yieldManyC = ofoldMap yield
STREAMING(yieldMany, yieldManyC, yieldManyS, x)
unfold :: Monad m
=> (b -> Maybe (a, b))
-> b
-> Producer m a
INLINE_RULE(unfold, f x, CL.unfold f x)
enumFromTo :: (Monad m, Enum a, Ord a) => a -> a -> Producer m a
INLINE_RULE(enumFromTo, f t, CL.enumFromTo f t)
iterate :: Monad m => (a -> a) -> a -> Producer m a
INLINE_RULE(iterate, f t, CL.iterate f t)
repeat :: Monad m => a -> Producer m a
INLINE_RULE(repeat, x, iterate id x)
replicate :: Monad m
=> Int
-> a
-> Producer m a
INLINE_RULE(replicate, n x, CL.replicate n x)
sourceLazy :: (Monad m, LazySequence lazy strict)
=> lazy
-> Producer m strict
INLINE_RULE(sourceLazy, x, yieldMany (toChunks x))
repeatM, repeatMC :: Monad m
=> m a
-> Producer m a
repeatMC m = forever $ lift m >>= yield
STREAMING(repeatM, repeatMC, repeatMS, m)
repeatWhileM, repeatWhileMC :: Monad m
=> m a
-> (a -> Bool)
-> Producer m a
repeatWhileMC m f =
loop
where
loop = do
x <- lift m
when (f x) $ yield x >> loop
STREAMING(repeatWhileM, repeatWhileMC, repeatWhileMS, m f)
replicateM :: Monad m
=> Int
-> m a
-> Producer m a
INLINE_RULE(replicateM, n m, CL.replicateM n m)
sourceFile :: (MonadResource m, IOData a) => FilePath -> Producer m a
sourceFile fp = sourceIOHandle (SIO.openFile fp SIO.ReadMode)
sourceHandle, sourceHandleC :: (MonadIO m, IOData a) => Handle -> Producer m a
sourceHandleC h =
loop
where
loop = do
x <- liftIO (hGetChunk h)
if onull x
then return ()
else yield x >> loop
STREAMING(sourceHandle, sourceHandleC, sourceHandleS, h)
sourceIOHandle :: (MonadResource m, IOData a) => SIO.IO Handle -> Producer m a
sourceIOHandle alloc = bracketP alloc SIO.hClose sourceHandle
stdin :: (MonadIO m, IOData a) => Producer m a
INLINE_RULE0(stdin, sourceHandle SIO.stdin)
sourceRandom :: (MWC.Variate a, MonadIO m) => Producer m a
INLINE_RULE0(sourceRandom, initRepeat (liftIO MWC.createSystemRandom) (liftIO . MWC.uniform))
sourceRandomN :: (MWC.Variate a, MonadIO m)
=> Int
-> Producer m a
INLINE_RULE(sourceRandomN, cnt, initReplicate (liftIO MWC.createSystemRandom) (liftIO . MWC.uniform) cnt)
sourceRandomGen :: (MWC.Variate a, MonadBase base m, PrimMonad base)
=> MWC.Gen (PrimState base)
-> Producer m a
INLINE_RULE(sourceRandomGen, gen, initRepeat (return gen) (liftBase . MWC.uniform))
sourceRandomNGen :: (MWC.Variate a, MonadBase base m, PrimMonad base)
=> MWC.Gen (PrimState base)
-> Int
-> Producer m a
INLINE_RULE(sourceRandomNGen, gen cnt, initReplicate (return gen) (liftBase . MWC.uniform) cnt)
sourceDirectory :: MonadResource m => FilePath -> Producer m FilePath
sourceDirectory = CF.sourceDirectory
sourceDirectoryDeep :: MonadResource m
=> Bool
-> FilePath
-> Producer m FilePath
sourceDirectoryDeep = CF.sourceDirectoryDeep
drop :: Monad m
=> Int
-> Consumer a m ()
INLINE_RULE(drop, n, CL.drop n)
dropE :: (Monad m, Seq.IsSequence seq)
=> Seq.Index seq
-> Consumer seq m ()
dropE =
loop
where
loop i = if i <= 0
then return ()
else await >>= maybe (return ()) (go i)
go i sq = do
unless (onull y) $ leftover y
loop i'
where
(x, y) = Seq.splitAt i sq
i' = i fromIntegral (olength x)
dropWhile :: Monad m
=> (a -> Bool)
-> Consumer a m ()
dropWhile f =
loop
where
loop = await >>= maybe (return ()) go
go x = if f x then loop else leftover x
dropWhileE :: (Monad m, Seq.IsSequence seq)
=> (Element seq -> Bool)
-> Consumer seq m ()
dropWhileE f =
loop
where
loop = await >>= maybe (return ()) go
go sq =
if onull x then loop else leftover x
where
x = Seq.dropWhile f sq
fold :: (Monad m, Monoid a)
=> Consumer a m a
INLINE_RULE0(fold, CL.foldMap id)
foldE :: (Monad m, MonoFoldable mono, Monoid (Element mono))
=> Consumer mono m (Element mono)
INLINE_RULE0(foldE, CL.fold (\accum mono -> accum `mappend` ofoldMap id mono) mempty)
foldl :: Monad m => (a -> b -> a) -> a -> Consumer b m a
INLINE_RULE(foldl, f x, CL.fold f x)
foldlE :: (Monad m, MonoFoldable mono)
=> (a -> Element mono -> a)
-> a
-> Consumer mono m a
INLINE_RULE(foldlE, f x, CL.fold (ofoldlPrime f) x)
ofoldlPrime :: MonoFoldable mono => (a -> Element mono -> a) -> a -> mono -> a
ofoldlPrime = ofoldl'
foldMap :: (Monad m, Monoid b)
=> (a -> b)
-> Consumer a m b
INLINE_RULE(foldMap, f, CL.foldMap f)
foldMapE :: (Monad m, MonoFoldable mono, Monoid w)
=> (Element mono -> w)
-> Consumer mono m w
INLINE_RULE(foldMapE, f, CL.foldMap (ofoldMap f))
foldl1, foldl1C :: Monad m => (a -> a -> a) -> Consumer a m (Maybe a)
foldl1C f =
await >>= maybe (return Nothing) loop
where
loop prev = await >>= maybe (return $ Just prev) (loop . f prev)
STREAMING(foldl1, foldl1C, foldl1S, f)
foldl1E :: (Monad m, MonoFoldable mono, a ~ Element mono)
=> (a -> a -> a)
-> Consumer mono m (Maybe a)
INLINE_RULE(foldl1E, f, foldl (foldMaybeNull f) Nothing)
foldMaybeNull :: (MonoFoldable mono, e ~ Element mono)
=> (e -> e -> e)
-> Maybe e
-> mono
-> Maybe e
foldMaybeNull f macc mono =
case (macc, NonNull.fromNullable mono) of
(Just acc, Just nn) -> Just $ ofoldl' f acc nn
(Nothing, Just nn) -> Just $ NonNull.ofoldl1' f nn
_ -> macc
all, allC :: Monad m
=> (a -> Bool)
-> Consumer a m Bool
allC f = fmap isNothing $ find (Prelude.not . f)
STREAMING(all, allC, allS, f)
allE :: (Monad m, MonoFoldable mono)
=> (Element mono -> Bool)
-> Consumer mono m Bool
INLINE_RULE(allE, f, all (oall f))
any, anyC :: Monad m
=> (a -> Bool)
-> Consumer a m Bool
anyC = fmap isJust . find
STREAMING(any, anyC, anyS, f)
anyE :: (Monad m, MonoFoldable mono)
=> (Element mono -> Bool)
-> Consumer mono m Bool
INLINE_RULE(anyE, f, any (oany f))
and :: Monad m => Consumer Bool m Bool
INLINE_RULE0(and, all id)
andE :: (Monad m, MonoFoldable mono, Element mono ~ Bool)
=> Consumer mono m Bool
#if __GLASGOW_HASKELL__ >= 706
INLINE_RULE0(andE, allE id)
#else
andE = allE id
#endif
or :: Monad m => Consumer Bool m Bool
INLINE_RULE0(or, any id)
orE :: (Monad m, MonoFoldable mono, Element mono ~ Bool)
=> Consumer mono m Bool
#if __GLASGOW_HASKELL__ >= 706
INLINE_RULE0(orE, anyE id)
#else
orE = anyE id
#endif
elem :: (Monad m, Eq a) => a -> Consumer a m Bool
INLINE_RULE(elem, x, any (== x))
elemE :: (Monad m, Seq.EqSequence seq)
=> Element seq
-> Consumer seq m Bool
#if MIN_VERSION_mono_traversable(0,8,0)
INLINE_RULE(elemE, f, any (oelem f))
#else
INLINE_RULE(elemE, f, any (Seq.elem f))
#endif
notElem :: (Monad m, Eq a) => a -> Consumer a m Bool
INLINE_RULE(notElem, x, all (/= x))
notElemE :: (Monad m, Seq.EqSequence seq)
=> Element seq
-> Consumer seq m Bool
#if MIN_VERSION_mono_traversable(0,8,0)
INLINE_RULE(notElemE, x, all (onotElem x))
#else
INLINE_RULE(notElemE, x, all (Seq.notElem x))
#endif
sinkLazy, sinkLazyC :: (Monad m, LazySequence lazy strict)
=> Consumer strict m lazy
sinkLazyC = (fromChunks . ($ [])) <$> CL.fold (\front next -> front . (next:)) id
STREAMING0(sinkLazy, sinkLazyC, sinkLazyS)
sinkList :: Monad m => Consumer a m [a]
INLINE_RULE0(sinkList, CL.consume)
sinkVector, sinkVectorC :: (MonadBase base m, V.Vector v a, PrimMonad base)
=> Consumer a m (v a)
sinkVectorC = do
let initSize = 10
mv0 <- liftBase $ VM.new initSize
let go maxSize i mv | i >= maxSize = do
let newMax = maxSize * 2
mv' <- liftBase $ VM.grow mv maxSize
go newMax i mv'
go maxSize i mv = do
mx <- await
case mx of
Nothing -> V.slice 0 i <$> liftBase (V.unsafeFreeze mv)
Just x -> do
liftBase $ VM.write mv i x
go maxSize (i + 1) mv
go initSize 0 mv0
STREAMING0(sinkVector, sinkVectorC, sinkVectorS)
sinkVectorN, sinkVectorNC :: (MonadBase base m, V.Vector v a, PrimMonad base)
=> Int
-> Consumer a m (v a)
sinkVectorNC maxSize = do
mv <- liftBase $ VM.new maxSize
let go i | i >= maxSize = liftBase $ V.unsafeFreeze mv
go i = do
mx <- await
case mx of
Nothing -> V.slice 0 i <$> liftBase (V.unsafeFreeze mv)
Just x -> do
liftBase $ VM.write mv i x
go (i + 1)
go 0
STREAMING(sinkVectorN, sinkVectorNC, sinkVectorNS, maxSize)
sinkBuilder :: (Monad m, Monoid builder, ToBuilder a builder)
=> Consumer a m builder
INLINE_RULE0(sinkBuilder, foldMap toBuilder)
sinkLazyBuilder, sinkLazyBuilderC :: (Monad m, Monoid builder, ToBuilder a builder, Builder builder lazy)
=> Consumer a m lazy
sinkLazyBuilderC = fmap builderToLazy sinkBuilder
STREAMING0(sinkLazyBuilder, sinkLazyBuilderC, sinkLazyBuilderS)
sinkNull :: Monad m => Consumer a m ()
INLINE_RULE0(sinkNull, CL.sinkNull)
awaitNonNull :: (Monad m, MonoFoldable a) => Consumer a m (Maybe (NonNull.NonNull a))
awaitNonNull =
go
where
go = await >>= maybe (return Nothing) go'
go' = maybe go (return . Just) . NonNull.fromNullable
headE :: (Monad m, Seq.IsSequence seq) => Consumer seq m (Maybe (Element seq))
headE =
loop
where
loop = await >>= maybe (return Nothing) go
go x =
case Seq.uncons x of
Nothing -> loop
Just (y, z) -> do
unless (onull z) $ leftover z
return $ Just y
peek :: Monad m => Consumer a m (Maybe a)
peek = CL.peek
peekE :: (Monad m, MonoFoldable mono) => Consumer mono m (Maybe (Element mono))
peekE =
loop
where
loop = await >>= maybe (return Nothing) go
go x =
case headMay x of
Nothing -> loop
Just y -> do
leftover x
return $ Just y
last, lastC :: Monad m => Consumer a m (Maybe a)
lastC =
await >>= maybe (return Nothing) loop
where
loop prev = await >>= maybe (return $ Just prev) loop
STREAMING0(last, lastC, lastS)
lastE, lastEC :: (Monad m, Seq.IsSequence seq) => Consumer seq m (Maybe (Element seq))
lastEC =
awaitNonNull >>= maybe (return Nothing) (loop . NonNull.last)
where
loop prev = awaitNonNull >>= maybe (return $ Just prev) (loop . NonNull.last)
STREAMING0(lastE, lastEC, lastES)
length :: (Monad m, Num len) => Consumer a m len
INLINE_RULE0(length, foldl (\x _ -> x + 1) 0)
lengthE :: (Monad m, Num len, MonoFoldable mono) => Consumer mono m len
INLINE_RULE0(lengthE, foldl (\x y -> x + fromIntegral (olength y)) 0)
lengthIf :: (Monad m, Num len) => (a -> Bool) -> Consumer a m len
INLINE_RULE(lengthIf, f, foldl (\cnt a -> if f a then (cnt + 1) else cnt) 0)
lengthIfE :: (Monad m, Num len, MonoFoldable mono)
=> (Element mono -> Bool) -> Consumer mono m len
INLINE_RULE(lengthIfE, f, foldlE (\cnt a -> if f a then (cnt + 1) else cnt) 0)
maximum :: (Monad m, Ord a) => Consumer a m (Maybe a)
INLINE_RULE0(maximum, foldl1 max)
maximumE :: (Monad m, Seq.OrdSequence seq) => Consumer seq m (Maybe (Element seq))
INLINE_RULE0(maximumE, foldl1E max)
minimum :: (Monad m, Ord a) => Consumer a m (Maybe a)
INLINE_RULE0(minimum, foldl1 min)
minimumE :: (Monad m, Seq.OrdSequence seq) => Consumer seq m (Maybe (Element seq))
INLINE_RULE0(minimumE, foldl1E min)
null :: Monad m => Consumer a m Bool
null = (maybe True (\_ -> False)) `fmap` peek
nullE :: (Monad m, MonoFoldable mono)
=> Consumer mono m Bool
nullE =
go
where
go = await >>= maybe (return True) go'
go' x = if onull x then go else leftover x >> return False
sum :: (Monad m, Num a) => Consumer a m a
INLINE_RULE0(sum, foldl (+) 0)
sumE :: (Monad m, MonoFoldable mono, Num (Element mono)) => Consumer mono m (Element mono)
INLINE_RULE0(sumE, foldlE (+) 0)
product :: (Monad m, Num a) => Consumer a m a
INLINE_RULE0(product, foldl (*) 1)
productE :: (Monad m, MonoFoldable mono, Num (Element mono)) => Consumer mono m (Element mono)
INLINE_RULE0(productE, foldlE (*) 1)
find, findC :: Monad m => (a -> Bool) -> Consumer a m (Maybe a)
findC f =
loop
where
loop = await >>= maybe (return Nothing) go
go x = if f x then return (Just x) else loop
STREAMING(find, findC, findS, f)
mapM_ :: Monad m => (a -> m ()) -> Consumer a m ()
INLINE_RULE(mapM_, f, CL.mapM_ f)
mapM_E :: (Monad m, MonoFoldable mono) => (Element mono -> m ()) -> Consumer mono m ()
INLINE_RULE(mapM_E, f, CL.mapM_ (omapM_ f))
foldM :: Monad m => (a -> b -> m a) -> a -> Consumer b m a
INLINE_RULE(foldM, f x, CL.foldM f x)
foldME :: (Monad m, MonoFoldable mono)
=> (a -> Element mono -> m a)
-> a
-> Consumer mono m a
INLINE_RULE(foldME, f x, foldM (ofoldlM f) x)
foldMapM :: (Monad m, Monoid w) => (a -> m w) -> Consumer a m w
INLINE_RULE(foldMapM, f, CL.foldMapM f)
foldMapME :: (Monad m, MonoFoldable mono, Monoid w)
=> (Element mono -> m w)
-> Consumer mono m w
INLINE_RULE(foldMapME, f, CL.foldM (ofoldlM (\accum e -> mappend accum `liftM` f e)) mempty)
sinkFile :: (MonadResource m, IOData a) => FilePath -> Consumer a m ()
sinkFile fp = sinkIOHandle (SIO.openFile fp SIO.WriteMode)
print :: (Show a, MonadIO m) => Consumer a m ()
INLINE_RULE0(print, mapM_ (liftIO . Prelude.print))
stdout :: (MonadIO m, IOData a) => Consumer a m ()
INLINE_RULE0(stdout, sinkHandle SIO.stdout)
stderr :: (MonadIO m, IOData a) => Consumer a m ()
INLINE_RULE0(stderr, sinkHandle SIO.stderr)
sinkHandle :: (MonadIO m, IOData a) => Handle -> Consumer a m ()
INLINE_RULE(sinkHandle, h, CL.mapM_ (hPut h))
sinkIOHandle :: (MonadResource m, IOData a) => SIO.IO Handle -> Consumer a m ()
sinkIOHandle alloc = bracketP alloc SIO.hClose sinkHandle
map :: Monad m => (a -> b) -> Conduit a m b
INLINE_RULE(map, f, CL.map f)
mapE :: (Monad m, Functor f) => (a -> b) -> Conduit (f a) m (f b)
INLINE_RULE(mapE, f, CL.map (fmap f))
omapE :: (Monad m, MonoFunctor mono) => (Element mono -> Element mono) -> Conduit mono m mono
INLINE_RULE(omapE, f, CL.map (omap f))
concatMap, concatMapC :: (Monad m, MonoFoldable mono)
=> (a -> mono)
-> Conduit a m (Element mono)
concatMapC f = awaitForever (yieldMany . f)
STREAMING(concatMap, concatMapC, concatMapS, f)
concatMapE :: (Monad m, MonoFoldable mono, Monoid w)
=> (Element mono -> w)
-> Conduit mono m w
INLINE_RULE(concatMapE, f, CL.map (ofoldMap f))
take :: Monad m => Int -> Conduit a m a
INLINE_RULE(take, n, CL.isolate n)
takeE :: (Monad m, Seq.IsSequence seq)
=> Seq.Index seq
-> Conduit seq m seq
takeE =
loop
where
loop i = if i <= 0
then return ()
else await >>= maybe (return ()) (go i)
go i sq = do
unless (onull x) $ yield x
unless (onull y) $ leftover y
loop i'
where
(x, y) = Seq.splitAt i sq
i' = i fromIntegral (olength x)
takeWhile :: Monad m
=> (a -> Bool)
-> Conduit a m a
takeWhile f =
loop
where
loop = await >>= maybe (return ()) go
go x = if f x
then yield x >> loop
else leftover x
takeWhileE :: (Monad m, Seq.IsSequence seq)
=> (Element seq -> Bool)
-> Conduit seq m seq
takeWhileE f =
loop
where
loop = await >>= maybe (return ()) go
go sq = do
unless (onull x) $ yield x
if onull y
then loop
else leftover y
where
(x, y) = Seq.span f sq
takeExactly :: Monad m
=> Int
-> ConduitM a b m r
-> ConduitM a b m r
takeExactly count inner = take count =$= do
r <- inner
CL.sinkNull
return r
takeExactlyE :: (Monad m, Seq.IsSequence a)
=> Seq.Index a
-> ConduitM a b m r
-> ConduitM a b m r
takeExactlyE count inner = takeE count =$= do
r <- inner
CL.sinkNull
return r
concat, concatC :: (Monad m, MonoFoldable mono)
=> Conduit mono m (Element mono)
concatC = awaitForever yieldMany
STREAMING0(concat, concatC, concatS)
filter :: Monad m => (a -> Bool) -> Conduit a m a
INLINE_RULE(filter, f, CL.filter f)
filterE :: (Seq.IsSequence seq, Monad m) => (Element seq -> Bool) -> Conduit seq m seq
INLINE_RULE(filterE, f, CL.map (Seq.filter f))
mapWhile :: Monad m => (a -> Maybe b) -> Conduit a m b
mapWhile f =
loop
where
loop = await >>= maybe (return ()) go
go x =
case f x of
Just y -> yield y >> loop
Nothing -> leftover x
conduitVector :: (MonadBase base m, V.Vector v a, PrimMonad base)
=> Int
-> Conduit a m (v a)
conduitVector size =
loop
where
loop = do
v <- sinkVectorN size
unless (V.null v) $ do
yield v
loop
scanl, scanlC :: Monad m => (a -> b -> a) -> a -> Conduit b m a
scanlC f =
loop
where
loop seed =
await >>= maybe (yield seed) go
where
go b = do
let seed' = f seed b
seed' `seq` yield seed
loop seed'
STREAMING(scanl, scanlC, scanlS, f x)
mapAccumWhile, mapAccumWhileC :: Monad m =>
(a -> s -> Either s (s, b)) -> s -> ConduitM a b m s
mapAccumWhileC f =
loop
where
loop !s = await >>= maybe (return s) go
where
go a = either (return $!) (\(s', b) -> yield b >> loop s') $ f a s
STREAMING(mapAccumWhile, mapAccumWhileC, mapAccumWhileS, f s)
concatMapAccum :: Monad m => (a -> accum -> (accum, [b])) -> accum -> Conduit a m b
INLINE_RULE0(concatMapAccum, CL.concatMapAccum)
intersperse, intersperseC :: Monad m => a -> Conduit a m a
intersperseC x =
await >>= omapM_ go
where
go y = yield y >> concatMap (\z -> [x, z])
STREAMING(intersperse, intersperseC, intersperseS, x)
slidingWindow, slidingWindowC :: (Monad m, Seq.IsSequence seq, Element seq ~ a) => Int -> Conduit a m seq
slidingWindowC sz = go (max 1 sz) mempty
where goContinue st = await >>=
maybe (return ())
(\x -> do
let st' = Seq.snoc st x
yield st' >> goContinue (Seq.unsafeTail st')
)
go 0 st = yield st >> goContinue (Seq.unsafeTail st)
go !n st = CL.head >>= \m ->
case m of
Nothing -> yield st
Just x -> go (n1) (Seq.snoc st x)
STREAMING(slidingWindow, slidingWindowC, slidingWindowS, sz)
codeWith :: Monad m
=> Int
-> (ByteString -> Either e ByteString)
-> Conduit ByteString m ByteString
codeWith size f =
loop
where
loop = await >>= maybe (return ()) push
loopWith bs
| S.null bs = loop
| otherwise = await >>= maybe (finish bs) (pushWith bs)
finish bs =
case f bs of
Left _ -> leftover bs
Right x -> yield x
push bs = do
let (x, y) = S.splitAt (len (len `mod` size)) bs
if S.null x
then loopWith y
else do
case f x of
Left _ -> leftover bs
Right x' -> yield x' >> loopWith y
where
len = olength bs
pushWith bs1 bs2 | S.length bs1 + S.length bs2 < size = loopWith (S.append bs1 bs2)
pushWith bs1 bs2 = assertion1 $ assertion2 $ assertion3 $
case f bs1' of
Left _ -> leftover bs2 >> leftover bs1
Right toYield -> yield toYield >> push y
where
m = S.length bs1 `mod` size
(x, y) = S.splitAt (size m) bs2
bs1' = mappend bs1 x
assertion1 = assert $ olength bs1 < size
assertion2 = assert $ olength bs1' `mod` size == 0
assertion3 = assert $ olength bs1' > 0
encodeBase64 :: Monad m => Conduit ByteString m ByteString
encodeBase64 = codeWith 3 (Right . B64.encode)
decodeBase64 :: Monad m => Conduit ByteString m ByteString
decodeBase64 = codeWith 4 B64.decode
encodeBase64URL :: Monad m => Conduit ByteString m ByteString
encodeBase64URL = codeWith 3 (Right . B64U.encode)
decodeBase64URL :: Monad m => Conduit ByteString m ByteString
decodeBase64URL = codeWith 4 B64U.decode
encodeBase16 :: Monad m => Conduit ByteString m ByteString
INLINE_RULE0(encodeBase16, map B16.encode)
decodeBase16 :: Monad m => Conduit ByteString m ByteString
decodeBase16 =
codeWith 2 decode'
where
decode' x
| onull z = Right y
| otherwise = Left ()
where
(y, z) = B16.decode x
mapM :: Monad m => (a -> m b) -> Conduit a m b
INLINE_RULE(mapM, f, CL.mapM f)
mapME :: (Monad m, Data.Traversable.Traversable f) => (a -> m b) -> Conduit (f a) m (f b)
INLINE_RULE(mapME, f, CL.mapM (Data.Traversable.mapM f))
omapME :: (Monad m, MonoTraversable mono)
=> (Element mono -> m (Element mono))
-> Conduit mono m mono
INLINE_RULE(omapME, f, CL.mapM (omapM f))
concatMapM, concatMapMC :: (Monad m, MonoFoldable mono)
=> (a -> m mono)
-> Conduit a m (Element mono)
concatMapMC f = awaitForever (lift . f >=> yieldMany)
STREAMING(concatMapM, concatMapMC, concatMapMS, f)
filterM, filterMC :: Monad m
=> (a -> m Bool)
-> Conduit a m a
filterMC f =
awaitForever go
where
go x = do
b <- lift $ f x
when b $ yield x
STREAMING(filterM, filterMC, filterMS, f)
filterME :: (Monad m, Seq.IsSequence seq) => (Element seq -> m Bool) -> Conduit seq m seq
INLINE_RULE(filterME, f, CL.mapM (Seq.filterM f))
iterM :: Monad m => (a -> m ()) -> Conduit a m a
INLINE_RULE(iterM, f, CL.iterM f)
scanlM, scanlMC :: Monad m => (a -> b -> m a) -> a -> Conduit b m a
scanlMC f =
loop
where
loop seed =
await >>= maybe (yield seed) go
where
go b = do
seed' <- lift $ f seed b
seed' `seq` yield seed
loop seed'
STREAMING(scanlM, scanlMC, scanlMS, f x)
mapAccumWhileM, mapAccumWhileMC :: Monad m =>
(a -> s -> m (Either s (s, b))) -> s -> ConduitM a b m s
mapAccumWhileMC f =
loop
where
loop !s = await >>= maybe (return s) go
where
go a = lift (f a s) >>= either (return $!) (\(s', b) -> yield b >> loop s')
STREAMING(mapAccumWhileM, mapAccumWhileMC, mapAccumWhileMS, f s)
concatMapAccumM :: Monad m => (a -> accum -> m (accum, [b])) -> accum -> Conduit a m b
INLINE_RULE(concatMapAccumM, f x, CL.concatMapAccumM f x)
encodeUtf8 :: (Monad m, DTE.Utf8 text binary) => Conduit text m binary
INLINE_RULE0(encodeUtf8, map DTE.encodeUtf8)
decodeUtf8 :: MonadThrow m => Conduit ByteString m Text
decodeUtf8 = CT.decode CT.utf8
decodeUtf8Lenient :: MonadThrow m => Conduit ByteString m Text
decodeUtf8Lenient = CT.decodeUtf8Lenient
line :: (Monad m, Seq.IsSequence seq, Element seq ~ Char)
=> ConduitM seq o m r
-> ConduitM seq o m r
line = takeExactlyUntilE (== '\n')
lineAscii :: (Monad m, Seq.IsSequence seq, Element seq ~ Word8)
=> ConduitM seq o m r
-> ConduitM seq o m r
lineAscii = takeExactlyUntilE (== 10)
takeExactlyUntilE :: (Monad m, Seq.IsSequence seq)
=> (Element seq -> Bool)
-> ConduitM seq o m r
-> ConduitM seq o m r
takeExactlyUntilE f inner =
loop =$= do
x <- inner
sinkNull
return x
where
loop = await >>= omapM_ go
go t =
if onull y
then yield x >> loop
else do
unless (onull x) $ yield x
let y' = Seq.drop 1 y
unless (onull y') $ leftover y'
where
(x, y) = Seq.break f t
unlines :: (Monad m, Seq.IsSequence seq, Element seq ~ Char) => Conduit seq m seq
#if __GLASGOW_HASKELL__ >= 706
INLINE_RULE0(unlines, concatMap (:[Seq.singleton '\n']))
#else
unlines = concatMap (:[Seq.singleton '\n'])
#endif
unlinesAscii :: (Monad m, Seq.IsSequence seq, Element seq ~ Word8) => Conduit seq m seq
#if __GLASGOW_HASKELL__ >= 706
INLINE_RULE0(unlinesAscii, concatMap (:[Seq.singleton 10]))
#else
unlinesAscii = concatMap (:[Seq.singleton 10])
#endif
splitOnUnboundedE, splitOnUnboundedEC
:: (Monad m, Seq.IsSequence seq)
=> (Element seq -> Bool) -> Conduit seq m seq
splitOnUnboundedEC f =
start
where
start = await >>= maybe (return ()) loop
loop t =
if onull y
then do
mt <- await
case mt of
Nothing -> unless (onull t) $ yield t
Just t' -> loop (t `mappend` t')
else yield x >> loop (Seq.drop 1 y)
where
(x, y) = Seq.break f t
STREAMING(splitOnUnboundedE, splitOnUnboundedEC, splitOnUnboundedES, f)
linesUnbounded :: (Monad m, Seq.IsSequence seq, Element seq ~ Char)
=> Conduit seq m seq
#if __GLASGOW_HASKELL__ >= 706
INLINE_RULE0(linesUnbounded, splitOnUnboundedE (== '\n'))
#else
linesUnbounded = splitOnUnboundedE (== '\n')
#endif
linesUnboundedAscii :: (Monad m, Seq.IsSequence seq, Element seq ~ Word8)
=> Conduit seq m seq
#if __GLASGOW_HASKELL__ >= 706
INLINE_RULE0(linesUnboundedAscii, splitOnUnboundedE (== 10))
#else
linesUnboundedAscii = splitOnUnboundedE (== 10)
#endif
vectorBuilder :: (PrimMonad base, MonadBase base m, V.Vector v e, MonadBase base n)
=> Int
-> ((e -> n ()) -> Sink i m r)
-> ConduitM i (v e) m r
vectorBuilder size inner = do
ref <- liftBase $ do
mv <- VM.new size
newMutVar $! S 0 mv id
res <- onAwait (yieldS ref) (inner (liftBase . addE ref))
vs <- liftBase $ do
S idx mv front <- readMutVar ref
end <-
if idx == 0
then return []
else do
v <- V.unsafeFreeze mv
return [V.unsafeTake idx v]
return $ front end
Prelude.mapM_ yield vs
return res
data S s v e = S
!Int
!(V.Mutable v s e)
([v e] -> [v e])
onAwait :: Monad m
=> ConduitM i o m ()
-> Sink i m r
-> ConduitM i o m r
onAwait (ConduitM callback) (ConduitM sink0) = ConduitM $ \rest -> let
go (Done r) = rest r
go (HaveOutput _ _ o) = absurd o
go (NeedInput f g) = callback $ \() -> NeedInput (go . f) (go . g)
go (PipeM mp) = PipeM (liftM go mp)
go (Leftover f i) = Leftover (go f) i
in go (sink0 Done)
yieldS :: (PrimMonad base, MonadBase base m)
=> MutVar (PrimState base) (S (PrimState base) v e)
-> Producer m (v e)
yieldS ref = do
S idx mv front <- liftBase $ readMutVar ref
Prelude.mapM_ yield (front [])
liftBase $ writeMutVar ref $! S idx mv id
addE :: (PrimMonad m, V.Vector v e)
=> MutVar (PrimState m) (S (PrimState m) v e)
-> e
-> m ()
addE ref e = do
S idx mv front <- readMutVar ref
VM.write mv idx e
let idx' = succ idx
size = VM.length mv
if idx' >= size
then do
v <- V.unsafeFreeze mv
let front' = front . (v:)
mv' <- VM.new size
writeMutVar ref $! S 0 mv' front'
else writeMutVar ref $! S idx' mv front
mapAccumS :: Monad m => (a -> s -> Sink b m s) -> s -> Source m b -> Sink a m s
mapAccumS f s xs = do
(zs, u) <- loop (newResumableSource xs, s)
lift (closeResumableSource zs) >> return u
where loop r@(ys, !t) = await >>= maybe (return r) go
where go a = lift (ys $$++ f a t) >>= loop
peekForever :: Monad m => ConduitM i o m () -> ConduitM i o m ()
peekForever inner =
loop
where
loop = do
mx <- peek
case mx of
Nothing -> return ()
Just _ -> inner >> loop