module Data.Conduit.Shell.Process
(
run
,Data.Conduit.Shell.Process.shell
,Data.Conduit.Shell.Process.proc
,bytes
,unbytes
,withRights
,redirect
,quiet
,writeChunks
,discardChunks
,conduitProcess
)
where
import Data.Conduit.Shell.Types
import Control.Applicative
import Control.Concurrent (forkIO)
import Control.Concurrent.Chan
import qualified Control.Exception as E
import Control.Monad
import Control.Monad.Fix
import Control.Monad.Trans
import Control.Monad.Trans.Resource
import Data.ByteString (ByteString)
import qualified Data.ByteString as S
import Data.Conduit
import Data.Conduit.List (sourceList)
import qualified Data.Conduit.List as CL
import Data.Conduit.Process
import Data.Semigroup
import Data.These
import System.Exit (ExitCode(..))
import System.IO
import qualified System.Process
bytes :: Monad m => Conduit Chunk m ByteString
bytes = CL.mapMaybe (either (const Nothing) Just)
unbytes :: Monad m => Conduit ByteString m Chunk
unbytes = CL.map Right
shell :: (MonadResource m) => String -> Conduit Chunk m Chunk
shell = conduitProcess . System.Process.shell
proc :: (MonadResource m) => String -> [String] -> Conduit Chunk m Chunk
proc px args = conduitProcess (System.Process.proc px args)
bufSize :: Int
bufSize = 64 * 1024
withRights :: (Monad m)
=> Conduit ByteString m ByteString -> Conduit Chunk m Chunk
withRights f =
getZipConduit
(ZipConduit f' *>
ZipConduit g')
where f' =
CL.mapMaybe (either (const Nothing) Just) =$=
f =$=
CL.map Right
g' = CL.filter isLeft
redirect :: Monad m
=> ChunkType -> Conduit Chunk m Chunk
redirect ty =
CL.map (\c' ->
case c' of
Left x' ->
case ty of
Stderr -> Right x'
Stdout -> c'
Right x' ->
case ty of
Stderr -> c'
Stdout -> Left x')
quiet :: (Monad m,MonadIO m) => Conduit Chunk m Chunk -> Conduit Chunk m Chunk
quiet m = m $= discardChunks
run :: (MonadIO m,MonadBaseControl IO m)
=> Conduit Chunk (ShellT m) Chunk -> m ()
run p =
runResourceT
(runShellT (sourceList [] $=
p $$
writeChunks))
writeChunks :: (MonadIO m)
=> Consumer Chunk m ()
writeChunks =
awaitForever
(\c ->
case c of
Left e -> liftIO (S.hPut stderr e)
Right o -> liftIO (S.hPut stdout o))
discardChunks :: (MonadIO m)
=> Consumer Chunk m ()
discardChunks = awaitForever (const (return ()))
conduitProcess :: (MonadResource m)
=> CreateProcess -> Conduit Chunk m Chunk
conduitProcess cp =
bracketP createp closep startProxy
where createp =
createProcess
cp {std_in = CreatePipe
,std_out = CreatePipe
,std_err = CreatePipe}
closep (Just cin,Just cout,Just cerr,ph) =
do hClose cin
hClose cout
hClose cerr
_ <- waitForProcess' ph
return ()
closep _ = error "conduitProcess: unexpected arguments to closep"
startProxy :: (MonadIO m,MonadThrow m)
=> (Maybe Handle,Maybe Handle,Maybe Handle,ProcessHandle)
-> ConduitM Chunk Chunk m ()
startProxy (Just cin,Just cout,Just cerr,ph) = interleave
where interleave =
do end <- proxyInterleaved
liftIO (hClose cin)
remainders cout cerr
ec <- liftIO (maybe (waitForProcess' ph) return end)
case ec of
ExitSuccess -> return ()
ExitFailure i ->
monadThrow (ShellExitFailure i)
proxyInterleaved =
do proxy cout Right
proxy cerr Left
ended <- liftIO (getProcessExitCode ph)
case ended of
Just{} -> return ended
Nothing ->
do minp <- await
case minp of
Nothing -> return Nothing
Just chunk ->
do case chunk of
Left{} -> yield chunk
Right bytes ->
liftIO (do S.hPut cin bytes
hFlush cin)
proxyInterleaved
startProxy _ = error "startProxy: unexpected arguments"
remainders :: MonadIO m
=> Handle -> Handle -> ConduitM i (Either ByteString ByteString) m ()
remainders cout cerr =
do chan <- liftIO newChan
void (liftIO (forkIO (remainder chan cout Right)))
void (liftIO (forkIO (remainder chan cerr Left)))
fix (\loop done ->
case done of
Just (These () ()) -> return ()
_ ->
do chunk <- liftIO (readChan chan)
case chunk of
Left mchunk ->
case mchunk of
Nothing ->
loop (done <>
Just (This ()))
Just chunk ->
do yield (Left chunk)
loop done
Right mchunk ->
case mchunk of
Nothing ->
loop (done <>
Just (That ()))
Just chunk ->
do yield (Right chunk)
loop done)
(Nothing :: Maybe (These () ()))
remainder :: Chan (Either (Maybe ByteString) (Maybe ByteString))
-> Handle
-> (Maybe ByteString -> Either (Maybe ByteString) (Maybe ByteString))
-> IO ()
remainder chan h cons =
do bytes <- S.hGetSome h bufSize
if S.null bytes
then writeChan chan (cons Nothing)
else do writeChan chan (cons (Just bytes))
remainder chan h cons
proxy :: MonadIO m
=> Handle -> (ByteString -> o) -> ConduitM i o m ()
proxy h cons =
do ready <- liftIO (hReady' h)
if not ready
then return ()
else do bytes <- liftIO (S.hGetSome h bufSize)
yield (cons bytes)
proxy h cons
hReady' :: Handle -> IO Bool
hReady' h =
E.catch (hReady h)
(\(E.SomeException _) -> return False)
waitForProcess' :: ProcessHandle -> IO ExitCode
waitForProcess' ph =
E.catch (waitForProcess ph)
(\(E.SomeException _) ->
return ExitSuccess)
isLeft :: Either a b -> Bool
isLeft (Left _) = True
isLeft _ = False