{-# Language BangPatterns #-}
-------------------------------------------------------------------------------
-- |
-- Module     : Network/Mom/Stompl/Patterns/Desk.hs
-- Copyright  : (c) Tobias Schoofs
-- License    : LGPL 
-- Stability  : experimental
-- Portability: portable
--
-- A Desk is a server that supplies information
-- about providers. A client requests providers
-- for a specific job (service, task or topic)
-- and the desk will reply with a list of queue names
-- of providers of the enquired job.
-- 
-- The desk is not statically configured,
-- but uses a registry to which providers connect.
-- Providers that cease to work can disconnect or,
-- if heartbeats are required, will be removed from the
-- list of available providers internally when no more heartbeats
-- are sent. This way, the information provided by a desk is 
-- always up-to-date.
--
-- Desk balances providers, /i.e./ providers rotate in a list
-- from which always the first /n/ providers are handed out
-- to requesting consumers (where /n/ corresponds to the number of 
-- providers requested by the consumer.) 
--
-- Since providers are managed dynamically, 
-- the result of two consecutive calls is probably not the same.
-- Desk is thus not idempotent in the strict sense.
-- But, since the call itself does only cause 
-- a change of the order of providers
-- (and since it should be irrelevant for the consumer
--  which provider is actually used),
-- two consecutive calls will have the same effect
-- -- if not all providers disconnect between the two calls.
--
-- Internally, the Desk protocol uses the following headers:
--
-- * __jobs__: Comma-separated list of providers;
--
-- * __redundancy__: Requested number of providers.
-------------------------------------------------------------------------------
module Network.Mom.Stompl.Patterns.Desk (
                            withDesk, requestProvider)
where

  import           Registry
  import           Types
  import           Network.Mom.Stompl.Patterns.Basic 
  import           Network.Mom.Stompl.Client.Queue 
  import           Data.Char (isDigit)
  import           Data.List (intercalate)
  import           Data.List.Split (endBy)
  import           Codec.MIME.Type (nullType)
  import           Control.Exception (throwIO, catches)
  import           Control.Monad (forever)
  import           Control.Applicative ((<$>))

  -----------------------------------------------------------------------
  -- | Creates a desk with the lifetime of the application-defined
  --   action:
  --
  --   * 'Con': Connection to a Stomp broker;
  --
  --   * String: Name of the desk, used for error handling;
  --
  --   * 'QName': Registration queue -- this queue is used
  --              by providers to connect to the registry,
  --              it is not used for consumer requests;
  --
  --   * (Int, Int): Heartbeat range of the 'Registry' 
  --                 (see 'withRegistry' for details);
  --
  --   * 'OnError': Error handling;
  --
  --   * 'QName': Request queue -- this queue is used
  --              by consumers to request information about
  --              available providers;
  --
  --   * IO r: Action that defines the lifetime of the desk;
  --           the result is also the result of /withDesk/.
  -----------------------------------------------------------------------
  withDesk :: Con -> String -> QName -> (Int, Int) -> OnError -> 
              QName -> IO r -> IO r
  withDesk :: Con
-> String
-> String
-> (Int, Int)
-> OnError
-> String
-> IO r
-> IO r
withDesk Con
c String
n String
qn (Int
mn,Int
mx) OnError
onErr String
rq IO r
action =
    Con
-> String
-> String
-> (Int, Int)
-> OnError
-> (Registry -> IO r)
-> IO r
forall r.
Con
-> String
-> String
-> (Int, Int)
-> OnError
-> (Registry -> IO r)
-> IO r
withRegistry Con
c String
n String
qn (Int
mn,Int
mx) OnError
onErr ((Registry -> IO r) -> IO r) -> (Registry -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Registry
reg -> 
      Con
-> String
-> ReaderDesc ()
-> WriterDesc ()
-> ((Reader (), Writer ()) -> IO r)
-> IO r
forall i o r.
Con
-> String
-> ReaderDesc i
-> WriterDesc o
-> ((Reader i, Writer o) -> IO r)
-> IO r
withPair   Con
c String
n (String
rq,        [], [], InBound ()
ignorebody)
                     (String
"unknown", [], [],     OutBound ()
nobody) (((Reader (), Writer ()) -> IO r) -> IO r)
-> ((Reader (), Writer ()) -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \(Reader ()
r,Writer ()
w) -> 
        IO () -> IO r -> IO r
forall r. IO () -> IO r -> IO r
withThread (Registry -> Reader () -> Writer () -> IO ()
forall i b. Registry -> Reader i -> Writer () -> IO b
doDesk Registry
reg Reader ()
r Writer ()
w) IO r
action
    where doDesk :: Registry -> Reader i -> Writer () -> IO b
doDesk Registry
reg Reader i
r Writer ()
w = 
            IO () -> IO b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO b) -> IO () -> IO b
forall a b. (a -> b) -> a -> b
$ IO () -> [Handler ()] -> IO ()
forall a. IO a -> [Handler a] -> IO a
catches (do
              Message i
m  <- Reader i -> IO (Message i)
forall a. Reader a -> IO (Message a)
readQ Reader i
r
              String
j  <- Message i -> IO String
forall m. Message m -> IO String
getJobName Message i
m
              String
q  <- Message i -> IO String
forall m. Message m -> IO String
getChannel Message i
m
              Int
i  <- Message i -> IO Int
forall i. Message i -> IO Int
getRedundancy Message i
m
              String
ps <- (String -> [String] -> String
forall a. [a] -> [[a]] -> [a]
intercalate String
"," ([String] -> String)
-> ([Provider] -> [String]) -> [Provider] -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Provider -> String) -> [Provider] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map Provider -> String
prvQ) ([Provider] -> String) -> IO [Provider] -> IO String
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Registry -> String -> Int -> IO [Provider]
getProvider Registry
reg String
j Int
i
              let hs :: [Header]
hs = case String -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length String
ps of
                         Int
0 -> [(String
"__sc__", StatusCode -> String
forall a. Show a => a -> String
show StatusCode
NotFound),
                               (String
"__jobs__",          String
""),
                               (String
"__redundancy__",   String
"0")]
                         Int
x -> [(String
"__sc__",        StatusCode -> String
forall a. Show a => a -> String
show StatusCode
OK),
                               (String
"__redundancy__", Int -> String
forall a. Show a => a -> String
show Int
x),
                               (String
"__jobs__",           String
ps)]
               in Writer () -> String -> Type -> [Header] -> () -> IO ()
forall a. Writer a -> String -> Type -> [Header] -> a -> IO ()
writeAdHoc Writer ()
w String
q Type
nullType [Header]
hs ())
            (String -> OnError -> [Handler ()]
ignoreHandler String
n OnError
onErr)

  -----------------------------------------------------------------------
  -- | Function used by consumer to request provider information 
  --   from a desk:
  --
  --   * 'ClientA' () (): The request to the desk is sent
  --                      through a client of type () ().
  --                      This client must be created by 
  --                      the application beforehand
  --                      (/e.g./: the client could be created
  --                               once during initialisation
  --                               and then be used repeatedly
  --                               to obtain or update information 
  --                               on providers according to the
  --                               application needs);
  --
  --   * Int: Timeout in microseconds;
  --
  --   * 'JobName': Name of the job for which the consumer
  --                needs providers;
  --
  --   * Int: Number of providers needed by the consumer.
  --          This can be used for redundancy:
  --          if one provider fails, 
  --          the consumer passes to the next.
  --          Be aware, however, that the information, 
  --          at the point in time, when a provider fails, 
  --          may already be outdated.
  --          Therefore, the redundant providers should be used
  --          immediately and, when the main provider fails later,
  --          the information should be updated by requesting
  --          new providers from the desk.
  --
  --  The result is a tuple of ('StatusCode', ['QName']).
  --  If the 'StatusCode' is not 'OK', 
  --  the list of 'QName' will be empty;
  --  otherwise, it will contain at least one provider
  --  and maximum /n/ providers (where /n/ is the number of providers
  --  requested). If fewer providers than requested are available,
  --  the list will contain less than /n/ providers. 
  --  But note that this, as long as there is at least one provider,
  --  does not count as an error, /i.e./ the 'StatusCode' is still 'OK'.
  -----------------------------------------------------------------------
  requestProvider :: ClientA () ()  ->  Int -> 
                     JobName        ->  Int -> IO (StatusCode, [QName])
  requestProvider :: ClientA () () -> Int -> String -> Int -> IO (StatusCode, [String])
requestProvider ClientA () ()
c Int
tmo String
jn Int
r = do
    Maybe (Message ())
mbR <- ClientA () ()
-> Int -> Type -> [Header] -> () -> IO (Maybe (Message ()))
forall i o.
ClientA i o
-> Int -> Type -> [Header] -> o -> IO (Maybe (Message i))
request ClientA () ()
c Int
tmo Type
nullType [(String
"__job__",            String
jn),  
                                   (String
"__redundancy__", Int -> String
forall a. Show a => a -> String
show Int
r)] ()
    case Maybe (Message ())
mbR of
      Maybe (Message ())
Nothing -> (StatusCode, [String]) -> IO (StatusCode, [String])
forall (m :: * -> *) a. Monad m => a -> m a
return (StatusCode
Timeout, [])
      Just Message ()
m  -> do
        Either String StatusCode
eiSC <- Message () -> IO (Either String StatusCode)
forall m. Message m -> IO (Either String StatusCode)
getSC   Message ()
m
        case Either String StatusCode
eiSC of
          Left  String
sc -> PatternsException -> IO (StatusCode, [String])
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO (StatusCode, [String]))
-> PatternsException -> IO (StatusCode, [String])
forall a b. (a -> b) -> a -> b
$ String -> PatternsException
BadStatusCodeX String
sc
          Right StatusCode
OK -> do [String]
qs <- Message () -> IO [String]
forall i. Message i -> IO [String]
getJobs Message ()
m
                         (StatusCode, [String]) -> IO (StatusCode, [String])
forall (m :: * -> *) a. Monad m => a -> m a
return (StatusCode
OK, [String]
qs)
          Right StatusCode
sc -> (StatusCode, [String]) -> IO (StatusCode, [String])
forall (m :: * -> *) a. Monad m => a -> m a
return (StatusCode
sc, [])
    
  getJobs :: Message i -> IO [QName]
  getJobs :: Message i -> IO [String]
getJobs Message i
m = do
    String
x <- String -> String -> Message i -> IO String
forall m. String -> String -> Message m -> IO String
getHeader String
"__jobs__"
                   String
"no jobs in header" Message i
m
    [String] -> IO [String]
forall (m :: * -> *) a. Monad m => a -> m a
return ([String] -> IO [String]) -> [String] -> IO [String]
forall a b. (a -> b) -> a -> b
$ String -> String -> [String]
forall a. Eq a => [a] -> [a] -> [[a]]
endBy String
"," String
x

  getRedundancy :: Message i -> IO Int
  getRedundancy :: Message i -> IO Int
getRedundancy Message i
m = do
    String
x <- String -> String -> Message i -> IO String
forall m. String -> String -> Message m -> IO String
getHeader String
"__redundancy__" 
                   String
"No redundancy level in headers" Message i
m
    if (Char -> Bool) -> String -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all Char -> Bool
isDigit String
x then Int -> IO Int
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> IO Int) -> Int -> IO Int
forall a b. (a -> b) -> a -> b
$ String -> Int
forall a. Read a => String -> a
read String
x 
                     else PatternsException -> IO Int
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO Int) -> PatternsException -> IO Int
forall a b. (a -> b) -> a -> b
$ String -> String -> PatternsException
HeaderX String
"__redundancy" (String -> PatternsException) -> String -> PatternsException
forall a b. (a -> b) -> a -> b
$
                                      String
"Redundancy level not numberic: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
x