{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE Trustworthy #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Data.Conduit.Internal.Fusion
(
Step (..)
, Stream (..)
, ConduitWithStream
, StreamConduitT
, StreamConduit
, StreamSource
, StreamProducer
, StreamSink
, StreamConsumer
, streamConduit
, streamSource
, streamSourcePure
, unstream
) where
import Data.Conduit.Internal.Conduit
import Data.Conduit.Internal.Pipe (Pipe (..))
import Data.Functor.Identity (Identity (runIdentity))
import Data.Void (Void, absurd)
import Control.Monad.Trans.Resource (runResourceT)
data Step s o r
= Emit s o
| Skip s
| Stop r
deriving forall a b. a -> Step s o b -> Step s o a
forall a b. (a -> b) -> Step s o a -> Step s o b
forall s o a b. a -> Step s o b -> Step s o a
forall s o a b. (a -> b) -> Step s o a -> Step s o b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: forall a b. a -> Step s o b -> Step s o a
$c<$ :: forall s o a b. a -> Step s o b -> Step s o a
fmap :: forall a b. (a -> b) -> Step s o a -> Step s o b
$cfmap :: forall s o a b. (a -> b) -> Step s o a -> Step s o b
Functor
data Stream m o r = forall s. Stream
(s -> m (Step s o r))
(m s)
data ConduitWithStream i o m r = ConduitWithStream
(ConduitT i o m r)
(StreamConduitT i o m r)
type StreamConduitT i o m r = Stream m i () -> Stream m o r
type StreamConduit i m o = StreamConduitT i o m ()
type StreamSource m o = StreamConduitT () o m ()
type StreamProducer m o = forall i. StreamConduitT i o m ()
type StreamSink i m r = StreamConduitT i Void m r
type StreamConsumer i m r = forall o. StreamConduitT i o m r
unstream :: ConduitWithStream i o m r -> ConduitT i o m r
unstream :: forall i o (m :: * -> *) r.
ConduitWithStream i o m r -> ConduitT i o m r
unstream (ConduitWithStream ConduitT i o m r
c StreamConduitT i o m r
_) = ConduitT i o m r
c
{-# INLINE [0] unstream #-}
fuseStream :: Monad m
=> ConduitWithStream a b m ()
-> ConduitWithStream b c m r
-> ConduitWithStream a c m r
fuseStream :: forall (m :: * -> *) a b c r.
Monad m =>
ConduitWithStream a b m ()
-> ConduitWithStream b c m r -> ConduitWithStream a c m r
fuseStream (ConduitWithStream ConduitT a b m ()
a StreamConduitT a b m ()
x) (ConduitWithStream ConduitT b c m r
b StreamConduitT b c m r
y) =
forall i o (m :: * -> *) r.
ConduitT i o m r
-> StreamConduitT i o m r -> ConduitWithStream i o m r
ConduitWithStream (ConduitT a b m ()
a forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT b c m r
b) (StreamConduitT b c m r
y forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamConduitT a b m ()
x)
{-# INLINE fuseStream #-}
{-# RULES "conduit: fuseStream (.|)" forall left right.
unstream left .| unstream right = unstream (fuseStream left right)
#-}
{-# RULES "conduit: fuseStream (fuse)" forall left right.
fuse (unstream left) (unstream right) = unstream (fuseStream left right)
#-}
{-# RULES "conduit: fuseStream (=$=)" forall left right.
unstream left =$= unstream right = unstream (fuseStream left right)
#-}
runStream :: Monad m
=> ConduitWithStream () Void m r
-> m r
runStream :: forall (m :: * -> *) r.
Monad m =>
ConduitWithStream () Void m r -> m r
runStream (ConduitWithStream ConduitT () Void m r
_ StreamConduitT () Void m r
f) =
forall {m :: * -> *} {r}. Monad m => Stream m Void r -> m r
run forall a b. (a -> b) -> a -> b
$ StreamConduitT () Void m r
f forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream forall {m :: * -> *} {p} {s} {o}. Monad m => p -> m (Step s o ())
emptyStep (forall (m :: * -> *) a. Monad m => a -> m a
return ())
where
emptyStep :: p -> m (Step s o ())
emptyStep p
_ = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. r -> Step s o r
Stop ()
run :: Stream m Void r -> m r
run (Stream s -> m (Step s Void r)
step m s
ms0) =
m s
ms0 forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= s -> m r
loop
where
loop :: s -> m r
loop s
s = do
Step s Void r
res <- s -> m (Step s Void r)
step s
s
case Step s Void r
res of
Stop r
r -> forall (m :: * -> *) a. Monad m => a -> m a
return r
r
Skip s
s' -> s -> m r
loop s
s'
Emit s
_ Void
o -> forall a. Void -> a
absurd Void
o
{-# INLINE runStream #-}
{-# RULES "conduit: runStream" forall stream.
runConduit (unstream stream) = runStream stream
#-}
{-# RULES "conduit: runStream (pure)" forall stream.
runConduitPure (unstream stream) = runIdentity (runStream stream)
#-}
{-# RULES "conduit: runStream (ResourceT)" forall stream.
runConduitRes (unstream stream) = runResourceT (runStream stream)
#-}
connectStream :: Monad m
=> ConduitWithStream () i m ()
-> ConduitWithStream i Void m r
-> m r
connectStream :: forall (m :: * -> *) i r.
Monad m =>
ConduitWithStream () i m () -> ConduitWithStream i Void m r -> m r
connectStream (ConduitWithStream ConduitT () i m ()
_ StreamConduitT () i m ()
stream) (ConduitWithStream ConduitT i Void m r
_ StreamConduitT i Void m r
f) =
forall {m :: * -> *} {r}. Monad m => Stream m Void r -> m r
run forall a b. (a -> b) -> a -> b
$ StreamConduitT i Void m r
f forall a b. (a -> b) -> a -> b
$ StreamConduitT () i m ()
stream forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream forall {m :: * -> *} {p} {s} {o}. Monad m => p -> m (Step s o ())
emptyStep (forall (m :: * -> *) a. Monad m => a -> m a
return ())
where
emptyStep :: p -> m (Step s o ())
emptyStep p
_ = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. r -> Step s o r
Stop ()
run :: Stream m Void r -> m r
run (Stream s -> m (Step s Void r)
step m s
ms0) =
m s
ms0 forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= s -> m r
loop
where
loop :: s -> m r
loop s
s = do
Step s Void r
res <- s -> m (Step s Void r)
step s
s
case Step s Void r
res of
Stop r
r -> forall (m :: * -> *) a. Monad m => a -> m a
return r
r
Skip s
s' -> s -> m r
loop s
s'
Emit s
_ Void
o -> forall a. Void -> a
absurd Void
o
{-# INLINE connectStream #-}
{-# RULES "conduit: connectStream ($$)" forall left right.
unstream left $$ unstream right = connectStream left right
#-}
connectStream1 :: Monad m
=> ConduitWithStream () i m ()
-> ConduitT i Void m r
-> m r
connectStream1 :: forall (m :: * -> *) i r.
Monad m =>
ConduitWithStream () i m () -> ConduitT i Void m r -> m r
connectStream1 (ConduitWithStream ConduitT () i m ()
_ StreamConduitT () i m ()
fstream) (ConduitT forall b. (r -> Pipe i i Void () m b) -> Pipe i i Void () m b
sink0) =
case StreamConduitT () i m ()
fstream forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream (forall a b. a -> b -> a
const forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. r -> Step s o r
Stop ()) (forall (m :: * -> *) a. Monad m => a -> m a
return ()) of
Stream s -> m (Step s i ())
step m s
ms0 ->
let loop :: [i] -> Pipe i i Void () m a -> s -> m a
loop [i]
_ (Done a
r) s
_ = forall (m :: * -> *) a. Monad m => a -> m a
return a
r
loop [i]
ls (PipeM m (Pipe i i Void () m a)
mp) s
s = m (Pipe i i Void () m a)
mp forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a b c. (a -> b -> c) -> b -> a -> c
flip ([i] -> Pipe i i Void () m a -> s -> m a
loop [i]
ls) s
s
loop [i]
ls (Leftover Pipe i i Void () m a
p i
l) s
s = [i] -> Pipe i i Void () m a -> s -> m a
loop (i
lforall a. a -> [a] -> [a]
:[i]
ls) Pipe i i Void () m a
p s
s
loop [i]
_ (HaveOutput Pipe i i Void () m a
_ Void
o) s
_ = forall a. Void -> a
absurd Void
o
loop (i
l:[i]
ls) (NeedInput i -> Pipe i i Void () m a
p () -> Pipe i i Void () m a
_) s
s = [i] -> Pipe i i Void () m a -> s -> m a
loop [i]
ls (i -> Pipe i i Void () m a
p i
l) s
s
loop [] (NeedInput i -> Pipe i i Void () m a
p () -> Pipe i i Void () m a
c) s
s = do
Step s i ()
res <- s -> m (Step s i ())
step s
s
case Step s i ()
res of
Stop () -> [i] -> Pipe i i Void () m a -> s -> m a
loop [] (() -> Pipe i i Void () m a
c ()) s
s
Skip s
s' -> [i] -> Pipe i i Void () m a -> s -> m a
loop [] (forall l i o u (m :: * -> *) r.
(i -> Pipe l i o u m r)
-> (u -> Pipe l i o u m r) -> Pipe l i o u m r
NeedInput i -> Pipe i i Void () m a
p () -> Pipe i i Void () m a
c) s
s'
Emit s
s' i
i -> [i] -> Pipe i i Void () m a -> s -> m a
loop [] (i -> Pipe i i Void () m a
p i
i) s
s'
in m s
ms0 forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall {a}. [i] -> Pipe i i Void () m a -> s -> m a
loop [] (forall b. (r -> Pipe i i Void () m b) -> Pipe i i Void () m b
sink0 forall l i o u (m :: * -> *) r. r -> Pipe l i o u m r
Done)
{-# INLINE connectStream1 #-}
{-# RULES "conduit: connectStream1 ($$)" forall left right.
unstream left $$ right = connectStream1 left right
#-}
{-# RULES "conduit: connectStream1 (runConduit/.|)" forall left right.
runConduit (unstream left .| right) = connectStream1 left right
#-}
{-# RULES "conduit: connectStream1 (runConduit/=$=)" forall left right.
runConduit (unstream left =$= right) = connectStream1 left right
#-}
{-# RULES "conduit: connectStream1 (runConduit/fuse)" forall left right.
runConduit (fuse (unstream left) right) = connectStream1 left right
#-}
{-# RULES "conduit: connectStream1 (runConduitPure/.|)" forall left right.
runConduitPure (unstream left .| right) = runIdentity (connectStream1 left right)
#-}
{-# RULES "conduit: connectStream1 (runConduitPure/=$=)" forall left right.
runConduitPure (unstream left =$= right) = runIdentity (connectStream1 left right)
#-}
{-# RULES "conduit: connectStream1 (runConduitPure/fuse)" forall left right.
runConduitPure (fuse (unstream left) right) = runIdentity (connectStream1 left right)
#-}
{-# RULES "conduit: connectStream1 (runConduitRes/.|)" forall left right.
runConduitRes (unstream left .| right) = runResourceT (connectStream1 left right)
#-}
{-# RULES "conduit: connectStream1 (runConduitRes/=$=)" forall left right.
runConduitRes (unstream left =$= right) = runResourceT (connectStream1 left right)
#-}
{-# RULES "conduit: connectStream1 (runConduitRes/fuse)" forall left right.
runConduitRes (fuse (unstream left) right) = runResourceT (connectStream1 left right)
#-}
connectStream2 :: forall i m r. Monad m
=> ConduitT () i m ()
-> ConduitWithStream i Void m r
-> m r
connectStream2 :: forall i (m :: * -> *) r.
Monad m =>
ConduitT () i m () -> ConduitWithStream i Void m r -> m r
connectStream2 (ConduitT forall b. (() -> Pipe () () i () m b) -> Pipe () () i () m b
src0) (ConduitWithStream ConduitT i Void m r
_ StreamConduitT i Void m r
fstream) =
forall {m :: * -> *} {r}. Monad m => Stream m Void r -> m r
run forall a b. (a -> b) -> a -> b
$ StreamConduitT i Void m r
fstream forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream Pipe () () i () m () -> m (Step (Pipe () () i () m ()) i ())
step' forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return (forall b. (() -> Pipe () () i () m b) -> Pipe () () i () m b
src0 forall l i o u (m :: * -> *) r. r -> Pipe l i o u m r
Done)
where
step' :: Pipe () () i () m () -> m (Step (Pipe () () i () m ()) i ())
step' :: Pipe () () i () m () -> m (Step (Pipe () () i () m ()) i ())
step' (Done ()) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. r -> Step s o r
Stop ()
step' (HaveOutput Pipe () () i () m ()
pipe i
o) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. s -> o -> Step s o r
Emit Pipe () () i () m ()
pipe i
o
step' (NeedInput () -> Pipe () () i () m ()
_ () -> Pipe () () i () m ()
c) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. s -> Step s o r
Skip forall a b. (a -> b) -> a -> b
$ () -> Pipe () () i () m ()
c ()
step' (PipeM m (Pipe () () i () m ())
mp) = forall s o r. s -> Step s o r
Skip forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m (Pipe () () i () m ())
mp
step' (Leftover Pipe () () i () m ()
p ()) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s o r. s -> Step s o r
Skip Pipe () () i () m ()
p
{-# INLINE step' #-}
run :: Stream m Void r -> m r
run (Stream s -> m (Step s Void r)
step m s
ms0) =
m s
ms0 forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= s -> m r
loop
where
loop :: s -> m r
loop s
s = do
Step s Void r
res <- s -> m (Step s Void r)
step s
s
case Step s Void r
res of
Stop r
r -> forall (m :: * -> *) a. Monad m => a -> m a
return r
r
Emit s
_ Void
o -> forall a. Void -> a
absurd Void
o
Skip s
s' -> s -> m r
loop s
s'
{-# INLINE connectStream2 #-}
{-# RULES "conduit: connectStream2 ($$)" forall left right.
left $$ unstream right = connectStream2 left right
#-}
{-# RULES "conduit: connectStream2 (runConduit/.|)" forall left right.
runConduit (left .| unstream right) = connectStream2 left right
#-}
{-# RULES "conduit: connectStream2 (runConduit/fuse)" forall left right.
runConduit (fuse left (unstream right)) = connectStream2 left right
#-}
{-# RULES "conduit: connectStream2 (runConduit/=$=)" forall left right.
runConduit (left =$= unstream right) = connectStream2 left right
#-}
{-# RULES "conduit: connectStream2 (runConduitPure/.|)" forall left right.
runConduitPure (left .| unstream right) = runIdentity (connectStream2 left right)
#-}
{-# RULES "conduit: connectStream2 (runConduitPure/fuse)" forall left right.
runConduitPure (fuse left (unstream right)) = runIdentity (connectStream2 left right)
#-}
{-# RULES "conduit: connectStream2 (runConduitPure/=$=)" forall left right.
runConduitPure (left =$= unstream right) = runIdentity (connectStream2 left right)
#-}
{-# RULES "conduit: connectStream2 (runConduitRes/.|)" forall left right.
runConduitRes (left .| unstream right) = runResourceT (connectStream2 left right)
#-}
{-# RULES "conduit: connectStream2 (runConduitRes/fuse)" forall left right.
runConduitRes (fuse left (unstream right)) = runResourceT (connectStream2 left right)
#-}
{-# RULES "conduit: connectStream2 (runConduitRes/=$=)" forall left right.
runConduitRes (left =$= unstream right) = runResourceT (connectStream2 left right)
#-}
streamConduit :: ConduitT i o m r
-> (Stream m i () -> Stream m o r)
-> ConduitWithStream i o m r
streamConduit :: forall i o (m :: * -> *) r.
ConduitT i o m r
-> StreamConduitT i o m r -> ConduitWithStream i o m r
streamConduit = forall i o (m :: * -> *) r.
ConduitT i o m r
-> StreamConduitT i o m r -> ConduitWithStream i o m r
ConduitWithStream
{-# INLINE CONLIKE streamConduit #-}
streamSource
:: Monad m
=> Stream m o ()
-> ConduitWithStream i o m ()
streamSource :: forall (m :: * -> *) o i.
Monad m =>
Stream m o () -> ConduitWithStream i o m ()
streamSource str :: Stream m o ()
str@(Stream s -> m (Step s o ())
step m s
ms0) =
forall i o (m :: * -> *) r.
ConduitT i o m r
-> StreamConduitT i o m r -> ConduitWithStream i o m r
ConduitWithStream forall {i}. ConduitT i o m ()
con (forall a b. a -> b -> a
const Stream m o ()
str)
where
con :: ConduitT i o m ()
con = forall i o (m :: * -> *) r.
(forall b. (r -> Pipe i i o () m b) -> Pipe i i o () m b)
-> ConduitT i o m r
ConduitT forall a b. (a -> b) -> a -> b
$ \() -> Pipe i i o () m b
rest -> forall l i o u (m :: * -> *) r.
m (Pipe l i o u m r) -> Pipe l i o u m r
PipeM forall a b. (a -> b) -> a -> b
$ do
s
s0 <- m s
ms0
let loop :: s -> m (Pipe i i o () m b)
loop s
s = do
Step s o ()
res <- s -> m (Step s o ())
step s
s
case Step s o ()
res of
Stop () -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ () -> Pipe i i o () m b
rest ()
Emit s
s' o
o -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall l i o u (m :: * -> *) r.
Pipe l i o u m r -> o -> Pipe l i o u m r
HaveOutput (forall l i o u (m :: * -> *) r.
m (Pipe l i o u m r) -> Pipe l i o u m r
PipeM forall a b. (a -> b) -> a -> b
$ s -> m (Pipe i i o () m b)
loop s
s') o
o
Skip s
s' -> s -> m (Pipe i i o () m b)
loop s
s'
s -> m (Pipe i i o () m b)
loop s
s0
{-# INLINE streamSource #-}
streamSourcePure
:: Monad m
=> Stream Identity o ()
-> ConduitWithStream i o m ()
streamSourcePure :: forall (m :: * -> *) o i.
Monad m =>
Stream Identity o () -> ConduitWithStream i o m ()
streamSourcePure (Stream s -> Identity (Step s o ())
step Identity s
ms0) =
forall i o (m :: * -> *) r.
ConduitT i o m r
-> StreamConduitT i o m r -> ConduitWithStream i o m r
ConduitWithStream forall {i} {m :: * -> *}. ConduitT i o m ()
con (forall a b. a -> b -> a
const forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream (forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Identity a -> a
runIdentity forall b c a. (b -> c) -> (a -> b) -> a -> c
. s -> Identity (Step s o ())
step) (forall (m :: * -> *) a. Monad m => a -> m a
return s
s0))
where
s0 :: s
s0 = forall a. Identity a -> a
runIdentity Identity s
ms0
con :: ConduitT i o m ()
con = forall i o (m :: * -> *) r.
(forall b. (r -> Pipe i i o () m b) -> Pipe i i o () m b)
-> ConduitT i o m r
ConduitT forall a b. (a -> b) -> a -> b
$ \() -> Pipe i i o () m b
rest ->
let loop :: s -> Pipe i i o () m b
loop s
s =
case forall a. Identity a -> a
runIdentity forall a b. (a -> b) -> a -> b
$ s -> Identity (Step s o ())
step s
s of
Stop () -> () -> Pipe i i o () m b
rest ()
Emit s
s' o
o -> forall l i o u (m :: * -> *) r.
Pipe l i o u m r -> o -> Pipe l i o u m r
HaveOutput (s -> Pipe i i o () m b
loop s
s') o
o
Skip s
s' -> s -> Pipe i i o () m b
loop s
s'
in s -> Pipe i i o () m b
loop s
s0
{-# INLINE streamSourcePure #-}