{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE RecursiveDo #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Reflex.Process
( createProcess
, createProcessBufferingInput
, defProcessConfig
, unsafeCreateProcessWithHandles
, Process(..)
, ProcessConfig(..)
, SendPipe (..)
, createRedirectedProcess
) where
import Control.Concurrent.Async (Async, async, race_, waitBoth)
import Control.Concurrent.Chan (newChan, readChan, writeChan)
import Control.Exception (finally)
import Control.Monad (void, when)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Fix (MonadFix)
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Data.Default (Default, def)
import Data.Function (fix)
import Data.Traversable (for)
import GHC.IO.Handle (Handle)
import qualified GHC.IO.Handle as H
import System.Exit (ExitCode)
import qualified System.Posix.Signals as P
import System.Process hiding (createProcess)
import qualified System.Process as P
import Reflex
data SendPipe i
= SendPipe_Message i
| SendPipe_EOF
| SendPipe_LastMessage i
data ProcessConfig t i = ProcessConfig
{ _processConfig_stdin :: Event t i
, _processConfig_signal :: Event t P.Signal
}
instance Reflex t => Default (ProcessConfig t i) where
def = defProcessConfig
defProcessConfig :: Reflex t => ProcessConfig t i
defProcessConfig = ProcessConfig never never
data Process t o e = Process
{ _process_handle :: P.ProcessHandle
, _process_stdout :: Event t o
, _process_stderr :: Event t e
, _process_exit :: Event t ExitCode
, _process_signal :: Event t P.Signal
}
createProcess
:: (MonadIO m, TriggerEvent t m, PerformEvent t m, MonadIO (Performable m), MonadFix m)
=> P.CreateProcess
-> ProcessConfig t (SendPipe ByteString)
-> m (Process t ByteString ByteString)
createProcess p procConfig = do
channel <- liftIO newChan
createProcessBufferingInput (readChan channel) (writeChan channel) p procConfig
createProcessBufferingInput
:: (MonadIO m, TriggerEvent t m, PerformEvent t m, MonadIO (Performable m), MonadFix m)
=> IO (SendPipe ByteString)
-> (SendPipe ByteString -> IO ())
-> P.CreateProcess
-> ProcessConfig t (SendPipe ByteString)
-> m (Process t ByteString ByteString)
createProcessBufferingInput readBuffer writeBuffer spec config = do
rec p <- unsafeCreateProcessWithHandles (input $ _process_handle p) output output spec config
pure p
where
input :: ProcessHandle -> Handle -> IO (SendPipe ByteString -> IO ())
input ph h = do
H.hSetBuffering h H.NoBuffering
void $ liftIO $ async $ race_ (waitForProcess ph) $ fix $ \loop -> do
newMessage <- readBuffer
open <- H.hIsOpen h
when open $ do
writable <- H.hIsWritable h
when writable $ do
case newMessage of
SendPipe_Message m -> BS.hPutStr h m *> loop
SendPipe_LastMessage m -> BS.hPutStr h m *> H.hClose h
SendPipe_EOF -> H.hClose h
return writeBuffer
output h trigger = do
H.hSetBuffering h H.LineBuffering
pure $ fix $ \go -> do
open <- H.hIsOpen h
when open $ do
readable <- H.hIsReadable h
when readable $ do
out <- BS.hGetSome h 32768
if BS.null out
then H.hClose h
else void (trigger out) *> go
unsafeCreateProcessWithHandles
:: forall t m i o e. (MonadIO m, TriggerEvent t m, PerformEvent t m, MonadIO (Performable m))
=> (Handle -> IO (i -> IO ()))
-> (Handle -> (o -> IO ()) -> IO (IO ()))
-> (Handle -> (e -> IO ()) -> IO (IO ()))
-> P.CreateProcess
-> ProcessConfig t i
-> m (Process t o e)
unsafeCreateProcessWithHandles mkWriteStdInput mkReadStdOutput mkReadStdError p (ProcessConfig input signal) = do
po <- liftIO $ P.createProcess p { std_in = P.CreatePipe, std_out = P.CreatePipe, std_err = P.CreatePipe }
(hIn, hOut, hErr, ph) <- case po of
(Just hIn, Just hOut, Just hErr, ph) -> pure (hIn, hOut, hErr, ph)
_ -> error "Reflex.Process.unsafeCreateProcessWithHandles: Created pipes were not returned by System.Process.createProcess."
sigOut :: Event t (Maybe P.Signal) <- performEvent $ ffor signal $ \sig -> liftIO $ do
mpid <- P.getPid ph
for mpid $ \pid -> sig <$ P.signalProcess sig pid
let
output :: Handle -> m (Event t o, Async ())
output h = do
(e, trigger) <- newTriggerEvent
reader <- liftIO $ mkReadStdOutput h trigger
t <- liftIO $ async reader
return (e, t)
errOutput :: Handle -> m (Event t e, Async ())
errOutput h = do
(e, trigger) <- newTriggerEvent
reader <- liftIO $ mkReadStdError h trigger
t <- liftIO $ async reader
return (e, t)
(out, outThread) <- output hOut
(err, errThread) <- errOutput hErr
(ecOut, ecTrigger) <- newTriggerEvent
void $ liftIO $ async $ flip finally (P.cleanupProcess (Just hIn, Just hOut, Just hErr, ph)) $ do
waited <- waitForProcess ph
_ <- waitBoth outThread errThread
ecTrigger waited
writeInput :: i -> IO () <- liftIO $ mkWriteStdInput hIn
performEvent_ $ liftIO . writeInput <$> input
return $ Process
{ _process_exit = ecOut
, _process_stdout = out
, _process_stderr = err
, _process_signal = fmapMaybe id sigOut
, _process_handle = ph
}
{-# DEPRECATED createRedirectedProcess "Use unsafeCreateProcessWithHandles instead." #-}
createRedirectedProcess
:: forall t m i o e. (MonadIO m, TriggerEvent t m, PerformEvent t m, MonadIO (Performable m))
=> (Handle -> IO (i -> IO ()))
-> (Handle -> (o -> IO ()) -> IO (IO ()))
-> (Handle -> (e -> IO ()) -> IO (IO ()))
-> P.CreateProcess
-> ProcessConfig t i
-> m (Process t o e)
createRedirectedProcess = unsafeCreateProcessWithHandles