module Data.Conduit.Internal.Conduit
(
ConduitT (..)
, ConduitM
, Source
, Producer
, Sink
, Consumer
, Conduit
, Flush (..)
, ZipSource (..)
, ZipSink (..)
, ZipConduit (..)
, SealedConduitT (..)
, sealConduitT
, unsealConduitT
, await
, awaitForever
, yield
, yieldM
, leftover
, runConduit
, fuse
, connect
, connectResume
, connectResumeConduit
, fuseLeftovers
, fuseReturnLeftovers
, ($$+)
, ($$++)
, ($$+-)
, ($=+)
, (=$$+)
, (=$$++)
, (=$$+-)
, ($$)
, ($=)
, (=$)
, (=$=)
, (.|)
, sourceToPipe
, sinkToPipe
, conduitToPipe
, toProducer
, toConsumer
, bracketP
, catchC
, handleC
, tryC
, Data.Conduit.Internal.Conduit.transPipe
, Data.Conduit.Internal.Conduit.mapOutput
, Data.Conduit.Internal.Conduit.mapOutputMaybe
, Data.Conduit.Internal.Conduit.mapInput
, zipSinks
, zipSources
, zipSourcesApp
, zipConduitApp
, mergeSource
, passthroughSink
, sourceToList
, fuseBoth
, fuseBothMaybe
, fuseUpstream
, sequenceSources
, sequenceSinks
, sequenceConduits
) where
import Control.Applicative (Applicative (..))
import Control.Exception (Exception)
import qualified Control.Exception as E (catch)
import Control.Monad (liftM, liftM2, ap)
import Control.Monad.Error.Class(MonadError(..))
import Control.Monad.Reader.Class(MonadReader(..))
import Control.Monad.RWS.Class(MonadRWS())
import Control.Monad.Writer.Class(MonadWriter(..), censor)
import Control.Monad.State.Class(MonadState(..))
import Control.Monad.Trans.Class (MonadTrans (lift))
import Control.Monad.IO.Unlift (MonadIO (liftIO), MonadUnliftIO, withRunInIO)
import Control.Monad.Primitive (PrimMonad, PrimState, primitive)
import Data.Void (Void, absurd)
import Data.Monoid (Monoid (mappend, mempty))
import Data.Semigroup (Semigroup ((<>)))
import Control.Monad.Trans.Resource
import Data.Conduit.Internal.Pipe hiding (yield, mapOutput, leftover, yieldM, await, awaitForever, bracketP)
import qualified Data.Conduit.Internal.Pipe as CI
import Control.Monad (forever)
import Data.Traversable (Traversable (..))
newtype ConduitT i o m r = ConduitT
{ unConduitT :: forall b.
(r -> Pipe i i o () m b) -> Pipe i i o () m b
}
newtype SealedConduitT i o m r = SealedConduitT (Pipe i i o () m r)
type ConduitM = ConduitT
instance Functor (ConduitT i o m) where
fmap f (ConduitT c) = ConduitT $ \rest -> c (rest . f)
instance Applicative (ConduitT i o m) where
pure x = ConduitT ($ x)
(<*>) = ap
instance Monad (ConduitT i o m) where
return = pure
ConduitT f >>= g = ConduitT $ \h -> f $ \a -> unConduitT (g a) h
instance MonadThrow m => MonadThrow (ConduitT i o m) where
throwM = lift . throwM
instance MonadIO m => MonadIO (ConduitT i o m) where
liftIO = lift . liftIO
instance MonadReader r m => MonadReader r (ConduitT i o m) where
ask = lift ask
local f (ConduitT c0) = ConduitT $ \rest ->
let go (HaveOutput p o) = HaveOutput (go p) o
go (NeedInput p c) = NeedInput (\i -> go (p i)) (\u -> go (c u))
go (Done x) = rest x
go (PipeM mp) = PipeM (liftM go $ local f mp)
go (Leftover p i) = Leftover (go p) i
in go (c0 Done)
#ifndef MIN_VERSION_mtl
#define MIN_VERSION_mtl(x, y, z) 0
#endif
instance MonadWriter w m => MonadWriter w (ConduitT i o m) where
#if MIN_VERSION_mtl(2, 1, 0)
writer = lift . writer
#endif
tell = lift . tell
listen (ConduitT c0) = ConduitT $ \rest ->
let go front (HaveOutput p o) = HaveOutput (go front p) o
go front (NeedInput p c) = NeedInput (\i -> go front (p i)) (\u -> go front (c u))
go front (Done x) = rest (x, front)
go front (PipeM mp) = PipeM $ do
(p,w) <- listen mp
return $ go (front `mappend` w) p
go front (Leftover p i) = Leftover (go front p) i
in go mempty (c0 Done)
pass (ConduitT c0) = ConduitT $ \rest ->
let go front (HaveOutput p o) = HaveOutput (go front p) o
go front (NeedInput p c) = NeedInput (\i -> go front (p i)) (\u -> go front (c u))
go front (PipeM mp) = PipeM $ do
(p,w) <- censor (const mempty) (listen mp)
return $ go (front `mappend` w) p
go front (Done (x,f)) = PipeM $ do
tell (f front)
return $ rest x
go front (Leftover p i) = Leftover (go front p) i
in go mempty (c0 Done)
instance MonadState s m => MonadState s (ConduitT i o m) where
get = lift get
put = lift . put
#if MIN_VERSION_mtl(2, 1, 0)
state = lift . state
#endif
instance MonadRWS r w s m => MonadRWS r w s (ConduitT i o m)
instance MonadError e m => MonadError e (ConduitT i o m) where
throwError = lift . throwError
catchError (ConduitT c0) f = ConduitT $ \rest ->
let go (HaveOutput p o) = HaveOutput (go p) o
go (NeedInput p c) = NeedInput (\i -> go (p i)) (\u -> go (c u))
go (Done x) = rest x
go (PipeM mp) =
PipeM $ catchError (liftM go mp) $ \e -> do
return $ unConduitT (f e) rest
go (Leftover p i) = Leftover (go p) i
in go (c0 Done)
instance MonadTrans (ConduitT i o) where
lift mr = ConduitT $ \rest -> PipeM (liftM rest mr)
instance MonadResource m => MonadResource (ConduitT i o m) where
liftResourceT = lift . liftResourceT
instance Monad m => Semigroup (ConduitT i o m ()) where
(<>) = (>>)
instance Monad m => Monoid (ConduitT i o m ()) where
mempty = return ()
#if !(MIN_VERSION_base(4,11,0))
mappend = (<>)
#endif
instance PrimMonad m => PrimMonad (ConduitT i o m) where
type PrimState (ConduitT i o m) = PrimState m
primitive = lift . primitive
type Source m o = ConduitT () o m ()
type Producer m o = forall i. ConduitT i o m ()
type Sink i = ConduitT i Void
type Consumer i m r = forall o. ConduitT i o m r
type Conduit i m o = ConduitT i o m ()
sealConduitT :: ConduitT i o m r -> SealedConduitT i o m r
sealConduitT (ConduitT f) = SealedConduitT (f Done)
unsealConduitT :: Monad m => SealedConduitT i o m r -> ConduitT i o m r
unsealConduitT (SealedConduitT f) = ConduitT (f >>=)
connectResume :: Monad m
=> SealedConduitT () a m ()
-> ConduitT a Void m r
-> m (SealedConduitT () a m (), r)
connectResume (SealedConduitT left0) (ConduitT right0) =
goRight left0 (right0 Done)
where
goRight left right =
case right of
HaveOutput _ o -> absurd o
NeedInput rp rc -> goLeft rp rc left
Done r2 -> return (SealedConduitT left, r2)
PipeM mp -> mp >>= goRight left
Leftover p i -> goRight (HaveOutput left i) p
goLeft rp rc left =
case left of
HaveOutput left' o -> goRight left' (rp o)
NeedInput _ lc -> recurse (lc ())
Done () -> goRight (Done ()) (rc ())
PipeM mp -> mp >>= recurse
Leftover p () -> recurse p
where
recurse = goLeft rp rc
sourceToPipe :: Monad m => Source m o -> Pipe l i o u m ()
sourceToPipe =
go . flip unConduitT Done
where
go (HaveOutput p o) = HaveOutput (go p) o
go (NeedInput _ c) = go $ c ()
go (Done ()) = Done ()
go (PipeM mp) = PipeM (liftM go mp)
go (Leftover p ()) = go p
sinkToPipe :: Monad m => Sink i m r -> Pipe l i o u m r
sinkToPipe =
go . injectLeftovers . flip unConduitT Done
where
go (HaveOutput _ o) = absurd o
go (NeedInput p c) = NeedInput (go . p) (const $ go $ c ())
go (Done r) = Done r
go (PipeM mp) = PipeM (liftM go mp)
go (Leftover _ l) = absurd l
conduitToPipe :: Monad m => Conduit i m o -> Pipe l i o u m ()
conduitToPipe =
go . injectLeftovers . flip unConduitT Done
where
go (HaveOutput p o) = HaveOutput (go p) o
go (NeedInput p c) = NeedInput (go . p) (const $ go $ c ())
go (Done ()) = Done ()
go (PipeM mp) = PipeM (liftM go mp)
go (Leftover _ l) = absurd l
toProducer :: Monad m => Source m a -> Producer m a
toProducer (ConduitT c0) = ConduitT $ \rest -> let
go (HaveOutput p o) = HaveOutput (go p) o
go (NeedInput _ c) = go (c ())
go (Done r) = rest r
go (PipeM mp) = PipeM (liftM go mp)
go (Leftover p ()) = go p
in go (c0 Done)
toConsumer :: Monad m => Sink a m b -> Consumer a m b
toConsumer (ConduitT c0) = ConduitT $ \rest -> let
go (HaveOutput _ o) = absurd o
go (NeedInput p c) = NeedInput (go . p) (go . c)
go (Done r) = rest r
go (PipeM mp) = PipeM (liftM go mp)
go (Leftover p l) = Leftover (go p) l
in go (c0 Done)
catchC :: (MonadUnliftIO m, Exception e)
=> ConduitT i o m r
-> (e -> ConduitT i o m r)
-> ConduitT i o m r
catchC (ConduitT p0) onErr = ConduitT $ \rest -> let
go (Done r) = rest r
go (PipeM mp) = PipeM $ withRunInIO $ \run -> E.catch (run (liftM go mp))
(return . flip unConduitT rest . onErr)
go (Leftover p i) = Leftover (go p) i
go (NeedInput x y) = NeedInput (go . x) (go . y)
go (HaveOutput p o) = HaveOutput (go p) o
in go (p0 Done)
handleC :: (MonadUnliftIO m, Exception e)
=> (e -> ConduitT i o m r)
-> ConduitT i o m r
-> ConduitT i o m r
handleC = flip catchC
tryC :: (MonadUnliftIO m, Exception e)
=> ConduitT i o m r
-> ConduitT i o m (Either e r)
tryC c = fmap Right c `catchC` (return . Left)
zipSinks :: Monad m => Sink i m r -> Sink i m r' -> Sink i m (r, r')
zipSinks (ConduitT x0) (ConduitT y0) = ConduitT $ \rest -> let
Leftover _ i >< _ = absurd i
_ >< Leftover _ i = absurd i
HaveOutput _ o >< _ = absurd o
_ >< HaveOutput _ o = absurd o
PipeM mx >< y = PipeM (liftM (>< y) mx)
x >< PipeM my = PipeM (liftM (x ><) my)
Done x >< Done y = rest (x, y)
NeedInput px cx >< NeedInput py cy = NeedInput (\i -> px i >< py i) (\() -> cx () >< cy ())
NeedInput px cx >< y@Done{} = NeedInput (\i -> px i >< y) (\u -> cx u >< y)
x@Done{} >< NeedInput py cy = NeedInput (\i -> x >< py i) (\u -> x >< cy u)
in injectLeftovers (x0 Done) >< injectLeftovers (y0 Done)
zipSources :: Monad m => Source m a -> Source m b -> Source m (a, b)
zipSources (ConduitT left0) (ConduitT right0) = ConduitT $ \rest -> let
go (Leftover left ()) right = go left right
go left (Leftover right ()) = go left right
go (Done ()) (Done ()) = rest ()
go (Done ()) (HaveOutput _ _) = rest ()
go (HaveOutput _ _) (Done ()) = rest ()
go (Done ()) (PipeM _) = rest ()
go (PipeM _) (Done ()) = rest ()
go (PipeM mx) (PipeM my) = PipeM (liftM2 go mx my)
go (PipeM mx) y@HaveOutput{} = PipeM (liftM (\x -> go x y) mx)
go x@HaveOutput{} (PipeM my) = PipeM (liftM (go x) my)
go (HaveOutput srcx x) (HaveOutput srcy y) = HaveOutput (go srcx srcy) (x, y)
go (NeedInput _ c) right = go (c ()) right
go left (NeedInput _ c) = go left (c ())
in go (left0 Done) (right0 Done)
zipSourcesApp :: Monad m => Source m (a -> b) -> Source m a -> Source m b
zipSourcesApp (ConduitT left0) (ConduitT right0) = ConduitT $ \rest -> let
go (Leftover left ()) right = go left right
go left (Leftover right ()) = go left right
go (Done ()) (Done ()) = rest ()
go (Done ()) (HaveOutput _ _) = rest ()
go (HaveOutput _ _) (Done ()) = rest ()
go (Done ()) (PipeM _) = rest ()
go (PipeM _) (Done ()) = rest ()
go (PipeM mx) (PipeM my) = PipeM (liftM2 go mx my)
go (PipeM mx) y@HaveOutput{} = PipeM (liftM (\x -> go x y) mx)
go x@HaveOutput{} (PipeM my) = PipeM (liftM (go x) my)
go (HaveOutput srcx x) (HaveOutput srcy y) = HaveOutput (go srcx srcy) (x y)
go (NeedInput _ c) right = go (c ()) right
go left (NeedInput _ c) = go left (c ())
in go (left0 Done) (right0 Done)
zipConduitApp
:: Monad m
=> ConduitT i o m (x -> y)
-> ConduitT i o m x
-> ConduitT i o m y
zipConduitApp (ConduitT left0) (ConduitT right0) = ConduitT $ \rest -> let
go (Done f) (Done x) = rest (f x)
go (PipeM mx) y = PipeM (flip go y `liftM` mx)
go x (PipeM my) = PipeM (go x `liftM` my)
go (HaveOutput x o) y = HaveOutput (go x y) o
go x (HaveOutput y o) = HaveOutput (go x y) o
go (Leftover _ i) _ = absurd i
go _ (Leftover _ i) = absurd i
go (NeedInput px cx) (NeedInput py cy) = NeedInput
(\i -> go (px i) (py i))
(\u -> go (cx u) (cy u))
go (NeedInput px cx) (Done y) = NeedInput
(\i -> go (px i) (Done y))
(\u -> go (cx u) (Done y))
go (Done x) (NeedInput py cy) = NeedInput
(\i -> go (Done x) (py i))
(\u -> go (Done x) (cy u))
in go (injectLeftovers $ left0 Done) (injectLeftovers $ right0 Done)
fuseReturnLeftovers :: Monad m
=> ConduitT a b m ()
-> ConduitT b c m r
-> ConduitT a c m (r, [b])
fuseReturnLeftovers (ConduitT left0) (ConduitT right0) = ConduitT $ \rest -> let
goRight bs left right =
case right of
HaveOutput p o -> HaveOutput (recurse p) o
NeedInput rp rc ->
case bs of
[] -> goLeft rp rc left
b:bs' -> goRight bs' left (rp b)
Done r2 -> rest (r2, bs)
PipeM mp -> PipeM (liftM recurse mp)
Leftover p b -> goRight (b:bs) left p
where
recurse = goRight bs left
goLeft rp rc left =
case left of
HaveOutput left' o -> goRight [] left' (rp o)
NeedInput left' lc -> NeedInput (recurse . left') (recurse . lc)
Done r1 -> goRight [] (Done r1) (rc r1)
PipeM mp -> PipeM (liftM recurse mp)
Leftover left' i -> Leftover (recurse left') i
where
recurse = goLeft rp rc
in goRight [] (left0 Done) (right0 Done)
fuseLeftovers
:: Monad m
=> ([b] -> [a])
-> ConduitT a b m ()
-> ConduitT b c m r
-> ConduitT a c m r
fuseLeftovers f left right = do
(r, bs) <- fuseReturnLeftovers left right
mapM_ leftover $ reverse $ f bs
return r
connectResumeConduit
:: Monad m
=> SealedConduitT i o m ()
-> ConduitT o Void m r
-> ConduitT i Void m (SealedConduitT i o m (), r)
connectResumeConduit (SealedConduitT left0) (ConduitT right0) = ConduitT $ \rest -> let
goRight left right =
case right of
HaveOutput _ o -> absurd o
NeedInput rp rc -> goLeft rp rc left
Done r2 -> rest (SealedConduitT left, r2)
PipeM mp -> PipeM (liftM (goRight left) mp)
Leftover p i -> goRight (HaveOutput left i) p
goLeft rp rc left =
case left of
HaveOutput left' o -> goRight left' (rp o)
NeedInput left' lc -> NeedInput (recurse . left') (recurse . lc)
Done () -> goRight (Done ()) (rc ())
PipeM mp -> PipeM (liftM recurse mp)
Leftover left' i -> Leftover (recurse left') i
where
recurse = goLeft rp rc
in goRight left0 (right0 Done)
mergeSource
:: Monad m
=> Source m i
-> Conduit a m (i, a)
mergeSource = loop . sealConduitT
where
loop :: Monad m => SealedConduitT () i m () -> Conduit a m (i, a)
loop src0 = await >>= maybe (return ()) go
where
go a = do
(src1, mi) <- lift $ src0 $$++ await
case mi of
Nothing -> return ()
Just i -> yield (i, a) >> loop src1
passthroughSink :: Monad m
=> Sink i m r
-> (r -> m ())
-> Conduit i m i
passthroughSink (ConduitT sink0) final = ConduitT $ \rest -> let
go mbuf _ (Done r) = do
maybe (return ()) CI.yield mbuf
lift $ final r
unConduitT (awaitForever yield) rest
go mbuf is (Leftover sink i) = go mbuf (i:is) sink
go _ _ (HaveOutput _ o) = absurd o
go mbuf is (PipeM mx) = do
x <- lift mx
go mbuf is x
go mbuf (i:is) (NeedInput next _) = go mbuf is (next i)
go mbuf [] (NeedInput next done) = do
maybe (return ()) CI.yield mbuf
mx <- CI.await
case mx of
Nothing -> go Nothing [] (done ())
Just x -> go (Just x) [] (next x)
in go Nothing [] (sink0 Done)
sourceToList :: Monad m => Source m a -> m [a]
sourceToList =
go . flip unConduitT Done
where
go (Done _) = return []
go (HaveOutput src x) = liftM (x:) (go src)
go (PipeM msrc) = msrc >>= go
go (NeedInput _ c) = go (c ())
go (Leftover p _) = go p
infixr 0 $$
infixl 1 $=
infixr 2 =$
infixr 2 =$=
infixr 0 $$+
infixr 0 $$++
infixr 0 $$+-
infixl 1 $=+
infixr 2 .|
connect :: Monad m
=> ConduitT () a m ()
-> ConduitT a Void m r
-> m r
connect = ($$)
fuse :: Monad m => Conduit a m b -> ConduitM b c m r -> ConduitM a c m r
fuse = (=$=)
(.|) :: Monad m
=> ConduitM a b m ()
-> ConduitM b c m r
-> ConduitM a c m r
(.|) = fuse
($$) :: Monad m => Source m a -> Sink a m b -> m b
src $$ sink = do
(rsrc, res) <- src $$+ sink
rsrc $$+- return ()
return res
($=) :: Monad m => Conduit a m b -> ConduitT b c m r -> ConduitT a c m r
($=) = (=$=)
(=$) :: Monad m => Conduit a m b -> ConduitT b c m r -> ConduitT a c m r
(=$) = (=$=)
(=$=) :: Monad m => Conduit a m b -> ConduitT b c m r -> ConduitT a c m r
ConduitT left0 =$= ConduitT right0 = ConduitT $ \rest ->
let goRight left right =
case right of
HaveOutput p o -> HaveOutput (recurse p) o
NeedInput rp rc -> goLeft rp rc left
Done r2 -> rest r2
PipeM mp -> PipeM (liftM recurse mp)
Leftover right' i -> goRight (HaveOutput left i) right'
where
recurse = goRight left
goLeft rp rc left =
case left of
HaveOutput left' o -> goRight left' (rp o)
NeedInput left' lc -> NeedInput (recurse . left') (recurse . lc)
Done r1 -> goRight (Done r1) (rc r1)
PipeM mp -> PipeM (liftM recurse mp)
Leftover left' i -> Leftover (recurse left') i
where
recurse = goLeft rp rc
in goRight (left0 Done) (right0 Done)
await :: Monad m => Consumer i m (Maybe i)
await = ConduitT $ \f -> NeedInput (f . Just) (const $ f Nothing)
await' :: Monad m
=> ConduitT i o m r
-> (i -> ConduitT i o m r)
-> ConduitT i o m r
await' f g = ConduitT $ \rest -> NeedInput
(\i -> unConduitT (g i) rest)
(const $ unConduitT f rest)
yield :: Monad m
=> o
-> ConduitT i o m ()
yield o = ConduitT $ \rest -> HaveOutput (rest ()) o
yieldM :: Monad m => m o -> ConduitT i o m ()
yieldM mo = lift mo >>= yield
leftover :: i -> ConduitT i o m ()
leftover i = ConduitT $ \rest -> Leftover (rest ()) i
runConduit :: Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT p) = runPipe $ injectLeftovers $ p Done
bracketP :: MonadResource m
=> IO a
-> (a -> IO ())
-> (a -> ConduitT i o m r)
-> ConduitT i o m r
bracketP alloc free inside = ConduitT $ \rest -> do
(key, seed) <- allocate alloc free
unConduitT (inside seed) $ \res -> do
release key
rest res
awaitForever :: Monad m => (i -> ConduitT i o m r) -> ConduitT i o m ()
awaitForever f = ConduitT $ \rest ->
let go = NeedInput (\i -> unConduitT (f i) (const go)) rest
in go
transPipe :: Monad m => (forall a. m a -> n a) -> ConduitT i o m r -> ConduitT i o n r
transPipe f (ConduitT c0) = ConduitT $ \rest -> let
go (HaveOutput p o) = HaveOutput (go p) o
go (NeedInput p c) = NeedInput (go . p) (go . c)
go (Done r) = rest r
go (PipeM mp) =
PipeM (f $ liftM go $ collapse mp)
where
collapse mpipe = do
pipe' <- mpipe
case pipe' of
PipeM mpipe' -> collapse mpipe'
_ -> return pipe'
go (Leftover p i) = Leftover (go p) i
in go (c0 Done)
mapOutput :: Monad m => (o1 -> o2) -> ConduitT i o1 m r -> ConduitT i o2 m r
mapOutput f (ConduitT c0) = ConduitT $ \rest -> let
go (HaveOutput p o) = HaveOutput (go p) (f o)
go (NeedInput p c) = NeedInput (go . p) (go . c)
go (Done r) = rest r
go (PipeM mp) = PipeM (liftM (go) mp)
go (Leftover p i) = Leftover (go p) i
in go (c0 Done)
mapOutputMaybe :: Monad m => (o1 -> Maybe o2) -> ConduitT i o1 m r -> ConduitT i o2 m r
mapOutputMaybe f (ConduitT c0) = ConduitT $ \rest -> let
go (HaveOutput p o) = maybe id (\o' p' -> HaveOutput p' o') (f o) (go p)
go (NeedInput p c) = NeedInput (go . p) (go . c)
go (Done r) = rest r
go (PipeM mp) = PipeM (liftM (go) mp)
go (Leftover p i) = Leftover (go p) i
in go (c0 Done)
mapInput :: Monad m
=> (i1 -> i2)
-> (i2 -> Maybe i1)
-> ConduitT i2 o m r
-> ConduitT i1 o m r
mapInput f f' (ConduitT c0) = ConduitT $ \rest -> let
go (HaveOutput p o) = HaveOutput (go p) o
go (NeedInput p c) = NeedInput (go . p . f) (go . c)
go (Done r) = rest r
go (PipeM mp) = PipeM $ liftM go mp
go (Leftover p i) = maybe id (flip Leftover) (f' i) (go p)
in go (c0 Done)
($$+) :: Monad m => Source m a -> Sink a m b -> m (SealedConduitT () a m (), b)
src $$+ sink = connectResume (sealConduitT src) sink
($$++) :: Monad m => SealedConduitT () a m () -> Sink a m b -> m (SealedConduitT () a m (), b)
($$++) = connectResume
($$+-) :: Monad m => SealedConduitT () a m () -> Sink a m b -> m b
rsrc $$+- sink = do
(_, res) <- connectResume rsrc sink
return res
($=+) :: Monad m => SealedConduitT () a m () -> Conduit a m b -> SealedConduitT () b m ()
SealedConduitT src $=+ ConduitT sink = SealedConduitT (src `pipeL` sink Done)
data Flush a = Chunk a | Flush
deriving (Show, Eq, Ord)
instance Functor Flush where
fmap _ Flush = Flush
fmap f (Chunk a) = Chunk (f a)
newtype ZipSource m o = ZipSource { getZipSource :: Source m o }
instance Monad m => Functor (ZipSource m) where
fmap f = ZipSource . mapOutput f . getZipSource
instance Monad m => Applicative (ZipSource m) where
pure = ZipSource . forever . yield
(ZipSource f) <*> (ZipSource x) = ZipSource $ zipSourcesApp f x
sequenceSources :: (Traversable f, Monad m) => f (Source m o) -> Source m (f o)
sequenceSources = getZipSource . sequenceA . fmap ZipSource
newtype ZipSink i m r = ZipSink { getZipSink :: Sink i m r }
instance Monad m => Functor (ZipSink i m) where
fmap f (ZipSink x) = ZipSink (liftM f x)
instance Monad m => Applicative (ZipSink i m) where
pure = ZipSink . return
(ZipSink f) <*> (ZipSink x) =
ZipSink $ liftM (uncurry ($)) $ zipSinks f x
sequenceSinks :: (Traversable f, Monad m) => f (Sink i m r) -> Sink i m (f r)
sequenceSinks = getZipSink . sequenceA . fmap ZipSink
(=$$+) :: Monad m
=> ConduitT a b m ()
-> ConduitT b Void m r
-> ConduitT a Void m (SealedConduitT a b m (), r)
(=$$+) conduit = connectResumeConduit (sealConduitT conduit)
(=$$++) :: Monad m => SealedConduitT i o m () -> ConduitT o Void m r -> ConduitT i Void m (SealedConduitT i o m (), r)
(=$$++) = connectResumeConduit
(=$$+-) :: Monad m => SealedConduitT i o m () -> ConduitT o Void m r -> ConduitT i Void m r
rsrc =$$+- sink = do
(_, res) <- connectResumeConduit rsrc sink
return res
infixr 0 =$$+
infixr 0 =$$++
infixr 0 =$$+-
newtype ZipConduit i o m r = ZipConduit { getZipConduit :: ConduitT i o m r }
deriving Functor
instance Monad m => Applicative (ZipConduit i o m) where
pure = ZipConduit . pure
ZipConduit left <*> ZipConduit right = ZipConduit (zipConduitApp left right)
sequenceConduits :: (Traversable f, Monad m) => f (ConduitT i o m r) -> ConduitT i o m (f r)
sequenceConduits = getZipConduit . sequenceA . fmap ZipConduit
fuseBoth :: Monad m => ConduitT a b m r1 -> ConduitT b c m r2 -> ConduitT a c m (r1, r2)
fuseBoth (ConduitT up) (ConduitT down) =
ConduitT (pipeL (up Done) (withUpstream $ generalizeUpstream $ down Done) >>=)
fuseBothMaybe
:: Monad m
=> ConduitT a b m r1
-> ConduitT b c m r2
-> ConduitT a c m (Maybe r1, r2)
fuseBothMaybe (ConduitT up) (ConduitT down) =
ConduitT (pipeL (up Done) (go Nothing $ down Done) >>=)
where
go mup (Done r) = Done (mup, r)
go mup (PipeM mp) = PipeM $ liftM (go mup) mp
go mup (HaveOutput p o) = HaveOutput (go mup p) o
go _ (NeedInput p c) = NeedInput
(\i -> go Nothing (p i))
(\u -> go (Just u) (c ()))
go mup (Leftover p i) = Leftover (go mup p) i
fuseUpstream :: Monad m => ConduitT a b m r -> Conduit b m c -> ConduitT a c m r
fuseUpstream up down = fmap fst (fuseBoth up down)