Z-MessagePack-0.4.1.0: MessagePack
Copyright(c) Dong Han 2019
LicenseBSD
Maintainerwinterland1989@gmail.com
Stabilityexperimental
Portabilitynon-portable
Safe HaskellNone
LanguageHaskell2010

Z.IO.RPC.MessagePack

Description

This module provides MessagePack-RPC implementation.

Synopsis

Example

import Data.Maybe
import Z.IO.RPC.MessagePack
import Z.IO.Network
import Data.IORef
import Z.IO
import qualified Z.Data.Text as T
import qualified Z.Data.Vector as V

newtype ServerCtx = ServerCtx { counter :: Int }

main = serveRPC (startTCPServer defaultTCPServerConfig) $ simpleRouter
 [ ("hi", CallHandler $ \ctx (req :: T.Text) -> do
     writeSessionCtx ctx (ServerCtx 0)
     return ("hello, " <> req)
   )
 , ("foo", CallHandler $ \ctx (req :: Int) -> do
     modifySessionCtx ctx (Just . ServerCtx . (+ 1) . counter)
     return (req + 1)
   )
 , ("bar", CallHandler $ \ctx (req :: T.Text) -> do
     counter . fromJust <$> readSessionCtx ctx
   )
 , ("qux", StreamHandler $ \ctx eofRef (_ :: ()) -> do
     withMVar stdinBuf (\ stdin -> pure $ \ k _ -> do
       eof <- readIORef eofRef
       if eof
       then k EOF
       else do
           r <- readBuffer stdin
           if V.null r
           then k EOF
           else k (Just r))
   )
 ]
import Data.Maybe
import Z.IO.RPC.MessagePack
import Z.IO.Network
import Data.IORef
import Z.IO
import qualified Z.Data.Text as T
import qualified Z.Data.Vector as V

main = withResource (initTCPClient defaultTCPClientConfig) $ \ uvs -> do
  c <- rpcClient uvs
  -- single call
  r <- call @T.Text @T.Text c "hi" "Alice"
  print r

  _ <- call @Int @Int c "foo" 1
  _ <- call @Int @Int c "foo" 1
  x <- call @T.Text @Int c "bar" ""
  print x

  -- streaming result
  (_, src) <- callStream c "qux" ()
  runBIO_ $ src . sinkToIO (\ b -> withMVar stdoutBuf (\ bo -> do
    writeBuffer bo b
    flushBuffer bo))

Server

type ServerLoop = (UVStream -> IO ()) -> IO () Source #

data ServerHandler a where Source #

Constructors

CallHandler :: (MessagePack req, MessagePack res) => (SessionCtx a -> req -> IO res) -> ServerHandler a 
NotifyHandler :: MessagePack req => (SessionCtx a -> req -> IO ()) -> ServerHandler a 
StreamHandler :: (MessagePack req, MessagePack res) => (SessionCtx a -> IORef Bool -> req -> IO (Source res)) -> ServerHandler a

StreamHandler will receive an IORef which get updated to True when client send stream end packet, stream should end up ASAP.

modifySessionCtx :: SessionCtx a -> (a -> Maybe a) -> IO () Source #

Try to modify SessionCtx if it has.

Note that you can set the modifier function to return Nothing to clear SessionCtx.

serveRPC :: ServerLoop -> ServerService a -> IO () Source #

Serve a RPC service.

serveRPC' Source #

Arguments

:: ServerLoop 
-> Int

recv buffer size

-> Int

send buffer size

-> ServerService a 
-> IO () 

Serve a RPC service with more control.

simpleRouter :: [(Text, ServerHandler a)] -> ServerService a Source #

Simple router using FlatMap, lookup name in O(log(N)).

import Z.IO.PRC.MessagePack
import Z.IO.Network
import Z.IO

serveRPC (startTCPServer defaultTCPServerConfig) . simpleRouter $
 [ ("foo", CallHandler $ \ ctx req -> do
     ... )
 , ("bar", CallHandler $ \ ctx req -> do
     ... )
 ]

Client

data Client Source #

Constructors

Client 

Fields

rpcClient :: (Input dev, Output dev) => dev -> IO Client Source #

Open a RPC client from input/output device.

rpcClient' Source #

Arguments

:: (Input i, Output o) 
=> i 
-> o 
-> Int

recv buffer size

-> Int

send buffer size

-> IO Client 

Open a RPC client with more control.

call :: (MessagePack req, MessagePack res, HasCallStack) => Client -> Text -> req -> IO res Source #

Send a single RPC call and get result.

notify :: (MessagePack req, HasCallStack) => Client -> Text -> req -> IO () Source #

Send a single notification RPC call without getting result.

Pipeline

type PipelineResult = FlatIntMap Value Source #

callPipeline :: HasCallStack => MessagePack req => Client -> Text -> req -> IO PipelineId Source #

Make a call inside a pipeline, which will be sent in batch when execPipeline.

 ...
 fooId <- callPipeline client "foo" $ ...
 barId <- callPipeline client "bar" $ ...
 notifyPipeline client "qux" $ ...

 r <- execPipeline client

 fooResult <- fetchPipeline fooId r
 barResult <- fetchPipeline barId r

notifyPipeline :: HasCallStack => MessagePack req => Client -> Text -> req -> IO () Source #

Make a notify inside a pipeline, which will be sent in batch when execPipeline.

Notify calls doesn't affect execution's result.

execPipeline :: HasCallStack => Client -> IO PipelineResult Source #

Sent request in batch and get result in a map identified by PipelineId.

fetchPipeline :: HasCallStack => MessagePack res => PipelineId -> PipelineResult -> IO res Source #

Use the PipelineId returned when callPipeline to fetch call's result.

callStream :: (MessagePack req, MessagePack res, HasCallStack) => Client -> Text -> req -> IO (IO (), Source res) Source #

Call a stream method, no other call or notify should be sent until returned stream is consumed completely.

This is implemented by extend MessagePack-RPC protocol by adding following new message types:

-- start stream request
[typ 0x04, name, param]

-- stop stream request
[typ 0x05]

-- each stream response
[typ 0x06, err, value]

-- stream response end
[typ 0x07]

The return tuple is a pair of a stop action and a Source, to terminate stream early, call the stop action. Please continue consuming until EOF reached, otherwise the state of the Client will be incorrect.

Misc

data Request a Source #

Constructors

Notify (Text, a) 
Call (Int64, Text, a) 
StreamStart (Text, a) 

Instances

Instances details
Show a => Show (Request a) Source # 
Instance details

Defined in Z.IO.RPC.MessagePack

Methods

showsPrec :: Int -> Request a -> ShowS #

show :: Request a -> String #

showList :: [Request a] -> ShowS #

data RPCException Source #

Exception thrown when remote endpoint return errors.