{-# Language BangPatterns #-}
-------------------------------------------------------------------------------
-- |
-- Module     : Network/Mom/Stompl/Patterns/Bridge.hs
-- Copyright  : (c) Tobias Schoofs
-- License    : LGPL 
-- Stability  : experimental
-- Portability: portable
--
-- Bridges link providers connected to one broker
-- to consumers connected to another broker.
--
-- For publishers and workers, this is quite trivial:
-- the bridge implements 
--            the corresponding consumer on one broker
--            and the corresponding provider on the other.
--
-- For servers, the task is somewhat more complicated:
-- since servers use the client's reply queue to send the result
-- back to the client and this queue only exists on the broker
-- to which the client is connected, the bridge has to remember 
-- the client's reply queue and use its own queue on the server-side broker
-- to finally route the reply back to the original client.
-- With many broker connected by a service bridge,
-- this can result in long chains of clients and servers 
-- sending requests and waiting for replies.
-------------------------------------------------------------------------------
module Network.Mom.Stompl.Patterns.Bridge (
                             -- * Forwarder
                             withForwarder, 
                             -- * TaskBridge
                             withTaskBridge,
                             -- * ServiceBridge
                             withServiceBridge)
where

  import           Types
  import           Network.Mom.Stompl.Patterns.Basic 
  import           Network.Mom.Stompl.Client.Queue 
  import           Codec.MIME.Type (nullType)
  import           Control.Exception (throwIO) 

  -----------------------------------------------------------------------
  -- | Create a forwarder with the lifetime of the application-defined
  --   action passed in and start it in a background thread:
  --
  --   * 'Con': Connection to the source broker
  --            (the one where the original publisher is connected);
  --
  --   * 'Con': Connection to the target broker
  --            (the one where the target subscribers are connected);
  --
  --   * String: Name of the forwarder used for error handling;
  --
  --   * 'JobName': Name of the Topic that is bridged;
  --
  --   * 'QName': Registration queue of the source publisher;
  --
  --   * 'QName': Queue through which the internal subscriber
  --              will receive topic data from the source publisher;
  --
  --   * 'QName': Registration queue of the target publisher;
  --
  --   * Int: Timeout on registering to the source publisher
  --          in microseconds;
  --
  --   * 'OnError': Error handler;
  --
  --   * IO r: Action that defines the lifetime of the forwarder;
  --           its result /r/ is also the result of /withForwarder/.
  --
  --   Note the remarkable similarity to the router pattern ('withRouter').
  --   In fact, a router is but a forwarder where source and target broker
  --   are the same.
  -------------------------------------------------------------------------
  withForwarder :: Con     -> Con   -> String  -> JobName -> 
                   QName   -> QName -> QName   -> Int     -> 
                   OnError -> IO r  -> IO r
  withForwarder :: Con
-> Con
-> String
-> String
-> String
-> String
-> String
-> Int
-> OnError
-> IO r
-> IO r
withForwarder Con
src Con
trg String
n String
jn String
srq String
ssq String
trq Int
tmo OnError
onErr IO r
action = 
     Con
-> String
-> String
-> String
-> OnError
-> WriterDesc ByteString
-> (PubA ByteString -> IO r)
-> IO r
forall o r.
Con
-> String
-> String
-> String
-> OnError
-> WriterDesc o
-> (PubA o -> IO r)
-> IO r
withPub Con
trg String
n String
jn String
trq OnError
onErr 
                     (String
"unknown", [], [], OutBound ByteString
bytesOut) ((PubA ByteString -> IO r) -> IO r)
-> (PubA ByteString -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \PubA ByteString
p ->
       Con
-> String
-> String
-> String
-> Int
-> ReaderDesc ByteString
-> (Message ByteString -> IO ())
-> OnError
-> IO r
-> IO r
forall i r.
Con
-> String
-> String
-> String
-> Int
-> ReaderDesc i
-> (Message i -> IO ())
-> OnError
-> IO r
-> IO r
withSubThread Con
src String
n String
jn String
srq Int
tmo 
                     (String
ssq,       [], [], InBound ByteString
bytesIn) (PubA ByteString -> Message ByteString -> IO ()
forall o. PubA o -> Message o -> IO ()
pub PubA ByteString
p) OnError
onErr IO r
action
    where pub :: PubA o -> Message o -> IO ()
pub PubA o
p Message o
m = PubA o -> Type -> [Header] -> o -> IO ()
forall o. PubA o -> Type -> [Header] -> o -> IO ()
publish PubA o
p Type
nullType (Message o -> [Header]
forall a. Message a -> [Header]
msgHdrs Message o
m) (o -> IO ()) -> o -> IO ()
forall a b. (a -> b) -> a -> b
$ Message o -> o
forall a. Message a -> a
msgContent Message o
m

  -----------------------------------------------------------------------
  -- | Create a TaskBridge with the lifetime of the action passed in
  --   and start it on a background thread:
  --
  --   * 'Con': Connection to the source broker
  --            (the one to which the pusher is connected);
  --
  --   * 'Con': Connection to the target broker
  --            (the one to which the worker is connected);
  --
  --   * String: Name of the bridge used for error handling;
  --
  --   * 'JobName': Name of the Task that is bridged;
  --
  --   * 'QName': Queue of the worker on the source side;
  --              (if the worker is connected to a balancer
  --               on the source side, this is an internal queue
  --               only visible in the bridge and in the balancer);
  --
  --   * 'QName': Queue of the worker on the target side
  --              (which may be a balancer's request queue);
  --
  --   * 'RegistryDesc': 'Registry' (/i.e./ balancer)
  --                     to which the bridge is connected
  --                     on the source side;
  --
  --   * 'OnError': Error handler;
  --
  --   * IO r: Action that defines the lifetime of the bridge;
  --           its result /r/ is also the result of /withTaskBridge/.
  -----------------------------------------------------------------------
  withTaskBridge :: Con   -> Con   -> String  -> JobName ->
                    QName -> QName ->
                    RegistryDesc   -> OnError -> IO r    -> IO r
  withTaskBridge :: Con
-> Con
-> String
-> String
-> String
-> String
-> RegistryDesc
-> OnError
-> IO r
-> IO r
withTaskBridge Con
src Con
trg String
n String
jn String
srq String
twq RegistryDesc
reg OnError
onErr IO r
action =
    Con
-> String
-> String
-> WriterDesc ByteString
-> (PusherA ByteString -> IO r)
-> IO r
forall o r.
Con
-> String -> String -> WriterDesc o -> (PusherA o -> IO r) -> IO r
withPusher Con
trg String
n String
jn (String
twq, [], [], OutBound ByteString
bytesOut) ((PusherA ByteString -> IO r) -> IO r)
-> (PusherA ByteString -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \PusherA ByteString
p ->
      Con
-> String
-> String
-> (Message ByteString -> IO ())
-> ReaderDesc ByteString
-> RegistryDesc
-> OnError
-> IO r
-> IO r
forall i r.
Con
-> String
-> String
-> (Message i -> IO ())
-> ReaderDesc i
-> RegistryDesc
-> OnError
-> IO r
-> IO r
withTaskThread Con
src String
n String
jn (PusherA ByteString -> Message ByteString -> IO ()
forall a. PusherA a -> Message a -> IO ()
fwd PusherA ByteString
p) 
                        (String
srq, [], [], InBound ByteString
bytesIn) RegistryDesc
reg OnError
onErr IO r
action
    where fwd :: PusherA a -> Message a -> IO ()
fwd PusherA a
p Message a
m = let hs :: [Header]
hs = (Header -> Bool) -> [Header] -> [Header]
forall a. (a -> Bool) -> [a] -> [a]
filter ((String -> String -> Bool
forall a. Eq a => a -> a -> Bool
/= String
"__job__") (String -> Bool) -> (Header -> String) -> Header -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Header -> String
forall a b. (a, b) -> a
fst) ([Header] -> [Header]) -> [Header] -> [Header]
forall a b. (a -> b) -> a -> b
$ Message a -> [Header]
forall a. Message a -> [Header]
msgHdrs Message a
m
                     in PusherA a -> Type -> [Header] -> a -> IO ()
forall o. PusherA o -> Type -> [Header] -> o -> IO ()
push PusherA a
p Type
nullType [Header]
hs (a -> IO ()) -> a -> IO ()
forall a b. (a -> b) -> a -> b
$ Message a -> a
forall a. Message a -> a
msgContent Message a
m

  -----------------------------------------------------------------------
  -- | Create a ServiceBridge with the lifetime of the action passed in
  --   and start it on a background thread:
  --
  --   * 'Con': Connection to the source broker
  --            (the one to which the client is connected);
  --
  --   * 'Con': Connection to the target broker
  --            (the one to which the server is connected);
  --
  --   * String: Name of the bridge used for error handling;
  --
  --   * 'JobName': Name of the Service that is bridged;
  --
  --   * 'QName': Queue of the server on the source side;
  --              (if the server is connected to a balancer
  --               on the source side, this is an internal queue
  --               only visible in the bridge and in the balancer);
  --
  --   * 'QName': Reader queue of the internal client on the target side;
  --
  --   * 'QName': Queue of the server on the target side
  --              (which may be a balancer's request queue);
  --
  --   * 'RegistryDesc': 'Registry' (/i.e./ balancer)
  --                     to which the bridge is connected
  --                     on the source side;
  --
  --   * 'OnError': Error handler;
  --
  --   * IO r: Action that defines the lifetime of the bridge;
  --           its result /r/ is also the result of /withServiceBridge/.
  -----------------------------------------------------------------------
  withServiceBridge :: Con   -> Con   -> String  ->  JobName -> 
                       QName -> QName -> QName   ->
                       RegistryDesc   -> OnError -> IO r     -> IO r
  withServiceBridge :: Con
-> Con
-> String
-> String
-> String
-> String
-> String
-> RegistryDesc
-> OnError
-> IO r
-> IO r
withServiceBridge Con
src Con
trg String
n String
jn String
srq String
trq String
twq
                    reg :: RegistryDesc
reg@(String
_, Int
tmo, (Int, Int, Int)
_) OnError
onErr IO r
action =
    Con
-> String
-> String
-> ReaderDesc ByteString
-> WriterDesc ByteString
-> (ClientA ByteString ByteString -> IO r)
-> IO r
forall i o r.
Con
-> String
-> String
-> ReaderDesc i
-> WriterDesc o
-> (ClientA i o -> IO r)
-> IO r
withClient Con
trg String
n String
jn (String
trq, [], [], InBound ByteString
bytesIn)
                        (String
twq, [], [], OutBound ByteString
bytesOut) ((ClientA ByteString ByteString -> IO r) -> IO r)
-> (ClientA ByteString ByteString -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \ClientA ByteString ByteString
c ->
      Con
-> String
-> String
-> Type
-> [Header]
-> (Message ByteString -> IO ByteString)
-> ReaderDesc ByteString
-> WriterDesc ByteString
-> RegistryDesc
-> OnError
-> IO r
-> IO r
forall i o r.
Con
-> String
-> String
-> Type
-> [Header]
-> (Message i -> IO o)
-> ReaderDesc i
-> WriterDesc o
-> RegistryDesc
-> OnError
-> IO r
-> IO r
withServerThread Con
src String
n String
jn Type
nullType [] (ClientA ByteString ByteString
-> Message ByteString -> IO ByteString
forall b a. ClientA b a -> Message a -> IO b
fwd ClientA ByteString ByteString
c)
                       (String
srq,         [], [], InBound ByteString
bytesIn) 
                       (String
"unknown",   [], [], OutBound ByteString
bytesOut)
                       RegistryDesc
reg OnError
onErr IO r
action
    where fwd :: ClientA b a -> Message a -> IO b
fwd ClientA b a
c Message a
m  = 
            let hs :: [Header]
hs = (Header -> Bool) -> [Header] -> [Header]
forall a. (a -> Bool) -> [a] -> [a]
filter (String -> Bool
clFilter (String -> Bool) -> (Header -> String) -> Header -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Header -> String
forall a b. (a, b) -> a
fst) (Message a -> [Header]
forall a. Message a -> [Header]
msgHdrs Message a
m)
             in do Maybe (Message b)
mbR <- ClientA b a
-> Int -> Type -> [Header] -> a -> IO (Maybe (Message b))
forall i o.
ClientA i o
-> Int -> Type -> [Header] -> o -> IO (Maybe (Message i))
request ClientA b a
c Int
tmo Type
nullType [Header]
hs (a -> IO (Maybe (Message b))) -> a -> IO (Maybe (Message b))
forall a b. (a -> b) -> a -> b
$ Message a -> a
forall a. Message a -> a
msgContent Message a
m
                   case Maybe (Message b)
mbR of
                     Maybe (Message b)
Nothing -> PatternsException -> IO b
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO b) -> PatternsException -> IO b
forall a b. (a -> b) -> a -> b
$ String -> PatternsException
TimeoutX String
"on requesting target"
                     Just Message b
r  -> b -> IO b
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> IO b) -> b -> IO b
forall a b. (a -> b) -> a -> b
$ Message b -> b
forall a. Message a -> a
msgContent Message b
r
          clFilter :: String -> Bool
clFilter = Bool -> Bool
not (Bool -> Bool) -> (String -> Bool) -> String -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (String -> [String] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [String
"__channel__", String
"__job__"])