module Data.Conduit.Util.Conduit
( haveMore
, conduitState
, ConduitStateResult (..)
, conduitIO
, ConduitIOResult (..)
, SequencedSink
, sequenceSink
, SequencedSinkResponse (..)
) where
import Prelude hiding (sequence)
import Control.Monad.Trans.Resource
import Data.Conduit.Internal hiding (leftover)
import Control.Monad (liftM)
haveMore :: Conduit a m b
-> m ()
-> [b]
-> Conduit a m b
haveMore res _ [] = res
haveMore res close (x:xs) = HaveOutput (haveMore res close xs) close x
data ConduitStateResult state input output =
StateFinished (Maybe input) [output]
| StateProducing state [output]
instance Functor (ConduitStateResult state input) where
fmap f (StateFinished a b) = StateFinished a (map f b)
fmap f (StateProducing a b) = StateProducing a (map f b)
conduitState
:: Monad m
=> state
-> (state -> input -> m (ConduitStateResult state input output))
-> (state -> m [output])
-> Conduit input m output
conduitState state0 push0 close0 =
NeedInput (push state0) (\() -> close state0)
where
push state input = PipeM (liftM goRes' $ state `seq` push0 state input)
close state = PipeM (do
os <- close0 state
return $ sourceList os)
goRes' (StateFinished leftover output) = maybe id pipePush leftover $ haveMore
(Done ())
(return ())
output
goRes' (StateProducing state output) = haveMore
(NeedInput (push state) (\() -> close state))
(return ())
output
data ConduitIOResult input output =
IOFinished (Maybe input) [output]
| IOProducing [output]
instance Functor (ConduitIOResult input) where
fmap f (IOFinished a b) = IOFinished a (map f b)
fmap f (IOProducing b) = IOProducing (map f b)
conduitIO :: MonadResource m
=> IO state
-> (state -> IO ())
-> (state -> input -> m (ConduitIOResult input output))
-> (state -> m [output])
-> Conduit input m output
conduitIO alloc cleanup push0 close0 = NeedInput
(\input -> PipeM $ do
(key, state) <- allocate alloc cleanup
push key state input)
(\() -> PipeM $ do
(key, state) <- allocate alloc cleanup
os <- close0 state
release key
return $ sourceList os)
where
push key state input = do
res <- push0 state input
case res of
IOProducing output -> return $ haveMore
(NeedInput (PipeM . push key state) (\() -> close key state))
(release key)
output
IOFinished leftover output -> do
release key
return $ maybe id pipePush leftover $ haveMore
(Done ())
(return ())
output
close key state = PipeM $ do
output <- close0 state
release key
return $ sourceList output
data SequencedSinkResponse state input m output =
Emit state [output]
| Stop
| StartConduit (Conduit input m output)
type SequencedSink state input m output =
state -> Sink input m (SequencedSinkResponse state input m output)
sequenceSink
:: Monad m
=> state
-> SequencedSink state input m output
-> Conduit input m output
sequenceSink state0 fsink = do
x <- hasInput
if x
then do
res <- sinkToPipe $ fsink state0
case res of
Emit state os -> do
sourceList os
sequenceSink state fsink
Stop -> return ()
StartConduit c -> c
else return ()
pipePush :: Monad m => i -> Pipe i i o u m r -> Pipe i i o u m r
pipePush i (HaveOutput p c o) = HaveOutput (pipePush i p) c o
pipePush i (NeedInput p _) =
case p i of
Leftover p' i' -> pipePush i' p'
p' -> p'
pipePush i (Done r) = Leftover (Done r) i
pipePush i (PipeM mp) = PipeM (pipePush i `liftM` mp)
pipePush i (Leftover p i') =
case pipePush i' p of
Leftover p'' i'' -> Leftover (Leftover p'' i'') i
p' -> pipePush i p'
hasInput :: Pipe i i o u m Bool
hasInput = NeedInput (Leftover (Done True)) (const $ Done False)