module Data.Conduit.Util.Conduit
( haveMore
, conduitState
, ConduitStateResult (..)
, conduitIO
, ConduitIOResult (..)
, SequencedSink
, sequenceSink
, sequence
, SequencedSinkResponse (..)
) where
import Prelude hiding (sequence)
import Control.Monad.Trans.Resource
import Data.Conduit.Internal
import Control.Monad (liftM)
haveMore :: Conduit a m b
-> m ()
-> [b]
-> Conduit a m b
haveMore res _ [] = res
haveMore res close (x:xs) = HaveOutput (haveMore res close xs) close x
data ConduitStateResult state input output =
StateFinished (Maybe input) [output]
| StateProducing state [output]
instance Functor (ConduitStateResult state input) where
fmap f (StateFinished a b) = StateFinished a (map f b)
fmap f (StateProducing a b) = StateProducing a (map f b)
conduitState
:: Monad m
=> state
-> (state -> input -> m (ConduitStateResult state input output))
-> (state -> m [output])
-> Conduit input m output
conduitState state0 push0 close0 =
NeedInput (push state0) (close state0)
where
push state input = PipeM (liftM goRes' $ state `seq` push0 state input) (return ())
close state = PipeM (do
os <- close0 state
return $ fromList os) (return ())
goRes' (StateFinished leftover output) = haveMore
(Done leftover ())
(return ())
output
goRes' (StateProducing state output) = haveMore
(NeedInput (push state) (close state))
(return ())
output
data ConduitIOResult input output =
IOFinished (Maybe input) [output]
| IOProducing [output]
instance Functor (ConduitIOResult input) where
fmap f (IOFinished a b) = IOFinished a (map f b)
fmap f (IOProducing b) = IOProducing (map f b)
conduitIO :: MonadResource m
=> IO state
-> (state -> IO ())
-> (state -> input -> m (ConduitIOResult input output))
-> (state -> m [output])
-> Conduit input m output
conduitIO alloc cleanup push0 close0 = NeedInput
(\input -> flip PipeM (return ()) $ do
(key, state) <- allocate alloc cleanup
push key state input)
(PipeM (do
(key, state) <- allocate alloc cleanup
os <- close0 state
release key
return $ fromList os) (return ()))
where
push key state input = do
res <- push0 state input
case res of
IOProducing output -> return $ haveMore
(NeedInput (flip PipeM (release key) . push key state) (close key state))
(release key >> return ())
output
IOFinished leftover output -> do
release key
return $ haveMore
(Done leftover ())
(return ())
output
close key state = PipeM (do
output <- close0 state
release key
return $ fromList output) (release key)
fromList :: Monad m => [a] -> Pipe i a m ()
fromList [] = Done Nothing ()
fromList (x:xs) = HaveOutput (fromList xs) (return ()) x
data SequencedSinkResponse state input m output =
Emit state [output]
| Stop
| StartConduit (Conduit input m output)
type SequencedSink state input m output =
state -> Sink input m (SequencedSinkResponse state input m output)
sequenceSink
:: Monad m
=> state
-> SequencedSink state input m output
-> Conduit input m output
sequenceSink state0 fsink = do
x <- hasInput
if x
then do
res <- sinkToPipe $ fsink state0
case res of
Emit state os -> do
fromList os
sequenceSink state fsink
Stop -> return ()
StartConduit c -> c
else return ()
sequence :: Monad m => Sink input m output -> Conduit input m output
sequence sink = do
x <- hasInput
if x
then do
sinkToPipe sink >>= yield
sequence sink
else return ()