{-# Language BangPatterns #-} ------------------------------------------------------------------------------- -- | -- Module : Network/Mom/Stompl/Patterns/Balancer.hs -- Copyright : (c) Tobias Schoofs -- License : LGPL -- Stability : experimental -- Portability: portable -- -- This module provides a balancer for services and tasks -- and a topic router. -- Balancers for services and tasks improve scalability and reliability -- of servers and workers. Workers should always be used with a balancer -- (since balancing workload is the main idea of workers); -- servers can very well be used without a balancer, but won't scale -- with increasing numbers of clients. -- -- A balancer consists of a registry to which -- servers and workers connect; -- servers and workers are maintained in lists -- according to the job they provide. -- Clients and pushers send requests to the balancer, -- which then forwards the request to a server or worker. -- The client will receive the reply not through the balancer, -- but directly from the server (to which the reply queue -- was forwarded as part of the request message -- -- see 'ClientA' for details). -- -- With servers and workers sending heartbeats, -- a balancer also improves reliability -- in contrast to a topology -- where a task is pushed to a single worker or -- a request is sent to only one server. -- -- A router is a forwarder of a topic. -- A router is very similar to a publisher ('PubA') -- with the difference that the router -- does not create new topic data, -- but uses topic data received from a publisher -- (a router, hence, is a subscriber and a publisher). -- Routers can be used to balance the workload of publishers: -- Instead of one publisher serving thousands of subscribers, -- the initial publisher would serve thousands of routers, -- which, in their turn, serve thousands of subscribers -- (or even other routers). ------------------------------------------------------------------------------- module Network.Mom.Stompl.Patterns.Balancer ( -- * Balancer withBalancer, -- * Router withRouter) where import Registry import Types import Network.Mom.Stompl.Client.Queue import Network.Mom.Stompl.Patterns.Basic import Codec.MIME.Type (nullType) import Control.Exception (throwIO, catches) import Control.Monad (forever, unless) ----------------------------------------------------------------------- -- | Create a Service and Task Balancer with the lifetime -- of the application-defined action passed in -- and start it in a background thread: -- -- * 'Con': Connection to a Stomp broker; -- -- * String: Name of the balancer, used for error handling; -- -- * 'QName': Registration queue -- this queue is used -- by providers to connect to the registry, -- it is not used for consumer requests; -- -- * (Int, Int): Heartbeat range of the 'Registry' -- (see 'withRegistry' for details); -- -- * 'QName': Request queue -- this queue is used -- for consumer requests; -- -- * 'OnError': Error handling; -- -- * IO r: Action that defines the lifetime of the balancer; -- the result /r/ is also the result of /withBalancer/. ----------------------------------------------------------------------- withBalancer :: Con -> String -> QName -> (Int, Int) -> QName -> OnError -> IO r -> IO r withBalancer c n qn (mn,mx) rq onErr action = withRegistry c n qn (mn,mx) onErr $ \reg -> withPair c n (rq, [], [], bytesIn) ("unknown", [], [], bytesOut) $ \(r,w) -> withThread (balance reg r w) action where balance reg r w = forever $ catches (do m <- readQ r jn <- getJobName m t <- mapR reg jn (send2Prov w m) unless t $ throwIO $ NoProviderX jn) (ignoreHandler n onErr) send2Prov w m p = writeAdHoc w (prvQ p) nullType (msgHdrs m) $ msgContent m ----------------------------------------------------------------------- -- | Create a router with the lifetime of the -- application-defined action passed in -- and start it in a background thread: -- -- * 'Con': Connection to a Stomp broker; -- -- * String: Name of the router, used for error handling; -- -- * 'JobName': Routed topic; -- -- * 'QName': Registration queue of the source publisher; -- -- * 'QName': Queue through which the internal subscriber -- will receive the topic data from the source publisher; -- -- * 'QName': Registration queue of the target publisher -- to which subscribers will connect; -- -- * Int: Registration timeout -- (timeout to register at the source publisher); -- -- * 'QName': Request queue -- this queue is used -- for consumer requests; -- -- * 'OnError': Error handling; -- -- * IO r: Action that defines the lifetime of the router; -- the result /r/ is also the result of /withRouter/. ----------------------------------------------------------------------- withRouter :: Con -> String -> JobName -> QName -> QName -> QName -> Int -> OnError -> IO r -> IO r withRouter c n jn srq ssq trq tmo onErr action = withPub c n jn trq onErr ("unknown", [], [], bytesOut) $ \p -> withSubThread c n jn srq tmo (ssq, [], [], bytesIn) (pub p) onErr action where pub p m = publish p nullType (msgHdrs m) $ msgContent m