{-# LANGUAGE ScopedTypeVariables #-}
module What4.Utils.HandleReader where
import Control.Monad (unless)
import Data.IORef
import Data.Text (Text)
import qualified Data.Text as Text
import qualified Data.Text.Lazy as LazyText
import qualified Data.Text.IO as Text
import Control.Exception(bracket,catch,IOException)
import Control.Concurrent(ThreadId,forkIO,killThread)
import Control.Concurrent.Chan(Chan,newChan,readChan,writeChan)
import System.IO(Handle,hClose)
import System.IO.Streams( OutputStream, InputStream )
import qualified System.IO.Streams as Streams
teeInputStream :: InputStream a -> OutputStream a -> IO (InputStream a)
teeInputStream i o = Streams.makeInputStream go
where
go = do x <- Streams.read i
Streams.write x o
return x
teeOutputStream :: OutputStream a -> OutputStream a -> IO (OutputStream a)
teeOutputStream o aux = Streams.makeOutputStream go
where
go x =
do Streams.write x aux
Streams.write x o
lineBufferedOutputStream :: Text -> OutputStream Text -> IO (OutputStream Text)
lineBufferedOutputStream prefix out =
do ref <- newIORef mempty
Streams.makeOutputStream (con ref)
where
newl = Text.pack "\n"
con ref mx =
do start <- readIORef ref
case mx of
Nothing ->
do unless (Text.null start) (Streams.write (Just (prefix <> start)) out)
Streams.write Nothing out
Just x -> go ref (start <> x)
go ref x =
let (ln, x') = Text.break (== '\n') x in
if Text.null x' then
do Streams.write (Just mempty) out
writeIORef ref x
else
do Streams.write (Just (prefix <> ln <> newl)) out
go ref (Text.drop 1 x')
demuxProcessHandles ::
Handle ->
Handle ->
Handle ->
Maybe (Text, Handle) ->
IO ( OutputStream Text, InputStream Text, HandleReader )
demuxProcessHandles in_h out_h err_h Nothing =
do in_str <- Streams.encodeUtf8 =<< Streams.handleToOutputStream in_h
out_str <- Streams.decodeUtf8 =<< Streams.handleToInputStream out_h
err_reader <- startHandleReader err_h Nothing
return (in_str, out_str, err_reader)
demuxProcessHandles in_h out_h err_h (Just (comment_prefix, aux_h)) =
do aux_str <- Streams.lockingOutputStream =<< Streams.encodeUtf8 =<< Streams.handleToOutputStream aux_h
in_str <- Streams.encodeUtf8 =<< Streams.handleToOutputStream in_h
out_str <- Streams.decodeUtf8 =<< Streams.handleToInputStream out_h
in_aux <- lineBufferedOutputStream mempty aux_str
in_str' <- teeOutputStream in_str in_aux
out_aux <- lineBufferedOutputStream comment_prefix aux_str
out_str' <- teeInputStream out_str out_aux
err_reader <- startHandleReader err_h . Just
=<< lineBufferedOutputStream comment_prefix aux_str
return (in_str', out_str', err_reader)
data HandleReader = HandleReader { hrChan :: !(Chan (Maybe Text))
, hrHandle :: !Handle
, hrThreadId :: !ThreadId
}
streamLines :: Chan (Maybe Text) -> Handle -> Maybe (OutputStream Text) -> IO ()
streamLines c h Nothing = go
where
go = do ln <- Text.hGetLine h
writeChan c (Just ln)
go
streamLines c h (Just auxstr) = go
where
go = do ln <- Text.hGetLine h
Streams.write (Just ln) auxstr
writeChan c (Just ln)
go
startHandleReader :: Handle -> Maybe (OutputStream Text) -> IO HandleReader
startHandleReader h auxOutput = do
c <- newChan
let handle_err (_e :: IOException) = writeChan c Nothing
tid <- forkIO $ streamLines c h auxOutput `catch` handle_err
return $! HandleReader { hrChan = c
, hrHandle = h
, hrThreadId = tid
}
stopHandleReader :: HandleReader -> IO ()
stopHandleReader hr = do
killThread (hrThreadId hr)
hClose (hrHandle hr)
withHandleReader :: Handle -> Maybe (OutputStream Text) -> (HandleReader -> IO a) -> IO a
withHandleReader h auxOut = bracket (startHandleReader h auxOut) stopHandleReader
readNextLine :: HandleReader -> IO (Maybe Text)
readNextLine hr = do
mr <- readChan (hrChan hr)
case mr of
Nothing -> writeChan (hrChan hr) Nothing
Just{} -> return()
return mr
readAllLines :: HandleReader -> IO LazyText.Text
readAllLines hr = go LazyText.empty
where go :: LazyText.Text -> IO LazyText.Text
go prev = do
mr <- readNextLine hr
case mr of
Nothing -> return prev
Just e -> go $! prev `LazyText.append` (LazyText.fromStrict e)
`LazyText.snoc` '\n'