{-# OPTIONS_HADDOCK hide #-} {-# LANGUAGE RecordWildCards, DeriveDataTypeable, ScopedTypeVariables, ExistentialQuantification, NamedFieldPuns, ViewPatterns #-} -- | This module allows you to register procedures which can then be -- | called from another node. Due to limitations in GHC, it is not -- | possible to snapshot its state; you will always need to -- | re-register them at program startup. -- -- | RPC calls are distinguished using an arbitrary name, as well as -- | the type of the parameter and return value. module Network.Hermes.RPC where import Control.Monad import Control.Applicative import Control.Concurrent import Control.Concurrent.MVar import Control.Concurrent.STM import Control.Exception import qualified Data.Map as M import Data.Map(Map) import Data.ByteString import Data.Typeable import Data.Serialize import Data.Serialize.Put import Data.Serialize.Get import System.Random import System.Log.Logger import Data.Int import Data.Unamb(race) import Network.Hermes.Core import Network.Hermes.Misc import Network.Hermes.Protocol import Network.Hermes.MChan type ProcName = String -- | ProcName :: Type type ProcId = (ProcName,Type) -- OPTIMIZE: Use a proper Word128 or something. type Serial = Integer data RPCContext = RPCContext { core :: CoreContext ,nextSerial :: IO Serial ,callbacks :: MVar (Map ProcId ThreadId) } data RPCQuery p r = RPCQuery { parameter :: p ,serial :: Serial } deriving(Typeable) instance Serialize p => Serialize (RPCQuery p r) where put (RPCQuery p s) = put p >> put s get = liftM2 RPCQuery get get data RPCReply p r = RPCReply { reply :: r } deriving(Typeable) instance Serialize r => Serialize (RPCReply p r) where put (RPCReply r) = put r get = RPCReply <$> get -- | Do NOT create multiple contexts for one CoreContext. They'll conflict. newContext :: CoreContext -> IO RPCContext newContext core = do serialV <- newMVar =<< randomRIO (0,2^128) let nextSerial = modifyMVar serialV (return . join (,) . succ) callbacks <- newMVar M.empty return RPCContext{..} -- | Registers (or replaces) a callback to be executed -- when we receive a call to this name. -- -- Individual callbacks are currently always executed in serial. Do we -- want parallel? registerCallback :: forall p r. (Serialize p, Serialize r, Typeable p, Typeable r) => RPCContext -> ProcName -- ^ Callback's name -> (p -> IO r) -- ^ The callback itself -> IO () registerCallback RPCContext{callbacks,core} name proc = block $ do modifyMVar_ callbacks $ \cbmap -> do let key = (name,showType proc) oldTid = M.lookup key cbmap maybe (return ()) killThread oldTid tid <- forkIO handleCallback return $ M.insert key tid cbmap where handleCallback = unblock $ forever $ do (uuid, RPCQuery{..} :: RPCQuery p r) <- recv' core name reply <- proc parameter send' core uuid (RPCReply{..} :: RPCReply p r) serial -- | Remote procedure call -- Apart from core exceptions, it may fail in the specific case that the procedure doesn't exist, -- in which case it returns Nothing. call :: forall p r. (Serialize p, Typeable p, Serialize r, Typeable r) => RPCContext -> HermesID -> ProcName -> p -> IO (Maybe r) call RPCContext{..} uuid name parameter = do serial <- nextSerial send' core uuid (RPCQuery{parameter,serial} :: RPCQuery p r) name let getReply = do (_,msg :: RPCReply p r) <- recv' core serial return $ Just $ reply msg getFailure = do (_,RejectedMessage) <- recv' core (showType serial, encode serial) return Nothing race getReply getFailure