module Network.Mom.Stompl.Patterns.Basic (
ClientA, clName, withClient, request, checkRequest,
ServerA, srvName, withServer, reply,
register, unRegister,
heartbeat, HB, mkHB,
withServerThread, RegistryDesc,
PusherA, pushName, withPusher, push,
withTaskThread,
Registry, withRegistry,
mapR, getProvider, showRegistry,
Provider, prvQ, JobType(..),
PubA, pubName, withPub, publish, withPubThread,
SubA, subName, withSub, checkIssue,
withSubThread, withSubMVar,
withPubProxy,
PatternsException(..), OnError,
StatusCode(..), readStatusCode,
JobName, QName,
nobody, ignorebody,
bytesOut, bytesIn,
getJobName, getJobType, getQueue, getChannel,
getHB,
getSC,
getHeader)
where
import Registry
import Types
import Network.Mom.Stompl.Client.Queue
import qualified Network.Mom.Stompl.Frame as F
import System.Timeout
import Codec.MIME.Type (Type)
import Control.Exception (throwIO, catches, finally)
import Control.Concurrent
import Control.Monad (forever, unless, when, void)
import Data.Time
import qualified Data.ByteString.Char8 as B
data ClientA i o = Cl {
ClientA i o -> String
clName :: String,
ClientA i o -> String
clChn :: QName,
ClientA i o -> String
clJob :: JobName,
ClientA i o -> Reader i
clIn :: Reader i,
ClientA i o -> Writer o
clOut :: Writer o}
withClient :: Con -> String ->
JobName ->
ReaderDesc i ->
WriterDesc o ->
(ClientA i o -> IO r) -> IO r
withClient :: Con
-> String
-> String
-> ReaderDesc i
-> WriterDesc o
-> (ClientA i o -> IO r)
-> IO r
withClient Con
c String
n String
jn rd :: ReaderDesc i
rd@(String
rn, [Qopt]
_, [Header]
_, InBound i
_) WriterDesc o
wd ClientA i o -> IO r
act =
Con
-> String
-> ReaderDesc i
-> WriterDesc o
-> ((Reader i, Writer o) -> IO r)
-> IO r
forall i o r.
Con
-> String
-> ReaderDesc i
-> WriterDesc o
-> ((Reader i, Writer o) -> IO r)
-> IO r
withPair Con
c String
n ReaderDesc i
rd WriterDesc o
wd (((Reader i, Writer o) -> IO r) -> IO r)
-> ((Reader i, Writer o) -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \(Reader i
r,Writer o
w) -> ClientA i o -> IO r
act (ClientA i o -> IO r) -> ClientA i o -> IO r
forall a b. (a -> b) -> a -> b
$ String -> String -> String -> Reader i -> Writer o -> ClientA i o
forall i o.
String -> String -> String -> Reader i -> Writer o -> ClientA i o
Cl String
n String
rn String
jn Reader i
r Writer o
w
request :: ClientA i o ->
Int -> Type -> [F.Header] -> o -> IO (Maybe (Message i))
request :: ClientA i o
-> Int -> Type -> [Header] -> o -> IO (Maybe (Message i))
request ClientA i o
c Int
tmo Type
t [Header]
hs o
r =
let hs' :: [Header]
hs' = [(String
"__channel__", ClientA i o -> String
forall i o. ClientA i o -> String
clChn ClientA i o
c),
(String
"__job__", ClientA i o -> String
forall i o. ClientA i o -> String
clJob ClientA i o
c)] [Header] -> [Header] -> [Header]
forall a. [a] -> [a] -> [a]
++ [Header]
hs
in Writer o -> Type -> [Header] -> o -> IO ()
forall a. Writer a -> Type -> [Header] -> a -> IO ()
writeQ (ClientA i o -> Writer o
forall i o. ClientA i o -> Writer o
clOut ClientA i o
c) Type
t [Header]
hs' o
r IO () -> IO (Maybe (Message i)) -> IO (Maybe (Message i))
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Int -> IO (Message i) -> IO (Maybe (Message i))
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
tmo (Reader i -> IO (Message i)
forall a. Reader a -> IO (Message a)
readQ (ClientA i o -> Reader i
forall i o. ClientA i o -> Reader i
clIn ClientA i o
c))
checkRequest :: ClientA i o -> Int -> IO (Maybe (Message i))
checkRequest :: ClientA i o -> Int -> IO (Maybe (Message i))
checkRequest ClientA i o
c Int
tmo = Int -> IO (Message i) -> IO (Maybe (Message i))
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
tmo (IO (Message i) -> IO (Maybe (Message i)))
-> IO (Message i) -> IO (Maybe (Message i))
forall a b. (a -> b) -> a -> b
$ Reader i -> IO (Message i)
forall a. Reader a -> IO (Message a)
readQ (ClientA i o -> Reader i
forall i o. ClientA i o -> Reader i
clIn ClientA i o
c)
data ServerA i o = Srv {
ServerA i o -> String
srvName :: String,
ServerA i o -> Reader i
srvIn :: Reader i,
ServerA i o -> Writer o
srvOut :: Writer o}
withServer :: Con -> String ->
ReaderDesc i ->
WriterDesc o ->
(ServerA i o -> IO r) -> IO r
withServer :: Con
-> String
-> ReaderDesc i
-> WriterDesc o
-> (ServerA i o -> IO r)
-> IO r
withServer Con
c String
n ReaderDesc i
rd WriterDesc o
wd ServerA i o -> IO r
act =
Con
-> String
-> ReaderDesc i
-> WriterDesc o
-> ((Reader i, Writer o) -> IO r)
-> IO r
forall i o r.
Con
-> String
-> ReaderDesc i
-> WriterDesc o
-> ((Reader i, Writer o) -> IO r)
-> IO r
withPair Con
c String
n ReaderDesc i
rd WriterDesc o
wd (((Reader i, Writer o) -> IO r) -> IO r)
-> ((Reader i, Writer o) -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \(Reader i
r,Writer o
w) -> ServerA i o -> IO r
act (ServerA i o -> IO r) -> ServerA i o -> IO r
forall a b. (a -> b) -> a -> b
$ String -> Reader i -> Writer o -> ServerA i o
forall i o. String -> Reader i -> Writer o -> ServerA i o
Srv String
n Reader i
r Writer o
w
reply :: ServerA i o -> Int -> Type -> [F.Header] ->
(Message i -> IO o) -> IO ()
reply :: ServerA i o
-> Int -> Type -> [Header] -> (Message i -> IO o) -> IO ()
reply ServerA i o
s Int
tmo Type
t [Header]
hs Message i -> IO o
transform = do
Maybe (Message i)
mbM <- Int -> IO (Message i) -> IO (Maybe (Message i))
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
tmo (IO (Message i) -> IO (Maybe (Message i)))
-> IO (Message i) -> IO (Maybe (Message i))
forall a b. (a -> b) -> a -> b
$ Reader i -> IO (Message i)
forall a. Reader a -> IO (Message a)
readQ (ServerA i o -> Reader i
forall i o. ServerA i o -> Reader i
srvIn ServerA i o
s)
case Maybe (Message i)
mbM of
Maybe (Message i)
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just Message i
m -> do
String
c <- Message i -> IO String
forall m. Message m -> IO String
getChannel Message i
m
o
o <- Message i -> IO o
transform Message i
m
Writer o -> String -> Type -> [Header] -> o -> IO ()
forall a. Writer a -> String -> Type -> [Header] -> a -> IO ()
writeAdHoc (ServerA i o -> Writer o
forall i o. ServerA i o -> Writer o
srvOut ServerA i o
s) String
c Type
t [Header]
hs o
o
withServerThread :: Con -> String -> JobName ->
Type -> [F.Header] -> (Message i -> IO o) ->
ReaderDesc i ->
WriterDesc o ->
RegistryDesc ->
OnError -> IO r -> IO r
withServerThread :: Con
-> String
-> String
-> Type
-> [Header]
-> (Message i -> IO o)
-> ReaderDesc i
-> WriterDesc o
-> RegistryDesc
-> OnError
-> IO r
-> IO r
withServerThread Con
c String
n String
jn Type
t [Header]
hs Message i -> IO o
transform
rd :: ReaderDesc i
rd@(String
rn, [Qopt]
_, [Header]
_, InBound i
_)
WriterDesc o
wd
(String
reg, Int
tmo, (Int
best, Int
mn, Int
mx))
OnError
onErr IO r
action =
Con
-> String
-> ReaderDesc i
-> WriterDesc o
-> (ServerA i o -> IO r)
-> IO r
forall i o r.
Con
-> String
-> ReaderDesc i
-> WriterDesc o
-> (ServerA i o -> IO r)
-> IO r
withServer Con
c String
n ReaderDesc i
rd WriterDesc o
wd ((ServerA i o -> IO r) -> IO r) -> (ServerA i o -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \ServerA i o
s -> do
(StatusCode
sc,Int
me) <- if String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
reg
then (StatusCode, Int) -> IO (StatusCode, Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (StatusCode
OK, Int
0)
else Con
-> String
-> JobType
-> String
-> String
-> Int
-> Int
-> IO (StatusCode, Int)
register Con
c String
jn JobType
Service String
reg String
rn Int
tmo Int
best
case StatusCode
sc of
StatusCode
OK ->
if Int
me Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
mn Bool -> Bool -> Bool
|| Int
me Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
mx
then do Con -> String -> String -> String -> Int -> IO ()
finalise Con
c String
jn String
reg String
rn Int
tmo
PatternsException -> IO r
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO r) -> PatternsException -> IO r
forall a b. (a -> b) -> a -> b
$ Int -> PatternsException
UnacceptableHbX Int
me
else do HB
hb <- Int -> IO HB
mkHB Int
me
MVar HB
m <- HB -> IO (MVar HB)
forall a. a -> IO (MVar a)
newMVar HB
hb
let p :: Int
p = if Int
me Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 then (-Int
1) else Int
1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
me
IO () -> IO r -> IO r
forall r. IO () -> IO r -> IO r
withThread (IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
finally (MVar HB -> Int -> ServerA i o -> IO ()
forall r. MVar HB -> Int -> ServerA i o -> IO r
srv MVar HB
m Int
p ServerA i o
s)
(Con -> String -> String -> String -> Int -> IO ()
finalise Con
c String
jn String
reg String
rn Int
tmo)) IO r
action
StatusCode
e -> PatternsException -> IO r
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO r) -> PatternsException -> IO r
forall a b. (a -> b) -> a -> b
$ StatusCode -> String -> PatternsException
NotOKX StatusCode
e String
"on register"
where srv :: MVar HB -> Int -> ServerA i o -> IO r
srv MVar HB
m Int
p ServerA i o
s = Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound ()
-> (Writer () -> IO r)
-> IO r
forall o r.
Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound o
-> (Writer o -> IO r)
-> IO r
withWriter Con
c String
"HB" String
reg [] [] OutBound ()
nobody ((Writer () -> IO r) -> IO r) -> (Writer () -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Writer ()
w ->
IO () -> IO r
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO r) -> IO () -> IO r
forall a b. (a -> b) -> a -> b
$ IO () -> [Handler ()] -> IO ()
forall a. IO a -> [Handler a] -> IO a
catches (
ServerA i o
-> Int -> Type -> [Header] -> (Message i -> IO o) -> IO ()
forall i o.
ServerA i o
-> Int -> Type -> [Header] -> (Message i -> IO o) -> IO ()
reply ServerA i o
s Int
p Type
t [Header]
hs Message i -> IO o
transform IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar HB -> Writer () -> String -> String -> IO ()
heartbeat MVar HB
m Writer ()
w String
jn String
rn) (
String -> OnError -> [Handler ()]
ignoreHandler (ServerA i o -> String
forall i o. ServerA i o -> String
srvName ServerA i o
s) OnError
onErr)
finalise :: Con -> JobName -> QName -> QName -> Int -> IO ()
finalise :: Con -> String -> String -> String -> Int -> IO ()
finalise Con
c String
jn String
wn String
rn Int
tmo | String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
wn = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
| Bool
otherwise = do
StatusCode
sc <- Con -> String -> String -> String -> Int -> IO StatusCode
unRegister Con
c String
jn String
wn String
rn Int
tmo
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (StatusCode
sc StatusCode -> StatusCode -> Bool
forall a. Eq a => a -> a -> Bool
== StatusCode
OK) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
PatternsException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO ()) -> PatternsException -> IO ()
forall a b. (a -> b) -> a -> b
$ StatusCode -> String -> PatternsException
NotOKX StatusCode
sc String
"on unregister"
data PubA o = Pub {
PubA o -> String
pubName :: String,
PubA o -> String
pubJob :: JobName,
PubA o -> Registry
pubReg :: Registry,
PubA o -> OutBound o
pubConv :: OutBound o,
PubA o -> Writer ByteString
pubOut :: Writer B.ByteString}
withPub :: Con -> String -> JobName -> QName -> OnError ->
WriterDesc o -> (PubA o -> IO r) -> IO r
withPub :: Con
-> String
-> String
-> String
-> OnError
-> WriterDesc o
-> (PubA o -> IO r)
-> IO r
withPub Con
c String
n String
jn String
rn OnError
onErr (String
_, [Qopt]
wos, [Header]
wh, OutBound o
oconv) PubA o -> IO r
act =
Con
-> String
-> String
-> (Int, Int)
-> OnError
-> (Registry -> IO r)
-> IO r
forall r.
Con
-> String
-> String
-> (Int, Int)
-> OnError
-> (Registry -> IO r)
-> IO r
withRegistry Con
c String
n String
rn (Int
0,Int
0) OnError
onErr ((Registry -> IO r) -> IO r) -> (Registry -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Registry
r ->
Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound ByteString
-> (Writer ByteString -> IO r)
-> IO r
forall o r.
Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound o
-> (Writer o -> IO r)
-> IO r
withWriter Con
c String
jn String
"unknown" [Qopt]
wos [Header]
wh OutBound ByteString
bytesOut ((Writer ByteString -> IO r) -> IO r)
-> (Writer ByteString -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Writer ByteString
w ->
PubA o -> IO r
act (PubA o -> IO r) -> PubA o -> IO r
forall a b. (a -> b) -> a -> b
$ String
-> String -> Registry -> OutBound o -> Writer ByteString -> PubA o
forall o.
String
-> String -> Registry -> OutBound o -> Writer ByteString -> PubA o
Pub String
n String
jn Registry
r OutBound o
oconv Writer ByteString
w
publish :: PubA o -> Type -> [F.Header] -> o -> IO ()
publish :: PubA o -> Type -> [Header] -> o -> IO ()
publish PubA o
p Type
t [Header]
hs o
x = let oc :: OutBound o
oc = PubA o -> OutBound o
forall o. PubA o -> OutBound o
pubConv PubA o
p
in OutBound o
oc o
x IO ByteString -> (ByteString -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ByteString
m ->
IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ Registry -> String -> (Provider -> IO ()) -> IO Bool
mapR (PubA o -> Registry
forall o. PubA o -> Registry
pubReg PubA o
p) (PubA o -> String
forall o. PubA o -> String
pubJob PubA o
p) ((Provider -> IO ()) -> IO Bool) -> (Provider -> IO ()) -> IO Bool
forall a b. (a -> b) -> a -> b
$ \Provider
prv ->
Writer ByteString
-> String -> Type -> [Header] -> ByteString -> IO ()
forall a. Writer a -> String -> Type -> [Header] -> a -> IO ()
writeAdHoc (PubA o -> Writer ByteString
forall o. PubA o -> Writer ByteString
pubOut PubA o
p) (Provider -> String
prvQ Provider
prv) Type
t [Header]
hs ByteString
m
withPubThread :: Con -> String -> JobName -> QName ->
Type -> [F.Header] -> IO o ->
WriterDesc o -> Int ->
OnError -> IO r -> IO r
withPubThread :: Con
-> String
-> String
-> String
-> Type
-> [Header]
-> IO o
-> WriterDesc o
-> Int
-> OnError
-> IO r
-> IO r
withPubThread Con
c String
n String
jn String
rn Type
t [Header]
hs IO o
create WriterDesc o
wd Int
period OnError
onErr IO r
action =
Con
-> String
-> String
-> String
-> OnError
-> WriterDesc o
-> (PubA o -> IO r)
-> IO r
forall o r.
Con
-> String
-> String
-> String
-> OnError
-> WriterDesc o
-> (PubA o -> IO r)
-> IO r
withPub Con
c String
n String
jn String
rn OnError
onErr WriterDesc o
wd ((PubA o -> IO r) -> IO r) -> (PubA o -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \PubA o
p -> IO () -> IO r -> IO r
forall r. IO () -> IO r -> IO r
withThread (PubA o -> IO ()
forall b. PubA o -> IO b
doPub PubA o
p) IO r
action
where doPub :: PubA o -> IO b
doPub PubA o
p = IO () -> IO b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO b) -> IO () -> IO b
forall a b. (a -> b) -> a -> b
$ IO () -> [Handler ()] -> IO ()
forall a. IO a -> [Handler a] -> IO a
catches (do
UTCTime
n1 <- IO UTCTime
getCurrentTime
IO o
create IO o -> (o -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= PubA o -> Type -> [Header] -> o -> IO ()
forall o. PubA o -> Type -> [Header] -> o -> IO ()
publish PubA o
p Type
t [Header]
hs
UTCTime
n2 <- IO UTCTime
getCurrentTime
let d :: Int
d = NominalDiffTime -> Int
nominal2us (UTCTime
n2 UTCTime -> UTCTime -> NominalDiffTime
`diffUTCTime` UTCTime
n1)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
d Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
period) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Int
period Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
d)) (
String -> OnError -> [Handler ()]
ignoreHandler (PubA o -> String
forall o. PubA o -> String
pubName PubA o
p) OnError
onErr)
data SubA i = Sub {
SubA i -> String
subName :: String,
SubA i -> Reader i
subIn :: Reader i
}
withSub :: Con -> String -> JobName -> QName -> Int ->
ReaderDesc i -> (SubA i -> IO r) -> IO r
withSub :: Con
-> String
-> String
-> String
-> Int
-> ReaderDesc i
-> (SubA i -> IO r)
-> IO r
withSub Con
c String
n String
jn String
wn Int
tmo (String
rn, [Qopt]
ros, [Header]
rh, InBound i
iconv) SubA i -> IO r
act =
Con
-> String
-> String
-> [Qopt]
-> [Header]
-> InBound i
-> (Reader i -> IO r)
-> IO r
forall i r.
Con
-> String
-> String
-> [Qopt]
-> [Header]
-> InBound i
-> (Reader i -> IO r)
-> IO r
withReader Con
c String
n String
rn [Qopt]
ros [Header]
rh InBound i
iconv ((Reader i -> IO r) -> IO r) -> (Reader i -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Reader i
r -> do
Maybe (StatusCode, Int)
mbR <- if String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
wn
then Maybe (StatusCode, Int) -> IO (Maybe (StatusCode, Int))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (StatusCode, Int) -> IO (Maybe (StatusCode, Int)))
-> Maybe (StatusCode, Int) -> IO (Maybe (StatusCode, Int))
forall a b. (a -> b) -> a -> b
$ (StatusCode, Int) -> Maybe (StatusCode, Int)
forall a. a -> Maybe a
Just (StatusCode
OK,Int
0)
else Int -> IO (StatusCode, Int) -> IO (Maybe (StatusCode, Int))
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
tmo (IO (StatusCode, Int) -> IO (Maybe (StatusCode, Int)))
-> IO (StatusCode, Int) -> IO (Maybe (StatusCode, Int))
forall a b. (a -> b) -> a -> b
$ Con
-> String
-> JobType
-> String
-> String
-> Int
-> Int
-> IO (StatusCode, Int)
register Con
c String
jn JobType
Topic String
wn String
rn Int
tmo Int
0
case Maybe (StatusCode, Int)
mbR of
Maybe (StatusCode, Int)
Nothing -> PatternsException -> IO r
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO r) -> PatternsException -> IO r
forall a b. (a -> b) -> a -> b
$ String -> PatternsException
TimeoutX String
"on register"
Just (StatusCode
OK,Int
_) -> IO r -> IO () -> IO r
forall a b. IO a -> IO b -> IO a
finally (SubA i -> IO r
act (SubA i -> IO r) -> SubA i -> IO r
forall a b. (a -> b) -> a -> b
$ String -> Reader i -> SubA i
forall i. String -> Reader i -> SubA i
Sub String
n Reader i
r)
(Con -> String -> String -> String -> Int -> IO ()
finalise Con
c String
jn String
wn String
rn Int
tmo)
Just (StatusCode
sc,Int
_) -> PatternsException -> IO r
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO r) -> PatternsException -> IO r
forall a b. (a -> b) -> a -> b
$ StatusCode -> String -> PatternsException
NotOKX StatusCode
sc String
"on register "
checkIssue :: SubA i -> Int -> IO (Maybe (Message i))
checkIssue :: SubA i -> Int -> IO (Maybe (Message i))
checkIssue SubA i
s Int
tmo = Int -> IO (Message i) -> IO (Maybe (Message i))
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
tmo (IO (Message i) -> IO (Maybe (Message i)))
-> IO (Message i) -> IO (Maybe (Message i))
forall a b. (a -> b) -> a -> b
$ Reader i -> IO (Message i)
forall a. Reader a -> IO (Message a)
readQ (SubA i -> Reader i
forall i. SubA i -> Reader i
subIn SubA i
s)
withSubThread :: Con -> String -> JobName -> QName -> Int ->
ReaderDesc i -> (Message i -> IO ()) -> OnError ->
IO r -> IO r
withSubThread :: Con
-> String
-> String
-> String
-> Int
-> ReaderDesc i
-> (Message i -> IO ())
-> OnError
-> IO r
-> IO r
withSubThread Con
c String
n String
jn String
wn Int
tmo ReaderDesc i
rd Message i -> IO ()
job OnError
onErr IO r
action =
Con
-> String
-> String
-> String
-> Int
-> ReaderDesc i
-> (SubA i -> IO r)
-> IO r
forall i r.
Con
-> String
-> String
-> String
-> Int
-> ReaderDesc i
-> (SubA i -> IO r)
-> IO r
withSub Con
c String
n String
jn String
wn Int
tmo ReaderDesc i
rd ((SubA i -> IO r) -> IO r) -> (SubA i -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \SubA i
s -> IO () -> IO r -> IO r
forall r. IO () -> IO r -> IO r
withThread (SubA i -> IO ()
forall b. SubA i -> IO b
go SubA i
s) IO r
action
where go :: SubA i -> IO b
go SubA i
s = IO () -> IO b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO b) -> IO () -> IO b
forall a b. (a -> b) -> a -> b
$ IO () -> [Handler ()] -> IO ()
forall a. IO a -> [Handler a] -> IO a
catches (SubA i -> IO (Message i)
forall i. SubA i -> IO (Message i)
chk SubA i
s IO (Message i) -> (Message i -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Message i -> IO ()
job)
(String -> OnError -> [Handler ()]
ignoreHandler (SubA i -> String
forall i. SubA i -> String
subName SubA i
s) OnError
onErr)
chk :: SubA i -> IO (Message i)
chk SubA i
s = SubA i -> Int -> IO (Maybe (Message i))
forall i. SubA i -> Int -> IO (Maybe (Message i))
checkIssue SubA i
s (-Int
1) IO (Maybe (Message i))
-> (Maybe (Message i) -> IO (Message i)) -> IO (Message i)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Maybe (Message i)
mbX ->
case Maybe (Message i)
mbX of
Maybe (Message i)
Nothing -> SubA i -> IO (Message i)
chk SubA i
s
Just Message i
m -> Message i -> IO (Message i)
forall (m :: * -> *) a. Monad m => a -> m a
return Message i
m
withSubMVar :: Con -> String -> JobName -> QName -> Int ->
ReaderDesc i -> MVar i -> OnError ->
IO r -> IO r
withSubMVar :: Con
-> String
-> String
-> String
-> Int
-> ReaderDesc i
-> MVar i
-> OnError
-> IO r
-> IO r
withSubMVar Con
c String
n String
jn String
wn Int
tmo ReaderDesc i
rd MVar i
v =
Con
-> String
-> String
-> String
-> Int
-> ReaderDesc i
-> (Message i -> IO ())
-> OnError
-> IO r
-> IO r
forall i r.
Con
-> String
-> String
-> String
-> Int
-> ReaderDesc i
-> (Message i -> IO ())
-> OnError
-> IO r
-> IO r
withSubThread Con
c String
n String
jn String
wn Int
tmo ReaderDesc i
rd Message i -> IO ()
job
where job :: Message i -> IO ()
job Message i
m = MVar i -> (i -> IO i) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar i
v ((i -> IO i) -> IO ()) -> (i -> IO i) -> IO ()
forall a b. (a -> b) -> a -> b
$ \i
_ -> i -> IO i
forall (m :: * -> *) a. Monad m => a -> m a
return (i -> IO i) -> i -> IO i
forall a b. (a -> b) -> a -> b
$ Message i -> i
forall a. Message a -> a
msgContent Message i
m
data PusherA o = Pusher {
PusherA o -> String
pushName :: String,
PusherA o -> String
pushJob :: JobName,
PusherA o -> Writer o
pushQ :: Writer o
}
withPusher :: Con -> String -> JobName -> WriterDesc o ->
(PusherA o -> IO r) -> IO r
withPusher :: Con
-> String -> String -> WriterDesc o -> (PusherA o -> IO r) -> IO r
withPusher Con
c String
n String
jn (String
wq, [Qopt]
wos, [Header]
wh, OutBound o
oconv) PusherA o -> IO r
action =
Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound o
-> (Writer o -> IO r)
-> IO r
forall o r.
Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound o
-> (Writer o -> IO r)
-> IO r
withWriter Con
c String
n String
wq [Qopt]
wos [Header]
wh OutBound o
oconv ((Writer o -> IO r) -> IO r) -> (Writer o -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Writer o
w -> PusherA o -> IO r
action (PusherA o -> IO r) -> PusherA o -> IO r
forall a b. (a -> b) -> a -> b
$ String -> String -> Writer o -> PusherA o
forall o. String -> String -> Writer o -> PusherA o
Pusher String
n String
jn Writer o
w
push :: PusherA o -> Type -> [F.Header] -> o -> IO ()
push :: PusherA o -> Type -> [Header] -> o -> IO ()
push PusherA o
p Type
t [Header]
hs o
m = let hs' :: [Header]
hs' = (String
"__job__", PusherA o -> String
forall o. PusherA o -> String
pushJob PusherA o
p) Header -> [Header] -> [Header]
forall a. a -> [a] -> [a]
: [Header]
hs
in Writer o -> Type -> [Header] -> o -> IO ()
forall a. Writer a -> Type -> [Header] -> a -> IO ()
writeQ (PusherA o -> Writer o
forall o. PusherA o -> Writer o
pushQ PusherA o
p) Type
t [Header]
hs' o
m
withTaskThread :: Con -> String -> JobName ->
(Message i -> IO ()) ->
ReaderDesc i -> RegistryDesc ->
OnError -> IO r -> IO r
withTaskThread :: Con
-> String
-> String
-> (Message i -> IO ())
-> ReaderDesc i
-> RegistryDesc
-> OnError
-> IO r
-> IO r
withTaskThread Con
c String
n String
jn Message i -> IO ()
task
(String
rn, [Qopt]
ros, [Header]
rh, InBound i
iconv)
(String
reg, Int
tmo, (Int
best, Int
mn, Int
mx))
OnError
onErr IO r
action = do
(StatusCode
sc,Int
me) <- if String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
reg
then (StatusCode, Int) -> IO (StatusCode, Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (StatusCode
OK,Int
0)
else Con
-> String
-> JobType
-> String
-> String
-> Int
-> Int
-> IO (StatusCode, Int)
register Con
c String
jn JobType
Task String
reg String
rn Int
tmo Int
best
case StatusCode
sc of
StatusCode
OK ->
if Int
me Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
mn Bool -> Bool -> Bool
|| Int
me Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
mx
then do Con -> String -> String -> String -> Int -> IO ()
finalise Con
c String
jn String
reg String
rn Int
tmo
PatternsException -> IO r
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO r) -> PatternsException -> IO r
forall a b. (a -> b) -> a -> b
$ Int -> PatternsException
UnacceptableHbX Int
me
else do HB
hb <- Int -> IO HB
mkHB Int
me
MVar HB
m <- HB -> IO (MVar HB)
forall a. a -> IO (MVar a)
newMVar HB
hb
let p :: Int
p = if Int
me Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 then (-Int
1) else Int
1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
me
Con
-> String
-> String
-> [Qopt]
-> [Header]
-> InBound i
-> (Reader i -> IO r)
-> IO r
forall i r.
Con
-> String
-> String
-> [Qopt]
-> [Header]
-> InBound i
-> (Reader i -> IO r)
-> IO r
withReader Con
c String
n String
rn [Qopt]
ros [Header]
rh InBound i
iconv ((Reader i -> IO r) -> IO r) -> (Reader i -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Reader i
r ->
IO () -> IO r -> IO r
forall r. IO () -> IO r -> IO r
withThread (IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
finally (MVar HB -> Reader i -> Int -> IO ()
forall r. MVar HB -> Reader i -> Int -> IO r
tsk MVar HB
m Reader i
r Int
p)
(Con -> String -> String -> String -> Int -> IO ()
finalise Con
c String
jn String
reg String
rn Int
tmo)) IO r
action
StatusCode
e -> PatternsException -> IO r
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO r) -> PatternsException -> IO r
forall a b. (a -> b) -> a -> b
$ StatusCode -> String -> PatternsException
NotOKX StatusCode
e String
"on register"
where tsk :: MVar HB -> Reader i -> Int -> IO r
tsk MVar HB
m Reader i
r Int
p = Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound ()
-> (Writer () -> IO r)
-> IO r
forall o r.
Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound o
-> (Writer o -> IO r)
-> IO r
withWriter Con
c String
"HB" String
reg [] [] OutBound ()
nobody ((Writer () -> IO r) -> IO r) -> (Writer () -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Writer ()
w ->
IO () -> IO r
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO r) -> IO () -> IO r
forall a b. (a -> b) -> a -> b
$ IO () -> [Handler ()] -> IO ()
forall a. IO a -> [Handler a] -> IO a
catches (do
Maybe (Message i)
mbM <- Int -> IO (Message i) -> IO (Maybe (Message i))
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
p (IO (Message i) -> IO (Maybe (Message i)))
-> IO (Message i) -> IO (Maybe (Message i))
forall a b. (a -> b) -> a -> b
$ Reader i -> IO (Message i)
forall a. Reader a -> IO (Message a)
readQ Reader i
r
case Maybe (Message i)
mbM of
Maybe (Message i)
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just Message i
x -> Message i -> IO ()
task Message i
x
MVar HB -> Writer () -> String -> String -> IO ()
heartbeat MVar HB
m Writer ()
w String
jn String
rn)
(String -> OnError -> [Handler ()]
ignoreHandler String
n OnError
onErr)
withPubProxy :: Con -> String -> JobName -> QName ->
ReaderDesc i -> RegistryDesc -> OnError -> IO r -> IO r
withPubProxy :: Con
-> String
-> String
-> String
-> ReaderDesc i
-> RegistryDesc
-> OnError
-> IO r
-> IO r
withPubProxy Con
c String
n String
jn String
pq ReaderDesc i
rd (String
reg, Int
tmo, (Int
best, Int
mn, Int
mx)) OnError
onErr IO r
action =
Con
-> String
-> String
-> String
-> Int
-> ReaderDesc i
-> (SubA i -> IO r)
-> IO r
forall i r.
Con
-> String
-> String
-> String
-> Int
-> ReaderDesc i
-> (SubA i -> IO r)
-> IO r
withSub Con
c String
n String
jn String
pq Int
tmo ReaderDesc i
rd ((SubA i -> IO r) -> IO r) -> (SubA i -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \SubA i
s ->
Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound ()
-> (Writer () -> IO r)
-> IO r
forall o r.
Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound o
-> (Writer o -> IO r)
-> IO r
withWriter Con
c String
"HB" String
reg [] [] OutBound ()
nobody ((Writer () -> IO r) -> IO r) -> (Writer () -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Writer ()
w -> do
(StatusCode
sc, Int
h) <- Con
-> String
-> JobType
-> String
-> String
-> Int
-> Int
-> IO (StatusCode, Int)
register Con
c String
jn JobType
Topic String
reg String
pq Int
tmo Int
best
if StatusCode
sc StatusCode -> StatusCode -> Bool
forall a. Eq a => a -> a -> Bool
/= StatusCode
OK
then PatternsException -> IO r
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO r) -> PatternsException -> IO r
forall a b. (a -> b) -> a -> b
$ StatusCode -> String -> PatternsException
NotOKX StatusCode
sc String
"on register proxy"
else if Int
h Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
mn Bool -> Bool -> Bool
|| Int
h Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
mx
then PatternsException -> IO r
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO r) -> PatternsException -> IO r
forall a b. (a -> b) -> a -> b
$ Int -> PatternsException
UnacceptableHbX Int
h
else do MVar HB
hb <- Int -> IO HB
mkHB Int
h IO HB -> (HB -> IO (MVar HB)) -> IO (MVar HB)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= HB -> IO (MVar HB)
forall a. a -> IO (MVar a)
newMVar
IO () -> IO r -> IO r
forall r. IO () -> IO r -> IO r
withThread (IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
finally (MVar HB -> Int -> SubA i -> Writer () -> Int -> IO ()
forall i. MVar HB -> Int -> SubA i -> Writer () -> Int -> IO ()
beat MVar HB
hb (Int
h Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000) SubA i
s Writer ()
w Int
0)
(Con -> String -> String -> String -> Int -> IO ()
finalise Con
c String
jn String
reg String
pq Int
tmo))
IO r
action
where beat :: MVar HB -> Int -> SubA i -> Writer () -> Int -> IO ()
beat :: MVar HB -> Int -> SubA i -> Writer () -> Int -> IO ()
beat MVar HB
hb Int
h SubA i
s Writer ()
w Int
i = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Maybe (Message i)
mbM <- SubA i -> Int -> IO (Maybe (Message i))
forall i. SubA i -> Int -> IO (Maybe (Message i))
checkIssue SubA i
s Int
h
case Maybe (Message i)
mbM of
Maybe (Message i)
Nothing -> if Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
10
then PatternsException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO ()) -> PatternsException -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> PatternsException
MissingHbX String
"No input from pub"
else MVar HB -> Int -> SubA i -> Writer () -> Int -> IO ()
forall i. MVar HB -> Int -> SubA i -> Writer () -> Int -> IO ()
beat MVar HB
hb Int
h SubA i
s Writer ()
w (Int
iInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1)
Just Message i
_ -> MVar HB -> Writer () -> String -> String -> IO ()
heartbeat MVar HB
hb Writer ()
w String
jn String
pq IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar HB -> Int -> SubA i -> Writer () -> Int -> IO ()
forall i. MVar HB -> Int -> SubA i -> Writer () -> Int -> IO ()
beat MVar HB
hb Int
h SubA i
s Writer ()
w Int
0
IO () -> [Handler ()] -> IO ()
forall a. IO a -> [Handler a] -> IO a
`catches` (String -> OnError -> [Handler ()]
ignoreHandler String
n OnError
onErr)