{-# Language BangPatterns #-}
module Network.Mom.Stompl.Patterns.Bridge (
withForwarder,
withTaskBridge,
withServiceBridge)
where
import Types
import Network.Mom.Stompl.Patterns.Basic
import Network.Mom.Stompl.Client.Queue
import Codec.MIME.Type (nullType)
import Control.Exception (throwIO)
withForwarder :: Con -> Con -> String -> JobName ->
QName -> QName -> QName -> Int ->
OnError -> IO r -> IO r
withForwarder :: Con
-> Con
-> String
-> String
-> String
-> String
-> String
-> Int
-> OnError
-> IO r
-> IO r
withForwarder Con
src Con
trg String
n String
jn String
srq String
ssq String
trq Int
tmo OnError
onErr IO r
action =
Con
-> String
-> String
-> String
-> OnError
-> WriterDesc ByteString
-> (PubA ByteString -> IO r)
-> IO r
forall o r.
Con
-> String
-> String
-> String
-> OnError
-> WriterDesc o
-> (PubA o -> IO r)
-> IO r
withPub Con
trg String
n String
jn String
trq OnError
onErr
(String
"unknown", [], [], OutBound ByteString
bytesOut) ((PubA ByteString -> IO r) -> IO r)
-> (PubA ByteString -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \PubA ByteString
p ->
Con
-> String
-> String
-> String
-> Int
-> ReaderDesc ByteString
-> (Message ByteString -> IO ())
-> OnError
-> IO r
-> IO r
forall i r.
Con
-> String
-> String
-> String
-> Int
-> ReaderDesc i
-> (Message i -> IO ())
-> OnError
-> IO r
-> IO r
withSubThread Con
src String
n String
jn String
srq Int
tmo
(String
ssq, [], [], InBound ByteString
bytesIn) (PubA ByteString -> Message ByteString -> IO ()
forall o. PubA o -> Message o -> IO ()
pub PubA ByteString
p) OnError
onErr IO r
action
where pub :: PubA o -> Message o -> IO ()
pub PubA o
p Message o
m = PubA o -> Type -> [Header] -> o -> IO ()
forall o. PubA o -> Type -> [Header] -> o -> IO ()
publish PubA o
p Type
nullType (Message o -> [Header]
forall a. Message a -> [Header]
msgHdrs Message o
m) (o -> IO ()) -> o -> IO ()
forall a b. (a -> b) -> a -> b
$ Message o -> o
forall a. Message a -> a
msgContent Message o
m
withTaskBridge :: Con -> Con -> String -> JobName ->
QName -> QName ->
RegistryDesc -> OnError -> IO r -> IO r
withTaskBridge :: Con
-> Con
-> String
-> String
-> String
-> String
-> RegistryDesc
-> OnError
-> IO r
-> IO r
withTaskBridge Con
src Con
trg String
n String
jn String
srq String
twq RegistryDesc
reg OnError
onErr IO r
action =
Con
-> String
-> String
-> WriterDesc ByteString
-> (PusherA ByteString -> IO r)
-> IO r
forall o r.
Con
-> String -> String -> WriterDesc o -> (PusherA o -> IO r) -> IO r
withPusher Con
trg String
n String
jn (String
twq, [], [], OutBound ByteString
bytesOut) ((PusherA ByteString -> IO r) -> IO r)
-> (PusherA ByteString -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \PusherA ByteString
p ->
Con
-> String
-> String
-> (Message ByteString -> IO ())
-> ReaderDesc ByteString
-> RegistryDesc
-> OnError
-> IO r
-> IO r
forall i r.
Con
-> String
-> String
-> (Message i -> IO ())
-> ReaderDesc i
-> RegistryDesc
-> OnError
-> IO r
-> IO r
withTaskThread Con
src String
n String
jn (PusherA ByteString -> Message ByteString -> IO ()
forall a. PusherA a -> Message a -> IO ()
fwd PusherA ByteString
p)
(String
srq, [], [], InBound ByteString
bytesIn) RegistryDesc
reg OnError
onErr IO r
action
where fwd :: PusherA a -> Message a -> IO ()
fwd PusherA a
p Message a
m = let hs :: [Header]
hs = (Header -> Bool) -> [Header] -> [Header]
forall a. (a -> Bool) -> [a] -> [a]
filter ((String -> String -> Bool
forall a. Eq a => a -> a -> Bool
/= String
"__job__") (String -> Bool) -> (Header -> String) -> Header -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Header -> String
forall a b. (a, b) -> a
fst) ([Header] -> [Header]) -> [Header] -> [Header]
forall a b. (a -> b) -> a -> b
$ Message a -> [Header]
forall a. Message a -> [Header]
msgHdrs Message a
m
in PusherA a -> Type -> [Header] -> a -> IO ()
forall o. PusherA o -> Type -> [Header] -> o -> IO ()
push PusherA a
p Type
nullType [Header]
hs (a -> IO ()) -> a -> IO ()
forall a b. (a -> b) -> a -> b
$ Message a -> a
forall a. Message a -> a
msgContent Message a
m
withServiceBridge :: Con -> Con -> String -> JobName ->
QName -> QName -> QName ->
RegistryDesc -> OnError -> IO r -> IO r
withServiceBridge :: Con
-> Con
-> String
-> String
-> String
-> String
-> String
-> RegistryDesc
-> OnError
-> IO r
-> IO r
withServiceBridge Con
src Con
trg String
n String
jn String
srq String
trq String
twq
reg :: RegistryDesc
reg@(String
_, Int
tmo, (Int, Int, Int)
_) OnError
onErr IO r
action =
Con
-> String
-> String
-> ReaderDesc ByteString
-> WriterDesc ByteString
-> (ClientA ByteString ByteString -> IO r)
-> IO r
forall i o r.
Con
-> String
-> String
-> ReaderDesc i
-> WriterDesc o
-> (ClientA i o -> IO r)
-> IO r
withClient Con
trg String
n String
jn (String
trq, [], [], InBound ByteString
bytesIn)
(String
twq, [], [], OutBound ByteString
bytesOut) ((ClientA ByteString ByteString -> IO r) -> IO r)
-> (ClientA ByteString ByteString -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \ClientA ByteString ByteString
c ->
Con
-> String
-> String
-> Type
-> [Header]
-> (Message ByteString -> IO ByteString)
-> ReaderDesc ByteString
-> WriterDesc ByteString
-> RegistryDesc
-> OnError
-> IO r
-> IO r
forall i o r.
Con
-> String
-> String
-> Type
-> [Header]
-> (Message i -> IO o)
-> ReaderDesc i
-> WriterDesc o
-> RegistryDesc
-> OnError
-> IO r
-> IO r
withServerThread Con
src String
n String
jn Type
nullType [] (ClientA ByteString ByteString
-> Message ByteString -> IO ByteString
forall b a. ClientA b a -> Message a -> IO b
fwd ClientA ByteString ByteString
c)
(String
srq, [], [], InBound ByteString
bytesIn)
(String
"unknown", [], [], OutBound ByteString
bytesOut)
RegistryDesc
reg OnError
onErr IO r
action
where fwd :: ClientA b a -> Message a -> IO b
fwd ClientA b a
c Message a
m =
let hs :: [Header]
hs = (Header -> Bool) -> [Header] -> [Header]
forall a. (a -> Bool) -> [a] -> [a]
filter (String -> Bool
clFilter (String -> Bool) -> (Header -> String) -> Header -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Header -> String
forall a b. (a, b) -> a
fst) (Message a -> [Header]
forall a. Message a -> [Header]
msgHdrs Message a
m)
in do Maybe (Message b)
mbR <- ClientA b a
-> Int -> Type -> [Header] -> a -> IO (Maybe (Message b))
forall i o.
ClientA i o
-> Int -> Type -> [Header] -> o -> IO (Maybe (Message i))
request ClientA b a
c Int
tmo Type
nullType [Header]
hs (a -> IO (Maybe (Message b))) -> a -> IO (Maybe (Message b))
forall a b. (a -> b) -> a -> b
$ Message a -> a
forall a. Message a -> a
msgContent Message a
m
case Maybe (Message b)
mbR of
Maybe (Message b)
Nothing -> PatternsException -> IO b
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO b) -> PatternsException -> IO b
forall a b. (a -> b) -> a -> b
$ String -> PatternsException
TimeoutX String
"on requesting target"
Just Message b
r -> b -> IO b
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> IO b) -> b -> IO b
forall a b. (a -> b) -> a -> b
$ Message b -> b
forall a. Message a -> a
msgContent Message b
r
clFilter :: String -> Bool
clFilter = Bool -> Bool
not (Bool -> Bool) -> (String -> Bool) -> String -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (String -> [String] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [String
"__channel__", String
"__job__"])