module Data.Conduit.Internal.Pipe
(
Pipe (..)
, await
, awaitE
, awaitForever
, yield
, yieldM
, leftover
, bracketP
, idP
, pipe
, pipeL
, runPipe
, injectLeftovers
, (>+>)
, (<+<)
, catchP
, handleP
, tryP
, transPipe
, mapOutput
, mapOutputMaybe
, mapInput
, sourceList
, withUpstream
, Data.Conduit.Internal.Pipe.enumFromTo
, generalizeUpstream
) where
import Control.Applicative (Applicative (..))
import Control.Monad ((>=>), liftM, ap)
import Control.Monad.Error.Class(MonadError(..))
import Control.Monad.Reader.Class(MonadReader(..))
import Control.Monad.RWS.Class(MonadRWS())
import Control.Monad.Writer.Class(MonadWriter(..))
import Control.Monad.State.Class(MonadState(..))
import Control.Monad.Trans.Class (MonadTrans (lift))
import Control.Monad.IO.Unlift (MonadIO (liftIO), MonadUnliftIO, withRunInIO)
import Control.Monad.Primitive (PrimMonad, PrimState, primitive)
import Data.Void (Void, absurd)
import Data.Monoid (Monoid (mappend, mempty))
import Data.Semigroup (Semigroup ((<>)))
import Control.Monad.Trans.Resource
import qualified GHC.Exts
import qualified Control.Exception as E
data Pipe l i o u m r =
HaveOutput (Pipe l i o u m r) o
| NeedInput (i -> Pipe l i o u m r) (u -> Pipe l i o u m r)
| Done r
| PipeM (m (Pipe l i o u m r))
| Leftover (Pipe l i o u m r) l
instance Monad m => Functor (Pipe l i o u m) where
fmap = liftM
instance Monad m => Applicative (Pipe l i o u m) where
pure = Done
(<*>) = ap
instance Monad m => Monad (Pipe l i o u m) where
return = pure
HaveOutput p o >>= fp = HaveOutput (p >>= fp) o
NeedInput p c >>= fp = NeedInput (p >=> fp) (c >=> fp)
Done x >>= fp = fp x
PipeM mp >>= fp = PipeM ((>>= fp) `liftM` mp)
Leftover p i >>= fp = Leftover (p >>= fp) i
instance MonadTrans (Pipe l i o u) where
lift mr = PipeM (Done `liftM` mr)
instance MonadIO m => MonadIO (Pipe l i o u m) where
liftIO = lift . liftIO
instance MonadThrow m => MonadThrow (Pipe l i o u m) where
throwM = lift . throwM
instance Monad m => Semigroup (Pipe l i o u m ()) where
(<>) = (>>)
instance Monad m => Monoid (Pipe l i o u m ()) where
mempty = return ()
#if !(MIN_VERSION_base(4,11,0))
mappend = (<>)
#endif
instance PrimMonad m => PrimMonad (Pipe l i o u m) where
type PrimState (Pipe l i o u m) = PrimState m
primitive = lift . primitive
instance MonadResource m => MonadResource (Pipe l i o u m) where
liftResourceT = lift . liftResourceT
instance MonadReader r m => MonadReader r (Pipe l i o u m) where
ask = lift ask
local f (HaveOutput p o) = HaveOutput (local f p) o
local f (NeedInput p c) = NeedInput (\i -> local f (p i)) (\u -> local f (c u))
local _ (Done x) = Done x
local f (PipeM mp) = PipeM (liftM (local f) $ local f mp)
local f (Leftover p i) = Leftover (local f p) i
#ifndef MIN_VERSION_mtl
#define MIN_VERSION_mtl(x, y, z) 0
#endif
instance MonadWriter w m => MonadWriter w (Pipe l i o u m) where
#if MIN_VERSION_mtl(2, 1, 0)
writer = lift . writer
#endif
tell = lift . tell
listen (HaveOutput p o) = HaveOutput (listen p) o
listen (NeedInput p c) = NeedInput (\i -> listen (p i)) (\u -> listen (c u))
listen (Done x) = Done (x,mempty)
listen (PipeM mp) =
PipeM $
do (p,w) <- listen mp
return $ do (x,w') <- listen p
return (x, w `mappend` w')
listen (Leftover p i) = Leftover (listen p) i
pass (HaveOutput p o) = HaveOutput (pass p) o
pass (NeedInput p c) = NeedInput (\i -> pass (p i)) (\u -> pass (c u))
pass (PipeM mp) = PipeM $ mp >>= (return . pass)
pass (Done (x,_)) = Done x
pass (Leftover p i) = Leftover (pass p) i
instance MonadState s m => MonadState s (Pipe l i o u m) where
get = lift get
put = lift . put
#if MIN_VERSION_mtl(2, 1, 0)
state = lift . state
#endif
instance MonadRWS r w s m => MonadRWS r w s (Pipe l i o u m)
instance MonadError e m => MonadError e (Pipe l i o u m) where
throwError = lift . throwError
catchError (HaveOutput p o) f = HaveOutput (catchError p f) o
catchError (NeedInput p c) f = NeedInput (\i -> catchError (p i) f) (\u -> catchError (c u) f)
catchError (Done x) _ = Done x
catchError (PipeM mp) f =
PipeM $ catchError (liftM (flip catchError f) mp) (\e -> return (f e))
catchError (Leftover p i) f = Leftover (catchError p f) i
await :: Pipe l i o u m (Maybe i)
await = NeedInput (Done . Just) (\_ -> Done Nothing)
awaitE :: Pipe l i o u m (Either u i)
awaitE = NeedInput (Done . Right) (Done . Left)
awaitForever :: Monad m => (i -> Pipe l i o r m r') -> Pipe l i o r m r
awaitForever inner =
self
where
self = awaitE >>= either return (\i -> inner i >> self)
yield :: Monad m
=> o
-> Pipe l i o u m ()
yield = HaveOutput (Done ())
yieldM :: Monad m => m o -> Pipe l i o u m ()
yieldM = PipeM . liftM (HaveOutput (Done ()))
leftover :: l -> Pipe l i o u m ()
leftover = Leftover (Done ())
bracketP :: MonadResource m
=> IO a
-> (a -> IO ())
-> (a -> Pipe l i o u m r)
-> Pipe l i o u m r
bracketP alloc free inside = do
(key, seed) <- allocate alloc free
res <- inside seed
release key
return res
idP :: Monad m => Pipe l a a r m r
idP = NeedInput (HaveOutput idP) Done
pipe :: Monad m => Pipe l a b r0 m r1 -> Pipe Void b c r1 m r2 -> Pipe l a c r0 m r2
pipe =
goRight
where
goRight left right =
case right of
HaveOutput p o -> HaveOutput (recurse p) o
NeedInput rp rc -> goLeft rp rc left
Done r2 -> Done r2
PipeM mp -> PipeM (liftM recurse mp)
Leftover _ i -> absurd i
where
recurse = goRight left
goLeft rp rc left =
case left of
HaveOutput left' o -> goRight left' (rp o)
NeedInput left' lc -> NeedInput (recurse . left') (recurse . lc)
Done r1 -> goRight (Done r1) (rc r1)
PipeM mp -> PipeM (liftM recurse mp)
Leftover left' i -> Leftover (recurse left') i
where
recurse = goLeft rp rc
pipeL :: Monad m => Pipe l a b r0 m r1 -> Pipe b b c r1 m r2 -> Pipe l a c r0 m r2
pipeL =
goRight
where
goRight left right =
case right of
HaveOutput p o -> HaveOutput (recurse p) o
NeedInput rp rc -> goLeft rp rc left
Done r2 -> Done r2
PipeM mp -> PipeM (liftM recurse mp)
Leftover right' i -> goRight (HaveOutput left i) right'
where
recurse = goRight left
goLeft rp rc left =
case left of
HaveOutput left' o -> goRight left' (rp o)
NeedInput left' lc -> NeedInput (recurse . left') (recurse . lc)
Done r1 -> goRight (Done r1) (rc r1)
PipeM mp -> PipeM (liftM recurse mp)
Leftover left' i -> Leftover (recurse left') i
where
recurse = goLeft rp rc
runPipe :: Monad m => Pipe Void () Void () m r -> m r
runPipe (HaveOutput _ o) = absurd o
runPipe (NeedInput _ c) = runPipe (c ())
runPipe (Done r) = return r
runPipe (PipeM mp) = mp >>= runPipe
runPipe (Leftover _ i) = absurd i
injectLeftovers :: Monad m => Pipe i i o u m r -> Pipe l i o u m r
injectLeftovers =
go []
where
go ls (HaveOutput p o) = HaveOutput (go ls p) o
go (l:ls) (NeedInput p _) = go ls $ p l
go [] (NeedInput p c) = NeedInput (go [] . p) (go [] . c)
go _ (Done r) = Done r
go ls (PipeM mp) = PipeM (liftM (go ls) mp)
go ls (Leftover p l) = go (l:ls) p
transPipe :: Monad m => (forall a. m a -> n a) -> Pipe l i o u m r -> Pipe l i o u n r
transPipe f (HaveOutput p o) = HaveOutput (transPipe f p) o
transPipe f (NeedInput p c) = NeedInput (transPipe f . p) (transPipe f . c)
transPipe _ (Done r) = Done r
transPipe f (PipeM mp) =
PipeM (f $ liftM (transPipe f) $ collapse mp)
where
collapse mpipe = do
pipe' <- mpipe
case pipe' of
PipeM mpipe' -> collapse mpipe'
_ -> return pipe'
transPipe f (Leftover p i) = Leftover (transPipe f p) i
mapOutput :: Monad m => (o1 -> o2) -> Pipe l i o1 u m r -> Pipe l i o2 u m r
mapOutput f =
go
where
go (HaveOutput p o) = HaveOutput (go p) (f o)
go (NeedInput p c) = NeedInput (go . p) (go . c)
go (Done r) = Done r
go (PipeM mp) = PipeM (liftM (go) mp)
go (Leftover p i) = Leftover (go p) i
mapOutputMaybe :: Monad m => (o1 -> Maybe o2) -> Pipe l i o1 u m r -> Pipe l i o2 u m r
mapOutputMaybe f =
go
where
go (HaveOutput p o) = maybe id (\o' p' -> HaveOutput p' o') (f o) (go p)
go (NeedInput p c) = NeedInput (go . p) (go . c)
go (Done r) = Done r
go (PipeM mp) = PipeM (liftM (go) mp)
go (Leftover p i) = Leftover (go p) i
mapInput :: Monad m
=> (i1 -> i2)
-> (l2 -> Maybe l1)
-> Pipe l2 i2 o u m r
-> Pipe l1 i1 o u m r
mapInput f f' (HaveOutput p o) = HaveOutput (mapInput f f' p) o
mapInput f f' (NeedInput p c) = NeedInput (mapInput f f' . p . f) (mapInput f f' . c)
mapInput _ _ (Done r) = Done r
mapInput f f' (PipeM mp) = PipeM (liftM (mapInput f f') mp)
mapInput f f' (Leftover p i) = maybe id (flip Leftover) (f' i) $ mapInput f f' p
enumFromTo :: (Enum o, Eq o, Monad m)
=> o
-> o
-> Pipe l i o u m ()
enumFromTo start stop =
loop start
where
loop i
| i == stop = HaveOutput (Done ()) i
| otherwise = HaveOutput (loop (succ i)) i
sourceList :: Monad m => [a] -> Pipe l i a u m ()
sourceList =
go
where
go [] = Done ()
go (o:os) = HaveOutput (go os) o
build :: Monad m => (forall b. (o -> b -> b) -> b -> b) -> Pipe l i o u m ()
build g = g (\o p -> HaveOutput p o) (return ())
withUpstream :: Monad m
=> Pipe l i o u m r
-> Pipe l i o u m (u, r)
withUpstream down =
down >>= go
where
go r =
loop
where
loop = awaitE >>= either (\u -> return (u, r)) (\_ -> loop)
infixr 9 <+<
infixl 9 >+>
(>+>) :: Monad m => Pipe l a b r0 m r1 -> Pipe Void b c r1 m r2 -> Pipe l a c r0 m r2
(>+>) = pipe
(<+<) :: Monad m => Pipe Void b c r1 m r2 -> Pipe l a b r0 m r1 -> Pipe l a c r0 m r2
(<+<) = flip pipe
catchP :: (MonadUnliftIO m, E.Exception e)
=> Pipe l i o u m r
-> (e -> Pipe l i o u m r)
-> Pipe l i o u m r
catchP p0 onErr =
go p0
where
go (Done r) = Done r
go (PipeM mp) = PipeM $ withRunInIO $ \run ->
E.catch (run (liftM go mp)) (return . onErr)
go (Leftover p i) = Leftover (go p) i
go (NeedInput x y) = NeedInput (go . x) (go . y)
go (HaveOutput p o) = HaveOutput (go p) o
handleP :: (MonadUnliftIO m, E.Exception e)
=> (e -> Pipe l i o u m r)
-> Pipe l i o u m r
-> Pipe l i o u m r
handleP = flip catchP
tryP :: (MonadUnliftIO m, E.Exception e)
=> Pipe l i o u m r
-> Pipe l i o u m (Either e r)
tryP p = (fmap Right p) `catchP` (return . Left)
generalizeUpstream :: Monad m => Pipe l i o () m r -> Pipe l i o u m r
generalizeUpstream =
go
where
go (HaveOutput p o) = HaveOutput (go p) o
go (NeedInput x y) = NeedInput (go . x) (\_ -> go (y ()))
go (Done r) = Done r
go (PipeM mp) = PipeM (liftM go mp)
go (Leftover p l) = Leftover (go p) l