module Data.Conduit.Util.Conduit
( haveMore
, conduitState
, ConduitStateResult (..)
, conduitIO
, ConduitIOResult (..)
, transConduit
, conduitClose
, SequencedSink
, sequenceSink
, sequence
, SequencedSinkResponse (..)
) where
import Prelude hiding (sequence)
import Control.Monad.Trans.Resource
import Data.Conduit.Types.Conduit
import Data.Conduit.Types.Sink
import Data.Conduit.Types.Source
import Data.Conduit.Util.Source
import Data.Conduit.Util.Sink
import Control.Monad (liftM)
import Data.Monoid (mempty)
haveMore :: Conduit a m b
-> m ()
-> [b]
-> Conduit a m b
haveMore res _ [] = res
haveMore res close (x:xs) = HaveOutput (haveMore res close xs) close x
data ConduitStateResult state input output =
StateFinished (Maybe input) [output]
| StateProducing state [output]
instance Functor (ConduitStateResult state input) where
fmap f (StateFinished a b) = StateFinished a (map f b)
fmap f (StateProducing a b) = StateProducing a (map f b)
conduitState
:: Monad m
=> state
-> (state -> input -> m (ConduitStateResult state input output))
-> (state -> m [output])
-> Conduit input m output
conduitState state0 push0 close0 =
NeedInput (push state0) (close state0)
where
push state input = ConduitM (liftM goRes' $ state `seq` push0 state input) (return ())
close state = SourceM (do
os <- close0 state
return $ fromList os) (return ())
goRes' (StateFinished leftover output) = haveMore
(Finished leftover)
(return ())
output
goRes' (StateProducing state output) = haveMore
(NeedInput (push state) (close state))
(return ())
output
data ConduitIOResult input output =
IOFinished (Maybe input) [output]
| IOProducing [output]
instance Functor (ConduitIOResult input) where
fmap f (IOFinished a b) = IOFinished a (map f b)
fmap f (IOProducing b) = IOProducing (map f b)
conduitIO :: MonadResource m
=> IO state
-> (state -> IO ())
-> (state -> input -> m (ConduitIOResult input output))
-> (state -> m [output])
-> Conduit input m output
conduitIO alloc cleanup push0 close0 = NeedInput
(\input -> flip ConduitM (return ()) $ do
(key, state) <- allocate alloc cleanup
push key state input)
(SourceM (do
(key, state) <- allocate alloc cleanup
os <- close0 state
release key
return $ fromList os) (return ()))
where
push key state input = do
res <- push0 state input
case res of
IOProducing output -> return $ haveMore
(NeedInput (flip ConduitM (release key) . push key state) (close key state))
(release key >> return ())
output
IOFinished leftover output -> do
release key
return $ haveMore
(Finished leftover)
(return ())
output
close key state = SourceM (do
output <- close0 state
release key
return $ fromList output) (release key)
fromList :: Monad m => [a] -> Source m a
fromList [] = Closed
fromList (x:xs) = Open (fromList xs) (return ()) x
transConduit :: Monad m
=> (forall a. m a -> n a)
-> Conduit input m output
-> Conduit input n output
transConduit _ (Finished a) = Finished a
transConduit f (NeedInput push close) = NeedInput
(transConduit f . push)
(transSource f close)
transConduit f (HaveOutput pull close output) = HaveOutput
(transConduit f pull)
(f close)
output
transConduit f (ConduitM mcon close) = ConduitM (f (liftM (transConduit f) mcon)) (f close)
data SequencedSinkResponse state input m output =
Emit state [output]
| Stop
| StartConduit (Conduit input m output)
type SequencedSink state input m output =
state -> Sink input m (SequencedSinkResponse state input m output)
sequenceSink
:: Monad m
=> state
-> SequencedSink state input m output
-> Conduit input m output
sequenceSink state0 fsink = NeedInput (scPush fsink $ fsink state0) mempty
scPush :: Monad m
=> SequencedSink state input m output
-> Sink input m (SequencedSinkResponse state input m output)
-> ConduitPush input m output
scPush fsink (Processing pushI _) input = scGoRes fsink $ pushI input
scPush fsink (Done Nothing res) input = scGoRes fsink (Done (Just input) res)
scPush _ (Done Just{} _) _ = error "Invariant violated: Sink returned leftover without input"
scPush fsink (SinkM msink) input = ConduitM (liftM (\sink -> scPush fsink sink input) msink) (msink >>= sinkClose)
scGoRes :: Monad m
=> SequencedSink state input m output
-> Sink input m (SequencedSinkResponse state input m output)
-> Conduit input m output
scGoRes fsink (Done (Just leftover) (Emit state os)) = haveMore
(scPush fsink (fsink state) leftover)
(return ())
os
scGoRes fsink (Done Nothing (Emit state os)) = haveMore
(NeedInput p c)
(return ())
os
where
NeedInput p c = sequenceSink state fsink
scGoRes fsink (Processing pushI closeI) = NeedInput
(scPush fsink (Processing pushI closeI))
(SourceM (closeI >>= goRes) (closeI >> return ()))
where
goRes (Emit _ os) = return $ fromList os
goRes Stop = return Closed
goRes (StartConduit (NeedInput _ closeC)) = return closeC
goRes (StartConduit (Finished _)) = return Closed
goRes (StartConduit (ConduitM mcon _)) = mcon >>= goRes . StartConduit
goRes (StartConduit HaveOutput{}) = error "scGoRes:goRes: StartConduit HaveOutput not supported yet"
scGoRes _ (Done mleftover Stop) = Finished mleftover
scGoRes _ (Done Nothing (StartConduit c)) = c
scGoRes _ (Done (Just leftover) (StartConduit (Finished Nothing))) = Finished (Just leftover)
scGoRes _ (Done Just{} (StartConduit (Finished Just{}))) = error "Invariant violated: conduit returns leftover without push"
scGoRes _ (Done (Just leftover) (StartConduit (NeedInput p _))) = p leftover
scGoRes _ (Done Just{} (StartConduit HaveOutput{})) = error "scGoRes: StartConduit HaveOutput not supported yet"
scGoRes fsink (Done mleftover (StartConduit (ConduitM mcon close))) =
ConduitM (liftM (scGoRes fsink . Done mleftover . StartConduit) mcon) close
scGoRes fsink (SinkM msink) = ConduitM (liftM (scGoRes fsink) msink) (msink >>= sinkClose)
sequence :: Monad m => Sink input m output -> Conduit input m output
sequence (Processing spush0 sclose0) =
NeedInput (push spush0) (close sclose0)
where
push spush input = goRes $ spush input
goRes res =
case res of
Processing spush'' sclose'' ->
NeedInput (push spush'') (close sclose'')
Done Nothing output -> HaveOutput
(NeedInput (push spush0) (close sclose0))
(return ())
output
Done (Just input') output -> HaveOutput
(goRes $ spush0 input')
(return ())
output
SinkM msink -> ConduitM (liftM goRes msink) (msink >>= sinkClose)
close sclose = SourceM (do
output <- sclose
return $ Open Closed (return ()) output) (return ())
sequence (Done Nothing output) = NeedInput
(\_input ->
let x = HaveOutput x (return ()) output
in x)
( let src = Open src (return ()) output
in src)
sequence (Done Just{} _) = error "Invariant violated: sink returns leftover without push"
sequence (SinkM msink) = ConduitM (liftM sequence msink) (msink >>= sinkClose)
conduitClose :: Monad m => Conduit input m output -> m ()
conduitClose (NeedInput _ c) = sourceClose c
conduitClose Finished{} = return ()
conduitClose (HaveOutput _ c _) = c
conduitClose (ConduitM _ c) = c