module Network.Hadoop.Rpc
( Connection(..)
, Protocol(..)
, User
, Method
, RawRequest
, RawResponse
, initConnectionV7
, invoke
) where
import Control.Applicative ((<$>), (<*>))
import Control.Exception (throwIO)
import Control.Monad.IO.Class (liftIO)
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.IORef
import Data.Monoid (mempty)
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Data.ProtocolBuffers
import Data.ProtocolBuffers.Orphans ()
import Data.Serialize.Get
import Data.Serialize.Put
import Data.Hadoop.Protobuf.Headers
import Data.Hadoop.Types
import qualified Network.Hadoop.Stream as S
import Network.Socket (Socket)
data Connection = Connection
{ cnVersion :: !Int
, cnConfig :: !HadoopConfig
, cnProtocol :: !Protocol
, invokeRaw :: !(Method -> RawRequest -> IO RawResponse)
}
data Protocol = Protocol
{ prName :: !Text
, prVersion :: !Int
} deriving (Eq, Ord, Show)
type Method = Text
type RawRequest = ByteString
type RawResponse = ByteString
initConnectionV7 :: HadoopConfig -> Protocol -> Socket -> IO Connection
initConnectionV7 config@HadoopConfig{..} protocol sock = do
stream <- S.mkSocketStream sock
S.runPut stream $ do
putByteString "hrpc"
putWord8 7
putWord8 80
putWord8 0
let bs = runPut (encodeMessage context)
putWord32be (fromIntegral (B.length bs))
putByteString bs
ref <- newIORef 0
return (Connection 7 config protocol (sendAndWait stream ref))
where
sendAndWait :: S.Stream -> IORef Int -> Method -> ByteString -> IO ByteString
sendAndWait stream ref method requestBytes = do
callId <- atomicModifyIORef' ref (\x -> (succ x, x))
S.runPut stream $ do
let bs = runPut $ encodeLengthPrefixedMessage (requestHeader callId)
>> encodeLengthPrefixedMessage (request method requestBytes)
putWord32be (fromIntegral (B.length bs))
putByteString bs
responseHdr <- S.maybeGet stream decodeLengthPrefixedMessage
case getField . rspStatus <$> responseHdr of
Just Success -> S.runGet stream getResponse
Just _ -> S.runGet stream getError >>= liftIO . throwIO
Nothing -> throwClosed
context = IpcConnectionContext
{ ctxProtocol = putField (Just (prName protocol))
, ctxUserInfo = putField (Just UserInformation
{ effectiveUser = putField (Just hcUser)
, realUser = mempty
})
}
requestHeader callId = RpcRequestHeader
{ reqKind = putField (Just ProtocolBuffer)
, reqOp = putField (Just FinalPacket)
, reqCallId = putField (fromIntegral callId)
}
request method bytes = RpcRequest
{ reqMethodName = putField method
, reqBytes = putField (Just bytes)
, reqProtocolName = putField (prName protocol)
, reqProtocolVersion = putField (fromIntegral (prVersion protocol))
}
throwClosed = throwIO (RemoteError "ConnectionClosed" "The socket connection was closed")
getResponse :: Get ByteString
getResponse = do
n <- fromIntegral <$> getWord32be
getByteString n
getError :: Get RemoteError
getError = RemoteError <$> getText <*> getText
where
getText = do
n <- fromIntegral <$> getWord32be
T.decodeUtf8 <$> getByteString n
invoke :: (Decode b, Encode a) => Connection -> Text -> a -> IO b
invoke Connection{..} method arg = decodeBytes =<< invokeRaw method (encodeBytes arg)
where
encodeBytes = runPut . encodeMessage
decodeBytes bs = case runGetState decodeMessage bs 0 of
Left err -> throwIO (RemoteError "DecodeError" (T.pack err))
Right (x, "") -> return x
Right (_, _) -> throwIO (RemoteError "DecodeError" "decoded response but did not consume enough bytes")