{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE CPP #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
module Data.Conduit.Process
(
sourceCmdWithConsumer
, sourceProcessWithConsumer
, sourceCmdWithStreams
, sourceProcessWithStreams
, withCheckedProcessCleanup
, FlushInput(..)
, BuilderInput(..)
, module Data.Streaming.Process
) where
import Data.Streaming.Process
import Data.Streaming.Process.Internal
import System.Exit (ExitCode (..))
import Control.Monad.IO.Unlift (MonadIO, liftIO, MonadUnliftIO, withRunInIO, withUnliftIO, unliftIO)
import System.IO (hClose, BufferMode (NoBuffering), hSetBuffering)
import Data.Conduit
import Data.Functor (($>))
import Data.Conduit.Binary (sourceHandle, sinkHandle, sinkHandleBuilder, sinkHandleFlush)
import Data.ByteString (ByteString)
import Data.ByteString.Builder (Builder)
import Control.Concurrent.Async (runConcurrently, Concurrently(..))
import Control.Exception (onException, throwIO, finally, bracket)
#if (__GLASGOW_HASKELL__ < 710)
import Control.Applicative ((<$>), (<*>))
#endif
instance (r ~ (), MonadIO m, i ~ ByteString) => InputSource (ConduitM i o m r) where
isStdStream = (\(Just h) -> hSetBuffering h NoBuffering $> sinkHandle h, Just CreatePipe)
instance (r ~ (), r' ~ (), MonadIO m, MonadIO n, i ~ ByteString) => InputSource (ConduitM i o m r, n r') where
isStdStream = (\(Just h) -> hSetBuffering h NoBuffering $> (sinkHandle h, liftIO $ hClose h), Just CreatePipe)
newtype BuilderInput o m r = BuilderInput (ConduitM Builder o m r)
newtype FlushInput o m r = FlushInput (ConduitM (Flush ByteString) o m r)
instance (MonadIO m, r ~ ()) => InputSource (BuilderInput o m r) where
isStdStream = (\(Just h) -> return $ BuilderInput $ sinkHandleBuilder h, Just CreatePipe)
instance (MonadIO m, MonadIO n, r ~ (), r' ~ ()) => InputSource (BuilderInput o m r, n r') where
isStdStream = (\(Just h) -> return (BuilderInput $ sinkHandleBuilder h, liftIO $ hClose h), Just CreatePipe)
instance (MonadIO m, r ~ ()) => InputSource (FlushInput o m r) where
isStdStream = (\(Just h) -> return $ FlushInput $ sinkHandleFlush h, Just CreatePipe)
instance (MonadIO m, MonadIO n, r ~ (), r' ~ ()) => InputSource (FlushInput o m r, n r') where
isStdStream = (\(Just h) -> return (FlushInput $ sinkHandleFlush h, liftIO $ hClose h), Just CreatePipe)
instance (r ~ (), MonadIO m, o ~ ByteString) => OutputSink (ConduitM i o m r) where
osStdStream = (\(Just h) -> hSetBuffering h NoBuffering $> sourceHandle h, Just CreatePipe)
instance (r ~ (), r' ~ (), MonadIO m, MonadIO n, o ~ ByteString) => OutputSink (ConduitM i o m r, n r') where
osStdStream = (\(Just h) -> hSetBuffering h NoBuffering $> (sourceHandle h, liftIO $ hClose h), Just CreatePipe)
sourceProcessWithConsumer :: MonadIO m
=> CreateProcess
-> ConduitT ByteString Void m a
-> m (ExitCode, a)
sourceProcessWithConsumer cp consumer = do
(ClosedStream, (source, close), ClosedStream, cph) <- streamingProcess cp
res <- runConduit $ source .| consumer
close
ec <- waitForStreamingProcess cph
return (ec, res)
sourceCmdWithConsumer :: MonadIO m
=> String
-> ConduitT ByteString Void m a
-> m (ExitCode, a)
sourceCmdWithConsumer cmd = sourceProcessWithConsumer (shell cmd)
sourceProcessWithStreams
:: MonadUnliftIO m
=> CreateProcess
-> ConduitT () ByteString m ()
-> ConduitT ByteString Void m a
-> ConduitT ByteString Void m b
-> m (ExitCode, a, b)
sourceProcessWithStreams cp producerStdin consumerStdout consumerStderr =
withUnliftIO $ \u -> do
( (sinkStdin, closeStdin)
, (sourceStdout, closeStdout)
, (sourceStderr, closeStderr)
, sph) <- streamingProcess cp
(_, resStdout, resStderr) <-
runConcurrently (
(,,)
<$> Concurrently ((unliftIO u $ runConduit $ producerStdin .| sinkStdin) `finally` closeStdin)
<*> Concurrently (unliftIO u $ runConduit $ sourceStdout .| consumerStdout)
<*> Concurrently (unliftIO u $ runConduit $ sourceStderr .| consumerStderr))
`finally` (closeStdout >> closeStderr)
`onException` terminateStreamingProcess sph
ec <- waitForStreamingProcess sph
return (ec, resStdout, resStderr)
sourceCmdWithStreams
:: MonadUnliftIO m
=> String
-> ConduitT () ByteString m ()
-> ConduitT ByteString Void m a
-> ConduitT ByteString Void m b
-> m (ExitCode, a, b)
sourceCmdWithStreams cmd = sourceProcessWithStreams (shell cmd)
withCheckedProcessCleanup
:: ( InputSource stdin
, OutputSink stderr
, OutputSink stdout
, MonadUnliftIO m
)
=> CreateProcess
-> (stdin -> stdout -> stderr -> m b)
-> m b
withCheckedProcessCleanup cp f = withRunInIO $ \run -> bracket
(streamingProcess cp)
(\(_, _, _, sph) -> closeStreamingProcessHandle sph)
$ \(x, y, z, sph) -> do
res <- run (f x y z) `onException` terminateStreamingProcess sph
ec <- waitForStreamingProcess sph
if ec == ExitSuccess
then return res
else throwIO $ ProcessExitedUnsuccessfully cp ec
terminateStreamingProcess :: MonadIO m => StreamingProcessHandle -> m ()
terminateStreamingProcess = liftIO . terminateProcess . streamingProcessHandleRaw