Copyright | (c) Dong Han 2019 |
---|---|
License | BSD |
Maintainer | winterland1989@gmail.com |
Stability | experimental |
Portability | non-portable |
Safe Haskell | None |
Language | Haskell2010 |
This module provides MessagePack-RPC implementation.
-- server import Data.IORef import Z.IO.RPC.MessagePack import Z.IO.Network import Z.IO import qualified Z.Data.Text as T newtype ServerCtx = ServerCtx { counter :: Int } serveRPC (startTCPServer defaultTCPServerConfig) (ServerCtx 0) $ simpleRouter [ ("foo", CallHandler $ \ ctx (req :: Int) -> do modifyIORef ctx (ServerCtx . (+ 1) . counter) return (req + 1)) , ("bar", NotifyHandler $ \ ctx (req :: T.Text) -> do printStd (req <> "world")) , ("qux", StreamHandler $ \ ctx (_ :: ()) -> do withMVar stdinBuf (pure . sourceFromBuffered)) ] -- client import Z.IO.RPC.MessagePack import Z.IO.Network import Z.IO import qualified Z.Data.Text as T import qualified Z.Data.Vector as V withResource (initTCPClient defaultTCPClientConfig) $ \ uvs -> do c <- rpcClient uvs -- single call call @Int @Int c "foo" 1 -- notify without result notify @T.Text c "bar" "hello" -- streaming result (_, src) <- callStream c "qux" () runBIO $ src >|> sinkToIO (\ b -> withMVar stdoutBuf (\ bo -> do writeBuffer bo b flushBuffer bo))
Synopsis
- data Client = Client {}
- rpcClient :: (Input dev, Output dev) => dev -> IO Client
- rpcClient' :: (Input i, Output o) => i -> o -> Int -> Int -> IO Client
- call :: (MessagePack req, MessagePack res, HasCallStack) => Client -> Text -> req -> IO res
- notify :: (MessagePack req, HasCallStack) => Client -> Text -> req -> IO ()
- type PipelineId = Int
- type PipelineResult = FlatIntMap Value
- callPipeline :: HasCallStack => MessagePack req => Client -> Text -> req -> IO PipelineId
- notifyPipeline :: HasCallStack => MessagePack req => Client -> Text -> req -> IO ()
- data RPCException
- execPipeline :: HasCallStack => Client -> IO PipelineResult
- fetchPipeline :: HasCallStack => MessagePack res => PipelineId -> PipelineResult -> IO res
- callStream :: (MessagePack req, MessagePack res, HasCallStack) => Client -> Text -> req -> IO (IO (), Source res)
- type ServerLoop = (UVStream -> IO ()) -> IO ()
- type ServerService a = Text -> Maybe (ServerHandler a)
- type ServerCtx a = IORef a
- data ServerHandler a where
- CallHandler :: (MessagePack req, MessagePack res) => (ServerCtx a -> req -> IO res) -> ServerHandler a
- NotifyHandler :: MessagePack req => (ServerCtx a -> req -> IO ()) -> ServerHandler a
- StreamHandler :: (MessagePack req, MessagePack res) => (ServerCtx a -> req -> IO (Source res)) -> ServerHandler a
- simpleRouter :: [(Text, ServerHandler a)] -> ServerService a
- serveRPC :: ServerLoop -> a -> ServerService a -> IO ()
- data Request a
- serveRPC' :: ServerLoop -> Int -> Int -> ServerCtx a -> ServerService a -> IO ()
Documentation
rpcClient :: (Input dev, Output dev) => dev -> IO Client Source #
Open a RPC client from input/output device.
Open a RPC client with more control.
call :: (MessagePack req, MessagePack res, HasCallStack) => Client -> Text -> req -> IO res Source #
Send a single RPC call and get result.
notify :: (MessagePack req, HasCallStack) => Client -> Text -> req -> IO () Source #
Send a single notification RPC call without getting result.
type PipelineId = Int Source #
type PipelineResult = FlatIntMap Value Source #
callPipeline :: HasCallStack => MessagePack req => Client -> Text -> req -> IO PipelineId Source #
Make a call inside a pipeline, which will be sent in batch when execPipeline
.
... fooId <- callPipeline client "foo" $ ... barId <- callPipeline client "bar" $ ... notifyPipeline client "qux" $ ... r <- execPipeline client fooResult <- fetchPipeline fooId r barResult <- fetchPipeline barId r
notifyPipeline :: HasCallStack => MessagePack req => Client -> Text -> req -> IO () Source #
Make a notify inside a pipeline, which will be sent in batch when execPipeline
.
Notify calls doesn't affect execution's result.
data RPCException Source #
Exception thrown when remote endpoint return errors.
Instances
Show RPCException Source # | |
Defined in Z.IO.RPC.MessagePack showsPrec :: Int -> RPCException -> ShowS # show :: RPCException -> String # showList :: [RPCException] -> ShowS # | |
Exception RPCException Source # | |
Defined in Z.IO.RPC.MessagePack |
execPipeline :: HasCallStack => Client -> IO PipelineResult Source #
Sent request in batch and get result in a map identified by PipelineId
.
fetchPipeline :: HasCallStack => MessagePack res => PipelineId -> PipelineResult -> IO res Source #
Use the PipelineId
returned when callPipeline
to fetch call's result.
callStream :: (MessagePack req, MessagePack res, HasCallStack) => Client -> Text -> req -> IO (IO (), Source res) Source #
Call a stream method, no other call
or notify
should be sent until
returned stream is consumed completely.
This is implemented by extend MessagePack-RPC protocol by adding following new message types:
-- start stream request [typ 0x04, name, param] -- stop stream request [typ 0x05] -- each stream response [typ 0x06, err, value] -- stream response end [typ 0x07]
The return tuple is a pair of a stop action and a Source
, to terminate stream early, call the
stop action. Please continue consuming until EOF reached,
otherwise the state of the Client
will be incorrect.
type ServerService a = Text -> Maybe (ServerHandler a) Source #
data ServerHandler a where Source #
CallHandler :: (MessagePack req, MessagePack res) => (ServerCtx a -> req -> IO res) -> ServerHandler a | |
NotifyHandler :: MessagePack req => (ServerCtx a -> req -> IO ()) -> ServerHandler a | |
StreamHandler :: (MessagePack req, MessagePack res) => (ServerCtx a -> req -> IO (Source res)) -> ServerHandler a |
simpleRouter :: [(Text, ServerHandler a)] -> ServerService a Source #
Simple router using FlatMap
, lookup name in O(log(N)).
import Z.IO.PRC.MessagePack import Z.IO.Network import Z.IO serveRPC (startTCPServer defaultTCPServerConfig) . simpleRouter $ [ ("foo", CallHandler $ \ ctx req -> do ... ) , ("bar", CallHandler $ \ ctx req -> do ... ) ]
serveRPC :: ServerLoop -> a -> ServerService a -> IO () Source #
Serve a RPC service.
:: ServerLoop | |
-> Int | recv buffer size |
-> Int | send buffer size |
-> ServerCtx a | |
-> ServerService a | |
-> IO () |
Serve a RPC service with more control.