{-# LANGUAGE CPP #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE NoMonomorphismRestriction #-}
{-# LANGUAGE BangPatterns #-}
module Data.Conduit.Combinators
(
yieldMany
, unfold
, enumFromTo
, iterate
, repeat
, replicate
, sourceLazy
, repeatM
, repeatWhileM
, replicateM
, sourceFile
, sourceFileBS
, sourceHandle
, sourceHandleUnsafe
, sourceIOHandle
, stdin
, withSourceFile
, sourceDirectory
, sourceDirectoryDeep
, drop
, dropE
, dropWhile
, dropWhileE
, fold
, foldE
, foldl
, foldl1
, foldlE
, foldMap
, foldMapE
, all
, allE
, any
, anyE
, and
, andE
, or
, orE
, asum
, elem
, elemE
, notElem
, notElemE
, sinkLazy
, sinkList
, sinkVector
, sinkVectorN
, sinkLazyBuilder
, sinkNull
, awaitNonNull
, head
, headDef
, headE
, peek
, peekE
, last
, lastDef
, lastE
, length
, lengthE
, lengthIf
, lengthIfE
, maximum
, maximumE
, minimum
, minimumE
, null
, nullE
, sum
, sumE
, product
, productE
, find
, mapM_
, mapM_E
, foldM
, foldME
, foldMapM
, foldMapME
, sinkFile
, sinkFileCautious
, sinkTempFile
, sinkSystemTempFile
, sinkFileBS
, sinkHandle
, sinkIOHandle
, print
, stdout
, stderr
, withSinkFile
, withSinkFileBuilder
, withSinkFileCautious
, sinkHandleBuilder
, sinkHandleFlush
, map
, mapE
, omapE
, concatMap
, concatMapE
, take
, takeE
, takeWhile
, takeWhileE
, takeExactly
, takeExactlyE
, concat
, filter
, filterE
, mapWhile
, conduitVector
, scanl
, mapAccumWhile
, concatMapAccum
, intersperse
, slidingWindow
, chunksOfE
, chunksOfExactlyE
, mapM
, mapME
, omapME
, concatMapM
, filterM
, filterME
, iterM
, scanlM
, mapAccumWhileM
, concatMapAccumM
, encodeUtf8
, decodeUtf8
, decodeUtf8Lenient
, line
, lineAscii
, unlines
, unlinesAscii
, takeExactlyUntilE
, linesUnbounded
, linesUnboundedAscii
, splitOnUnboundedE
, builderToByteString
, unsafeBuilderToByteString
, builderToByteStringWith
, builderToByteStringFlush
, builderToByteStringWithFlush
, BufferAllocStrategy
, allNewBuffersStrategy
, reuseBufferStrategy
, vectorBuilder
, mapAccumS
, peekForever
, peekForeverE
) where
import Data.ByteString.Builder (Builder, toLazyByteString, hPutBuilder)
import qualified Data.ByteString.Builder.Internal as BB (flush)
import qualified Data.ByteString.Builder.Extra as BB (runBuilder, Next(Done, More, Chunk))
import qualified Data.NonNull as NonNull
import qualified Data.Traversable
import qualified Data.ByteString as S
import qualified Data.ByteString.Lazy as BL
import Data.ByteString.Lazy.Internal (defaultChunkSize)
import Control.Applicative (Alternative(..), (<$>))
import Control.Exception (catch, throwIO, finally, bracket, try, evaluate)
import Control.Category (Category (..))
import Control.Monad (unless, when, (>=>), liftM, forever)
import Control.Monad.IO.Unlift (MonadIO (..), MonadUnliftIO, withRunInIO)
import Control.Monad.Primitive (PrimMonad, PrimState, unsafePrimToPrim)
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Resource (MonadResource, MonadThrow, allocate, throwM)
import Data.Conduit
import Data.Conduit.Internal (ConduitT (..), Pipe (..))
import qualified Data.Conduit.List as CL
import Data.IORef
import Data.Maybe (fromMaybe, isNothing, isJust)
import Data.Monoid (Monoid (..))
import Data.MonoTraversable
import qualified Data.Sequences as Seq
import qualified Data.Vector.Generic as V
import qualified Data.Vector.Generic.Mutable as VM
import Data.Void (absurd)
import Prelude (Bool (..), Eq (..), Int,
Maybe (..), Either (..), Monad (..), Num (..),
Ord (..), fromIntegral, maybe, either,
($), Functor (..), Enum, seq, Show, Char,
otherwise, Either (..), not,
($!), succ, FilePath, IO, String)
import Data.Word (Word8)
import qualified Prelude
import qualified System.IO as IO
import System.IO.Error (isDoesNotExistError)
import System.IO.Unsafe (unsafePerformIO)
import Data.ByteString (ByteString)
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as TE
import qualified Data.Text.Encoding.Error as TEE
import Data.Conduit.Combinators.Stream
import Data.Conduit.Internal.Fusion
import Data.Primitive.MutVar (MutVar, newMutVar, readMutVar,
writeMutVar)
import qualified Data.Streaming.FileRead as FR
import qualified Data.Streaming.Filesystem as F
import GHC.ForeignPtr (mallocPlainForeignPtrBytes, unsafeForeignPtrToPtr)
import Foreign.ForeignPtr (touchForeignPtr, ForeignPtr)
import Foreign.Ptr (Ptr, plusPtr, minusPtr)
import Data.ByteString.Internal (ByteString (PS), mallocByteString)
import System.FilePath ((</>), (<.>), takeDirectory, takeFileName)
import System.Directory (renameFile, getTemporaryDirectory, removeFile)
import qualified Data.Sequences as DTE
import Data.Sequences (LazySequence (..))
#include "fusion-macros.h"
yieldMany, yieldManyC :: (Monad m, MonoFoldable mono)
=> mono
-> ConduitT i (Element mono) m ()
yieldManyC = ofoldMap yield
{-# INLINE yieldManyC #-}
STREAMING(yieldMany, yieldManyC, yieldManyS, x)
unfold :: Monad m
=> (b -> Maybe (a, b))
-> b
-> ConduitT i a m ()
INLINE_RULE(unfold, f x, CL.unfold f x)
enumFromTo :: (Monad m, Enum a, Ord a) => a -> a -> ConduitT i a m ()
INLINE_RULE(enumFromTo, f t, CL.enumFromTo f t)
iterate :: Monad m => (a -> a) -> a -> ConduitT i a m ()
INLINE_RULE(iterate, f t, CL.iterate f t)
repeat :: Monad m => a -> ConduitT i a m ()
INLINE_RULE(repeat, x, iterate id x)
replicate :: Monad m
=> Int
-> a
-> ConduitT i a m ()
INLINE_RULE(replicate, n x, CL.replicate n x)
sourceLazy :: (Monad m, LazySequence lazy strict)
=> lazy
-> ConduitT i strict m ()
INLINE_RULE(sourceLazy, x, yieldMany (toChunks x))
repeatM, repeatMC :: Monad m
=> m a
-> ConduitT i a m ()
repeatMC m = forever $ lift m >>= yield
{-# INLINE repeatMC #-}
STREAMING(repeatM, repeatMC, repeatMS, m)
repeatWhileM, repeatWhileMC :: Monad m
=> m a
-> (a -> Bool)
-> ConduitT i a m ()
repeatWhileMC m f =
loop
where
loop = do
x <- lift m
when (f x) $ yield x >> loop
STREAMING(repeatWhileM, repeatWhileMC, repeatWhileMS, m f)
replicateM :: Monad m
=> Int
-> m a
-> ConduitT i a m ()
INLINE_RULE(replicateM, n m, CL.replicateM n m)
sourceFile :: MonadResource m
=> FilePath
-> ConduitT i S.ByteString m ()
sourceFile fp =
bracketP
(FR.openFile fp)
FR.closeFile
loop
where
loop h = do
bs <- liftIO $ FR.readChunk h
unless (S.null bs) $ do
yield bs
loop h
sourceHandle :: MonadIO m
=> IO.Handle
-> ConduitT i S.ByteString m ()
sourceHandle h =
loop
where
loop = do
bs <- liftIO (S.hGetSome h defaultChunkSize)
if S.null bs
then return ()
else yield bs >> loop
sourceHandleUnsafe :: MonadIO m => IO.Handle -> ConduitT i ByteString m ()
sourceHandleUnsafe handle = do
fptr <- liftIO $ mallocPlainForeignPtrBytes defaultChunkSize
let ptr = unsafeForeignPtrToPtr fptr
loop = do
count <- liftIO $ IO.hGetBuf handle ptr defaultChunkSize
when (count > 0) $ do
yield (PS fptr 0 count)
loop
loop
liftIO $ touchForeignPtr fptr
sourceIOHandle :: MonadResource m
=> IO IO.Handle
-> ConduitT i S.ByteString m ()
sourceIOHandle alloc = bracketP alloc IO.hClose sourceHandle
sourceFileBS :: MonadResource m => FilePath -> ConduitT i ByteString m ()
sourceFileBS = sourceFile
{-# INLINE sourceFileBS #-}
stdin :: MonadIO m => ConduitT i ByteString m ()
INLINE_RULE0(stdin, sourceHandle IO.stdin)
sinkFile :: MonadResource m
=> FilePath
-> ConduitT S.ByteString o m ()
sinkFile fp = sinkIOHandle (IO.openBinaryFile fp IO.WriteMode)
sinkFileCautious
:: MonadResource m
=> FilePath
-> ConduitM S.ByteString o m ()
sinkFileCautious fp =
bracketP (cautiousAcquire fp) cautiousCleanup inner
where
inner (tmpFP, h) = do
sinkHandle h
liftIO $ do
IO.hClose h
renameFile tmpFP fp
withSinkFileCautious
:: (MonadUnliftIO m, MonadIO n)
=> FilePath
-> (ConduitM S.ByteString o n () -> m a)
-> m a
withSinkFileCautious fp inner =
withRunInIO $ \run -> bracket
(cautiousAcquire fp)
cautiousCleanup
(\(tmpFP, h) -> do
a <- run $ inner $ sinkHandle h
IO.hClose h
renameFile tmpFP fp
return a)
cautiousAcquire :: FilePath -> IO (FilePath, IO.Handle)
cautiousAcquire fp = IO.openBinaryTempFile (takeDirectory fp) (takeFileName fp <.> "tmp")
cautiousCleanup :: (FilePath, IO.Handle) -> IO ()
cautiousCleanup (tmpFP, h) = do
IO.hClose h
removeFile tmpFP `Control.Exception.catch` \e ->
if isDoesNotExistError e
then return ()
else throwIO e
sinkTempFile :: MonadResource m
=> FilePath
-> String
-> ConduitM ByteString o m FilePath
sinkTempFile tmpdir pattern = do
(_releaseKey, (fp, h)) <- allocate
(IO.openBinaryTempFile tmpdir pattern)
(\(fp, h) -> IO.hClose h `finally` (removeFile fp `Control.Exception.catch` \e ->
if isDoesNotExistError e
then return ()
else throwIO e))
sinkHandle h
liftIO $ IO.hClose h
return fp
sinkSystemTempFile
:: MonadResource m
=> String
-> ConduitM ByteString o m FilePath
sinkSystemTempFile pattern = do
dir <- liftIO getTemporaryDirectory
sinkTempFile dir pattern
sinkHandle :: MonadIO m
=> IO.Handle
-> ConduitT S.ByteString o m ()
sinkHandle h = awaitForever (liftIO . S.hPut h)
sinkHandleBuilder :: MonadIO m => IO.Handle -> ConduitM Builder o m ()
sinkHandleBuilder h = awaitForever (liftIO . hPutBuilder h)
sinkHandleFlush :: MonadIO m
=> IO.Handle
-> ConduitM (Flush S.ByteString) o m ()
sinkHandleFlush h =
awaitForever $ \mbs -> liftIO $
case mbs of
Chunk bs -> S.hPut h bs
Flush -> IO.hFlush h
sinkIOHandle :: MonadResource m
=> IO IO.Handle
-> ConduitT S.ByteString o m ()
sinkIOHandle alloc = bracketP alloc IO.hClose sinkHandle
withSourceFile
:: (MonadUnliftIO m, MonadIO n)
=> FilePath
-> (ConduitM i ByteString n () -> m a)
-> m a
withSourceFile fp inner =
withRunInIO $ \run ->
IO.withBinaryFile fp IO.ReadMode $
run . inner . sourceHandle
withSinkFile
:: (MonadUnliftIO m, MonadIO n)
=> FilePath
-> (ConduitM ByteString o n () -> m a)
-> m a
withSinkFile fp inner =
withRunInIO $ \run ->
IO.withBinaryFile fp IO.WriteMode $
run . inner . sinkHandle
withSinkFileBuilder
:: (MonadUnliftIO m, MonadIO n)
=> FilePath
-> (ConduitM Builder o n () -> m a)
-> m a
withSinkFileBuilder fp inner =
withRunInIO $ \run ->
IO.withBinaryFile fp IO.WriteMode $ \h ->
run $ inner $ CL.mapM_ (liftIO . hPutBuilder h)
sourceDirectory :: MonadResource m => FilePath -> ConduitT i FilePath m ()
sourceDirectory dir =
bracketP (F.openDirStream dir) F.closeDirStream go
where
go ds =
loop
where
loop = do
mfp <- liftIO $ F.readDirStream ds
case mfp of
Nothing -> return ()
Just fp -> do
yield $ dir </> fp
loop
sourceDirectoryDeep :: MonadResource m
=> Bool
-> FilePath
-> ConduitT i FilePath m ()
sourceDirectoryDeep followSymlinks =
start
where
start :: MonadResource m => FilePath -> ConduitT i FilePath m ()
start dir = sourceDirectory dir .| awaitForever go
go :: MonadResource m => FilePath -> ConduitT i FilePath m ()
go fp = do
ft <- liftIO $ F.getFileType fp
case ft of
F.FTFile -> yield fp
F.FTFileSym -> yield fp
F.FTDirectory -> start fp
F.FTDirectorySym
| followSymlinks -> start fp
| otherwise -> return ()
F.FTOther -> return ()
drop :: Monad m
=> Int
-> ConduitT a o m ()
INLINE_RULE(drop, n, CL.drop n)
dropE :: (Monad m, Seq.IsSequence seq)
=> Seq.Index seq
-> ConduitT seq o m ()
dropE =
loop
where
loop i = if i <= 0
then return ()
else await >>= maybe (return ()) (go i)
go i sq = do
unless (onull y) $ leftover y
loop i'
where
(x, y) = Seq.splitAt i sq
i' = i - fromIntegral (olength x)
{-# INLINEABLE dropE #-}
dropWhile :: Monad m
=> (a -> Bool)
-> ConduitT a o m ()
dropWhile f =
loop
where
loop = await >>= maybe (return ()) go
go x = if f x then loop else leftover x
{-# INLINE dropWhile #-}
dropWhileE :: (Monad m, Seq.IsSequence seq)
=> (Element seq -> Bool)
-> ConduitT seq o m ()
dropWhileE f =
loop
where
loop = await >>= maybe (return ()) go
go sq =
if onull x then loop else leftover x
where
x = Seq.dropWhile f sq
{-# INLINE dropWhileE #-}
fold :: (Monad m, Monoid a)
=> ConduitT a o m a
INLINE_RULE0(fold, CL.foldMap id)
foldE :: (Monad m, MonoFoldable mono, Monoid (Element mono))
=> ConduitT mono o m (Element mono)
INLINE_RULE0(foldE, CL.fold (\accum mono -> accum `mappend` ofoldMap id mono) mempty)
foldl :: Monad m => (a -> b -> a) -> a -> ConduitT b o m a
INLINE_RULE(foldl, f x, CL.fold f x)
foldlE :: (Monad m, MonoFoldable mono)
=> (a -> Element mono -> a)
-> a
-> ConduitT mono o m a
INLINE_RULE(foldlE, f x, CL.fold (ofoldlPrime f) x)
ofoldlPrime :: MonoFoldable mono => (a -> Element mono -> a) -> a -> mono -> a
ofoldlPrime = ofoldl'
foldMap :: (Monad m, Monoid b)
=> (a -> b)
-> ConduitT a o m b
INLINE_RULE(foldMap, f, CL.foldMap f)
foldMapE :: (Monad m, MonoFoldable mono, Monoid w)
=> (Element mono -> w)
-> ConduitT mono o m w
INLINE_RULE(foldMapE, f, CL.foldMap (ofoldMap f))
foldl1, foldl1C :: Monad m => (a -> a -> a) -> ConduitT a o m (Maybe a)
foldl1C f =
await >>= maybe (return Nothing) loop
where
loop !prev = await >>= maybe (return $ Just prev) (loop . f prev)
STREAMING(foldl1, foldl1C, foldl1S, f)
foldl1E :: (Monad m, MonoFoldable mono, a ~ Element mono)
=> (a -> a -> a)
-> ConduitT mono o m (Maybe a)
INLINE_RULE(foldl1E, f, foldl (foldMaybeNull f) Nothing)
foldMaybeNull :: (MonoFoldable mono, e ~ Element mono)
=> (e -> e -> e)
-> Maybe e
-> mono
-> Maybe e
foldMaybeNull f macc mono =
case (macc, NonNull.fromNullable mono) of
(Just acc, Just nn) -> Just $ ofoldl' f acc nn
(Nothing, Just nn) -> Just $ NonNull.ofoldl1' f nn
_ -> macc
{-# INLINE foldMaybeNull #-}
all, allC :: Monad m
=> (a -> Bool)
-> ConduitT a o m Bool
allC f = fmap isNothing $ find (Prelude.not . f)
{-# INLINE allC #-}
STREAMING(all, allC, allS, f)
allE :: (Monad m, MonoFoldable mono)
=> (Element mono -> Bool)
-> ConduitT mono o m Bool
INLINE_RULE(allE, f, all (oall f))
any, anyC :: Monad m
=> (a -> Bool)
-> ConduitT a o m Bool
anyC = fmap isJust . find
{-# INLINE anyC #-}
STREAMING(any, anyC, anyS, f)
anyE :: (Monad m, MonoFoldable mono)
=> (Element mono -> Bool)
-> ConduitT mono o m Bool
INLINE_RULE(anyE, f, any (oany f))
and :: Monad m => ConduitT Bool o m Bool
INLINE_RULE0(and, all id)
andE :: (Monad m, MonoFoldable mono, Element mono ~ Bool)
=> ConduitT mono o m Bool
INLINE_RULE0(andE, allE id)
or :: Monad m => ConduitT Bool o m Bool
INLINE_RULE0(or, any id)
orE :: (Monad m, MonoFoldable mono, Element mono ~ Bool)
=> ConduitT mono o m Bool
INLINE_RULE0(orE, anyE id)
asum :: (Monad m, Alternative f)
=> ConduitT (f a) o m (f a)
INLINE_RULE0(asum, foldl (<|>) empty)
elem :: (Monad m, Eq a) => a -> ConduitT a o m Bool
INLINE_RULE(elem, x, any (== x))
elemE :: (Monad m, Seq.IsSequence seq, Eq (Element seq))
=> Element seq
-> ConduitT seq o m Bool
INLINE_RULE(elemE, f, any (oelem f))
notElem :: (Monad m, Eq a) => a -> ConduitT a o m Bool
INLINE_RULE(notElem, x, all (/= x))
notElemE :: (Monad m, Seq.IsSequence seq, Eq (Element seq))
=> Element seq
-> ConduitT seq o m Bool
INLINE_RULE(notElemE, x, all (onotElem x))
sinkLazy, sinkLazyC :: (Monad m, LazySequence lazy strict)
=> ConduitT strict o m lazy
sinkLazyC = (fromChunks . ($ [])) <$> CL.fold (\front next -> front . (next:)) id
{-# INLINE sinkLazyC #-}
STREAMING0(sinkLazy, sinkLazyC, sinkLazyS)
sinkList :: Monad m => ConduitT a o m [a]
INLINE_RULE0(sinkList, CL.consume)
sinkVector, sinkVectorC :: (V.Vector v a, PrimMonad m)
=> ConduitT a o m (v a)
sinkVectorC = do
let initSize = 10
mv0 <- VM.new initSize
let go maxSize i mv | i >= maxSize = do
let newMax = maxSize * 2
mv' <- VM.grow mv maxSize
go newMax i mv'
go maxSize i mv = do
mx <- await
case mx of
Nothing -> V.slice 0 i <$> V.unsafeFreeze mv
Just x -> do
VM.write mv i x
go maxSize (i + 1) mv
go initSize 0 mv0
{-# INLINEABLE sinkVectorC #-}
STREAMING0(sinkVector, sinkVectorC, sinkVectorS)
sinkVectorN, sinkVectorNC :: (V.Vector v a, PrimMonad m)
=> Int
-> ConduitT a o m (v a)
sinkVectorNC maxSize = do
mv <- VM.new maxSize
let go i | i >= maxSize = V.unsafeFreeze mv
go i = do
mx <- await
case mx of
Nothing -> V.slice 0 i <$> V.unsafeFreeze mv
Just x -> do
VM.write mv i x
go (i + 1)
go 0
{-# INLINEABLE sinkVectorNC #-}
STREAMING(sinkVectorN, sinkVectorNC, sinkVectorNS, maxSize)
sinkLazyBuilder, sinkLazyBuilderC :: Monad m => ConduitT Builder o m BL.ByteString
sinkLazyBuilderC = fmap toLazyByteString fold
{-# INLINE sinkLazyBuilderC #-}
STREAMING0(sinkLazyBuilder, sinkLazyBuilderC, sinkLazyBuilderS)
sinkNull :: Monad m => ConduitT a o m ()
INLINE_RULE0(sinkNull, CL.sinkNull)
awaitNonNull :: (Monad m, MonoFoldable a) => ConduitT a o m (Maybe (NonNull.NonNull a))
awaitNonNull =
go
where
go = await >>= maybe (return Nothing) go'
go' = maybe go (return . Just) . NonNull.fromNullable
{-# INLINE awaitNonNull #-}
head :: Monad m => ConduitT a o m (Maybe a)
head = CL.head
headDef :: Monad m => a -> ConduitT a o m a
headDef a = fromMaybe a <$> head
headE :: (Monad m, Seq.IsSequence seq) => ConduitT seq o m (Maybe (Element seq))
headE =
loop
where
loop = await >>= maybe (return Nothing) go
go x =
case Seq.uncons x of
Nothing -> loop
Just (y, z) -> do
unless (onull z) $ leftover z
return $ Just y
{-# INLINE headE #-}
peek :: Monad m => ConduitT a o m (Maybe a)
peek = CL.peek
{-# INLINE peek #-}
peekE :: (Monad m, MonoFoldable mono) => ConduitT mono o m (Maybe (Element mono))
peekE =
loop
where
loop = await >>= maybe (return Nothing) go
go x =
case headMay x of
Nothing -> loop
Just y -> do
leftover x
return $ Just y
{-# INLINE peekE #-}
last, lastC :: Monad m => ConduitT a o m (Maybe a)
lastC =
await >>= maybe (return Nothing) loop
where
loop prev = await >>= maybe (return $ Just prev) loop
STREAMING0(last, lastC, lastS)
lastDef :: Monad m => a -> ConduitT a o m a
lastDef a = fromMaybe a <$> last
lastE, lastEC :: (Monad m, Seq.IsSequence seq) => ConduitT seq o m (Maybe (Element seq))
lastEC =
awaitNonNull >>= maybe (return Nothing) (loop . NonNull.last)
where
loop prev = awaitNonNull >>= maybe (return $ Just prev) (loop . NonNull.last)
STREAMING0(lastE, lastEC, lastES)
length :: (Monad m, Num len) => ConduitT a o m len
INLINE_RULE0(length, foldl (\x _ -> x + 1) 0)
lengthE :: (Monad m, Num len, MonoFoldable mono) => ConduitT mono o m len
INLINE_RULE0(lengthE, foldl (\x y -> x + fromIntegral (olength y)) 0)
lengthIf :: (Monad m, Num len) => (a -> Bool) -> ConduitT a o m len
INLINE_RULE(lengthIf, f, foldl (\cnt a -> if f a then (cnt + 1) else cnt) 0)
lengthIfE :: (Monad m, Num len, MonoFoldable mono)
=> (Element mono -> Bool) -> ConduitT mono o m len
INLINE_RULE(lengthIfE, f, foldlE (\cnt a -> if f a then (cnt + 1) else cnt) 0)
maximum :: (Monad m, Ord a) => ConduitT a o m (Maybe a)
INLINE_RULE0(maximum, foldl1 max)
maximumE :: (Monad m, Seq.IsSequence seq, Ord (Element seq)) => ConduitT seq o m (Maybe (Element seq))
INLINE_RULE0(maximumE, foldl1E max)
minimum :: (Monad m, Ord a) => ConduitT a o m (Maybe a)
INLINE_RULE0(minimum, foldl1 min)
minimumE :: (Monad m, Seq.IsSequence seq, Ord (Element seq)) => ConduitT seq o m (Maybe (Element seq))
INLINE_RULE0(minimumE, foldl1E min)
null :: Monad m => ConduitT a o m Bool
null = (maybe True (\_ -> False)) `fmap` peek
{-# INLINE null #-}
nullE :: (Monad m, MonoFoldable mono)
=> ConduitT mono o m Bool
nullE =
go
where
go = await >>= maybe (return True) go'
go' x = if onull x then go else leftover x >> return False
{-# INLINE nullE #-}
sum :: (Monad m, Num a) => ConduitT a o m a
INLINE_RULE0(sum, foldl (+) 0)
sumE :: (Monad m, MonoFoldable mono, Num (Element mono)) => ConduitT mono o m (Element mono)
INLINE_RULE0(sumE, foldlE (+) 0)
product :: (Monad m, Num a) => ConduitT a o m a
INLINE_RULE0(product, foldl (*) 1)
productE :: (Monad m, MonoFoldable mono, Num (Element mono)) => ConduitT mono o m (Element mono)
INLINE_RULE0(productE, foldlE (*) 1)
find, findC :: Monad m => (a -> Bool) -> ConduitT a o m (Maybe a)
findC f =
loop
where
loop = await >>= maybe (return Nothing) go
go x = if f x then return (Just x) else loop
{-# INLINE findC #-}
STREAMING(find, findC, findS, f)
mapM_ :: Monad m => (a -> m ()) -> ConduitT a o m ()
INLINE_RULE(mapM_, f, CL.mapM_ f)
mapM_E :: (Monad m, MonoFoldable mono) => (Element mono -> m ()) -> ConduitT mono o m ()
INLINE_RULE(mapM_E, f, CL.mapM_ (omapM_ f))
foldM :: Monad m => (a -> b -> m a) -> a -> ConduitT b o m a
INLINE_RULE(foldM, f x, CL.foldM f x)
foldME :: (Monad m, MonoFoldable mono)
=> (a -> Element mono -> m a)
-> a
-> ConduitT mono o m a
INLINE_RULE(foldME, f x, foldM (ofoldlM f) x)
foldMapM :: (Monad m, Monoid w) => (a -> m w) -> ConduitT a o m w
INLINE_RULE(foldMapM, f, CL.foldMapM f)
foldMapME :: (Monad m, MonoFoldable mono, Monoid w)
=> (Element mono -> m w)
-> ConduitT mono o m w
INLINE_RULE(foldMapME, f, CL.foldM (ofoldlM (\accum e -> mappend accum `liftM` f e)) mempty)
sinkFileBS :: MonadResource m => FilePath -> ConduitT ByteString o m ()
sinkFileBS = sinkFile
{-# INLINE sinkFileBS #-}
print :: (Show a, MonadIO m) => ConduitT a o m ()
INLINE_RULE0(print, mapM_ (liftIO . Prelude.print))
stdout :: MonadIO m => ConduitT ByteString o m ()
INLINE_RULE0(stdout, sinkHandle IO.stdout)
stderr :: MonadIO m => ConduitT ByteString o m ()
INLINE_RULE0(stderr, sinkHandle IO.stderr)
map :: Monad m => (a -> b) -> ConduitT a b m ()
INLINE_RULE(map, f, CL.map f)
mapE :: (Monad m, Functor f) => (a -> b) -> ConduitT (f a) (f b) m ()
INLINE_RULE(mapE, f, CL.map (fmap f))
omapE :: (Monad m, MonoFunctor mono) => (Element mono -> Element mono) -> ConduitT mono mono m ()
INLINE_RULE(omapE, f, CL.map (omap f))
concatMap, concatMapC :: (Monad m, MonoFoldable mono)
=> (a -> mono)
-> ConduitT a (Element mono) m ()
concatMapC f = awaitForever (yieldMany . f)
{-# INLINE concatMapC #-}
STREAMING(concatMap, concatMapC, concatMapS, f)
concatMapE :: (Monad m, MonoFoldable mono, Monoid w)
=> (Element mono -> w)
-> ConduitT mono w m ()
INLINE_RULE(concatMapE, f, CL.map (ofoldMap f))
take :: Monad m => Int -> ConduitT a a m ()
INLINE_RULE(take, n, CL.isolate n)
takeE :: (Monad m, Seq.IsSequence seq)
=> Seq.Index seq
-> ConduitT seq seq m ()
takeE =
loop
where
loop i = if i <= 0
then return ()
else await >>= maybe (return ()) (go i)
go i sq = do
unless (onull x) $ yield x
unless (onull y) $ leftover y
loop i'
where
(x, y) = Seq.splitAt i sq
i' = i - fromIntegral (olength x)
{-# INLINEABLE takeE #-}
takeWhile :: Monad m
=> (a -> Bool)
-> ConduitT a a m ()
takeWhile f =
loop
where
loop = await >>= maybe (return ()) go
go x = if f x
then yield x >> loop
else leftover x
{-# INLINE takeWhile #-}
takeWhileE :: (Monad m, Seq.IsSequence seq)
=> (Element seq -> Bool)
-> ConduitT seq seq m ()
takeWhileE f =
loop
where
loop = await >>= maybe (return ()) go
go sq = do
unless (onull x) $ yield x
if onull y
then loop
else leftover y
where
(x, y) = Seq.span f sq
{-# INLINE takeWhileE #-}
takeExactly :: Monad m
=> Int
-> ConduitT a b m r
-> ConduitT a b m r
takeExactly count inner = take count .| do
r <- inner
CL.sinkNull
return r
takeExactlyE :: (Monad m, Seq.IsSequence a)
=> Seq.Index a
-> ConduitT a b m r
-> ConduitT a b m r
takeExactlyE count inner = takeE count .| do
r <- inner
CL.sinkNull
return r
{-# INLINE takeExactlyE #-}
concat, concatC :: (Monad m, MonoFoldable mono)
=> ConduitT mono (Element mono) m ()
concatC = awaitForever yieldMany
STREAMING0(concat, concatC, concatS)
filter :: Monad m => (a -> Bool) -> ConduitT a a m ()
INLINE_RULE(filter, f, CL.filter f)
filterE :: (Seq.IsSequence seq, Monad m) => (Element seq -> Bool) -> ConduitT seq seq m ()
INLINE_RULE(filterE, f, CL.map (Seq.filter f))
mapWhile :: Monad m => (a -> Maybe b) -> ConduitT a b m ()
mapWhile f =
loop
where
loop = await >>= maybe (return ()) go
go x =
case f x of
Just y -> yield y >> loop
Nothing -> leftover x
{-# INLINE mapWhile #-}
conduitVector :: (V.Vector v a, PrimMonad m)
=> Int
-> ConduitT a (v a) m ()
conduitVector size =
loop
where
loop = do
v <- sinkVectorN size
unless (V.null v) $ do
yield v
loop
{-# INLINE conduitVector #-}
scanl, scanlC :: Monad m => (a -> b -> a) -> a -> ConduitT b a m ()
scanlC f =
loop
where
loop seed =
await >>= maybe (yield seed) go
where
go b = do
let seed' = f seed b
seed' `seq` yield seed
loop seed'
STREAMING(scanl, scanlC, scanlS, f x)
mapAccumWhile, mapAccumWhileC :: Monad m => (a -> s -> Either s (s, b)) -> s -> ConduitT a b m s
mapAccumWhileC f =
loop
where
loop !s = await >>= maybe (return s) go
where
go a = either (return $!) (\(s', b) -> yield b >> loop s') $ f a s
{-# INLINE mapAccumWhileC #-}
STREAMING(mapAccumWhile, mapAccumWhileC, mapAccumWhileS, f s)
concatMapAccum :: Monad m => (a -> accum -> (accum, [b])) -> accum -> ConduitT a b m ()
INLINE_RULE0(concatMapAccum, CL.concatMapAccum)
intersperse, intersperseC :: Monad m => a -> ConduitT a a m ()
intersperseC x =
await >>= omapM_ go
where
go y = yield y >> concatMap (\z -> [x, z])
STREAMING(intersperse, intersperseC, intersperseS, x)
slidingWindow, slidingWindowC :: (Monad m, Seq.IsSequence seq, Element seq ~ a) => Int -> ConduitT a seq m ()
slidingWindowC sz = go (max 1 sz) mempty
where goContinue st = await >>=
maybe (return ())
(\x -> do
let st' = Seq.snoc st x
yield st' >> goContinue (Seq.unsafeTail st')
)
go 0 st = yield st >> goContinue (Seq.unsafeTail st)
go !n st = CL.head >>= \m ->
case m of
Nothing -> yield st
Just x -> go (n-1) (Seq.snoc st x)
STREAMING(slidingWindow, slidingWindowC, slidingWindowS, sz)
chunksOfE :: (Monad m, Seq.IsSequence seq) => Seq.Index seq -> ConduitT seq seq m ()
chunksOfE chunkSize = chunksOfExactlyE chunkSize >> (await >>= maybe (return ()) yield)
chunksOfExactlyE :: (Monad m, Seq.IsSequence seq) => Seq.Index seq -> ConduitT seq seq m ()
chunksOfExactlyE chunkSize = await >>= maybe (return ()) start
where
start b
| onull b = chunksOfE chunkSize
| Seq.lengthIndex b < chunkSize = continue (Seq.lengthIndex b) [b]
| otherwise = let (first,rest) = Seq.splitAt chunkSize b in
yield first >> start rest
continue !sofar bs = do
next <- await
case next of
Nothing -> leftover (mconcat $ Prelude.reverse bs)
Just next' ->
let !sofar' = Seq.lengthIndex next' + sofar
bs' = next':bs
in if sofar' < chunkSize
then continue sofar' bs'
else start (mconcat (Prelude.reverse bs'))
mapM :: Monad m => (a -> m b) -> ConduitT a b m ()
INLINE_RULE(mapM, f, CL.mapM f)
mapME :: (Monad m, Data.Traversable.Traversable f) => (a -> m b) -> ConduitT (f a) (f b) m ()
INLINE_RULE(mapME, f, CL.mapM (Data.Traversable.mapM f))
omapME :: (Monad m, MonoTraversable mono)
=> (Element mono -> m (Element mono))
-> ConduitT mono mono m ()
INLINE_RULE(omapME, f, CL.mapM (omapM f))
concatMapM, concatMapMC :: (Monad m, MonoFoldable mono)
=> (a -> m mono)
-> ConduitT a (Element mono) m ()
concatMapMC f = awaitForever (lift . f >=> yieldMany)
STREAMING(concatMapM, concatMapMC, concatMapMS, f)
filterM, filterMC :: Monad m
=> (a -> m Bool)
-> ConduitT a a m ()
filterMC f =
awaitForever go
where
go x = do
b <- lift $ f x
when b $ yield x
STREAMING(filterM, filterMC, filterMS, f)
filterME :: (Monad m, Seq.IsSequence seq) => (Element seq -> m Bool) -> ConduitT seq seq m ()
INLINE_RULE(filterME, f, CL.mapM (Seq.filterM f))
iterM :: Monad m => (a -> m ()) -> ConduitT a a m ()
INLINE_RULE(iterM, f, CL.iterM f)
scanlM, scanlMC :: Monad m => (a -> b -> m a) -> a -> ConduitT b a m ()
scanlMC f =
loop
where
loop seed =
await >>= maybe (yield seed) go
where
go b = do
seed' <- lift $ f seed b
seed' `seq` yield seed
loop seed'
STREAMING(scanlM, scanlMC, scanlMS, f x)
mapAccumWhileM, mapAccumWhileMC :: Monad m => (a -> s -> m (Either s (s, b))) -> s -> ConduitT a b m s
mapAccumWhileMC f =
loop
where
loop !s = await >>= maybe (return s) go
where
go a = lift (f a s) >>= either (return $!) (\(s', b) -> yield b >> loop s')
{-# INLINE mapAccumWhileMC #-}
STREAMING(mapAccumWhileM, mapAccumWhileMC, mapAccumWhileMS, f s)
concatMapAccumM :: Monad m => (a -> accum -> m (accum, [b])) -> accum -> ConduitT a b m ()
INLINE_RULE(concatMapAccumM, f x, CL.concatMapAccumM f x)
encodeUtf8 :: (Monad m, DTE.Utf8 text binary) => ConduitT text binary m ()
INLINE_RULE0(encodeUtf8, map DTE.encodeUtf8)
decodeUtf8 :: MonadThrow m => ConduitT ByteString Text m ()
decodeUtf8 =
loop TE.streamDecodeUtf8
where
loop parse =
await >>= maybe done go
where
parse' = unsafePerformIO . try . evaluate . parse
done =
case parse' mempty of
Left e -> throwM (e :: TEE.UnicodeException)
Right (TE.Some t bs _) -> do
unless (T.null t) (yield t)
unless (S.null bs) (yield $ T.replicate (S.length bs) (T.singleton '\xFFFD'))
go bs = do
case parse' bs of
Left e -> do
leftover bs
throwM (e :: TEE.UnicodeException)
Right (TE.Some t _ next) -> do
unless (T.null t) (yield t)
loop next
decodeUtf8Lenient :: Monad m => ConduitT ByteString Text m ()
decodeUtf8Lenient =
loop (TE.streamDecodeUtf8With TEE.lenientDecode)
where
loop parse =
await >>= maybe done go
where
done = do
let TE.Some t bs _ = parse mempty
unless (T.null t) (yield t)
unless (S.null bs) (yield $ T.replicate (S.length bs) (T.singleton '\xFFFD'))
go bs = do
let TE.Some t _ next = parse bs
unless (T.null t) (yield t)
loop next
line :: (Monad m, Seq.IsSequence seq, Element seq ~ Char)
=> ConduitT seq o m r
-> ConduitT seq o m r
line = takeExactlyUntilE (== '\n')
{-# INLINE line #-}
lineAscii :: (Monad m, Seq.IsSequence seq, Element seq ~ Word8)
=> ConduitT seq o m r
-> ConduitT seq o m r
lineAscii = takeExactlyUntilE (== 10)
{-# INLINE lineAscii #-}
takeExactlyUntilE :: (Monad m, Seq.IsSequence seq)
=> (Element seq -> Bool)
-> ConduitT seq o m r
-> ConduitT seq o m r
takeExactlyUntilE f inner =
loop .| do
x <- inner
sinkNull
return x
where
loop = await >>= omapM_ go
go t =
if onull y
then yield x >> loop
else do
unless (onull x) $ yield x
let y' = Seq.drop 1 y
unless (onull y') $ leftover y'
where
(x, y) = Seq.break f t
{-# INLINE takeExactlyUntilE #-}
unlines :: (Monad m, Seq.IsSequence seq, Element seq ~ Char) => ConduitT seq seq m ()
INLINE_RULE0(unlines, concatMap (:[Seq.singleton '\n']))
unlinesAscii :: (Monad m, Seq.IsSequence seq, Element seq ~ Word8) => ConduitT seq seq m ()
INLINE_RULE0(unlinesAscii, concatMap (:[Seq.singleton 10]))
splitOnUnboundedE, splitOnUnboundedEC :: (Monad m, Seq.IsSequence seq) => (Element seq -> Bool) -> ConduitT seq seq m ()
splitOnUnboundedEC f =
start
where
start = await >>= maybe (return ()) (loop id)
loop bldr t =
if onull y
then do
mt <- await
case mt of
Nothing -> let finalChunk = mconcat $ bldr [t]
in unless (onull finalChunk) $ yield finalChunk
Just t' -> loop (bldr . (t:)) t'
else yield (mconcat $ bldr [x]) >> loop id (Seq.drop 1 y)
where
(x, y) = Seq.break f t
STREAMING(splitOnUnboundedE, splitOnUnboundedEC, splitOnUnboundedES, f)
linesUnbounded :: (Monad m, Seq.IsSequence seq, Element seq ~ Char)
=> ConduitT seq seq m ()
INLINE_RULE0(linesUnbounded, splitOnUnboundedE (== '\n'))
linesUnboundedAscii :: (Monad m, Seq.IsSequence seq, Element seq ~ Word8)
=> ConduitT seq seq m ()
INLINE_RULE0(linesUnboundedAscii, splitOnUnboundedE (== 10))
builderToByteString :: PrimMonad m => ConduitT Builder S.ByteString m ()
builderToByteString = builderToByteStringWith defaultStrategy
{-# INLINE builderToByteString #-}
builderToByteStringFlush :: PrimMonad m
=> ConduitT (Flush Builder) (Flush S.ByteString) m ()
builderToByteStringFlush = builderToByteStringWithFlush defaultStrategy
{-# INLINE builderToByteStringFlush #-}
unsafeBuilderToByteString :: PrimMonad m
=> ConduitT Builder S.ByteString m ()
unsafeBuilderToByteString =
builderToByteStringWith (reuseBufferStrategy (allocBuffer defaultChunkSize))
{-# INLINE unsafeBuilderToByteString #-}
builderToByteStringWith :: PrimMonad m
=> BufferAllocStrategy
-> ConduitT Builder S.ByteString m ()
builderToByteStringWith =
bbhelper (liftM (fmap Chunk) await) yield'
where
yield' Flush = return ()
yield' (Chunk bs) = yield bs
{-# INLINE builderToByteStringWith #-}
builderToByteStringWithFlush
:: PrimMonad m
=> BufferAllocStrategy
-> ConduitT (Flush Builder) (Flush S.ByteString) m ()
builderToByteStringWithFlush = bbhelper await yield
{-# INLINE builderToByteStringWithFlush #-}
bbhelper
:: PrimMonad m
=> m (Maybe (Flush Builder))
-> (Flush S.ByteString -> m ())
-> BufferAllocStrategy
-> m ()
bbhelper await' yield' strat = do
(recv, finish) <- unsafePrimToPrim $ newByteStringBuilderRecv strat
let loop = await' >>= maybe finish' cont
finish' = do
mbs <- unsafePrimToPrim finish
maybe (return ()) (yield' . Chunk) mbs
cont fbuilder = do
let builder =
case fbuilder of
Flush -> BB.flush
Chunk b -> b
popper <- unsafePrimToPrim $ recv builder
let cont' = do
bs <- unsafePrimToPrim popper
unless (S.null bs) $ do
yield' (Chunk bs)
cont'
cont'
case fbuilder of
Flush -> yield' Flush
Chunk _ -> return ()
loop
loop
{-# INLINE bbhelper #-}
type BuilderPopper = IO S.ByteString
type BuilderRecv = Builder -> IO BuilderPopper
type BuilderFinish = IO (Maybe S.ByteString)
newByteStringBuilderRecv :: BufferAllocStrategy -> IO (BuilderRecv, BuilderFinish)
newByteStringBuilderRecv (ioBufInit, nextBuf) = do
refBuf <- newIORef ioBufInit
return (push refBuf, finish refBuf)
where
finish refBuf = do
ioBuf <- readIORef refBuf
buf <- ioBuf
return $ unsafeFreezeNonEmptyBuffer buf
push refBuf builder = do
refWri <- newIORef $ Left $ BB.runBuilder builder
return $ popper refBuf refWri
popper refBuf refWri = do
ioBuf <- readIORef refBuf
ebWri <- readIORef refWri
case ebWri of
Left bWri -> do
!buf@(Buffer _ _ op ope) <- ioBuf
(bytes, next) <- bWri op (ope `minusPtr` op)
let op' = op `plusPtr` bytes
case next of
BB.Done -> do
writeIORef refBuf $ return $ updateEndOfSlice buf op'
return S.empty
BB.More minSize bWri' -> do
let buf' = updateEndOfSlice buf op'
{-# INLINE cont #-}
cont mbs = do
ioBuf' <- nextBuf minSize buf'
writeIORef refBuf ioBuf'
writeIORef refWri $ Left bWri'
case mbs of
Just bs | not $ S.null bs -> return bs
_ -> popper refBuf refWri
cont $ unsafeFreezeNonEmptyBuffer buf'
BB.Chunk bs bWri' -> do
let buf' = updateEndOfSlice buf op'
let yieldBS = do
nextBuf 1 buf' >>= writeIORef refBuf
writeIORef refWri $ Left bWri'
if S.null bs
then popper refBuf refWri
else return bs
case unsafeFreezeNonEmptyBuffer buf' of
Nothing -> yieldBS
Just bs' -> do
writeIORef refWri $ Right yieldBS
return bs'
Right action -> action
data Buffer = Buffer {-# UNPACK #-} !(ForeignPtr Word8)
{-# UNPACK #-} !(Ptr Word8)
{-# UNPACK #-} !(Ptr Word8)
{-# UNPACK #-} !(Ptr Word8)
{-# INLINE unsafeFreezeBuffer #-}
unsafeFreezeBuffer :: Buffer -> S.ByteString
unsafeFreezeBuffer (Buffer fpbuf p0 op _) =
PS fpbuf (p0 `minusPtr` unsafeForeignPtrToPtr fpbuf) (op `minusPtr` p0)
{-# INLINE unsafeFreezeNonEmptyBuffer #-}
unsafeFreezeNonEmptyBuffer :: Buffer -> Maybe S.ByteString
unsafeFreezeNonEmptyBuffer buf
| sliceSize buf <= 0 = Nothing
| otherwise = Just $ unsafeFreezeBuffer buf
{-# INLINE updateEndOfSlice #-}
updateEndOfSlice :: Buffer
-> Ptr Word8
-> Buffer
updateEndOfSlice (Buffer fpbuf p0 _ ope) op' = Buffer fpbuf p0 op' ope
sliceSize :: Buffer -> Int
sliceSize (Buffer _ p0 op _) = op `minusPtr` p0
type BufferAllocStrategy = (IO Buffer, Int -> Buffer -> IO (IO Buffer))
defaultStrategy :: BufferAllocStrategy
defaultStrategy = allNewBuffersStrategy defaultChunkSize
allNewBuffersStrategy :: Int
-> BufferAllocStrategy
allNewBuffersStrategy bufSize =
( allocBuffer bufSize
, \reqSize _ -> return (allocBuffer (max reqSize bufSize)) )
reuseBufferStrategy :: IO Buffer
-> BufferAllocStrategy
reuseBufferStrategy buf0 =
(buf0, tryReuseBuffer)
where
tryReuseBuffer reqSize buf
| bufferSize buf >= reqSize = return $ return (reuseBuffer buf)
| otherwise = return $ allocBuffer reqSize
bufferSize :: Buffer -> Int
bufferSize (Buffer fpbuf _ _ ope) =
ope `minusPtr` unsafeForeignPtrToPtr fpbuf
{-# INLINE allocBuffer #-}
allocBuffer :: Int -> IO Buffer
allocBuffer size = do
fpbuf <- mallocByteString size
let !pbuf = unsafeForeignPtrToPtr fpbuf
return $! Buffer fpbuf pbuf pbuf (pbuf `plusPtr` size)
{-# INLINE reuseBuffer #-}
reuseBuffer :: Buffer -> Buffer
reuseBuffer (Buffer fpbuf _ _ ope) = Buffer fpbuf p0 p0 ope
where
p0 = unsafeForeignPtrToPtr fpbuf
vectorBuilder :: (PrimMonad m, PrimMonad n, V.Vector v e, PrimState m ~ PrimState n)
=> Int
-> ((e -> n ()) -> ConduitT i Void m r)
-> ConduitT i (v e) m r
vectorBuilder size inner = do
ref <- do
mv <- VM.new size
newMutVar $! S 0 mv id
res <- onAwait (yieldS ref) (inner (addE ref))
vs <- do
S idx mv front <- readMutVar ref
end <-
if idx == 0
then return []
else do
v <- V.unsafeFreeze mv
return [V.unsafeTake idx v]
return $ front end
Prelude.mapM_ yield vs
return res
{-# INLINE vectorBuilder #-}
data S s v e = S
{-# UNPACK #-} !Int
!(V.Mutable v s e)
([v e] -> [v e])
onAwait :: Monad m
=> ConduitT i o m ()
-> ConduitT i Void m r
-> ConduitT i o m r
onAwait (ConduitT callback) (ConduitT sink0) = ConduitT $ \rest -> let
go (Done r) = rest r
go (HaveOutput _ o) = absurd o
go (NeedInput f g) = callback $ \() -> NeedInput (go . f) (go . g)
go (PipeM mp) = PipeM (liftM go mp)
go (Leftover f i) = Leftover (go f) i
in go (sink0 Done)
{-# INLINE onAwait #-}
yieldS :: PrimMonad m
=> MutVar (PrimState m) (S (PrimState m) v e)
-> ConduitT i (v e) m ()
yieldS ref = do
S idx mv front <- readMutVar ref
Prelude.mapM_ yield (front [])
writeMutVar ref $! S idx mv id
{-# INLINE yieldS #-}
addE :: (PrimMonad m, V.Vector v e)
=> MutVar (PrimState m) (S (PrimState m) v e)
-> e
-> m ()
addE ref e = do
S idx mv front <- readMutVar ref
VM.write mv idx e
let idx' = succ idx
size = VM.length mv
if idx' >= size
then do
v <- V.unsafeFreeze mv
let front' = front . (v:)
mv' <- VM.new size
writeMutVar ref $! S 0 mv' front'
else writeMutVar ref $! S idx' mv front
{-# INLINE addE #-}
mapAccumS
:: Monad m
=> (a -> s -> ConduitT b Void m s)
-> s
-> ConduitT () b m ()
-> ConduitT a Void m s
mapAccumS f s xs = do
(_, u) <- loop (sealConduitT xs, s)
return u
where loop r@(ys, !t) = await >>= maybe (return r) go
where go a = lift (ys $$++ f a t) >>= loop
{-# INLINE mapAccumS #-}
peekForever :: Monad m => ConduitT i o m () -> ConduitT i o m ()
peekForever inner =
loop
where
loop = do
mx <- peek
case mx of
Nothing -> return ()
Just _ -> inner >> loop
peekForeverE :: (Monad m, MonoFoldable i)
=> ConduitT i o m ()
-> ConduitT i o m ()
peekForeverE inner =
loop
where
loop = do
mx <- peekE
case mx of
Nothing -> return ()
Just _ -> inner >> loop