module Z.IO.RPC.MessagePack
(
ServerLoop
, ServerService
, ServerHandler (..)
, SessionCtx
, readSessionCtx
, writeSessionCtx
, clearSessionCtx
, modifySessionCtx
, serveRPC
, serveRPC'
, simpleRouter
, Client (..)
, rpcClient
, rpcClient'
, call
, notify
, PipelineId
, PipelineResult
, callPipeline
, notifyPipeline
, execPipeline
, fetchPipeline
, callStream
, Request (..)
, RPCException (..)
) where
import Control.Concurrent
import Control.Monad
import Data.Bits
import Data.IORef
import Data.Int
import Z.Data.MessagePack (MessagePack)
import qualified Z.Data.MessagePack as MP
import qualified Z.Data.MessagePack.Builder as MB
import qualified Z.Data.MessagePack.Value as MV
import qualified Z.Data.Parser as P
import Z.Data.PrimRef.PrimIORef
import qualified Z.Data.Text as T
import qualified Z.Data.Vector as V
import qualified Z.Data.Vector.FlatIntMap as FIM
import qualified Z.Data.Vector.FlatMap as FM
import Z.IO
import Z.IO.Network
data Client = Client
{ Client -> Counter
_clientSeqRef :: Counter
, Client -> Counter
_clientPipelineReqNum :: Counter
, Client -> BufferedInput
_clientBufferedInput :: BufferedInput
, Client -> BufferedOutput
_clientBufferedOutput :: BufferedOutput
}
rpcClient :: (Input dev, Output dev) => dev -> IO Client
rpcClient :: dev -> IO Client
rpcClient dev
uvs = dev -> dev -> Int -> Int -> IO Client
forall i o.
(Input i, Output o) =>
i -> o -> Int -> Int -> IO Client
rpcClient' dev
uvs dev
uvs Int
V.defaultChunkSize Int
V.defaultChunkSize
rpcClient' :: (Input i, Output o)
=> i
-> o
-> Int
-> Int
-> IO Client
rpcClient' :: i -> o -> Int -> Int -> IO Client
rpcClient' i
i o
o Int
recvBufSiz Int
sendBufSiz = do
Counter
seqRef <- Int -> IO Counter
newCounter Int
0
Counter
reqNum <- Int -> IO Counter
newCounter Int
0
BufferedInput
bi <- Int -> i -> IO BufferedInput
forall i. Input i => Int -> i -> IO BufferedInput
newBufferedInput' Int
recvBufSiz i
i
BufferedOutput
bo <- Int -> o -> IO BufferedOutput
forall o. Output o => Int -> o -> IO BufferedOutput
newBufferedOutput' Int
sendBufSiz o
o
Client -> IO Client
forall (m :: * -> *) a. Monad m => a -> m a
return (Counter -> Counter -> BufferedInput -> BufferedOutput -> Client
Client Counter
seqRef Counter
reqNum BufferedInput
bi BufferedOutput
bo)
call:: (MessagePack req, MessagePack res, HasCallStack) => Client -> T.Text -> req -> IO res
call :: Client -> Text -> req -> IO res
call Client
cli Text
name req
req = do
Int
msgid <- Client -> Text -> req -> IO Int
forall req.
(HasCallStack, MessagePack req) =>
Client -> Text -> req -> IO Int
callPipeline Client
cli Text
name req
req
Int -> PipelineResult -> IO res
forall res.
(HasCallStack, MessagePack res) =>
Int -> PipelineResult -> IO res
fetchPipeline Int
msgid (PipelineResult -> IO res) -> IO PipelineResult -> IO res
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< HasCallStack => Client -> IO PipelineResult
Client -> IO PipelineResult
execPipeline Client
cli
notify :: (MessagePack req, HasCallStack)=> Client -> T.Text -> req -> IO ()
notify :: Client -> Text -> req -> IO ()
notify c :: Client
c@(Client Counter
_ Counter
_ BufferedInput
_ BufferedOutput
bo) Text
name req
req = Client -> Text -> req -> IO ()
forall req.
(HasCallStack, MessagePack req) =>
Client -> Text -> req -> IO ()
notifyPipeline Client
c Text
name req
req IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> HasCallStack => BufferedOutput -> IO ()
BufferedOutput -> IO ()
flushBuffer BufferedOutput
bo
type PipelineId = Int
type PipelineResult = FIM.FlatIntMap MV.Value
callPipeline :: HasCallStack => MessagePack req => Client -> T.Text -> req -> IO PipelineId
callPipeline :: Client -> Text -> req -> IO Int
callPipeline (Client Counter
seqRef Counter
reqNum BufferedInput
_ BufferedOutput
bo) Text
name req
req = do
Int
x <- Counter -> IO Int
forall a. Prim a => PrimIORef a -> IO a
readPrimIORef Counter
reqNum
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
x Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== (-Int
1)) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ RPCException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (CallStack -> RPCException
RPCStreamUnconsumed CallStack
HasCallStack => CallStack
callStack)
Counter -> Int -> IO ()
forall a. Prim a => PrimIORef a -> a -> IO ()
writePrimIORef Counter
reqNum (Int
xInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1)
Int
msgid <- Counter -> IO Int
forall a. Prim a => PrimIORef a -> IO a
readPrimIORef Counter
seqRef
Counter -> Int -> IO ()
forall a. Prim a => PrimIORef a -> a -> IO ()
writePrimIORef Counter
seqRef (Int
msgidInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1)
let !msgid' :: Int
msgid' = Int
msgid Int -> Int -> Int
forall a. Bits a => a -> a -> a
.&. Int
0xFFFFFFFF
BufferedOutput -> Builder () -> IO ()
forall a. HasCallStack => BufferedOutput -> Builder a -> IO ()
writeBuilder BufferedOutput
bo (Builder () -> IO ()) -> Builder () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Int -> Builder ()
MB.arrayHeader Int
4
Int64 -> Builder ()
MB.int Int64
0
Int64 -> Builder ()
MB.int (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
msgid')
Text -> Builder ()
MB.str Text
name
req -> Builder ()
forall a. MessagePack a => a -> Builder ()
MP.encodeMessagePack req
req
Int -> IO Int
forall (m :: * -> *) a. Monad m => a -> m a
return Int
msgid'
notifyPipeline :: HasCallStack => MessagePack req => Client -> T.Text -> req -> IO ()
notifyPipeline :: Client -> Text -> req -> IO ()
notifyPipeline (Client Counter
_ Counter
reqNum BufferedInput
_ BufferedOutput
bo) Text
name req
req = do
Int
x <- Counter -> IO Int
forall a. Prim a => PrimIORef a -> IO a
readPrimIORef Counter
reqNum
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
x Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== (-Int
1)) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ RPCException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (CallStack -> RPCException
RPCStreamUnconsumed CallStack
HasCallStack => CallStack
callStack)
BufferedOutput -> Builder () -> IO ()
forall a. HasCallStack => BufferedOutput -> Builder a -> IO ()
writeBuilder BufferedOutput
bo (Builder () -> IO ()) -> Builder () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Int -> Builder ()
MB.arrayHeader Int
3
Int64 -> Builder ()
MB.int Int64
2
Text -> Builder ()
MB.str Text
name
req -> Builder ()
forall a. MessagePack a => a -> Builder ()
MP.encodeMessagePack req
req
data RPCException
= RPCStreamUnconsumed CallStack
| RPCException MV.Value CallStack
deriving Int -> RPCException -> ShowS
[RPCException] -> ShowS
RPCException -> String
(Int -> RPCException -> ShowS)
-> (RPCException -> String)
-> ([RPCException] -> ShowS)
-> Show RPCException
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [RPCException] -> ShowS
$cshowList :: [RPCException] -> ShowS
show :: RPCException -> String
$cshow :: RPCException -> String
showsPrec :: Int -> RPCException -> ShowS
$cshowsPrec :: Int -> RPCException -> ShowS
Show
instance Exception RPCException
execPipeline :: HasCallStack => Client -> IO PipelineResult
execPipeline :: Client -> IO PipelineResult
execPipeline (Client Counter
_ Counter
reqNum BufferedInput
bi BufferedOutput
bo) = do
HasCallStack => BufferedOutput -> IO ()
BufferedOutput -> IO ()
flushBuffer BufferedOutput
bo
Int
x <- Counter -> IO Int
forall a. Prim a => PrimIORef a -> IO a
readPrimIORef Counter
reqNum
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
x Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== (-Int
1)) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ RPCException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (CallStack -> RPCException
RPCStreamUnconsumed CallStack
HasCallStack => CallStack
callStack)
Counter -> Int -> IO ()
forall a. Prim a => PrimIORef a -> a -> IO ()
writePrimIORef Counter
reqNum Int
0
Int -> [IPair Value] -> PipelineResult
forall v. Int -> [IPair v] -> FlatIntMap v
FIM.packN Int
x ([IPair Value] -> PipelineResult)
-> IO [IPair Value] -> IO PipelineResult
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> IO (IPair Value) -> IO [IPair Value]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
x (do
(Int64
msgid, Value
err, Value
v) <- Parser (Int64, Value, Value)
-> BufferedInput -> IO (Int64, Value, Value)
forall a. HasCallStack => Parser a -> BufferedInput -> IO a
readParser (do
Word8
tag <- Parser Word8
P.anyWord8
Bool -> Parser () -> Parser ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word8
tag Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
/= Word8
0x94) (Text -> Parser ()
forall a. Text -> Parser a
P.fail' (Text -> Parser ()) -> Text -> Parser ()
forall a b. (a -> b) -> a -> b
$ Text
"wrong response tag: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Word8 -> Text
forall a. Print a => a -> Text
T.toText Word8
tag)
!Value
typ <- Parser Value
MV.value
!Value
seq_ <- Parser Value
MV.value
!Value
err <- Parser Value
MV.value
!Value
v <- Parser Value
MV.value
case Value
typ of
MV.Int Int64
1 -> case Value
seq_ of
MV.Int Int64
msgid | Int64
msgid Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
>= Int64
0 Bool -> Bool -> Bool
&& Int64
msgid Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
<= Int64
0xFFFFFFFF ->
(Int64, Value, Value) -> Parser (Int64, Value, Value)
forall (m :: * -> *) a. Monad m => a -> m a
return (Int64
msgid, Value
err, Value
v)
Value
_ -> Text -> Parser (Int64, Value, Value)
forall a. Text -> Parser a
P.fail' (Text -> Parser (Int64, Value, Value))
-> Text -> Parser (Int64, Value, Value)
forall a b. (a -> b) -> a -> b
$ Text
"wrong msgid: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Value -> Text
forall a. Print a => a -> Text
T.toText Value
seq_
Value
_ -> Text -> Parser (Int64, Value, Value)
forall a. Text -> Parser a
P.fail' (Text -> Parser (Int64, Value, Value))
-> Text -> Parser (Int64, Value, Value)
forall a b. (a -> b) -> a -> b
$ Text
"wrong response type: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Value -> Text
forall a. Print a => a -> Text
T.toText Value
typ
) BufferedInput
bi
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Value
err Value -> Value -> Bool
forall a. Eq a => a -> a -> Bool
/= Value
MV.Nil) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ RPCException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (Value -> CallStack -> RPCException
RPCException Value
err CallStack
HasCallStack => CallStack
callStack)
IPair Value -> IO (IPair Value)
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> Value -> IPair Value
forall a. Int -> a -> IPair a
V.IPair (Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
msgid) Value
v))
fetchPipeline :: HasCallStack => MessagePack res => PipelineId -> PipelineResult -> IO res
fetchPipeline :: Int -> PipelineResult -> IO res
fetchPipeline Int
msgid PipelineResult
r = do
Text -> Either ConvertError res -> IO res
forall e a. (HasCallStack, Print e) => Text -> Either e a -> IO a
unwrap Text
"EPARSE" (Either ConvertError res -> IO res)
-> (Value -> Either ConvertError res) -> Value -> IO res
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Value -> Either ConvertError res
forall a. MessagePack a => Value -> Either ConvertError a
MP.convertValue (Value -> IO res) -> IO Value -> IO res
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<
Text -> Text -> Maybe Value -> IO Value
forall a. HasCallStack => Text -> Text -> Maybe a -> IO a
unwrap' Text
"ENOMSG" (Text
"missing message in response: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int -> Text
forall a. Print a => a -> Text
T.toText Int
msgid)
(Int -> PipelineResult -> Maybe Value
forall v. Int -> FlatIntMap v -> Maybe v
FIM.lookup Int
msgid PipelineResult
r)
callStream :: (MessagePack req, MessagePack res, HasCallStack) => Client -> T.Text -> req -> IO (IO (), Source res)
callStream :: Client -> Text -> req -> IO (IO (), Source res)
callStream (Client Counter
_seqRef Counter
reqNum BufferedInput
bi BufferedOutput
bo) Text
name req
req = do
Int
x <- Counter -> IO Int
forall a. Prim a => PrimIORef a -> IO a
readPrimIORef Counter
reqNum
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
x Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== (-Int
1)) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ RPCException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (CallStack -> RPCException
RPCStreamUnconsumed CallStack
HasCallStack => CallStack
callStack)
Counter -> Int -> IO ()
forall a. Prim a => PrimIORef a -> a -> IO ()
writePrimIORef Counter
reqNum (-Int
1)
BufferedOutput -> Builder () -> IO ()
forall a. HasCallStack => BufferedOutput -> Builder a -> IO ()
writeBuilder BufferedOutput
bo (Builder () -> IO ()) -> Builder () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Int -> Builder ()
MB.arrayHeader Int
3
Int64 -> Builder ()
MB.int Int64
4
Text -> Builder ()
MB.str Text
name
req -> Builder ()
forall a. MessagePack a => a -> Builder ()
MP.encodeMessagePack req
req
HasCallStack => BufferedOutput -> IO ()
BufferedOutput -> IO ()
flushBuffer BufferedOutput
bo
(IO (), Source res) -> IO (IO (), Source res)
forall (m :: * -> *) a. Monad m => a -> m a
return (IO ()
sendEOF, IO (Maybe res) -> Source res
forall a. HasCallStack => IO (Maybe a) -> Source a
sourceFromIO (IO (Maybe res) -> Source res) -> IO (Maybe res) -> Source res
forall a b. (a -> b) -> a -> b
$ do
Maybe (Value, Value)
res <- Parser (Maybe (Value, Value))
-> BufferedInput -> IO (Maybe (Value, Value))
forall a. HasCallStack => Parser a -> BufferedInput -> IO a
readParser (do
Word8
tag <- Parser Word8
P.anyWord8
case Word8
tag of
Word8
0x91 -> do
!Value
typ <- Parser Value
MV.value
Bool -> Parser () -> Parser ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Value
typ Value -> Value -> Bool
forall a. Eq a => a -> a -> Bool
/= Int64 -> Value
MV.Int Int64
7) (Parser () -> Parser ()) -> Parser () -> Parser ()
forall a b. (a -> b) -> a -> b
$
Text -> Parser ()
forall a. Text -> Parser a
P.fail' (Text -> Parser ()) -> Text -> Parser ()
forall a b. (a -> b) -> a -> b
$ Text
"wrong response type: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Value -> Text
forall a. Print a => a -> Text
T.toText Value
typ
Maybe (Value, Value) -> Parser (Maybe (Value, Value))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Value, Value)
forall a. Maybe a
Nothing
Word8
0x93 -> do
!Value
typ <- Parser Value
MV.value
!Value
err <- Parser Value
MV.value
!Value
v <- Parser Value
MV.value
Bool -> Parser () -> Parser ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Value
typ Value -> Value -> Bool
forall a. Eq a => a -> a -> Bool
/= Int64 -> Value
MV.Int Int64
6) (Parser () -> Parser ()) -> Parser () -> Parser ()
forall a b. (a -> b) -> a -> b
$
Text -> Parser ()
forall a. Text -> Parser a
P.fail' (Text -> Parser ()) -> Text -> Parser ()
forall a b. (a -> b) -> a -> b
$ Text
"wrong response type: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Value -> Text
forall a. Print a => a -> Text
T.toText Value
typ
Maybe (Value, Value) -> Parser (Maybe (Value, Value))
forall (m :: * -> *) a. Monad m => a -> m a
return ((Value, Value) -> Maybe (Value, Value)
forall a. a -> Maybe a
Just (Value
err, Value
v))
Word8
_ -> Text -> Parser (Maybe (Value, Value))
forall a. Text -> Parser a
P.fail' (Text -> Parser (Maybe (Value, Value)))
-> Text -> Parser (Maybe (Value, Value))
forall a b. (a -> b) -> a -> b
$ Text
"wrong response tag: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Word8 -> Text
forall a. Print a => a -> Text
T.toText Word8
tag
) BufferedInput
bi
case Maybe (Value, Value)
res of
Just (Value
err, Value
v) -> do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Value
err Value -> Value -> Bool
forall a. Eq a => a -> a -> Bool
/= Value
MV.Nil) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ RPCException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (Value -> CallStack -> RPCException
RPCException Value
err CallStack
HasCallStack => CallStack
callStack)
Text -> Either ConvertError (Maybe res) -> IO (Maybe res)
forall e a. (HasCallStack, Print e) => Text -> Either e a -> IO a
unwrap Text
"EPARSE" (Value -> Either ConvertError (Maybe res)
forall a. MessagePack a => Value -> Either ConvertError a
MP.convertValue Value
v)
Maybe (Value, Value)
_ -> do
Counter -> Int -> IO ()
forall a. Prim a => PrimIORef a -> a -> IO ()
writePrimIORef Counter
reqNum Int
0
Maybe res -> IO (Maybe res)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe res
forall a. Maybe a
Nothing
)
where
sendEOF :: IO ()
sendEOF = do
BufferedOutput -> Builder () -> IO ()
forall a. HasCallStack => BufferedOutput -> Builder a -> IO ()
writeBuilder BufferedOutput
bo (Builder () -> IO ()) -> Builder () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Int -> Builder ()
MB.arrayHeader Int
1
Int64 -> Builder ()
MB.int Int64
5
HasCallStack => BufferedOutput -> IO ()
BufferedOutput -> IO ()
flushBuffer BufferedOutput
bo
type ServerLoop = (UVStream -> IO ()) -> IO ()
type ServerService a = T.Text -> Maybe (ServerHandler a)
newtype SessionCtx a = SessionCtx (IORef (Maybe a))
readSessionCtx :: SessionCtx a -> IO (Maybe a)
readSessionCtx :: SessionCtx a -> IO (Maybe a)
readSessionCtx (SessionCtx IORef (Maybe a)
x') = IORef (Maybe a) -> IO (Maybe a)
forall a. IORef a -> IO a
readIORef IORef (Maybe a)
x'
writeSessionCtx :: SessionCtx a -> a -> IO ()
writeSessionCtx :: SessionCtx a -> a -> IO ()
writeSessionCtx (SessionCtx IORef (Maybe a)
x') a
x = IORef (Maybe a) -> Maybe a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
x' (a -> Maybe a
forall a. a -> Maybe a
Just a
x)
clearSessionCtx :: SessionCtx a -> IO ()
clearSessionCtx :: SessionCtx a -> IO ()
clearSessionCtx (SessionCtx IORef (Maybe a)
x') = IORef (Maybe a) -> Maybe a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
x' Maybe a
forall a. Maybe a
Nothing
modifySessionCtx :: SessionCtx a -> (a -> Maybe a) -> IO ()
modifySessionCtx :: SessionCtx a -> (a -> Maybe a) -> IO ()
modifySessionCtx (SessionCtx IORef (Maybe a)
x') a -> Maybe a
f = IORef (Maybe a) -> (Maybe a -> Maybe a) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef (Maybe a)
x' (a -> Maybe a
f (a -> Maybe a) -> Maybe a -> Maybe a
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<)
data ServerHandler a where
CallHandler :: (MessagePack req, MessagePack res)
=> (SessionCtx a -> req -> IO res) -> ServerHandler a
NotifyHandler :: MessagePack req
=> (SessionCtx a -> req -> IO ()) -> ServerHandler a
StreamHandler :: (MessagePack req, MessagePack res)
=> (SessionCtx a -> IORef Bool -> req -> IO (Source res)) -> ServerHandler a
simpleRouter :: [(T.Text, ServerHandler a)] -> ServerService a
simpleRouter :: [(Text, ServerHandler a)] -> ServerService a
simpleRouter [(Text, ServerHandler a)]
handles Text
name = Text -> FlatMap Text (ServerHandler a) -> Maybe (ServerHandler a)
forall k v. Ord k => k -> FlatMap k v -> Maybe v
FM.lookup Text
name FlatMap Text (ServerHandler a)
handleMap
where
handleMap :: FlatMap Text (ServerHandler a)
handleMap = [(Text, ServerHandler a)] -> FlatMap Text (ServerHandler a)
forall k v. Ord k => [(k, v)] -> FlatMap k v
FM.packR [(Text, ServerHandler a)]
handles
serveRPC :: ServerLoop -> ServerService a -> IO ()
serveRPC :: ServerLoop -> ServerService a -> IO ()
serveRPC ServerLoop
serve = ServerLoop -> Int -> Int -> ServerService a -> IO ()
forall a. ServerLoop -> Int -> Int -> ServerService a -> IO ()
serveRPC' ServerLoop
serve Int
V.defaultChunkSize Int
V.defaultChunkSize
data Request a
= Notify (T.Text, a)
| Call (Int64, T.Text, a)
| StreamStart (T.Text, a)
deriving Int -> Request a -> ShowS
[Request a] -> ShowS
Request a -> String
(Int -> Request a -> ShowS)
-> (Request a -> String)
-> ([Request a] -> ShowS)
-> Show (Request a)
forall a. Show a => Int -> Request a -> ShowS
forall a. Show a => [Request a] -> ShowS
forall a. Show a => Request a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Request a] -> ShowS
$cshowList :: forall a. Show a => [Request a] -> ShowS
show :: Request a -> String
$cshow :: forall a. Show a => Request a -> String
showsPrec :: Int -> Request a -> ShowS
$cshowsPrec :: forall a. Show a => Int -> Request a -> ShowS
Show
serveRPC' :: ServerLoop
-> Int
-> Int
-> ServerService a -> IO ()
serveRPC' :: ServerLoop -> Int -> Int -> ServerService a -> IO ()
serveRPC' ServerLoop
serve Int
recvBufSiz Int
sendBufSiz ServerService a
handle = ServerLoop
serve ServerLoop -> ServerLoop
forall a b. (a -> b) -> a -> b
$ \ UVStream
uvs -> do
SessionCtx a
ctx <- IORef (Maybe a) -> SessionCtx a
forall a. IORef (Maybe a) -> SessionCtx a
SessionCtx (IORef (Maybe a) -> SessionCtx a)
-> IO (IORef (Maybe a)) -> IO (SessionCtx a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe a -> IO (IORef (Maybe a))
forall a. a -> IO (IORef a)
newIORef Maybe a
forall a. Maybe a
Nothing
BufferedInput
bi <- Int -> UVStream -> IO BufferedInput
forall i. Input i => Int -> i -> IO BufferedInput
newBufferedInput' Int
recvBufSiz UVStream
uvs
BufferedOutput
bo <- Int -> UVStream -> IO BufferedOutput
forall o. Output o => Int -> o -> IO BufferedOutput
newBufferedOutput' Int
sendBufSiz UVStream
uvs
SessionCtx a -> BufferedInput -> BufferedOutput -> IO ()
loop SessionCtx a
ctx BufferedInput
bi BufferedOutput
bo
where
loop :: SessionCtx a -> BufferedInput -> BufferedOutput -> IO ()
loop SessionCtx a
ctx BufferedInput
bi BufferedOutput
bo = do
Request Value
req <- Parser (Request Value) -> BufferedInput -> IO (Request Value)
forall a. HasCallStack => Parser a -> BufferedInput -> IO a
readParser Parser (Request Value)
sourceParser BufferedInput
bi
case Request Value
req of
Notify (Text
name, Value
v) -> do
case ServerService a
handle Text
name of
Just (NotifyHandler SessionCtx a -> req -> IO ()
f) -> do
SessionCtx a -> req -> IO ()
f SessionCtx a
ctx (req -> IO ()) -> IO req -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Text -> Either ConvertError req -> IO req
forall e a. (HasCallStack, Print e) => Text -> Either e a -> IO a
unwrap Text
"EPARSE" (Value -> Either ConvertError req
forall a. MessagePack a => Value -> Either ConvertError a
MP.convertValue Value
v)
Maybe (ServerHandler a)
_ -> Text -> Text -> IO ()
forall a. HasCallStack => Text -> Text -> IO a
throwOtherError Text
"ENOTFOUND" Text
"notification method not found"
SessionCtx a -> BufferedInput -> BufferedOutput -> IO ()
loop SessionCtx a
ctx BufferedInput
bi BufferedOutput
bo
Call (Int64
msgid, Text
name, Value
v) -> do
case ServerService a
handle Text
name of
Just (CallHandler SessionCtx a -> req -> IO res
f) -> do
Either SomeException res
res <- IO res -> IO (Either SomeException res)
forall e a. Exception e => IO a -> IO (Either e a)
try (SessionCtx a -> req -> IO res
f SessionCtx a
ctx (req -> IO res) -> IO req -> IO res
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Text -> Either ConvertError req -> IO req
forall e a. (HasCallStack, Print e) => Text -> Either e a -> IO a
unwrap Text
"EPARSE" (Value -> Either ConvertError req
forall a. MessagePack a => Value -> Either ConvertError a
MP.convertValue Value
v))
BufferedOutput -> Builder () -> IO ()
forall a. HasCallStack => BufferedOutput -> Builder a -> IO ()
writeBuilder BufferedOutput
bo (Builder () -> IO ()) -> Builder () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Int -> Builder ()
MB.arrayHeader Int
4
Int64 -> Builder ()
MB.int Int64
1
Int64 -> Builder ()
MB.int (Int64 -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
msgid)
case Either SomeException res
res of
Left SomeException
e -> do
Text -> Builder ()
MB.str (String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ SomeException -> String
forall a. Show a => a -> String
show (SomeException
e :: SomeException))
Builder ()
MB.nil
Right res
res -> do
Builder ()
MB.nil
res -> Builder ()
forall a. MessagePack a => a -> Builder ()
MP.encodeMessagePack res
res
HasCallStack => BufferedOutput -> IO ()
BufferedOutput -> IO ()
flushBuffer BufferedOutput
bo
Maybe (ServerHandler a)
_ -> do
BufferedOutput -> Builder () -> IO ()
forall a. HasCallStack => BufferedOutput -> Builder a -> IO ()
writeBuilder BufferedOutput
bo (Builder () -> IO ()) -> Builder () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Int -> Builder ()
MB.arrayHeader Int
4
Int64 -> Builder ()
MB.int Int64
1
Int64 -> Builder ()
MB.int (Int64 -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
msgid)
Text -> Builder ()
MB.str (Text -> Builder ()) -> Text -> Builder ()
forall a b. (a -> b) -> a -> b
$ Text
"request method: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
name Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" not found"
Builder ()
MB.nil
HasCallStack => BufferedOutput -> IO ()
BufferedOutput -> IO ()
flushBuffer BufferedOutput
bo
SessionCtx a -> BufferedInput -> BufferedOutput -> IO ()
loop SessionCtx a
ctx BufferedInput
bi BufferedOutput
bo
StreamStart (Text
name, Value
v) -> do
IORef Bool
eofRef <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
()
_ <- Parser () -> BufferedInput -> IO ()
forall a. HasCallStack => Parser a -> BufferedInput -> IO a
readParser (do
Word8
tag <- Parser Word8
P.anyWord8
Bool -> Parser () -> Parser ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word8
tag Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
/= Word8
0x91) (Parser () -> Parser ()) -> Parser () -> Parser ()
forall a b. (a -> b) -> a -> b
$
Text -> Parser ()
forall a. Text -> Parser a
P.fail' (Text -> Parser ()) -> Text -> Parser ()
forall a b. (a -> b) -> a -> b
$ Text
"wrong request tag: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Word8 -> Text
forall a. Print a => a -> Text
T.toText Word8
tag
!Value
typ <- Parser Value
MV.value
Bool -> Parser () -> Parser ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Value
typ Value -> Value -> Bool
forall a. Eq a => a -> a -> Bool
/= Int64 -> Value
MV.Int Int64
5) (Parser () -> Parser ()) -> Parser () -> Parser ()
forall a b. (a -> b) -> a -> b
$
Text -> Parser ()
forall a. Text -> Parser a
P.fail' (Text -> Parser ()) -> Text -> Parser ()
forall a b. (a -> b) -> a -> b
$ Text
"wrong request type: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Value -> Text
forall a. Print a => a -> Text
T.toText Value
typ
) BufferedInput
bi
IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
atomicWriteIORef IORef Bool
eofRef Bool
True
case ServerService a
handle Text
name of
Just (StreamHandler SessionCtx a -> IORef Bool -> req -> IO (Source res)
f) -> (do
Source res
src <- SessionCtx a -> IORef Bool -> req -> IO (Source res)
f SessionCtx a
ctx IORef Bool
eofRef (req -> IO (Source res)) -> IO req -> IO (Source res)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Text -> Either ConvertError req -> IO req
forall e a. (HasCallStack, Print e) => Text -> Either e a -> IO a
unwrap Text
"EPARSE" (Value -> Either ConvertError req
forall a. MessagePack a => Value -> Either ConvertError a
MP.convertValue Value
v)
Source res
src (BufferedOutput -> Maybe res -> IO ()
forall a. MessagePack a => BufferedOutput -> Maybe a -> IO ()
writeItem BufferedOutput
bo) Maybe Void
forall a. Maybe a
EOF) IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` (\ (SomeException
e :: SomeException) ->
BufferedOutput -> Text -> IO ()
writeErrorItem BufferedOutput
bo (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"error when stream: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SomeException -> Text
forall a. Print a => a -> Text
T.toText SomeException
e)
Maybe (ServerHandler a)
_ -> BufferedOutput -> Text -> IO ()
writeErrorItem BufferedOutput
bo (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"request method: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
name Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" not found"
SessionCtx a -> BufferedInput -> BufferedOutput -> IO ()
loop SessionCtx a
ctx BufferedInput
bi BufferedOutput
bo
writeItem :: BufferedOutput -> Maybe a -> IO ()
writeItem BufferedOutput
bo = \ Maybe a
mx -> do
case Maybe a
mx of
Just a
x -> do
BufferedOutput -> Builder () -> IO ()
forall a. HasCallStack => BufferedOutput -> Builder a -> IO ()
writeBuilder BufferedOutput
bo (Builder () -> IO ()) -> Builder () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Int -> Builder ()
MB.arrayHeader Int
3
Int64 -> Builder ()
MB.int Int64
6
Builder ()
MB.nil
a -> Builder ()
forall a. MessagePack a => a -> Builder ()
MP.encodeMessagePack a
x
HasCallStack => BufferedOutput -> IO ()
BufferedOutput -> IO ()
flushBuffer BufferedOutput
bo
Maybe a
_ -> do
BufferedOutput -> Builder () -> IO ()
forall a. HasCallStack => BufferedOutput -> Builder a -> IO ()
writeBuilder BufferedOutput
bo (Builder () -> IO ()) -> Builder () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Int -> Builder ()
MB.arrayHeader Int
1
Int64 -> Builder ()
MB.int Int64
7
HasCallStack => BufferedOutput -> IO ()
BufferedOutput -> IO ()
flushBuffer BufferedOutput
bo
writeErrorItem :: BufferedOutput -> Text -> IO ()
writeErrorItem BufferedOutput
bo Text
msg = do
BufferedOutput -> Builder () -> IO ()
forall a. HasCallStack => BufferedOutput -> Builder a -> IO ()
writeBuilder BufferedOutput
bo (Builder () -> IO ()) -> Builder () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Int -> Builder ()
MB.arrayHeader Int
3
Int64 -> Builder ()
MB.int Int64
6
Text -> Builder ()
MB.str Text
msg
Builder ()
MB.nil
HasCallStack => BufferedOutput -> IO ()
BufferedOutput -> IO ()
flushBuffer BufferedOutput
bo
sourceParser :: P.Parser (Request MV.Value)
{-# INLINE sourceParser #-}
sourceParser :: Parser (Request Value)
sourceParser = do
Word8
tag <- Parser Word8
P.anyWord8
case Word8
tag of
Word8
0x93 -> do
!Value
typ <- Parser Value
MV.value
!Value
name <- Parser Value
MV.value
!Value
v <- Parser Value
MV.value
case Value
typ of
MV.Int Int64
2 -> case Value
name of
MV.Str Text
name' -> Request Value -> Parser (Request Value)
forall (m :: * -> *) a. Monad m => a -> m a
return ((Text, Value) -> Request Value
forall a. (Text, a) -> Request a
Notify (Text
name', Value
v))
Value
_ -> Text -> Parser (Request Value)
forall a. Text -> Parser a
P.fail' (Text -> Parser (Request Value)) -> Text -> Parser (Request Value)
forall a b. (a -> b) -> a -> b
$ Text
"wrong RPC name: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Value -> Text
forall a. Print a => a -> Text
T.toText Value
name
MV.Int Int64
4 -> case Value
name of
MV.Str Text
name' -> Request Value -> Parser (Request Value)
forall (m :: * -> *) a. Monad m => a -> m a
return ((Text, Value) -> Request Value
forall a. (Text, a) -> Request a
StreamStart (Text
name', Value
v))
Value
_ -> Text -> Parser (Request Value)
forall a. Text -> Parser a
P.fail' (Text -> Parser (Request Value)) -> Text -> Parser (Request Value)
forall a b. (a -> b) -> a -> b
$ Text
"wrong RPC name: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Value -> Text
forall a. Print a => a -> Text
T.toText Value
name
Value
_ -> Text -> Parser (Request Value)
forall a. Text -> Parser a
P.fail' (Text -> Parser (Request Value)) -> Text -> Parser (Request Value)
forall a b. (a -> b) -> a -> b
$ Text
"wrong request type: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Value -> Text
forall a. Print a => a -> Text
T.toText Value
typ
Word8
0x94 -> do
!Value
typ <- Parser Value
MV.value
!Value
seq_ <- Parser Value
MV.value
!Value
name <- Parser Value
MV.value
!Value
v <- Parser Value
MV.value
case Value
typ of
MV.Int Int64
0 -> case Value
seq_ of
MV.Int Int64
msgid | Int64
msgid Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
>= Int64
0 Bool -> Bool -> Bool
&& Int64
msgid Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
<= Int64
0xFFFFFFFF -> case Value
name of
MV.Str Text
name' -> Request Value -> Parser (Request Value)
forall (m :: * -> *) a. Monad m => a -> m a
return ((Int64, Text, Value) -> Request Value
forall a. (Int64, Text, a) -> Request a
Call (Int64
msgid, Text
name', Value
v))
Value
_ -> Text -> Parser (Request Value)
forall a. Text -> Parser a
P.fail' (Text -> Parser (Request Value)) -> Text -> Parser (Request Value)
forall a b. (a -> b) -> a -> b
$ Text
"wrong RPC name: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Value -> Text
forall a. Print a => a -> Text
T.toText Value
name
Value
_ -> Text -> Parser (Request Value)
forall a. Text -> Parser a
P.fail' (Text -> Parser (Request Value)) -> Text -> Parser (Request Value)
forall a b. (a -> b) -> a -> b
$ Text
"wrong msgid: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Value -> Text
forall a. Print a => a -> Text
T.toText Value
seq_
Value
_ -> Text -> Parser (Request Value)
forall a. Text -> Parser a
P.fail' (Text -> Parser (Request Value)) -> Text -> Parser (Request Value)
forall a b. (a -> b) -> a -> b
$ Text
"wrong request type: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Value -> Text
forall a. Print a => a -> Text
T.toText Value
typ
Word8
_ -> Text -> Parser (Request Value)
forall a. Text -> Parser a
P.fail' (Text -> Parser (Request Value)) -> Text -> Parser (Request Value)
forall a b. (a -> b) -> a -> b
$ Text
"wrong request tag: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Word8 -> Text
forall a. Print a => a -> Text
T.toText Word8
tag