module Network.Hermes.RPC where
import Control.Monad
import Control.Applicative
import Control.Concurrent
import Control.Concurrent.MVar
import Control.Concurrent.STM
import Control.Exception
import qualified Data.Map as M
import Data.Map(Map)
import Data.ByteString
import Data.Typeable
import Data.Serialize
import Data.Serialize.Put
import Data.Serialize.Get
import System.Random
import System.Log.Logger
import Data.Int
import Data.Unamb(race)
import Network.Hermes.Core
import Network.Hermes.Misc
import Network.Hermes.Protocol
import Network.Hermes.MChan
type ProcName = String
type ProcId = (ProcName,Type)
type Serial = Integer
data RPCContext = RPCContext {
core :: CoreContext
,nextSerial :: IO Serial
,callbacks :: MVar (Map ProcId ThreadId)
}
data RPCQuery p r = RPCQuery {
parameter :: p
,serial :: Serial
} deriving(Typeable)
instance Serialize p => Serialize (RPCQuery p r) where
put (RPCQuery p s) = put p >> put s
get = liftM2 RPCQuery get get
data RPCReply p r = RPCReply {
reply :: r
} deriving(Typeable)
instance Serialize r => Serialize (RPCReply p r) where
put (RPCReply r) = put r
get = RPCReply <$> get
newContext :: CoreContext
-> IO RPCContext
newContext core = do
serialV <- newMVar =<< randomRIO (0,2^128)
let nextSerial = modifyMVar serialV (return . join (,) . succ)
callbacks <- newMVar M.empty
return RPCContext{..}
registerCallback :: forall p r. (Serialize p, Serialize r, Typeable p, Typeable r)
=> RPCContext
-> ProcName
-> (p -> IO r)
-> IO ()
registerCallback RPCContext{callbacks,core} name proc = block $ do
modifyMVar_ callbacks $ \cbmap -> do
let key = (name,showType proc)
oldTid = M.lookup key cbmap
maybe (return ()) killThread oldTid
tid <- forkIO handleCallback
return $ M.insert key tid cbmap
where
handleCallback = unblock $ forever $ do
(uuid, RPCQuery{..} :: RPCQuery p r) <- recv' core name
reply <- proc parameter
send' core uuid (RPCReply{..} :: RPCReply p r) serial
call :: forall p r. (Serialize p, Typeable p, Serialize r, Typeable r) =>
RPCContext -> HermesID -> ProcName -> p -> IO (Maybe r)
call RPCContext{..} uuid name parameter = do
serial <- nextSerial
send' core uuid (RPCQuery{parameter,serial} :: RPCQuery p r) name
let getReply = do
(_,msg :: RPCReply p r) <- recv' core serial
return $ Just $ reply msg
getFailure = do
(_,RejectedMessage) <- recv' core (showType serial, encode serial)
return Nothing
race getReply getFailure