module System.Console.Concurrent.Internal where
import System.IO
import System.Directory
import System.Exit
import Control.Monad
import Control.Monad.IO.Class (liftIO, MonadIO)
import System.IO.Unsafe (unsafePerformIO)
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.Async
import Data.Maybe
import Data.List
import Data.Monoid
import qualified System.Process as P
import qualified Data.Text as T
import qualified Data.Text.IO as T
import qualified Data.Text.Lazy as L
import Control.Applicative
import Prelude
import Utility.Monad
import Utility.Exception
data OutputHandle = OutputHandle
{ outputLock :: TMVar Lock
, outputBuffer :: TMVar OutputBuffer
, errorBuffer :: TMVar OutputBuffer
, outputThreads :: TMVar Integer
}
data Lock = Locked
globalOutputHandle :: OutputHandle
globalOutputHandle = unsafePerformIO $ OutputHandle
<$> newEmptyTMVarIO
<*> newTMVarIO (OutputBuffer [])
<*> newTMVarIO (OutputBuffer [])
<*> newTMVarIO 0
lockOutput :: (MonadIO m, MonadMask m) => m a -> m a
lockOutput = bracket_ (liftIO takeOutputLock) (liftIO dropOutputLock)
takeOutputLock :: IO ()
takeOutputLock = void $ takeOutputLock' True
tryTakeOutputLock :: IO Bool
tryTakeOutputLock = takeOutputLock' False
withLock :: (TMVar Lock -> STM a) -> IO a
withLock a = atomically $ a (outputLock globalOutputHandle)
takeOutputLock' :: Bool -> IO Bool
takeOutputLock' block = do
locked <- withLock $ \l -> do
v <- tryTakeTMVar l
case v of
Just Locked
| block -> retry
| otherwise -> do
putTMVar l Locked
return False
Nothing -> do
putTMVar l Locked
return True
when locked $ do
(outbuf, errbuf) <- atomically $ (,)
<$> swapTMVar (outputBuffer globalOutputHandle) (OutputBuffer [])
<*> swapTMVar (errorBuffer globalOutputHandle) (OutputBuffer [])
emitOutputBuffer StdOut outbuf
emitOutputBuffer StdErr errbuf
return locked
dropOutputLock :: IO ()
dropOutputLock = withLock $ void . takeTMVar
withConcurrentOutput :: (MonadIO m, MonadMask m) => m a -> m a
withConcurrentOutput a = a `finally` liftIO flushConcurrentOutput
flushConcurrentOutput :: IO ()
flushConcurrentOutput = do
atomically $ do
r <- takeTMVar (outputThreads globalOutputHandle)
if r <= 0
then putTMVar (outputThreads globalOutputHandle) r
else retry
lockOutput $ return ()
class Outputable v where
toOutput :: v -> T.Text
instance Outputable T.Text where
toOutput = id
instance Outputable L.Text where
toOutput = toOutput . L.toStrict
instance Outputable String where
toOutput = toOutput . T.pack
outputConcurrent :: Outputable v => v -> IO ()
outputConcurrent = outputConcurrent' StdOut
errorConcurrent :: Outputable v => v -> IO ()
errorConcurrent = outputConcurrent' StdErr
outputConcurrent' :: Outputable v => StdHandle -> v -> IO ()
outputConcurrent' stdh v = bracket setup cleanup go
where
setup = tryTakeOutputLock
cleanup False = return ()
cleanup True = dropOutputLock
go True = do
T.hPutStr h (toOutput v)
hFlush h
go False = do
oldbuf <- atomically $ takeTMVar bv
newbuf <- addOutputBuffer (Output (toOutput v)) oldbuf
atomically $ putTMVar bv newbuf
h = toHandle stdh
bv = bufferFor stdh
type ConcurrentProcessHandle = P.ProcessHandle
waitForProcessConcurrent :: ConcurrentProcessHandle -> IO ExitCode
waitForProcessConcurrent = P.waitForProcess
createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle)
createProcessConcurrent p
| willOutput (P.std_out p) || willOutput (P.std_err p) =
ifM tryTakeOutputLock
( fgProcess p
, bgProcess p
)
| otherwise = do
r@(_, _, _, h) <- P.createProcess p
_ <- async $ void $ tryIO $ P.waitForProcess h
return r
createProcessForeground :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle)
createProcessForeground p = do
takeOutputLock
fgProcess p
fgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle)
fgProcess p = do
r@(_, _, _, h) <- P.createProcess p
`onException` dropOutputLock
registerOutputThread
_ <- async $ do
void $ tryIO $ P.waitForProcess h
unregisterOutputThread
dropOutputLock
return r
bgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle)
bgProcess p = do
let p' = p
{ P.std_out = rediroutput (P.std_out p)
, P.std_err = rediroutput (P.std_err p)
}
registerOutputThread
(stdin_h, stdout_h, stderr_h, h) <- P.createProcess p'
`onException` unregisterOutputThread
let r =
( stdin_h
, mungeret (P.std_out p) stdout_h
, mungeret (P.std_err p) stderr_h
, h
)
_ <- async $ void $ tryIO $ P.waitForProcess h
outbuf <- setupOutputBuffer StdOut stdout_h
errbuf <- setupOutputBuffer StdErr stderr_h
void $ async $ bufferWriter [outbuf, errbuf]
return r
where
rediroutput ss
| willOutput ss = P.CreatePipe
| otherwise = ss
mungeret ss mh
| willOutput ss = Nothing
| otherwise = mh
willOutput :: P.StdStream -> Bool
willOutput P.Inherit = True
willOutput _ = False
data OutputBuffer = OutputBuffer [OutputBufferedActivity]
deriving (Eq)
data StdHandle = StdOut | StdErr
toHandle :: StdHandle -> Handle
toHandle StdOut = stdout
toHandle StdErr = stderr
bufferFor :: StdHandle -> TMVar OutputBuffer
bufferFor StdOut = outputBuffer globalOutputHandle
bufferFor StdErr = errorBuffer globalOutputHandle
data OutputBufferedActivity
= Output T.Text
| InTempFile
{ tempFile :: FilePath
, endsInNewLine :: Bool
}
deriving (Eq)
data AtEnd = AtEnd
deriving Eq
data BufSig = BufSig
setupOutputBuffer :: StdHandle -> Maybe Handle -> IO (StdHandle, MVar OutputBuffer, TMVar BufSig, TMVar AtEnd)
setupOutputBuffer h fromh = do
buf <- newMVar (OutputBuffer [])
bufsig <- atomically newEmptyTMVar
bufend <- atomically newEmptyTMVar
void $ async $ outputDrainer fromh buf bufsig bufend
return (h, buf, bufsig, bufend)
outputDrainer :: Maybe Handle -> MVar OutputBuffer -> TMVar BufSig -> TMVar AtEnd -> IO ()
outputDrainer mfromh buf bufsig bufend = case mfromh of
Nothing -> atend
Just fromh -> go fromh
where
go fromh = do
t <- T.hGetChunk fromh
if T.null t
then do
atend
hClose fromh
else do
modifyMVar_ buf $ addOutputBuffer (Output t)
changed
go fromh
atend = atomically $ putTMVar bufend AtEnd
changed = atomically $ do
void $ tryTakeTMVar bufsig
putTMVar bufsig BufSig
registerOutputThread :: IO ()
registerOutputThread = do
let v = outputThreads globalOutputHandle
atomically $ putTMVar v . succ =<< takeTMVar v
unregisterOutputThread :: IO ()
unregisterOutputThread = do
let v = outputThreads globalOutputHandle
atomically $ putTMVar v . pred =<< takeTMVar v
bufferWriter :: [(StdHandle, MVar OutputBuffer, TMVar BufSig, TMVar AtEnd)] -> IO ()
bufferWriter ts = do
activitysig <- atomically newEmptyTMVar
worker1 <- async $ lockOutput $
ifM (atomically $ tryPutTMVar activitysig ())
( void $ mapConcurrently displaybuf ts
, noop
)
worker2 <- async $ void $ globalbuf activitysig worker1
void $ async $ do
void $ waitCatch worker1
void $ waitCatch worker2
unregisterOutputThread
where
displaybuf v@(outh, buf, bufsig, bufend) = do
change <- atomically $
(Right <$> takeTMVar bufsig)
`orElse`
(Left <$> takeTMVar bufend)
l <- takeMVar buf
putMVar buf (OutputBuffer [])
emitOutputBuffer outh l
case change of
Right BufSig -> displaybuf v
Left AtEnd -> return ()
globalbuf activitysig worker1 = do
ok <- atomically $ do
ok <- tryPutTMVar activitysig ()
when ok $
mapM_ (\(_outh, _buf, _bufsig, bufend) -> takeTMVar bufend) ts
return ok
when ok $ do
bs <- forM ts $ \(outh, buf, _bufsig, _bufend) ->
(outh,) <$> takeMVar buf
atomically $
forM_ bs $ \(outh, b) ->
bufferOutputSTM' outh b
cancel worker1
addOutputBuffer :: OutputBufferedActivity -> OutputBuffer -> IO OutputBuffer
addOutputBuffer (Output t) (OutputBuffer buf)
| T.length t' <= 1048576 = return $ OutputBuffer (Output t' : other)
| otherwise = do
tmpdir <- getTemporaryDirectory
(tmp, h) <- openTempFile tmpdir "output.tmp"
let !endnl = endsNewLine t'
let i = InTempFile
{ tempFile = tmp
, endsInNewLine = endnl
}
T.hPutStr h t'
hClose h
return $ OutputBuffer (i : other)
where
!t' = T.concat (mapMaybe getOutput this) <> t
!(this, other) = partition isOutput buf
isOutput v = case v of
Output _ -> True
_ -> False
getOutput v = case v of
Output t'' -> Just t''
_ -> Nothing
addOutputBuffer v (OutputBuffer buf) = return $ OutputBuffer (v:buf)
bufferOutputSTM :: Outputable v => StdHandle -> v -> STM ()
bufferOutputSTM h v = bufferOutputSTM' h (OutputBuffer [Output (toOutput v)])
bufferOutputSTM' :: StdHandle -> OutputBuffer -> STM ()
bufferOutputSTM' h (OutputBuffer newbuf) = do
(OutputBuffer buf) <- takeTMVar bv
putTMVar bv (OutputBuffer (newbuf ++ buf))
where
bv = bufferFor h
outputBufferWaiterSTM :: (OutputBuffer -> (OutputBuffer, OutputBuffer)) -> STM (StdHandle, OutputBuffer)
outputBufferWaiterSTM selector = waitgetbuf StdOut `orElse` waitgetbuf StdErr
where
waitgetbuf h = do
let bv = bufferFor h
(selected, rest) <- selector <$> takeTMVar bv
when (selected == OutputBuffer [])
retry
putTMVar bv rest
return (h, selected)
waitAnyBuffer :: OutputBuffer -> (OutputBuffer, OutputBuffer)
waitAnyBuffer b = (b, OutputBuffer [])
waitCompleteLines :: OutputBuffer -> (OutputBuffer, OutputBuffer)
waitCompleteLines (OutputBuffer l) =
let (selected, rest) = span completeline l
in (OutputBuffer selected, OutputBuffer rest)
where
completeline (v@(InTempFile {})) = endsInNewLine v
completeline (Output b) = endsNewLine b
endsNewLine :: T.Text -> Bool
endsNewLine t = not (T.null t) && T.last t == '\n'
emitOutputBuffer :: StdHandle -> OutputBuffer -> IO ()
emitOutputBuffer stdh (OutputBuffer l) =
forM_ (reverse l) $ \ba -> case ba of
Output t -> emit t
InTempFile tmp _ -> do
emit =<< T.readFile tmp
void $ tryWhenExists $ removeFile tmp
where
outh = toHandle stdh
emit t = void $ tryIO $ do
T.hPutStr outh t
hFlush outh