module IdeSession.RPC.Server
( rpcServer
, concurrentConversation
, RpcConversation(..)
) where
import Prelude hiding (take)
import System.IO
( Handle
, hSetBinaryMode
, hSetBuffering
, BufferMode(BlockBuffering)
)
import System.Posix.Types (Fd)
import System.Posix.IO (closeFd, fdToHandle)
import Control.Monad (void)
import qualified Control.Exception as Ex
import Control.Concurrent (threadDelay)
import Control.Concurrent.Chan (Chan, newChan, writeChan)
import qualified Data.ByteString.Lazy.Char8 as BSL
import Control.Concurrent.Async (Async, async)
import Data.Binary (encode, decode)
import IdeSession.Util.BlockingOps (readChan, wait, waitAny)
import IdeSession.RPC.API
import IdeSession.RPC.Stream
rpcServer :: (FilePath -> RpcConversation -> IO ())
-> [String]
-> IO ()
rpcServer handler args = do
let readFd :: String -> Fd
readFd fd = fromIntegral (read fd :: Int)
let errorLog : fds = args
[requestR, requestW, responseR, responseW] = map readFd fds
closeFd requestW
closeFd responseR
requestR' <- fdToHandle requestR
responseW' <- fdToHandle responseW
rpcServer' requestR' responseW' errorLog handler
concurrentConversation :: FilePath
-> FilePath
-> FilePath
-> (FilePath -> RpcConversation -> IO ())
-> IO ()
concurrentConversation requestR responseW errorLog server = do
hin <- openPipeForReading requestR timeout
hout <- openPipeForWriting responseW timeout
rpcServer' hin hout errorLog server
where
timeout :: Int
timeout = maxBound
rpcServer' :: Handle
-> Handle
-> FilePath
-> (FilePath -> RpcConversation -> IO ())
-> IO ()
rpcServer' hin hout errorLog server = do
requests <- newChan :: IO (Chan BSL.ByteString)
responses <- newChan :: IO (Chan (Maybe BSL.ByteString))
setBinaryBlockBuffered [hin, hout]
(reader, writer, handler) <- Ex.mask $ \restore -> do
reader <- async $ readRequests restore hin requests
writer <- async $ writeResponses restore responses hout
handler <- async $ channelHandler restore requests responses (server errorLog)
return (reader, writer, handler)
(_thread, ev) <- $waitAny [reader, writer, handler]
case ev of
LostConnection ex ->
tryShowException (Just ex)
ReaderThreadTerminated ->
return ()
WriterThreadTerminated ->
error "The impossible happened"
ServerThreadTerminated ->
tryShowException =<< flushResponses responses writer
ServerThreadAborted ex -> do
tryShowException (Just ex)
void $ flushResponses responses writer
threadDelay 100000
where
tryShowException :: Maybe Ex.SomeException -> IO ()
tryShowException (Just ex) =
ignoreIOExceptions $ appendFile errorLog (show ex)
tryShowException Nothing =
return ()
data ServerEvent =
ReaderThreadTerminated
| WriterThreadTerminated
| ServerThreadTerminated
| ServerThreadAborted Ex.SomeException
| LostConnection Ex.SomeException
deriving Show
readRequests :: Restore -> Handle -> Chan BSL.ByteString -> IO ServerEvent
readRequests restore h ch =
Ex.handle (return . LostConnection)
(restore (newStream h >>= go))
where
go :: Stream Request -> IO ServerEvent
go input = do
req <- nextInStream input
case req of
Request req' -> writeChan ch (unIncBS req') >> go input
RequestShutdown -> return ReaderThreadTerminated
writeResponses :: Restore -> Chan (Maybe BSL.ByteString) -> Handle -> IO ServerEvent
writeResponses restore ch h =
Ex.handle (return . LostConnection)
(restore go)
where
go :: IO ServerEvent
go = do
mbs <- $readChan ch
case mbs of
Just bs -> do hPutFlush h $ encode (Response (IncBS bs)) ; go
Nothing -> return WriterThreadTerminated
flushResponses :: Chan (Maybe BSL.ByteString) -> Async ServerEvent -> IO (Maybe Ex.SomeException)
flushResponses responses writer = do
writeChan responses Nothing
ev <- $wait writer
case ev of
WriterThreadTerminated ->
return Nothing
LostConnection ex ->
return (Just ex)
_ ->
error "the impossible happened"
channelHandler :: Restore
-> Chan BSL.ByteString
-> Chan (Maybe BSL.ByteString)
-> (RpcConversation -> IO ())
-> IO ServerEvent
channelHandler restore requests responses server =
Ex.handle (return . ServerThreadAborted)
(restore go)
where
go :: IO ServerEvent
go = do
server RpcConversation {
get = $readChan requests >>= Ex.evaluate . decode
, put = writeChan responses . Just . encode
}
return ServerThreadTerminated
type Restore = forall a. IO a -> IO a
setBinaryBlockBuffered :: [Handle] -> IO ()
setBinaryBlockBuffered =
mapM_ $ \h -> do hSetBinaryMode h True
hSetBuffering h (BlockBuffering Nothing)