{-# Language BangPatterns #-}
-------------------------------------------------------------------------------
-- |
-- Module     : Network/Mom/Stompl/Patterns/Bridge.hs
-- Copyright  : (c) Tobias Schoofs
-- License    : LGPL 
-- Stability  : experimental
-- Portability: portable
--
-- Bridges link providers connected to one broker
-- to consumers connected to another broker.
--
-- For publishers and workers, this is quite trivial:
-- the bridge implements 
--            the corresponding consumer on one broker
--            and the corresponding provider on the other.
--
-- For servers, the task is somewhat more complicated:
-- since servers use the client's reply queue to send the result
-- back to the client and this queue only exists on the broker
-- to which the client is connected, the bridge has to remember 
-- the client's reply queue and use its own queue on the server-side broker
-- to finally route the reply back to the original client.
-- With many broker connected by a service bridge,
-- this can result in long chains of clients and servers 
-- sending requests and waiting for replies.
-------------------------------------------------------------------------------
module Network.Mom.Stompl.Patterns.Bridge (
                             -- * Forwarder
                             withForwarder, 
                             -- * TaskBridge
                             withTaskBridge,
                             -- * ServiceBridge
                             withServiceBridge)
where

  import           Types
  import           Network.Mom.Stompl.Patterns.Basic 
  import           Network.Mom.Stompl.Client.Queue 
  import           Codec.MIME.Type (nullType)
  import           Prelude hiding (catch)
  import           Control.Exception (throwIO) 

  -----------------------------------------------------------------------
  -- | Create a forwarder with the lifetime of the application-defined
  --   action passed in and start it in a background thread:
  --
  --   * 'Con': Connection to the source broker
  --            (the one where the original publisher is connected);
  --
  --   * 'Con': Connection to the target broker
  --            (the one where the target subscribers are connected);
  --
  --   * String: Name of the forwarder used for error handling;
  --
  --   * 'JobName': Name of the Topic that is bridged;
  --
  --   * 'QName': Registration queue of the source publisher;
  --
  --   * 'QName': Queue through which the internal subscriber
  --              will receive topic data from the source publisher;
  --
  --   * 'QName': Registration queue of the target publisher;
  --
  --   * Int: Timeout on registering to the source publisher
  --          in microseconds;
  --
  --   * 'OnError': Error handler;
  --
  --   * IO r: Action that defines the lifetime of the forwarder;
  --           its result /r/ is also the result of /withForwarder/.
  --
  --   Note the remarkable similarity to the router pattern ('withRouter').
  --   In fact, a router is but a forwarder where source and target broker
  --   are the same.
  -------------------------------------------------------------------------
  withForwarder :: Con     -> Con   -> String  -> JobName -> 
                   QName   -> QName -> QName   -> Int     -> 
                   OnError -> IO r  -> IO r
  withForwarder src trg n jn srq ssq trq tmo onErr action = 
     withPub trg n jn trq onErr 
                     ("unknown", [], [], bytesOut) $ \p ->
       withSubThread src n jn srq tmo 
                     (ssq,       [], [], bytesIn) (pub p) onErr action
    where pub p m = publish p nullType (msgHdrs m) $ msgContent m

  -----------------------------------------------------------------------
  -- | Create a TaskBridge with the lifetime of the action passed in
  --   and start it on a background thread:
  --
  --   * 'Con': Connection to the source broker
  --            (the one to which the pusher is connected);
  --
  --   * 'Con': Connection to the target broker
  --            (the one to which the worker is connected);
  --
  --   * String: Name of the bridge used for error handling;
  --
  --   * 'JobName': Name of the Task that is bridged;
  --
  --   * 'QName': Queue of the worker on the source side;
  --              (if the worker is connected to a balancer
  --               on the source side, this is an internal queue
  --               only visible in the bridge and in the balancer);
  --
  --   * 'QName': Queue of the worker on the target side
  --              (which may be a balancer's request queue);
  --
  --   * 'RegistryDesc': 'Registry' (/i.e./ balancer)
  --                     to which the bridge is connected
  --                     on the source side;
  --
  --   * 'OnError': Error handler;
  --
  --   * IO r: Action that defines the lifetime of the bridge;
  --           its result /r/ is also the result of /withTaskBridge/.
  -----------------------------------------------------------------------
  withTaskBridge :: Con   -> Con   -> String  -> JobName ->
                    QName -> QName ->
                    RegistryDesc   -> OnError -> IO r    -> IO r
  withTaskBridge src trg n jn srq twq reg onErr action =
    withPusher trg n jn (twq, [], [], bytesOut) $ \p ->
      withTaskThread src n jn (fwd p) 
                        (srq, [], [], bytesIn) reg onErr action
    where fwd p m = let hs = filter ((/= "__job__") . fst) $ msgHdrs m
                     in push p nullType hs $ msgContent m

  -----------------------------------------------------------------------
  -- | Create a ServiceBridge with the lifetime of the action passed in
  --   and start it on a background thread:
  --
  --   * 'Con': Connection to the source broker
  --            (the one to which the client is connected);
  --
  --   * 'Con': Connection to the target broker
  --            (the one to which the server is connected);
  --
  --   * String: Name of the bridge used for error handling;
  --
  --   * 'JobName': Name of the Service that is bridged;
  --
  --   * 'QName': Queue of the server on the source side;
  --              (if the server is connected to a balancer
  --               on the source side, this is an internal queue
  --               only visible in the bridge and in the balancer);
  --
  --   * 'QName': Reader queue of the internal client on the target side;
  --
  --   * 'QName': Queue of the server on the target side
  --              (which may be a balancer's request queue);
  --
  --   * 'RegistryDesc': 'Registry' (/i.e./ balancer)
  --                     to which the bridge is connected
  --                     on the source side;
  --
  --   * 'OnError': Error handler;
  --
  --   * IO r: Action that defines the lifetime of the bridge;
  --           its result /r/ is also the result of /withServiceBridge/.
  -----------------------------------------------------------------------
  withServiceBridge :: Con   -> Con   -> String  ->  JobName -> 
                       QName -> QName -> QName   ->
                       RegistryDesc   -> OnError -> IO r     -> IO r
  withServiceBridge src trg n jn srq trq twq
                    reg@(_, tmo, _) onErr action =
    withClient trg n jn (trq, [], [], bytesIn)
                        (twq, [], [], bytesOut) $ \c ->
      withServerThread src n jn nullType [] (fwd c)
                       (srq,         [], [], bytesIn) 
                       ("unknown",   [], [], bytesOut)
                       reg onErr action
    where fwd c m  = 
            let hs = filter (clFilter . fst) (msgHdrs m)
             in do mbR <- request c tmo nullType hs $ msgContent m
                   case mbR of
                     Nothing -> throwIO $ TimeoutX "on requesting target"
                     Just r  -> return $ msgContent r
          clFilter = not . (`elem` ["__channel__", "__job__"])