{-# LANGUAGE PatternGuards, BangPatterns, NamedFieldPuns #-}
module BuildBox.Command.System.Internals
( streamIn
, streamOuts)
where
import System.IO
import Control.Concurrent
import Control.Concurrent.STM.TChan
import Control.Monad.STM
import Control.Monad
import Foreign.Ptr
import Data.IORef
import Data.Char
import Data.Word
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Internal as BS
import qualified Data.ByteString as BS
import GHC.IO.Handle.Internals
import GHC.IO.Handle.Types
import GHC.IO.Buffer
import GHC.IO.BufferedIO as Buffered
import Foreign.Marshal.Utils (copyBytes)
streamIn :: Handle -> TChan (Maybe ByteString) -> IO ()
streamIn !hRead !chan
= streamIn' False hRead chan
streamIn'
:: Bool
-> Handle
-> TChan (Maybe ByteString)
-> IO ()
streamIn' !gotNewLine !hRead !chan
= uponM (hIsEOF hRead)
(do when gotNewLine
$ atomically $ writeTChan chan (Just BS.empty)
atomically $ writeTChan chan Nothing
return ())
(do str <- hGetLineNL hRead
if BS.null str
then
streamIn' gotNewLine hRead chan
else do
let hasNewLine
| BS.last str == (fromIntegral $ ord '\n')
= True
| otherwise = False
let str'
| hasNewLine = BS.init str
| otherwise = str
atomically $ writeTChan chan (Just str')
streamIn' hasNewLine hRead chan)
uponM :: Monad m => m Bool -> m a -> m a -> m a
uponM c x y
= do b <- c
if b then x else y
streamOuts :: [(TChan (Maybe ByteString), (Maybe Handle), QSem)] -> IO ()
streamOuts !chans
= streamOuts' False [] chans
where
streamOuts' _ [] []
= return ()
streamOuts' True prev []
= streamOuts' False [] prev
streamOuts' False prev []
= do threadDelay 1000
yield
streamOuts' False [] prev
streamOuts' !active !prev (!x@(!chan, !mHandle, !qsem) : rest)
= do
mStr <- atomically
$ do isEmpty <- isEmptyTChan chan
if isEmpty
then return (False, Nothing)
else do mStr <- readTChan chan
return (True, mStr)
case mStr of
(False, _)
-> streamOuts' active (prev ++ [x]) rest
(True, Nothing)
-> do signalQSem qsem
streamOuts' active prev rest
(True, Just str)
| Just h <- mHandle
-> do BS.hPutStr h str
hPutChar h '\n'
streamOuts' True (prev ++ [x]) rest
| otherwise
-> streamOuts' True (prev ++ [x]) rest
hGetLineNL :: Handle -> IO ByteString
hGetLineNL h =
wantReadableHandle_ "BuildBox.Command.System.Internals" h $
\ h_@Handle__{haByteBuffer} -> do
flushCharReadBuffer h_
buf <- readIORef haByteBuffer
if isEmptyBuffer buf
then fill h_ buf 0 []
else haveBuf h_ buf 0 []
where
fill h_@Handle__{haByteBuffer,haDevice} buf len xss =
len `seq` do
(r,buf') <- Buffered.fillReadBuffer haDevice buf
if r == 0
then do writeIORef haByteBuffer buf{ bufR=0, bufL=0 }
if len > 0
then mkBigPS len xss
else ioe_EOF
else haveBuf h_ buf' len xss
haveBuf h_@Handle__{haByteBuffer}
buf@Buffer{ bufRaw=raw, bufR=w, bufL=r }
len xss =
do
off <- findEOL r w raw
let new_len = len + off - r
xs <- mkPS raw r off
if off /= w
then do
if (w == off + 1)
then writeIORef haByteBuffer buf{ bufL=0, bufR=0 }
else writeIORef haByteBuffer buf{ bufL = off + 1 }
if r == w
then mkBigPS new_len (xs:xss)
else mkBigPS new_len (BS.pack [fromIntegral $ ord '\n'] : xs : xss)
else do
fill h_ buf{ bufL=0, bufR=0 } new_len (xs:xss)
findEOL r w raw
| r == w = return w
| otherwise
= do
c <- readWord8Buf raw r
if c == fromIntegral (ord '\n')
then return r
else findEOL (r+1) w raw
mkPS :: RawBuffer Word8 -> Int -> Int -> IO ByteString
mkPS buf start end =
BS.create len $ \p ->
withRawBuffer buf $ \pbuf -> do
copyBytes p (pbuf `plusPtr` start) len
where
len = end - start
mkBigPS :: Int -> [ByteString] -> IO ByteString
mkBigPS _ [ps] = return ps
mkBigPS _ pss = return $! BS.concat (reverse pss)