module Data.Conduit
(
module Data.Conduit.Types.Source
, BufferedSource
, bufferSource
, unbufferSource
, bsourceClose
, IsSource
, module Data.Conduit.Types.Sink
, module Data.Conduit.Types.Conduit
,
($$)
, ($=)
, (=$)
, (=$=)
, module Data.Conduit.Util.Source
, module Data.Conduit.Util.Sink
, module Data.Conduit.Util.Conduit
, Flush (..)
, ResourceT
, MonadResource
, MonadThrow (..)
, MonadUnsafeIO (..)
, runResourceT
) where
import Control.Monad (liftM)
import Control.Monad.Trans.Resource
import Control.Monad.IO.Class (MonadIO (liftIO))
import qualified Data.IORef as I
import Data.Conduit.Types.Source
import Data.Conduit.Util.Source
import Data.Conduit.Types.Sink
import Data.Conduit.Util.Sink
import Data.Conduit.Types.Conduit
import Data.Conduit.Util.Conduit
infixr 0 $$
($$) :: IsSource src m => src m a -> Sink a m b -> m b
($$) = connect
class IsSource src m where
connect :: src m a -> Sink a m b -> m b
fuseLeft :: src m a -> Conduit a m b -> Source m b
instance Monad m => IsSource Source m where
connect = normalConnect
fuseLeft = normalFuseLeft
instance MonadIO m => IsSource BufferedSource m where
connect = bufferedConnect
fuseLeft = bufferedFuseLeft
normalConnect :: Monad m => Source m a -> Sink a m b -> m b
normalConnect src (Done _leftover output) = sourceClose src >> return output
normalConnect src (SinkM msink) = msink >>= normalConnect src
normalConnect (SourceM msrc _) sink@Processing{} = msrc >>= flip normalConnect sink
normalConnect Closed (Processing _ close) = close
normalConnect (Open src _ a) (Processing push _) = normalConnect src $ push a
data FuseLeftState srcState input m output =
FLClosed
| FLOpen srcState (ConduitPush input m output) (ConduitClose m output)
| FLHaveOutput srcState (Conduit input m output) (m ())
infixl 1 $=
($=) :: IsSource src m
=> src m a
-> Conduit a m b
-> Source m b
($=) = fuseLeft
normalFuseLeft :: Monad m => Source m a -> Conduit a m b -> Source m b
normalFuseLeft Closed (NeedInput _ close) = close
normalFuseLeft Closed (Finished _) = Closed
normalFuseLeft src (Finished _) = SourceM
(sourceClose src >> return Closed)
(sourceClose src)
normalFuseLeft src (HaveOutput p c x) = Open
(normalFuseLeft src p)
(sourceClose src >> c)
x
normalFuseLeft (Open src _ a) (NeedInput push _) = normalFuseLeft src $ push a
normalFuseLeft src (ConduitM mcon conclose) = SourceM
(liftM (normalFuseLeft src) mcon)
(conclose >> sourceClose src)
normalFuseLeft (SourceM msrc closeS) conduit@(NeedInput _ closeC) =
SourceM (liftM (flip normalFuseLeft conduit) msrc) $ do
closeS
sourceClose closeC
infixr 0 =$
(=$) :: Monad m => Conduit a m b -> Sink b m c -> Sink a m c
conduit =$ Done _leftover output = SinkM $ conduitClose conduit >> return (Done Nothing output)
conduit =$ SinkM msink = SinkM (liftM (conduit =$) msink)
NeedInput pushO closeO =$ sink = Processing
(\input -> pushO input =$ sink)
(closeO $$ sink)
Finished mleftover =$ Processing _ close = SinkM $ liftM (Done mleftover) close
ConduitM mcon _ =$ sink@Processing{} = SinkM $ liftM (=$ sink) mcon
HaveOutput con _ input =$ Processing pushI _ = con =$ pushI input
infixr 0 =$=
(=$=) :: Monad m => Conduit a m b -> Conduit b m c -> Conduit a m c
Finished mleftover =$= NeedInput _ closeI =
go closeI
where
go Closed = Finished mleftover
go (Open src close x) = HaveOutput (go src) close x
go (SourceM msrc close) = ConduitM (liftM go msrc) close
Finished mleftover =$= Finished _ = Finished mleftover
conO =$= HaveOutput con close x = HaveOutput (conO =$= con) close x
ConduitM mcon close =$= conI = ConduitM (liftM (=$= conI) mcon) (close >> conduitClose conI)
NeedInput pushO closeO =$= conI = NeedInput
(\input -> pushO input =$= conI)
(closeO $= conI)
conO =$= ConduitM mconI close = ConduitM (liftM (conO =$=) mconI) (close >> conduitClose conO)
HaveOutput conO _ inputI =$= NeedInput pushI _ = conO =$= pushI inputI
HaveOutput _ close _ =$= Finished _ = ConduitM (close >> return (Finished Nothing)) close
data BufferedSource m a = BufferedSource (I.IORef (BSState m a))
data BSState m a =
ClosedEmpty
| OpenEmpty (Source m a)
| ClosedFull a
| OpenFull (Source m a) a
bufferSource :: MonadIO m => Source m a -> m (BufferedSource m a)
bufferSource src = liftM BufferedSource $ liftIO $ I.newIORef $ OpenEmpty src
unbufferSource :: MonadIO m
=> BufferedSource m a
-> Source m a
unbufferSource (BufferedSource bs) =
SourceM msrc (msrc >>= sourceClose)
where
msrc = do
buf <- liftIO $ I.readIORef bs
case buf of
OpenEmpty src -> return src
OpenFull src a -> return $ Open src (sourceClose src) a
ClosedEmpty -> return Closed
ClosedFull a -> return $ Open Closed (return ()) a
bufferedConnect :: MonadIO m => BufferedSource m a -> Sink a m b -> m b
bufferedConnect _ (Done Nothing output) = return output
bufferedConnect _ (Done Just{} _) = error "Invariant violated: sink returned leftover without input"
bufferedConnect bsrc (SinkM msink) = msink >>= bufferedConnect bsrc
bufferedConnect (BufferedSource bs) (Processing push0 close0) = do
bsState <- liftIO $ I.readIORef bs
case bsState of
ClosedEmpty -> close0
OpenEmpty src -> connect' src push0 close0
ClosedFull a -> onRes Nothing $ push0 a
OpenFull src a -> onRes (Just src) $ push0 a
where
connect' Closed _ close = do
liftIO $ I.writeIORef bs ClosedEmpty
close
connect' (Open src _ x) push _ = onRes (Just src) $ push x
connect' (SourceM msrc _) push close = msrc >>= \src -> connect' src push close
onRes msrc (Done mleftover res) = do
let state =
case (msrc, mleftover) of
(Nothing, Nothing) -> ClosedEmpty
(Just src, Nothing) -> OpenEmpty src
(Nothing, Just leftover) -> ClosedFull leftover
(Just src, Just leftover) -> OpenFull src leftover
liftIO $ I.writeIORef bs state
return res
onRes Nothing (Processing _ close) = do
liftIO $ I.writeIORef bs ClosedEmpty
close
onRes (Just src) (Processing push close) = connect' src push close
onRes msrc (SinkM msink) = msink >>= onRes msrc
bufferedFuseLeft
:: MonadIO m
=> BufferedSource m a
-> Conduit a m b
-> Source m b
bufferedFuseLeft bsrc (ConduitM mcon close) = SourceM
(liftM (bufferedFuseLeft bsrc) mcon)
close
bufferedFuseLeft _ (Finished _) = Closed
bufferedFuseLeft bsrc (HaveOutput next close x) = Open
(bufferedFuseLeft bsrc next)
close
x
bufferedFuseLeft bsrc (NeedInput push0 close0) = SourceM
(pullF $ FLOpen () push0 close0)
(sourceClose close0)
where
mkSrc state = SourceM
(pullF state)
(closeF state)
pullF state' =
case state' of
FLClosed -> return Closed
FLHaveOutput () pull _ -> goRes pull
FLOpen () push close -> do
mres <- bsourcePull bsrc
case mres of
Nothing -> return close
Just input -> goRes $ push input
goRes (Finished leftover) = do
bsourceUnpull bsrc leftover
return Closed
goRes (HaveOutput pull close' x) =
let state = FLHaveOutput () pull close'
in return $ Open (mkSrc state) (closeF state) x
goRes (NeedInput pushI closeI) = pullF (FLOpen () pushI closeI)
goRes (ConduitM mcon _) = mcon >>= goRes
closeF state = do
case state of
FLClosed -> return ()
FLOpen () _ close -> do
() <- sourceClose close
return ()
FLHaveOutput () _ close -> close
bsourcePull :: MonadIO m => BufferedSource m a -> m (Maybe a)
bsourcePull (BufferedSource bs) =
liftIO (I.readIORef bs) >>= goBuf
where
goBuf (OpenEmpty Closed) = liftIO $ I.writeIORef bs ClosedEmpty >> return Nothing
goBuf (OpenEmpty (Open src _ a)) = do
liftIO $ I.writeIORef bs $ OpenEmpty src
return $ Just a
goBuf (OpenEmpty (SourceM msrc _)) = msrc >>= goBuf . OpenEmpty
goBuf ClosedEmpty = return Nothing
goBuf (OpenFull src a) = do
liftIO $ I.writeIORef bs (OpenEmpty src)
return $ Just a
goBuf (ClosedFull a) = do
liftIO $ I.writeIORef bs ClosedEmpty
return $ Just a
bsourceUnpull :: MonadIO m => BufferedSource m a -> Maybe a -> m ()
bsourceUnpull _ Nothing = return ()
bsourceUnpull (BufferedSource ref) (Just a) = do
buf <- liftIO $ I.readIORef ref
case buf of
OpenEmpty src -> liftIO $ I.writeIORef ref (OpenFull src a)
ClosedEmpty -> liftIO $ I.writeIORef ref (ClosedFull a)
_ -> error $ "Invariant violated: bsourceUnpull called on full data"
bsourceClose :: MonadIO m => BufferedSource m a -> m ()
bsourceClose (BufferedSource ref) = do
buf <- liftIO $ I.readIORef ref
case buf of
OpenEmpty src -> sourceClose src
OpenFull src _ -> sourceClose src
ClosedEmpty -> return ()
ClosedFull _ -> return ()
data Flush a = Chunk a | Flush
deriving (Show, Eq, Ord)
instance Functor Flush where
fmap _ Flush = Flush
fmap f (Chunk a) = Chunk (f a)