module Data.Conduit.Process
(
sourceCmdWithConsumer
, sourceProcessWithConsumer
, sourceCmdWithStreams
, sourceProcessWithStreams
, withCheckedProcessCleanup
, module Data.Streaming.Process
) where
import Data.Streaming.Process
import Data.Streaming.Process.Internal
import System.Exit (ExitCode (..))
import Control.Monad.IO.Class (MonadIO, liftIO)
import System.IO (hClose)
import Data.Conduit
import Data.Conduit.Binary (sourceHandle, sinkHandle, sinkHandleBuilder, sinkHandleFlush)
import Data.ByteString (ByteString)
import Data.ByteString.Builder (Builder)
import Control.Concurrent.Async (runConcurrently, Concurrently(..))
import Control.Monad.Catch (MonadMask, onException, throwM, finally, bracket)
#if (__GLASGOW_HASKELL__ < 710)
import Control.Applicative ((<$>), (<*>))
#endif
instance (r ~ (), MonadIO m, i ~ ByteString) => InputSource (ConduitM i o m r) where
isStdStream = (\(Just h) -> return $ sinkHandle h, Just CreatePipe)
instance (r ~ (), r' ~ (), MonadIO m, MonadIO n, i ~ ByteString) => InputSource (ConduitM i o m r, n r') where
isStdStream = (\(Just h) -> return (sinkHandle h, liftIO $ hClose h), Just CreatePipe)
newtype BuilderInput o m r = BuilderInput (ConduitM Builder o m r)
newtype FlushInput o m r = FlushInput (ConduitM (Flush ByteString) o m r)
instance (MonadIO m, r ~ ()) => InputSource (BuilderInput o m r) where
isStdStream = (\(Just h) -> return $ BuilderInput $ sinkHandleBuilder h, Just CreatePipe)
instance (MonadIO m, MonadIO n, r ~ (), r' ~ ()) => InputSource (BuilderInput o m r, n r') where
isStdStream = (\(Just h) -> return (BuilderInput $ sinkHandleBuilder h, liftIO $ hClose h), Just CreatePipe)
instance (MonadIO m, r ~ ()) => InputSource (FlushInput o m r) where
isStdStream = (\(Just h) -> return $ FlushInput $ sinkHandleFlush h, Just CreatePipe)
instance (MonadIO m, MonadIO n, r ~ (), r' ~ ()) => InputSource (FlushInput o m r, n r') where
isStdStream = (\(Just h) -> return (FlushInput $ sinkHandleFlush h, liftIO $ hClose h), Just CreatePipe)
instance (r ~ (), MonadIO m, o ~ ByteString) => OutputSink (ConduitM i o m r) where
osStdStream = (\(Just h) -> return $ sourceHandle h, Just CreatePipe)
instance (r ~ (), r' ~ (), MonadIO m, MonadIO n, o ~ ByteString) => OutputSink (ConduitM i o m r, n r') where
osStdStream = (\(Just h) -> return (sourceHandle h, liftIO $ hClose h), Just CreatePipe)
sourceProcessWithConsumer :: MonadIO m
=> CreateProcess
-> Consumer ByteString m a
-> m (ExitCode, a)
sourceProcessWithConsumer cp consumer = do
(ClosedStream, (source, close), ClosedStream, cph) <- streamingProcess cp
res <- source $$ consumer
close
ec <- waitForStreamingProcess cph
return (ec, res)
sourceCmdWithConsumer :: MonadIO m
=> String
-> Consumer ByteString m a
-> m (ExitCode, a)
sourceCmdWithConsumer cmd = sourceProcessWithConsumer (shell cmd)
sourceProcessWithStreams :: CreateProcess
-> Producer IO ByteString
-> Consumer ByteString IO a
-> Consumer ByteString IO b
-> IO (ExitCode, a, b)
sourceProcessWithStreams cp producerStdin consumerStdout consumerStderr = do
( (sinkStdin, closeStdin)
, (sourceStdout, closeStdout)
, (sourceStderr, closeStderr)
, sph) <- streamingProcess cp
(_, resStdout, resStderr) <-
runConcurrently (
(,,)
<$> Concurrently ((producerStdin $$ sinkStdin) `finally` closeStdin)
<*> Concurrently (sourceStdout $$ consumerStdout)
<*> Concurrently (sourceStderr $$ consumerStderr))
`finally` (closeStdout >> closeStderr)
`onException` terminateStreamingProcess sph
ec <- waitForStreamingProcess sph
return (ec, resStdout, resStderr)
sourceCmdWithStreams :: String
-> Producer IO ByteString
-> Consumer ByteString IO a
-> Consumer ByteString IO b
-> IO (ExitCode, a, b)
sourceCmdWithStreams cmd = sourceProcessWithStreams (shell cmd)
withCheckedProcessCleanup
:: ( InputSource stdin
, OutputSink stderr
, OutputSink stdout
, MonadIO m
, MonadMask m
)
=> CreateProcess
-> (stdin -> stdout -> stderr -> m b)
-> m b
withCheckedProcessCleanup cp f = bracket
(streamingProcess cp)
(\(_, _, _, sph) -> closeStreamingProcessHandle sph)
$ \(x, y, z, sph) -> do
res <- f x y z `onException` liftIO (terminateStreamingProcess sph)
ec <- waitForStreamingProcess sph
if ec == ExitSuccess
then return res
else throwM $ ProcessExitedUnsuccessfully cp ec
terminateStreamingProcess :: StreamingProcessHandle -> IO ()
terminateStreamingProcess = terminateProcess . streamingProcessHandleRaw