module Control.Distributed.Process.Execution.Exchange.Router
(
HeaderName
, Binding(..)
, Bindable
, BindingSelector
, RelayType(..)
, router
, supervisedRouter
, supervisedRouterRef
, route
, routeMessage
, messageKeyRouter
, bindKey
, headerContentRouter
, bindHeader
) where
import Control.DeepSeq (NFData)
import Control.Distributed.Process
( Process
, ProcessMonitorNotification(..)
, ProcessId
, monitor
, handleMessage
, unsafeWrapMessage
)
import qualified Control.Distributed.Process as P
import Control.Distributed.Process.Serializable (Serializable)
import Control.Distributed.Process.Execution.Exchange.Internal
( startExchange
, startSupervised
, configureExchange
, Message(..)
, Exchange
, ExchangeType(..)
, post
, postMessage
, applyHandlers
)
import Control.Distributed.Process.Extras.Internal.Primitives
( deliver
, Resolvable(..)
)
import Control.Distributed.Process.Supervisor (SupervisorPid)
import Data.Binary
import Data.Foldable (forM_)
import Data.Hashable
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as Map
import Data.HashSet (HashSet)
import qualified Data.HashSet as Set
import Data.Typeable (Typeable)
import GHC.Generics
type HeaderName = String
data Binding =
BindKey { bindingKey :: !String }
| BindHeader { bindingKey :: !String
, headerName :: !HeaderName
}
| BindNone
deriving (Typeable, Generic, Eq, Show)
instance Binary Binding where
instance NFData Binding where
instance Hashable Binding where
class (Hashable k, Eq k, Serializable k) => Bindable k
instance (Hashable k, Eq k, Serializable k) => Bindable k
type BindingSelector k = (Message -> Process k)
data RelayType = PayloadOnly | WholeMessage
data State k = State { bindings :: !(HashMap k (HashSet ProcessId))
, selector :: !(BindingSelector k)
, relayType :: !RelayType
}
type Router k = ExchangeType (State k)
messageKeyRouter :: RelayType -> Process Exchange
messageKeyRouter t = router t matchOnKey
where
matchOnKey :: Message -> Process Binding
matchOnKey m = return $ BindKey (key m)
headerContentRouter :: RelayType -> HeaderName -> Process Exchange
headerContentRouter t n = router t (checkHeaders n)
where
checkHeaders hn Message{..} = do
case Map.lookup hn (Map.fromList headers) of
Nothing -> return BindNone
Just hv -> return $ BindHeader hn hv
router :: (Bindable k) => RelayType -> BindingSelector k -> Process Exchange
router t s = routerT t s >>= startExchange
supervisedRouterRef :: Bindable k
=> RelayType
-> BindingSelector k
-> SupervisorPid
-> Process (ProcessId, P.Message)
supervisedRouterRef t sel spid = do
ex <- supervisedRouter t sel spid
Just pid <- resolve ex
return (pid, unsafeWrapMessage ex)
supervisedRouter :: Bindable k
=> RelayType
-> BindingSelector k
-> SupervisorPid
-> Process Exchange
supervisedRouter t sel spid =
routerT t sel >>= \t' -> startSupervised t' spid
routerT :: Bindable k
=> RelayType
-> BindingSelector k
-> Process (Router k)
routerT t s = do
return $ ExchangeType { name = "Router"
, state = State Map.empty s t
, configureEx = apiConfigure
, routeEx = apiRoute
}
bindKey :: String -> Exchange -> Process ()
bindKey k ex = do
self <- P.getSelfPid
configureExchange ex (self, BindKey k)
bindHeader :: HeaderName -> String -> Exchange -> Process ()
bindHeader n v ex = do
self <- P.getSelfPid
configureExchange ex (self, BindHeader v n)
route :: Serializable m => Exchange -> m -> Process ()
route = post
routeMessage :: Exchange -> Message -> Process ()
routeMessage = postMessage
apiRoute :: forall k. Bindable k
=> State k
-> Message
-> Process (State k)
apiRoute st@State{..} msg = do
binding <- selector msg
case Map.lookup binding bindings of
Nothing -> return st
Just bs -> forM_ bs (fwd relayType msg) >> return st
where
fwd WholeMessage m = deliver m
fwd PayloadOnly m = P.forward (payload m)
apiConfigure :: forall k. Bindable k
=> State k
-> P.Message
-> Process (State k)
apiConfigure st msg = do
applyHandlers st msg $ [ \m -> handleMessage m (createBinding st)
, \m -> handleMessage m (handleMonitorSignal st)
]
where
createBinding s@State{..} (pid, bind) = do
case Map.lookup bind bindings of
Nothing -> do _ <- monitor pid
return $ s { bindings = newBind bind pid bindings }
Just ps -> return $ s { bindings = addBind bind pid bindings ps }
newBind b p bs = Map.insert b (Set.singleton p) bs
addBind b' p' bs' ps = Map.insert b' (Set.insert p' ps) bs'
handleMonitorSignal s@State{..} (ProcessMonitorNotification _ p _) =
let bs = bindings
bs' = Map.foldlWithKey' (\a k v -> Map.insert k (Set.delete p v) a) bs bs
in return $ s { bindings = bs' }