Z-MessagePack-0.3.0.1: MessagePack
Copyright(c) Dong Han 2019
LicenseBSD
Maintainerwinterland1989@gmail.com
Stabilityexperimental
Portabilitynon-portable
Safe HaskellNone
LanguageHaskell2010

Z.IO.RPC.MessagePack

Description

This module provides MessagePack-RPC implementation.

-- server
import Z.IO.RPC.MessagePack
import Z.IO.Network
import Z.IO
import qualified Z.Data.Text as T

serveRPC (startTCPServer defaultTCPServerConfig) . simpleRouter $
 [ ("foo", CallHandler $ \ (req :: Int) -> do
     return (req + 1))
 , ("bar", NotifyHandler $ \ (req :: T.Text) -> do
     printStd (req <> "world"))
 , ("qux", StreamHandler $ \ (_ :: ()) -> do
    withMVar stdinBuf (pure . sourceFromBuffered))
 ]

-- client
import Z.IO.RPC.MessagePack
import Z.IO.Network
import Z.IO
import qualified Z.Data.Text as T
import qualified Z.Data.Vector as V

withResource (initTCPClient defaultTCPClientConfig) $ \ uvs -> do
    c <- rpcClient uvs
    -- single call
    call @Int @Int c "foo" 1
    -- notify without result
    notify @T.Text c "bar" "hello"
    -- streaming result
    (_, src) <- callStream c "qux" ()
    runBIO $ src >|> sinkToIO (\ b -> withMVar stdoutBuf (\ bo -> do
        writeBuffer bo b
        flushBuffer bo))

Synopsis

Documentation

rpcClient :: (Input dev, Output dev) => dev -> IO Client Source #

Open a RPC client from input/output device.

rpcClient' Source #

Arguments

:: (Input i, Output o) 
=> i 
-> o 
-> Int

recv buffer size

-> Int

send buffer size

-> IO Client 

Open a RPC client with more control.

call :: (MessagePack req, MessagePack res, HasCallStack) => Client -> Text -> req -> IO res Source #

Send a single RPC call and get result.

notify :: (MessagePack req, HasCallStack) => Client -> Text -> req -> IO () Source #

Send a single notification RPC call without getting result.

callPipeline :: HasCallStack => MessagePack req => Client -> Text -> req -> IO PipelineId Source #

Make a call inside a pipeline, which will be sent in batch when execPipeline.

 ...
 fooId <- callPipeline client "foo" $ ...
 barId <- callPipeline client "bar" $ ...
 notifyPipeline client "qux" $ ...

 r <- execPipeline client

 fooResult <- fetchPipeline fooId r
 barResult <- fetchPipeline barId r

notifyPipeline :: HasCallStack => MessagePack req => Client -> Text -> req -> IO () Source #

Make a notify inside a pipeline, which will be sent in batch when execPipeline.

Notify calls doesn't affect execution's result.

data RPCException Source #

Exception thrown when remote endpoint return errors.

execPipeline :: HasCallStack => Client -> IO PipelineResult Source #

Sent request in batch and get result in a map identified by PipelineId.

fetchPipeline :: HasCallStack => MessagePack res => PipelineId -> PipelineResult -> IO res Source #

Use the PipelineId returned when callPipeline to fetch call's result.

callStream :: (MessagePack req, MessagePack res, HasCallStack) => Client -> Text -> req -> IO (IO (), Source res) Source #

Call a stream method, no other call or notify should be sent until returned stream is consumed completely.

This is implemented by extend MessagePack-RPC protocol by adding following new message types:

-- start stream request
[typ 0x04, name, param]

-- stop stream request
[typ 0x05]

-- each stream response
[typ 0x06, err, value]

-- stream response end
[typ 0x07]

The return tuple is a pair of a stop action and a Source, to terminate stream early, call the stop action. Please continue consuming until EOF reached, otherwise the state of the Client will be incorrect.

type ServerLoop = (UVStream -> IO ()) -> IO () Source #

data ServerHandler where Source #

Constructors

CallHandler :: (MessagePack req, MessagePack res) => (req -> IO res) -> ServerHandler 
NotifyHandler :: MessagePack req => (req -> IO ()) -> ServerHandler 
StreamHandler :: (MessagePack req, MessagePack res) => (req -> IO (Source res)) -> ServerHandler 

simpleRouter :: [(Text, ServerHandler)] -> ServerService Source #

Simple router using FlatMap, lookup name in O(log(N)).

import Z.IO.PRC.MessagePack
import Z.IO.Network
import Z.IO

serveRPC (startTCPServer defaultTCPServerConfig) . simpleRouter $
 [ ("foo", CallHandler $ \ req -> do
     ... )
 , ("bar", CallHandler $ \ req -> do
     ... )
 ]

serveRPC :: ServerLoop -> ServerService -> IO () Source #

Serve a RPC service.

data Request a Source #

Constructors

Notify (Text, a) 
Call (Int64, Text, a) 
StreamStart (Text, a) 

Instances

Instances details
Show a => Show (Request a) Source # 
Instance details

Defined in Z.IO.RPC.MessagePack

Methods

showsPrec :: Int -> Request a -> ShowS #

show :: Request a -> String #

showList :: [Request a] -> ShowS #

serveRPC' Source #

Arguments

:: ServerLoop 
-> Int

recv buffer size

-> Int

send buffer size

-> ServerService 
-> IO () 

Serve a RPC service with more control.