module Data.Conduit.Process.Unix
(
ProcessTracker
, initProcessTracker
, MonitoredProcess
, monitorProcess
, terminateMonitoredProcess
) where
import Control.Applicative ((<$>), (<*>))
import Control.Arrow ((***))
import Control.Concurrent (forkIO)
import Control.Concurrent (threadDelay)
import Control.Concurrent.MVar (MVar, modifyMVar, modifyMVar_,
newEmptyMVar, newMVar,
putMVar, readMVar, swapMVar,
takeMVar)
import Control.Exception (Exception, SomeException,
bracketOnError, finally,
handle, mask_,
throwIO, try)
import Control.Monad (void)
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as S8
import Data.Conduit (Source, ($$))
import Data.Conduit.Binary (sinkHandle, sourceHandle)
import qualified Data.Conduit.List as CL
import Data.IORef (IORef, newIORef, readIORef,
writeIORef)
import Data.Time (getCurrentTime)
import Data.Time (diffUTCTime)
import Data.Typeable (Typeable)
import Foreign.C.Types
import Prelude (Bool (..), Either (..), IO,
Maybe (..), Monad (..), Show,
const, error, flip, fmap,
fromIntegral, fst, head, id,
length, map, maybe, show, snd,
($), ($!), (*), (.), (<),
(==))
import System.Exit (ExitCode)
import System.IO (hClose)
import System.Posix.IO.ByteString ( closeFd, createPipe,
fdToHandle)
import System.Posix.Signals (sigKILL, signalProcess)
import System.Posix.Types (CPid (..))
import System.Process
import System.Process.Internals (ProcessHandle (..),
ProcessHandle__ (..))
processHandleMVar :: ProcessHandle -> MVar ProcessHandle__
#if MIN_VERSION_process(1, 2, 0)
processHandleMVar (ProcessHandle m _) = m
#else
processHandleMVar (ProcessHandle m) = m
#endif
withProcessHandle_
:: ProcessHandle
-> (ProcessHandle__ -> IO ProcessHandle__)
-> IO ()
withProcessHandle_ ph io = modifyMVar_ (processHandleMVar ph) io
killProcess :: ProcessHandle -> IO ()
killProcess ph = withProcessHandle_ ph $ \p_ ->
case p_ of
ClosedHandle _ -> return p_
OpenHandle h -> do
signalProcess sigKILL h
return p_
ignoreExceptions :: IO () -> IO ()
ignoreExceptions = handle (\(_ :: SomeException) -> return ())
foreign import ccall unsafe "launch_process_tracker"
c_launch_process_tracker :: IO CInt
foreign import ccall unsafe "track_process"
c_track_process :: ProcessTracker -> CPid -> CInt -> IO ()
newtype ProcessTracker = ProcessTracker CInt
data TrackedProcess = TrackedProcess !ProcessTracker !(IORef MaybePid) !(IO ExitCode)
data MaybePid = NoPid | Pid !CPid
initProcessTracker :: IO ProcessTracker
initProcessTracker = do
i <- c_launch_process_tracker
if i == 1
then throwIO CannotLaunchProcessTracker
else return $! ProcessTracker i
data ProcessTrackerException = CannotLaunchProcessTracker
deriving (Show, Typeable)
instance Exception ProcessTrackerException
trackProcess :: ProcessTracker -> ProcessHandle -> IO TrackedProcess
trackProcess pt ph = mask_ $ do
mpid <- readMVar $ processHandleMVar ph
mpid' <- case mpid of
ClosedHandle{} -> return NoPid
OpenHandle pid -> do
c_track_process pt pid 1
return $ Pid pid
ipid <- newIORef mpid'
baton <- newEmptyMVar
let tp = TrackedProcess pt ipid (takeMVar baton)
case mpid' of
NoPid -> return ()
Pid _ -> void $ forkIO $ do
waitForProcess ph >>= putMVar baton
untrackProcess tp
return $! tp
untrackProcess :: TrackedProcess -> IO ()
untrackProcess (TrackedProcess pt ipid _) = mask_ $ do
mpid <- readIORef ipid
case mpid of
NoPid -> return ()
Pid pid -> do
c_track_process pt pid 0
writeIORef ipid NoPid
forkExecuteLog :: ByteString
-> [ByteString]
-> Maybe [(ByteString, ByteString)]
-> Maybe ByteString
-> Maybe (Source IO ByteString)
-> (ByteString -> IO ())
-> IO ProcessHandle
forkExecuteLog cmd args menv mwdir mstdin rlog = bracketOnError
setupPipe
cleanupPipes
usePipes
where
setupPipe = bracketOnError
createPipe
(\(x, y) -> closeFd x `finally` closeFd y)
(\(x, y) -> (,) <$> fdToHandle x <*> fdToHandle y)
cleanupPipes (x, y) = hClose x `finally` hClose y
usePipes pipes@(readerH, writerH) = do
(min, _, _, ph) <- createProcess CreateProcess
{ cmdspec = RawCommand (S8.unpack cmd) (map S8.unpack args)
, cwd = S8.unpack <$> mwdir
, env = map (S8.unpack *** S8.unpack) <$> menv
, std_in = maybe Inherit (const CreatePipe) mstdin
, std_out = UseHandle writerH
, std_err = UseHandle writerH
, close_fds = True
, create_group = True
#if MIN_VERSION_process(1, 2, 0)
, delegate_ctlc = False
#endif
}
ignoreExceptions $ addAttachMessage pipes ph
void $ forkIO $ ignoreExceptions $
(sourceHandle readerH $$ CL.mapM_ rlog) `finally` hClose readerH
case (min, mstdin) of
(Just h, Just source) -> void $ forkIO $ ignoreExceptions $
(source $$ sinkHandle h) `finally` hClose h
(Nothing, Nothing) -> return ()
_ -> error $ "Invariant violated: Data.Conduit.Process.Unix.forkExecuteLog"
return ph
addAttachMessage pipes ph = withProcessHandle_ ph $ \p_ -> do
now <- getCurrentTime
case p_ of
ClosedHandle ec -> do
rlog $ S8.concat
[ "\n\n"
, S8.pack $ show now
, ": Process immediately died with exit code "
, S8.pack $ show ec
, "\n\n"
]
cleanupPipes pipes
OpenHandle h -> do
rlog $ S8.concat
[ "\n\n"
, S8.pack $ show now
, ": Attached new process "
, S8.pack $ show h
, "\n\n"
]
return p_
data Status = NeedsRestart | NoRestart | Running ProcessHandle
monitorProcess
:: (ByteString -> IO ())
-> ProcessTracker
-> Maybe S8.ByteString
-> S8.ByteString
-> S8.ByteString
-> [S8.ByteString]
-> [(S8.ByteString, S8.ByteString)]
-> (ByteString -> IO ())
-> (ExitCode -> IO Bool)
-> IO MonitoredProcess
monitorProcess log processTracker msetuid exec dir args env' rlog shouldRestart = do
mstatus <- newMVar NeedsRestart
let loop mlast = do
next <- modifyMVar mstatus $ \status ->
case status of
NoRestart -> return (NoRestart, return ())
_ -> do
now <- getCurrentTime
case mlast of
Just last | diffUTCTime now last < 5 -> do
log $ "Process restarting too quickly, waiting before trying again: " `S8.append` exec
threadDelay $ 5 * 1000 * 1000
_ -> return ()
let (cmd, args') =
case msetuid of
Nothing -> (exec, args)
Just setuid -> ("sudo", "-E" : "-u" : setuid : "--" : exec : args)
res <- try $ forkExecuteLog
cmd
args'
(Just env')
(Just dir)
(Just $ return ())
rlog
case res of
Left e -> do
log $ "Data.Conduit.Process.Unix.monitorProcess: " `S8.append` S8.pack (show (e :: SomeException))
return (NeedsRestart, return ())
Right pid -> do
log $ "Process created: " `S8.append` exec
return (Running pid, do
TrackedProcess _ _ wait <- trackProcess processTracker pid
ec <- wait
shouldRestart' <- shouldRestart ec
if shouldRestart'
then loop (Just now)
else return ())
next
_ <- forkIO $ loop Nothing
return $ MonitoredProcess mstatus
newtype MonitoredProcess = MonitoredProcess (MVar Status)
terminateMonitoredProcess :: MonitoredProcess -> IO ()
terminateMonitoredProcess (MonitoredProcess mstatus) = do
status <- swapMVar mstatus NoRestart
case status of
Running pid -> do
terminateProcess pid
threadDelay 1000000
killProcess pid
_ -> return ()