-------------------------------------------------------------------------------
-- |
-- Module     : Network/Mom/Stompl/Patterns/Basic.hs
-- Copyright  : (c) Tobias Schoofs
-- License    : LGPL 
-- Stability  : experimental
-- Portability: portable
--
-- This module provides the /basic/ patterns
-- client\/server, publish and subscribe and pusher\/worker
-- (a.k.a. pipeline) as well as a /registry/,
-- a means to support patterns
-- where one application uses or serves a set of other applications. 
-- 
-- Basic patterns can be distinguished by the data exchange
-- defined by their protocol:
--
--  * client\/server: The client, first, sends data to the server
--                    (the request) and, then, 
--                    the server sends data to this client
--                    (the reply);
--
--  * publish and subscribe: The publisher sends data to all subscribers
--                           (the topic);
--
--  * pusher\/worker: The pusher sends data to the worker (the request)
--                   without receiving a reply.
--
--  We call the processing performed by an application on behalf of 
--  another application a /job/.
--  There are three different job types:
--
--  * service: must be requested explicitly and includes a message
--             sent from the application that perfoms the service (server)
--             to the application that requests the service (client).
--
--  * task: must be requested explicitly, but does not include a reply.
--
--  * topic: is sent to registered subscribers without being requested
--           explicitly.
--
--  Applications providing a job are generically called providers, 
--  applications requesting a job are called consumers.
--  Providers, hence, are servers, workers and publishers, and
--  consumers are clients, pushers and subscribers.
--  Note that this is somewhat different from the 
--  data-centric terminology \"producer\" and \"consumer\".
--  It is not very useful
--  to distinugish servers from clients or
--  pushers from workers by referring to the distinction
--  of producing or not producing data.
--  It is in fact the pusher that produces data,
--  not the worker. The pusher, however, is the one
--  that requests something from the worker. The task, in this case,
--  is the \"good\" that is provided by one side and consumed
--  by the other.
--
--  This distinction is relevant when we start to think
--  about /reliability/.
--  Reliability is a relation between a provider and a consumer:
--  The consumer relies on the producer,
--  not the other way round, /e.g./
--  a pusher relies on a woker
--  and a client on a server to get the job done.
--
--  The interfaces in this library 
--  give some guarantees related to reliability, 
--  but there are also some pitfalls:
--  
--  * A client using timeouts can be sure that 
--      the requested service has been performed
--      when the reply arrives before the timeout expires.
--      If no reply arrives before timeout expiration,
--      no such claim can be made (in particular not
--      that the service has not been performed).
--      The service may have been performed, but 
--      it may have taken more time than expected or
--      it may have been performed, 
--      but the reply message has failed to arrive.
--      If the service is /idempotent/ -
--      /i.e./ calling the service twice has 
--      the same effect as calling it once -
--      the client, when the timeout has expired,
--      can just send the request once again;
--      otherwise, it has to use other means to recover 
--      from this situation.
--
--  * A pusher will never know if the task has been performed
--      correctly, since there is no response from the worker.
--      This is one of the reasons that the pipeline pattern 
--      should usually not be used alone, but in the context of
--      a balancer. (You usually want to push 
--      a request to a worker through a balancer --
--      one of the ideas behind pusher/woker is work balancing.)
--      A balancer may request providers to send /heartbeats/
--      and, this way, minimise the risk of failure.
--      The worker still may fail between a heartbeat
--      and a request and even the fact that it does send hearbeats
--      does not necessarily mean that it is operating correctly.
--      If it is essential for the client to know
--      that all tasks have been performed correctly,
--      other verification means are required.
--      
--  * Finally, a subscriber will never know
--             whether a publisher is still working correctly
--             or not, if the publisher does not send data
--             periodically. A reliable design
--             would use periodic publishers, /i.e./
--             publishers that send data at a constant rate,
--             even if no new data are available.
--             The data update, in this case,
--             would have the effect of a heartbeat.
--
--  The library uses a set of headers that must not be used by applications.
--  All internal header keys start and end with two underscores.
--  By avoiding this naming of header keys, application code easily avoids
--  naming conflicts. The headers used in basic patterns are:
--
--  * __channel__: Reply queue (client\/server)
--
--  * __job__: The requested /job/ (client\/server, pusher\/worker and registry)
--
--  * __type__: Request type (registry)
--
--  * __job-type__: Type of job (registry)
--
--  * __queue__: Queue to register (registry)
--
--  * __hb__: Heartbeat specification (registry)
--
--  * __sc__: Status Code (registry) 
-------------------------------------------------------------------------------
module Network.Mom.Stompl.Patterns.Basic (
                          -- * Client
                          ClientA, clName, withClient, request, checkRequest,

                          -- * Server
                          ServerA, srvName, withServer, reply,

                          -- * Registry
                          -- $registry_intro

                          register, unRegister, 
                          heartbeat, HB, mkHB,

                          -- $registry_howto

                          -- * withServerThread
                          withServerThread, RegistryDesc,

                          -- * Pusher
                          PusherA, pushName, withPusher, push,

                          -- * Worker
                          withTaskThread,

                          -- * More about Registries
                          -- $registry_core

                          Registry, withRegistry, 
                          mapR, getProvider, showRegistry,
                          Provider, prvQ, JobType(..),

                          -- $registry_usage

                          -- * Publisher
                          PubA, pubName, withPub, publish, withPubThread,

                          -- * Subscriber
                          SubA, subName, withSub, checkIssue,
                          withSubThread, withSubMVar,

                          -- * Heartbeats for Pub
                          withPubProxy,

                          -- * Exceptions and Error Handling
                          PatternsException(..), OnError,
                          StatusCode(..), readStatusCode,

                          -- * Useful Types and Helpers
                          JobName, QName,
                          nobody, ignorebody,
                          bytesOut, bytesIn,
                          getJobName, getJobType, getQueue, getChannel, 
                          getHB,
                          getSC,
                          getHeader)
where

  import           Registry
  import           Types

  import           Network.Mom.Stompl.Client.Queue 
  import qualified Network.Mom.Stompl.Frame as F
  import           System.Timeout
  import           Codec.MIME.Type (Type)
  import           Control.Exception (throwIO, catches, finally)
  import           Control.Concurrent 
  import           Control.Monad (forever, unless, when, void)
  import           Data.Time
  import qualified Data.ByteString.Char8 as B

  {- $registry_intro
      Before we continue the survey of basic patterns,
      we have to introduce registries.
      Registries are used by some patterns, advanced patterns,
      but also publishers,
      to use a set of providers that register beforehand
      or, in the case of publishers, 
      to serve a set of consumers that have registered to
      the publisher.
      A typical example is balancers (/majordomo pattern/):
      Servers or tasks register to a balancer, which 
      on receiving a request from a client for a certain job,
      forwards the request to one of the registered providers
      of this job. Internally, the register balances the request,
      such that, with more than one provider currently registered,
      two consecutive requests will not be served
      by the same provider.
      Note that registers provide different modes of using providers:
      a load balancer will send a request to only one of its providers,
      whereas publishers will send the data they produce to all
      currently registered consumers.
      The difference is defined by the 'JobType' of a given 'Job'.
      Of course, only providers of the same type may register
      for the same job. 

      Registers provide a queue through which services can register;
      patterns using registers would provide 
      another queue through which they receive requests. 
      Registers allow for some level of reliability,
      /i.e./ registers can ensure with certain probability 
             that providers are available at the time,
             when a request is made.
     Therefore, registries may request heartbeats from providers.
     Heartbeats are negotiated on registration.
     Note that registries do not send heartbeats back to providers.
     Providers have to use other strategies to make sure
     that the registry to which they have registered is actually
     available. 
  -}

  {- $registry_howto
     The following example shows
     how to use the registration functions and heartbeats
     together with a server:

     > -- The definition of the variables
     > -- reg, jn, rn, tmo
     > -- is out of the scope of this listing;
     > -- their data type and meaning 
     > -- can be inferred from the context.
     >
     >  withConnection "127.0.0.1" 61613 [] [] $ \c -> 
     >    withServer c "Test" (q,            [], [], iconv)
     >                        ("unknown",    [], [], oconv) $ \s -> do
     >      (sc,me) <- if null reg -- if parameter reg is null
     >                   then return (OK, 0)
     >                   else register c jn Service reg rn tmo 500
     >      case sc of
     >        -- ok ------------------------------
     >        OK -> 
     >          if me < 0 || me > 5000 -- accept heartbeat  from 
     >                                 -- 0 (no heartbeats) to
     >                                 -- 5 seconds
     >            then do void $ unRegister c jn wn rn tmo
     >                    throwIO $ UnacceptableHbX me
     >            else do hb <- mkHB me
     >                    m  <- newMVar hb 
     >                    let p = if me <= 0 then (-1) else 1000 * me 
     >                    withWriter c "HB" reg [] [] nobody $ \w -> 
     >                      finally (forever $
     >                        reply s p t hs transform >> heartbeat m w jn rn) (do
     >                        -- "don't forget to unregister!" (Frank Zappa)
     >                        sc <- unRegister c jn wn rn tmo
     >                              unless (sc == OK) $ 
     >                                throwIO $ NotOKX sc "on unregister")
     >        -- not ok ---------------------------
     >        e -> throwIO $ NotOKX e "on register"

     There is, however, a function that does all of this internally: 
     'withServerThread'.
  -}

  {- $registry_core
     Until now, we have only looked at how to connect to a registry,
     not at how to use it and what a registry actually is in terms of
     data types.
     Well, answering the second question is simple:
     a registry, from the perspective of the user application,
     is an opaque data type with a set of functions:
  -}

  {- $registry_usage
     A typical example of how to use a registry in practice 
     is the balancer pattern, which is shown (without error handling)
     below:

     > -- The definition of the variables
     > -- c, n qn, mn, mx, onErr, rq
     > -- is out of the scope of this listing;
     > -- their data type and meaning 
     > -- can be inferred from the context.
     >
     > withRegistry c n qn (mn, mx) onErr $ \reg ->
     >   withPair c n (rq,        [], [], bytesIn) 
     >                ("unknown", [], [], bytesOut) $ \(r,w) -> 
     >     forever $ do
     >       m  <- readQ r        -- receive a request
     >       jn <- getJobName m   -- get the job name from the request 
     >       t  <- mapR reg jn (send2Prov w m)   -- apply job
     >       unless t $ throwIO $ NoProviderX jn -- throw exception
     >                                           -- when job is not provided
     > where send2Prov w m p = writeAdHoc w (prvQ p) nullType 
     >                                      (msgHdrs m) $ msgContent m

     User applications, usually, do not need to use registries directly.
     Registries are used in patterns, namely in Desks, Balancers
     and in /Pub/s.
  -}

  ------------------------------------------------------------------------
  -- | The client data type, which implements the client side
  --   of the client\/server protocol.
  ------------------------------------------------------------------------
  data ClientA i o = Cl {
                      -- | Access to the client name
                      ClientA i o -> String
clName :: String,
                      ClientA i o -> String
clChn  :: QName,
                      ClientA i o -> String
clJob  :: JobName,
                      ClientA i o -> Reader i
clIn   :: Reader i,
                      ClientA i o -> Writer o
clOut  :: Writer o}
  
  ------------------------------------------------------------------------
  -- | The function creates a client that lives within its scope.
  --
  --   Parameters:
  --
  --   * 'Con': Connection to a Stomp broker
  --
  --   * 'String': Name of the Client, which can be used for error reporting.
  --
  --   * 'JobName': Name of the 'Service' the client will request
  --
  --   * 'ReaderDesc' i: Description of a reader queue;
  --                     this is the queue through which the server
  --                     will send its response.
  --
  --   * 'WriterDesc' o: Description of a writer queue;
  --                     this is the queue through which the server
  --                     is expecting requests.
  --
  --   * 'ClientA' i o -> IO r: An application-defined action
  --                            whose scope defines the client's lifetime
  ------------------------------------------------------------------------
  withClient :: Con -> String  ->
                       JobName ->
                       ReaderDesc i ->
                       WriterDesc o ->
                       (ClientA i o  -> IO r) -> IO r
  withClient :: Con
-> String
-> String
-> ReaderDesc i
-> WriterDesc o
-> (ClientA i o -> IO r)
-> IO r
withClient Con
c String
n String
jn rd :: ReaderDesc i
rd@(String
rn, [Qopt]
_, [Header]
_, InBound i
_) WriterDesc o
wd ClientA i o -> IO r
act =
    Con
-> String
-> ReaderDesc i
-> WriterDesc o
-> ((Reader i, Writer o) -> IO r)
-> IO r
forall i o r.
Con
-> String
-> ReaderDesc i
-> WriterDesc o
-> ((Reader i, Writer o) -> IO r)
-> IO r
withPair Con
c String
n ReaderDesc i
rd WriterDesc o
wd (((Reader i, Writer o) -> IO r) -> IO r)
-> ((Reader i, Writer o) -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \(Reader i
r,Writer o
w) -> ClientA i o -> IO r
act (ClientA i o -> IO r) -> ClientA i o -> IO r
forall a b. (a -> b) -> a -> b
$ String -> String -> String -> Reader i -> Writer o -> ClientA i o
forall i o.
String -> String -> String -> Reader i -> Writer o -> ClientA i o
Cl String
n String
rn String
jn Reader i
r Writer o
w

  ------------------------------------------------------------------------
  -- | The client will send the request of type /o/
  --   and wait for the reply until the timeout exprires.
  --   The reply is of type /i/ and is returned as 'Message' /i/.
  --   If the timeout expires before the reply has been received,
  --   the function returns 'Nothing'.
  --
  --   Since servers do not know the clients they are serving,
  --   'request' sends the name of its reader queue (the /reply queue/)
  --   as message header to the server.
  --
  --   Parameters:
  --
  --   * 'ClientA' i o: The client; note that i is the type of the reply,
  --                                          o is the type of the request.
  --
  --   * 'Int': The timeout in microseconds.
  --
  --   * 'Type': The /MIME/ type of the request.
  --
  --   * ['F.Header']: List of additional headers 
  --                   to be sent with the request.
  --
  --  * /o/: The request 
  ------------------------------------------------------------------------
  request :: ClientA i o -> 
             Int -> Type -> [F.Header] -> o -> IO (Maybe (Message i))
  request :: ClientA i o
-> Int -> Type -> [Header] -> o -> IO (Maybe (Message i))
request ClientA i o
c Int
tmo Type
t [Header]
hs o
r = 
    let hs' :: [Header]
hs' = [(String
"__channel__", ClientA i o -> String
forall i o. ClientA i o -> String
clChn ClientA i o
c),
               (String
"__job__", ClientA i o -> String
forall i o. ClientA i o -> String
clJob ClientA i o
c)] [Header] -> [Header] -> [Header]
forall a. [a] -> [a] -> [a]
++ [Header]
hs
     in Writer o -> Type -> [Header] -> o -> IO ()
forall a. Writer a -> Type -> [Header] -> a -> IO ()
writeQ (ClientA i o -> Writer o
forall i o. ClientA i o -> Writer o
clOut ClientA i o
c) Type
t [Header]
hs' o
r IO () -> IO (Maybe (Message i)) -> IO (Maybe (Message i))
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Int -> IO (Message i) -> IO (Maybe (Message i))
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
tmo (Reader i -> IO (Message i)
forall a. Reader a -> IO (Message a)
readQ (ClientA i o -> Reader i
forall i o. ClientA i o -> Reader i
clIn ClientA i o
c))

  ------------------------------------------------------------------------
  -- | This function serves as a \"delayed\" receiver for the case
  --   that the timeout of a request has expired.
  --   When using this function, it is assumed
  --   that a request has been made, but no response has been received.
  --   It can be used in time-critical applications,
  --   where the client may use the time between request and reply
  --   productively, instead of passively blocking on the reply queue.
  --
  --   Use this function with care! It can be easily abused
  --   to break the client\/server pattern, when it is called
  --   without a request having been made before.
  --   If, in this case, /timout/ is /-1/,
  --   the application will block forever.
  --
  --   The function receives those parameters from 'request'
  --   that are related to receiving the reply, /i.e./
  --   'Type', ['F.Header'] and /o/ are not passed to /checkRequest/.
  ------------------------------------------------------------------------
  checkRequest :: ClientA i o -> Int -> IO (Maybe (Message i))
  checkRequest :: ClientA i o -> Int -> IO (Maybe (Message i))
checkRequest ClientA i o
c Int
tmo = Int -> IO (Message i) -> IO (Maybe (Message i))
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
tmo (IO (Message i) -> IO (Maybe (Message i)))
-> IO (Message i) -> IO (Maybe (Message i))
forall a b. (a -> b) -> a -> b
$ Reader i -> IO (Message i)
forall a. Reader a -> IO (Message a)
readQ (ClientA i o -> Reader i
forall i o. ClientA i o -> Reader i
clIn ClientA i o
c)

  ------------------------------------------------------------------------
  -- | The server data type, which implements the server side
  --   of the client\/server protocol.
  ------------------------------------------------------------------------
  data ServerA i o = Srv {
                      -- | Access to the server name
                      ServerA i o -> String
srvName :: String,
                      ServerA i o -> Reader i
srvIn   :: Reader i,
                      ServerA i o -> Writer o
srvOut  :: Writer o}
  
  ------------------------------------------------------------------------
  -- | The function creates a server
  --   that lives within the scope of the application-defined action
  --   passed into it.
  --
  --   Parameters:
  --
  --   * 'Con': Connection to a Stomp broker
  --
  --   * 'String': Name of the Server, which can be used for error reporting.
  --
  --   * 'ReaderDesc' i: Description of a reader queue;
  --                     this is the queue through which clients
  --                     are expected to send requests.
  --
  --   * 'WriterDesc' o: Description of a writer queue;
  --                     this is the queue through which
  --                     a specific client will expect the reply.
  --                     Note that the server will overwrite
  --                     the destination of this queue
  --                     using 'writeAdHoc'; 
  --                     the destination of this queue, hence,
  --                     is irrelevant.
  --
  --   * 'ServerA' i o -> IO r: An application-defined action
  --                            whose scope defines the server's lifetime
  ------------------------------------------------------------------------
  withServer :: Con -> String        ->
                       ReaderDesc i  ->
                       WriterDesc o  ->
                       (ServerA i o  -> IO r) -> IO r
  withServer :: Con
-> String
-> ReaderDesc i
-> WriterDesc o
-> (ServerA i o -> IO r)
-> IO r
withServer Con
c String
n ReaderDesc i
rd WriterDesc o
wd ServerA i o -> IO r
act =
    Con
-> String
-> ReaderDesc i
-> WriterDesc o
-> ((Reader i, Writer o) -> IO r)
-> IO r
forall i o r.
Con
-> String
-> ReaderDesc i
-> WriterDesc o
-> ((Reader i, Writer o) -> IO r)
-> IO r
withPair Con
c String
n ReaderDesc i
rd WriterDesc o
wd (((Reader i, Writer o) -> IO r) -> IO r)
-> ((Reader i, Writer o) -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \(Reader i
r,Writer o
w) -> ServerA i o -> IO r
act (ServerA i o -> IO r) -> ServerA i o -> IO r
forall a b. (a -> b) -> a -> b
$ String -> Reader i -> Writer o -> ServerA i o
forall i o. String -> Reader i -> Writer o -> ServerA i o
Srv String
n Reader i
r Writer o
w

  ------------------------------------------------------------------------
  -- | Waits for a client request, 
  --   calls the application-defined transformer to generate a reply
  --   and sends this reply through the reply queue
  --   whose name is indicated by a header in the request.
  --   The time a server waits for a request may be restricted
  --   by the timeout. Typically, you would call reply with 
  --   timeout set to /-1/ (/wait eternally/).
  --   There may be situations, however, where it actually
  --   makes sense to restrict the waiting time,
  --   /i.e./ to perform some housekeeping in between.
  --
  --   Typically, you call reply in a loop like
  --
  --   > forever $ reply srv (-1) nullType [] f
  --
  --   where /f/ is a function of type 
  --
  --   > Message i -> IO o.
  --
  --   Parameters:
  --
  --   * 'ServerA' i o: The server; note that i is the request queue
  --                                     and  o the reply queue.
  --
  --   * 'Int': The timeout in microseconds.
  --
  --   * 'Type': The /MIME/ type of the reply.
  --
  --   * ['F.Header']: Additional headers to be sent with the reply.
  --
  --   * 'Message' i -> IO o: Transforms the request into a reply -
  --                          this defines the service provided by this
  --                          application.
  ------------------------------------------------------------------------
  reply :: ServerA i o -> Int -> Type -> [F.Header] -> 
           (Message i -> IO o) -> IO ()
  reply :: ServerA i o
-> Int -> Type -> [Header] -> (Message i -> IO o) -> IO ()
reply ServerA i o
s Int
tmo Type
t [Header]
hs Message i -> IO o
transform = do
    Maybe (Message i)
mbM <- Int -> IO (Message i) -> IO (Maybe (Message i))
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
tmo (IO (Message i) -> IO (Maybe (Message i)))
-> IO (Message i) -> IO (Maybe (Message i))
forall a b. (a -> b) -> a -> b
$ Reader i -> IO (Message i)
forall a. Reader a -> IO (Message a)
readQ (ServerA i o -> Reader i
forall i o. ServerA i o -> Reader i
srvIn ServerA i o
s)
    case Maybe (Message i)
mbM of
      Maybe (Message i)
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
      Just Message i
m  -> do
        String
c <- Message i -> IO String
forall m. Message m -> IO String
getChannel Message i
m
        o
o <- Message i -> IO o
transform Message i
m
        Writer o -> String -> Type -> [Header] -> o -> IO ()
forall a. Writer a -> String -> Type -> [Header] -> a -> IO ()
writeAdHoc (ServerA i o -> Writer o
forall i o. ServerA i o -> Writer o
srvOut ServerA i o
s) String
c Type
t [Header]
hs o
o

  ------------------------------------------------------------------------
  -- | Create a server that works in a background thread:
  --   The background thread (and with it the server)
  --   is running until the action passed in to the function (IO r)
  --   terminates; when it terminates, the background thread is
  --   terminated as well.
  --   /withServerThread/ may connect to a registry
  --   (to serve as a provider of a balancer for instance),
  --   which is automatically handled internally
  --   when a RegistryDesc is passed in with a 'QName'
  --   that is not null. 
  --
  --   * 'Con': Connection to a Stomp broker;
  --
  --   * 'String': The name of the server, used for error reporting;
  --
  --   * 'JobName': The job provided by this server
  --
  --   * 'Type': The MIME Type (passed to 'reply')
  --
  --   * ['F.Header']: Additional headers (passed to 'reply')
  --
  --   * 'Message' i -> IO o: The core of the reply function:
  --                          transforming a request of type /i/
  --                          into a reply of type /o/
  --
  --   * 'ReaderDesc' i: The reader through which requests are expected;
  -- 
  --   * 'WriterDesc' o: The writer through which replies are sent;
  --
  --   * 'RegistryDesc': Describes whether and how to connect to a registry:
  --                     if the queue name of the registry description 
  --                     is null,
  --                     the function will not connect to a registry;
  --                     otherwise it will connect to the registry
  --                     proposing the best value of the 'RegistryDesc'
  --                     as its preferred heartbeat rate;
  --                     should the heartbeat rate returned by the registry
  --                     be outside the scope of min and max,
  --                     /withServerThread/ will terminate 
  --                     with 'UnacceptableHbX'.
  --
  --   * 'OnError': Error handler
  --
  --   * IO r: The function starts a new thread on which the 
  --           the server is working; 
  --           the thread from which the function was called
  --           continues in this action.
  --           Its return value is also the result of /withServerThread/.
  --           When the action terminates,
  --           the new thread is terminated internally.
  ------------------------------------------------------------------------
  withServerThread :: Con  -> String     -> JobName ->
                      Type -> [F.Header] -> (Message i -> IO o) ->
                      ReaderDesc i -> 
                      WriterDesc o ->
                      RegistryDesc -> 
                      OnError      -> IO r -> IO r
  withServerThread :: Con
-> String
-> String
-> Type
-> [Header]
-> (Message i -> IO o)
-> ReaderDesc i
-> WriterDesc o
-> RegistryDesc
-> OnError
-> IO r
-> IO r
withServerThread Con
c String
n String
jn Type
t [Header]
hs Message i -> IO o
transform
                     rd :: ReaderDesc i
rd@(String
rn, [Qopt]
_, [Header]
_, InBound i
_)
                     WriterDesc o
wd
                     (String
reg, Int
tmo, (Int
best, Int
mn, Int
mx))
                     OnError
onErr IO r
action =
    Con
-> String
-> ReaderDesc i
-> WriterDesc o
-> (ServerA i o -> IO r)
-> IO r
forall i o r.
Con
-> String
-> ReaderDesc i
-> WriterDesc o
-> (ServerA i o -> IO r)
-> IO r
withServer Con
c String
n ReaderDesc i
rd WriterDesc o
wd ((ServerA i o -> IO r) -> IO r) -> (ServerA i o -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \ServerA i o
s -> do
      (StatusCode
sc,Int
me) <- if String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
reg 
                   then (StatusCode, Int) -> IO (StatusCode, Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (StatusCode
OK, Int
0)
                   else Con
-> String
-> JobType
-> String
-> String
-> Int
-> Int
-> IO (StatusCode, Int)
register Con
c String
jn JobType
Service String
reg String
rn Int
tmo Int
best
      case StatusCode
sc of
        StatusCode
OK -> 
          if Int
me Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
mn Bool -> Bool -> Bool
|| Int
me Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
mx
            then do Con -> String -> String -> String -> Int -> IO ()
finalise Con
c String
jn String
reg String
rn Int
tmo
                    PatternsException -> IO r
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO r) -> PatternsException -> IO r
forall a b. (a -> b) -> a -> b
$ Int -> PatternsException
UnacceptableHbX Int
me

            else do HB
hb <- Int -> IO HB
mkHB Int
me
                    MVar HB
m  <- HB -> IO (MVar HB)
forall a. a -> IO (MVar a)
newMVar HB
hb 
                    let p :: Int
p = if Int
me Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 then (-Int
1) else Int
1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
me 
                    IO () -> IO r -> IO r
forall r. IO () -> IO r -> IO r
withThread (IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
finally (MVar HB -> Int -> ServerA i o -> IO ()
forall r. MVar HB -> Int -> ServerA i o -> IO r
srv MVar HB
m Int
p ServerA i o
s) 
                                        (Con -> String -> String -> String -> Int -> IO ()
finalise Con
c String
jn String
reg String
rn Int
tmo)) IO r
action
        StatusCode
e -> PatternsException -> IO r
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO r) -> PatternsException -> IO r
forall a b. (a -> b) -> a -> b
$ StatusCode -> String -> PatternsException
NotOKX StatusCode
e String
"on register"
      where srv :: MVar HB -> Int -> ServerA i o -> IO r
srv MVar HB
m Int
p ServerA i o
s = Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound ()
-> (Writer () -> IO r)
-> IO r
forall o r.
Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound o
-> (Writer o -> IO r)
-> IO r
withWriter Con
c String
"HB" String
reg [] [] OutBound ()
nobody ((Writer () -> IO r) -> IO r) -> (Writer () -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Writer ()
w -> 
                          IO () -> IO r
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO r) -> IO () -> IO r
forall a b. (a -> b) -> a -> b
$ IO () -> [Handler ()] -> IO ()
forall a. IO a -> [Handler a] -> IO a
catches (
                            ServerA i o
-> Int -> Type -> [Header] -> (Message i -> IO o) -> IO ()
forall i o.
ServerA i o
-> Int -> Type -> [Header] -> (Message i -> IO o) -> IO ()
reply ServerA i o
s Int
p Type
t [Header]
hs Message i -> IO o
transform IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar HB -> Writer () -> String -> String -> IO ()
heartbeat MVar HB
m Writer ()
w String
jn String
rn) (
                           String -> OnError -> [Handler ()]
ignoreHandler (ServerA i o -> String
forall i o. ServerA i o -> String
srvName ServerA i o
s) OnError
onErr)

  ------------------------------------------------------------------------
  -- Finaliser for the registry
  ------------------------------------------------------------------------
  finalise :: Con -> JobName -> QName -> QName -> Int -> IO ()
  finalise :: Con -> String -> String -> String -> Int -> IO ()
finalise Con
c String
jn String
wn String
rn Int
tmo | String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
wn   = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                          | Bool
otherwise = do 
                               StatusCode
sc <- Con -> String -> String -> String -> Int -> IO StatusCode
unRegister Con
c String
jn String
wn String
rn Int
tmo
                               Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (StatusCode
sc StatusCode -> StatusCode -> Bool
forall a. Eq a => a -> a -> Bool
== StatusCode
OK) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ 
                                     PatternsException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO ()) -> PatternsException -> IO ()
forall a b. (a -> b) -> a -> b
$ StatusCode -> String -> PatternsException
NotOKX StatusCode
sc String
"on unregister"

  ------------------------------------------------------------------------
  -- | The publisher data type
  ------------------------------------------------------------------------
  data PubA o = Pub {
                  -- | Access to the name of the publisher
                  PubA o -> String
pubName :: String,
                  PubA o -> String
pubJob  :: JobName,
                  PubA o -> Registry
pubReg  :: Registry,
                  PubA o -> OutBound o
pubConv :: OutBound o,
                  PubA o -> Writer ByteString
pubOut  :: Writer B.ByteString}

  ------------------------------------------------------------------------
  -- | Create a publisher with the lifetime of the scope
  --   of the user action passed in.
  --   The publisher, internally, creates a registry
  --   to which subscribers will connect to obtain the topic data.
  --   The registry will not expect heartbeats from subscribers,
  --   since the dependability relation is the other way round:
  --   the publisher does not depend on subscribers,
  --   but subscribers depend on a publisher.
  --   The publisher, usually, does not send heartbeats either.
  --   For exceptions to this rule, see 'withPubProxy'.
  --
  --   * 'Con': Connect to a Stomp broker;
  --
  --   * String: Name of the publisher used for error reporting;
  --
  --   * 'JobName': The name of the topic;
  --
  --   * 'QName': Name of the registration queue (see 'withRegistry');
  --
  --   * 'OnError': Error Handler passed to the registry;
  --
  --   * 'WriterDesc': Queue through which data are published;
  --                   note that the queue name is irrelevant.
  --                   The publisher will send data to the queues
  --                   of registered subscribers (see 'mapR');
  --
  --   * 'PubA' -> IO r: Action that defines the lifetime
  --                     of the publisher; the result (/r/)
  --                     is also the result of /withPub/.
  ------------------------------------------------------------------------
  withPub :: Con -> String -> JobName -> QName -> OnError -> 
             WriterDesc o  -> (PubA o -> IO r) -> IO r
  withPub :: Con
-> String
-> String
-> String
-> OnError
-> WriterDesc o
-> (PubA o -> IO r)
-> IO r
withPub Con
c String
n String
jn String
rn OnError
onErr (String
_, [Qopt]
wos, [Header]
wh, OutBound o
oconv) PubA o -> IO r
act = 
    Con
-> String
-> String
-> (Int, Int)
-> OnError
-> (Registry -> IO r)
-> IO r
forall r.
Con
-> String
-> String
-> (Int, Int)
-> OnError
-> (Registry -> IO r)
-> IO r
withRegistry Con
c  String
n String
rn (Int
0,Int
0) OnError
onErr ((Registry -> IO r) -> IO r) -> (Registry -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Registry
r ->
      Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound ByteString
-> (Writer ByteString -> IO r)
-> IO r
forall o r.
Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound o
-> (Writer o -> IO r)
-> IO r
withWriter Con
c String
jn String
"unknown" [Qopt]
wos [Header]
wh OutBound ByteString
bytesOut ((Writer ByteString -> IO r) -> IO r)
-> (Writer ByteString -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Writer ByteString
w -> 
        PubA o -> IO r
act (PubA o -> IO r) -> PubA o -> IO r
forall a b. (a -> b) -> a -> b
$ String
-> String -> Registry -> OutBound o -> Writer ByteString -> PubA o
forall o.
String
-> String -> Registry -> OutBound o -> Writer ByteString -> PubA o
Pub String
n String
jn Registry
r OutBound o
oconv Writer ByteString
w

  ------------------------------------------------------------------------
  -- | Publish data of type /o/:
  --
  --   * 'PubA' o: Publisher to use;
  --
  --   * 'Type': MIME Type of the message to be sent;
  --
  --   * ['F.Header']: Additional headers to be sent with the message;
  --
  --   * /o/: The message content.
  ------------------------------------------------------------------------
  publish :: PubA o -> Type -> [F.Header] -> o -> IO ()
  publish :: PubA o -> Type -> [Header] -> o -> IO ()
publish PubA o
p Type
t [Header]
hs o
x = let oc :: OutBound o
oc = PubA o -> OutBound o
forall o. PubA o -> OutBound o
pubConv PubA o
p
                      in OutBound o
oc o
x IO ByteString -> (ByteString -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ByteString
m ->
                         IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ Registry -> String -> (Provider -> IO ()) -> IO Bool
mapR  (PubA o -> Registry
forall o. PubA o -> Registry
pubReg PubA o
p) (PubA o -> String
forall o. PubA o -> String
pubJob PubA o
p) ((Provider -> IO ()) -> IO Bool) -> (Provider -> IO ()) -> IO Bool
forall a b. (a -> b) -> a -> b
$ \Provider
prv -> 
                           Writer ByteString
-> String -> Type -> [Header] -> ByteString -> IO ()
forall a. Writer a -> String -> Type -> [Header] -> a -> IO ()
writeAdHoc (PubA o -> Writer ByteString
forall o. PubA o -> Writer ByteString
pubOut PubA o
p) (Provider -> String
prvQ Provider
prv) Type
t [Header]
hs ByteString
m

  ------------------------------------------------------------------------
  -- | Create a publisher that works in a background thread
  --   publishing periodically at a monotonic rate,
  --   /i.e./ it creates data and publishes them,
  --          computes the difference 
  --             of the publication rate minus the time needed
  --                to create and publish the data 
  --          and will then suspend the thread for this period.
  --          For a publication rate of /p/ microseconds,
  --              the thread will be delayed for /p - x/ microseconds,
  --              if /x/ corresponds to the time that was spent
  --                 on creating and publishing the data.
  --
  --  The precision depends of course on your system and
  --  its current workload.
  --  For most cases, this will be equal to just suspending the thread
  --  for the publication rate.
  --
  --  Parameters:
  --
  --  * 'Con': Connection to a Stomp broker;
  --
  --  * String: Name of the publisher used for error reporting;
  --
  --  * 'JobName': Name of the topic;
  --
  --  * 'QName': Registration queue;
  --
  --  * Type: MIME Type of the published message;
  --
  --  * ['F.Header']: Additional headers to be sent
  --                  with the message;
  --
  --  * IO o: Action to create the message content;
  --
  --  * 'WriterDesc' o: Queue through which the message
  --                    will be published (remember, however,
  --                    that the queue name is irrelevant);
  --
  --  * Int: Publication rate in microseconds;
  --
  --  * 'OnError': Error handler for the registry
  --               and the publisher;
  --
  --  * IO r: Action that defines the lifetime of the publisher;
  --          The result /r/ is also the result of /withPubThread/.   
  ------------------------------------------------------------------------
  withPubThread :: Con -> String -> JobName -> QName ->
                   Type -> [F.Header] -> IO o ->
                   WriterDesc o       -> Int  -> 
                   OnError -> IO r    -> IO r
  withPubThread :: Con
-> String
-> String
-> String
-> Type
-> [Header]
-> IO o
-> WriterDesc o
-> Int
-> OnError
-> IO r
-> IO r
withPubThread Con
c String
n String
jn String
rn Type
t [Header]
hs IO o
create WriterDesc o
wd Int
period OnError
onErr IO r
action = 
    Con
-> String
-> String
-> String
-> OnError
-> WriterDesc o
-> (PubA o -> IO r)
-> IO r
forall o r.
Con
-> String
-> String
-> String
-> OnError
-> WriterDesc o
-> (PubA o -> IO r)
-> IO r
withPub Con
c String
n String
jn String
rn OnError
onErr WriterDesc o
wd ((PubA o -> IO r) -> IO r) -> (PubA o -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \PubA o
p -> IO () -> IO r -> IO r
forall r. IO () -> IO r -> IO r
withThread (PubA o -> IO ()
forall b. PubA o -> IO b
doPub PubA o
p) IO r
action
    where doPub :: PubA o -> IO b
doPub PubA o
p = IO () -> IO b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO b) -> IO () -> IO b
forall a b. (a -> b) -> a -> b
$ IO () -> [Handler ()] -> IO ()
forall a. IO a -> [Handler a] -> IO a
catches (do
                      UTCTime
n1 <- IO UTCTime
getCurrentTime
                      IO o
create IO o -> (o -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= PubA o -> Type -> [Header] -> o -> IO ()
forall o. PubA o -> Type -> [Header] -> o -> IO ()
publish PubA o
p Type
t [Header]
hs
                      UTCTime
n2 <- IO UTCTime
getCurrentTime
                      let d :: Int
d = NominalDiffTime -> Int
nominal2us (UTCTime
n2 UTCTime -> UTCTime -> NominalDiffTime
`diffUTCTime` UTCTime
n1)
                      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
d Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
period) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Int
period Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
d)) (
                    String -> OnError -> [Handler ()]
ignoreHandler (PubA o -> String
forall o. PubA o -> String
pubName PubA o
p) OnError
onErr)

  ------------------------------------------------------------------------
  -- | Subscriber data type
  ------------------------------------------------------------------------
  data SubA i = Sub {
                 -- | Access to the subscriber name
                 SubA i -> String
subName :: String,
                 SubA i -> Reader i
subIn   :: Reader i
                }

  ------------------------------------------------------------------------
  -- | Create a subscriber with the lifetime 
  --   of the user action passed in.
  --   The subscriber will internally connect to a publisher's
  --   registry and receive data as long as it stays connected.
  --
  --   * 'Con': Connection to a Stomp broker;
  --
  --   * String: Subscriber name useful for error reporting;
  --
  --   * 'JobName': Subscribed topic;
  --
  --   * 'QName': Queue of a registry to connect to
  --              (the 'Pub's registration queue!)
  --
  --   * Int: Registration timeout in microseconds;
  --
  --   * 'ReaderDesc': This is the queue through which
  --                   the subscriber will receive data.
  --
  --   * 'SubA' i -> IO r: Action that defines the lifetime
  --                       of the subscriber. Its result /r/
  --                       is also the result of /withSub/.
  ------------------------------------------------------------------------
  withSub :: Con -> String -> JobName -> QName -> Int ->
             ReaderDesc i  -> (SubA i -> IO r) -> IO r
  withSub :: Con
-> String
-> String
-> String
-> Int
-> ReaderDesc i
-> (SubA i -> IO r)
-> IO r
withSub Con
c String
n String
jn String
wn Int
tmo (String
rn, [Qopt]
ros, [Header]
rh, InBound i
iconv) SubA i -> IO r
act = 
    Con
-> String
-> String
-> [Qopt]
-> [Header]
-> InBound i
-> (Reader i -> IO r)
-> IO r
forall i r.
Con
-> String
-> String
-> [Qopt]
-> [Header]
-> InBound i
-> (Reader i -> IO r)
-> IO r
withReader Con
c String
n String
rn [Qopt]
ros [Header]
rh InBound i
iconv ((Reader i -> IO r) -> IO r) -> (Reader i -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Reader i
r -> do
      Maybe (StatusCode, Int)
mbR <- if String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
wn 
               then Maybe (StatusCode, Int) -> IO (Maybe (StatusCode, Int))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (StatusCode, Int) -> IO (Maybe (StatusCode, Int)))
-> Maybe (StatusCode, Int) -> IO (Maybe (StatusCode, Int))
forall a b. (a -> b) -> a -> b
$ (StatusCode, Int) -> Maybe (StatusCode, Int)
forall a. a -> Maybe a
Just (StatusCode
OK,Int
0)
               else Int -> IO (StatusCode, Int) -> IO (Maybe (StatusCode, Int))
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
tmo (IO (StatusCode, Int) -> IO (Maybe (StatusCode, Int)))
-> IO (StatusCode, Int) -> IO (Maybe (StatusCode, Int))
forall a b. (a -> b) -> a -> b
$ Con
-> String
-> JobType
-> String
-> String
-> Int
-> Int
-> IO (StatusCode, Int)
register Con
c String
jn JobType
Topic String
wn String
rn Int
tmo Int
0
      case Maybe (StatusCode, Int)
mbR of
        Maybe (StatusCode, Int)
Nothing     -> PatternsException -> IO r
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO r) -> PatternsException -> IO r
forall a b. (a -> b) -> a -> b
$ String -> PatternsException
TimeoutX String
"on register"
        Just (StatusCode
OK,Int
_) -> IO r -> IO () -> IO r
forall a b. IO a -> IO b -> IO a
finally (SubA i -> IO r
act (SubA i -> IO r) -> SubA i -> IO r
forall a b. (a -> b) -> a -> b
$ String -> Reader i -> SubA i
forall i. String -> Reader i -> SubA i
Sub String
n Reader i
r) 
                               (Con -> String -> String -> String -> Int -> IO ()
finalise Con
c String
jn String
wn String
rn Int
tmo)
        Just (StatusCode
sc,Int
_) -> PatternsException -> IO r
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO r) -> PatternsException -> IO r
forall a b. (a -> b) -> a -> b
$ StatusCode -> String -> PatternsException
NotOKX StatusCode
sc String
"on register "

  ------------------------------------------------------------------------
  -- | Check if data have been arrived for this subscriber;
  --   if data are available before the timeout expires,
  --   the function results in 'Just' ('Message' i);
  --   if the timeout expires first, the result is 'Nothing'.
  --
  --   * 'SubA' i: The subscriber to check 
  --
  --   * Int: Timeout in microseconds
  ------------------------------------------------------------------------
  checkIssue :: SubA i -> Int -> IO (Maybe (Message i))
  checkIssue :: SubA i -> Int -> IO (Maybe (Message i))
checkIssue SubA i
s Int
tmo = Int -> IO (Message i) -> IO (Maybe (Message i))
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
tmo (IO (Message i) -> IO (Maybe (Message i)))
-> IO (Message i) -> IO (Maybe (Message i))
forall a b. (a -> b) -> a -> b
$ Reader i -> IO (Message i)
forall a. Reader a -> IO (Message a)
readQ (SubA i -> Reader i
forall i. SubA i -> Reader i
subIn SubA i
s)

  ------------------------------------------------------------------------
  -- | Create a subscriber that works in a background thread;
  --   Whenever data are available, an application callback passed in
  --   to the function is called with the message that has arrived.
  --
  --   * 'Con': Connection to a Stomp broker;
  --
  --   * String: Subscriber name used for error reporting;
  --
  --   * 'JobName': Subscribed topic;
  --
  --   * 'QName': The publisher's registration queue;
  --
  --   * Int: Registration timeout in microseconds;
  --
  --   * 'ReaderDesc' i: Queue through which the subscriber
  --                     shall receive data;
  --
  --   * 'Message' i -> IO (): Application callback;
  --
  --   * 'OnError': Error handler; 
  --
  --   * IO r: Action that defines the lifetime of the subscriber;
  --           the result /r/ is also the result of /withSubThread/.
  ------------------------------------------------------------------------
  withSubThread :: Con -> String -> JobName    -> QName  -> Int     ->
                   ReaderDesc i  -> (Message i -> IO ()) -> OnError -> 
                   IO r -> IO r
  withSubThread :: Con
-> String
-> String
-> String
-> Int
-> ReaderDesc i
-> (Message i -> IO ())
-> OnError
-> IO r
-> IO r
withSubThread Con
c String
n String
jn String
wn Int
tmo ReaderDesc i
rd Message i -> IO ()
job OnError
onErr IO r
action = 
     Con
-> String
-> String
-> String
-> Int
-> ReaderDesc i
-> (SubA i -> IO r)
-> IO r
forall i r.
Con
-> String
-> String
-> String
-> Int
-> ReaderDesc i
-> (SubA i -> IO r)
-> IO r
withSub Con
c String
n String
jn String
wn Int
tmo ReaderDesc i
rd ((SubA i -> IO r) -> IO r) -> (SubA i -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \SubA i
s -> IO () -> IO r -> IO r
forall r. IO () -> IO r -> IO r
withThread (SubA i -> IO ()
forall b. SubA i -> IO b
go SubA i
s) IO r
action
    where go :: SubA i -> IO b
go SubA i
s = IO () -> IO b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO b) -> IO () -> IO b
forall a b. (a -> b) -> a -> b
$ IO () -> [Handler ()] -> IO ()
forall a. IO a -> [Handler a] -> IO a
catches (SubA i -> IO (Message i)
forall i. SubA i -> IO (Message i)
chk SubA i
s IO (Message i) -> (Message i -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Message i -> IO ()
job)
                                   (String -> OnError -> [Handler ()]
ignoreHandler (SubA i -> String
forall i. SubA i -> String
subName SubA i
s) OnError
onErr)
          chk :: SubA i -> IO (Message i)
chk SubA i
s = SubA i -> Int -> IO (Maybe (Message i))
forall i. SubA i -> Int -> IO (Maybe (Message i))
checkIssue SubA i
s (-Int
1) IO (Maybe (Message i))
-> (Maybe (Message i) -> IO (Message i)) -> IO (Message i)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Maybe (Message i)
mbX ->
                  case Maybe (Message i)
mbX of
                    Maybe (Message i)
Nothing -> SubA i -> IO (Message i)
chk SubA i
s
                    Just Message i
m  -> Message i -> IO (Message i)
forall (m :: * -> *) a. Monad m => a -> m a
return Message i
m

  ------------------------------------------------------------------------
  -- | Create a subscriber that works in a background thread 
  --   and updates an MVar, whenever new data are available;
  --   the function is in fact a special case of 'withSubThread',
  --   where the application callback updates an MVar.
  --   Note that the MVar must not be empty when the function
  --   is called, otherwise, it will block on modifying the MVar.
  --
  --   * 'Con': Connection to a Stomp broker;
  --
  --   * String: Subscriber name used for error reporting;
  --
  --   * 'JobName': Subscribed topic;
  --
  --   * 'QName': The publisher's registration queue;
  --
  --   * Int: Registration timeout in microseconds;
  --
  --   * 'ReaderDesc' i: Queue through which the subscriber
  --                     shall receive data;
  --
  --   * 'MVar' i: MVar to update;
  --
  --   * 'OnError': Error handler; 
  --
  --   * IO r: Action that defines the lifetime of the subscriber;
  --           the result /r/ is also the result of /withSubMVar/.
  ------------------------------------------------------------------------
  withSubMVar :: Con -> String -> JobName -> QName   -> Int ->
                 ReaderDesc i  -> MVar i  -> OnError -> 
                 IO r -> IO r
  withSubMVar :: Con
-> String
-> String
-> String
-> Int
-> ReaderDesc i
-> MVar i
-> OnError
-> IO r
-> IO r
withSubMVar Con
c String
n String
jn String
wn Int
tmo ReaderDesc i
rd MVar i
v = 
    Con
-> String
-> String
-> String
-> Int
-> ReaderDesc i
-> (Message i -> IO ())
-> OnError
-> IO r
-> IO r
forall i r.
Con
-> String
-> String
-> String
-> Int
-> ReaderDesc i
-> (Message i -> IO ())
-> OnError
-> IO r
-> IO r
withSubThread Con
c String
n String
jn String
wn Int
tmo ReaderDesc i
rd Message i -> IO ()
job 
    where job :: Message i -> IO ()
job Message i
m = MVar i -> (i -> IO i) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar i
v ((i -> IO i) -> IO ()) -> (i -> IO i) -> IO ()
forall a b. (a -> b) -> a -> b
$ \i
_ -> i -> IO i
forall (m :: * -> *) a. Monad m => a -> m a
return (i -> IO i) -> i -> IO i
forall a b. (a -> b) -> a -> b
$ Message i -> i
forall a. Message a -> a
msgContent Message i
m 

  ------------------------------------------------------------------------
  -- | The Pusher data type, which implements
  --   the consumer side of the pipeline protocol.
  --   Note that, when we say "consumer" here,
  --   the pusher is actually a data producer,
  --   but consumes the effect of having a task done.
  --   The pusher can be seen as a client
  --   that does not expect a reply.
  ------------------------------------------------------------------------
  data PusherA o = Pusher {
                    -- | Access to the pusher's name
                    PusherA o -> String
pushName :: String,
                    PusherA o -> String
pushJob  :: JobName,
                    PusherA o -> Writer o
pushQ    :: Writer o
                  }

  ------------------------------------------------------------------------
  -- | Create a 'Pusher' with the lifetime of the action passed in:
  --
  --   * 'Con': Connection to a Stomp broker;
  --
  --   * String: Name of the pusher, which may be used for error reporting;
  --
  --   * 'JobName': Name of the job requested by this pusher;
  --
  --   * 'WriterDesc' o: 'Writer' queue through which 
  --                              the job request is pushed;
  --
  --   * ('PusherA' o -> IO r): Action that defines the lifetime of
  --                            the pusher; the result /r/ is also
  --                            the result of 'withPusher'.
  ------------------------------------------------------------------------
  withPusher :: Con -> String -> JobName -> WriterDesc o -> 
                (PusherA o -> IO r) -> IO r
  withPusher :: Con
-> String -> String -> WriterDesc o -> (PusherA o -> IO r) -> IO r
withPusher Con
c String
n String
jn (String
wq, [Qopt]
wos, [Header]
wh, OutBound o
oconv) PusherA o -> IO r
action = 
    Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound o
-> (Writer o -> IO r)
-> IO r
forall o r.
Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound o
-> (Writer o -> IO r)
-> IO r
withWriter Con
c String
n String
wq [Qopt]
wos [Header]
wh OutBound o
oconv ((Writer o -> IO r) -> IO r) -> (Writer o -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Writer o
w -> PusherA o -> IO r
action (PusherA o -> IO r) -> PusherA o -> IO r
forall a b. (a -> b) -> a -> b
$ String -> String -> Writer o -> PusherA o
forall o. String -> String -> Writer o -> PusherA o
Pusher String
n String
jn Writer o
w

  ------------------------------------------------------------------------
  -- | Push a 'Job':
  --
  --     * 'PusherA' o: The pusher to be used;
  --
  --     * 'Type': The MIME Type of the message to be sent;
  --
  --     * ['F.Header']: The headers to be sent with the message;
  --
  --     * /o/: The message contents.
  ------------------------------------------------------------------------
  push :: PusherA o -> Type -> [F.Header] -> o -> IO ()
  push :: PusherA o -> Type -> [Header] -> o -> IO ()
push PusherA o
p Type
t [Header]
hs o
m = let hs' :: [Header]
hs' = (String
"__job__", PusherA o -> String
forall o. PusherA o -> String
pushJob PusherA o
p) Header -> [Header] -> [Header]
forall a. a -> [a] -> [a]
: [Header]
hs
                   in Writer o -> Type -> [Header] -> o -> IO ()
forall a. Writer a -> Type -> [Header] -> a -> IO ()
writeQ (PusherA o -> Writer o
forall o. PusherA o -> Writer o
pushQ PusherA o
p) Type
t [Header]
hs' o
m

  ------------------------------------------------------------------------
  -- | On the other side of the pipeline,
  --   there sits a worker waiting for requests.
  --   Note that no /Worker/ data type is defined.
  --   Instead, there is only a /withTaskThread/ function
  --   that, internally, creates a worker acting in a background thread.
  --   The rationale is that it does not make too much sense
  --   to have a pipeline with only one worker. 
  --   It is in fact part of the idea of the pipeline pattern 
  --   that several workers are used through a balancer.
  --   /withTaskThread/ implements the interaction with the registry
  --   internally and frees the programmer from concerns related
  --   to registration. If you really need a single worker,
  --   you can call the function with an empty RegistryDesc, 
  --   /i.e./ with an empty queue name.
  --
  --   * 'Con': Connection to a Stomp broker;
  --
  --   * String: Name of the worker used for error reporting;
  --
  --   * 'JobName': Name of the job, the worker provides;
  --
  --   * ('Message' i  -> IO ()): The job provided by the worker.
  --                              Note that the function does not
  --                              return a value: Since workers do
  --                              not produce a reply, no result
  --                              is necessary;
  --
  --   * 'ReaderDesc' i: Queue through which the worker receives
  --                     requests;
  --
  --   * 'RegistryDesc': The registry to which the worker connects;
  --
  --   * OnError: Error handler;
  --
  --   * IO r: Action that defines the worker's lifetime.
  --
  ------------------------------------------------------------------------
  withTaskThread :: Con -> String -> JobName     ->
                    (Message i -> IO ())         -> 
                    ReaderDesc i -> RegistryDesc -> 
                    OnError      -> IO r         -> IO r
  withTaskThread :: Con
-> String
-> String
-> (Message i -> IO ())
-> ReaderDesc i
-> RegistryDesc
-> OnError
-> IO r
-> IO r
withTaskThread Con
c String
n String
jn Message i -> IO ()
task
                     (String
rn, [Qopt]
ros, [Header]
rh, InBound i
iconv)
                     (String
reg, Int
tmo, (Int
best, Int
mn, Int
mx))
                     OnError
onErr IO r
action = do
      (StatusCode
sc,Int
me) <- if String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
reg
                   then (StatusCode, Int) -> IO (StatusCode, Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (StatusCode
OK,Int
0)
                   else Con
-> String
-> JobType
-> String
-> String
-> Int
-> Int
-> IO (StatusCode, Int)
register Con
c String
jn JobType
Task String
reg String
rn Int
tmo Int
best
      case StatusCode
sc of
        StatusCode
OK -> 
          if Int
me Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
mn Bool -> Bool -> Bool
|| Int
me Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
mx
            then do Con -> String -> String -> String -> Int -> IO ()
finalise Con
c String
jn String
reg String
rn Int
tmo
                    PatternsException -> IO r
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO r) -> PatternsException -> IO r
forall a b. (a -> b) -> a -> b
$ Int -> PatternsException
UnacceptableHbX Int
me

            else do HB
hb  <- Int -> IO HB
mkHB Int
me
                    MVar HB
m   <- HB -> IO (MVar HB)
forall a. a -> IO (MVar a)
newMVar HB
hb 
                    let p :: Int
p = if Int
me Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 then (-Int
1) else Int
1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
me 
                    Con
-> String
-> String
-> [Qopt]
-> [Header]
-> InBound i
-> (Reader i -> IO r)
-> IO r
forall i r.
Con
-> String
-> String
-> [Qopt]
-> [Header]
-> InBound i
-> (Reader i -> IO r)
-> IO r
withReader Con
c String
n String
rn [Qopt]
ros [Header]
rh InBound i
iconv ((Reader i -> IO r) -> IO r) -> (Reader i -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Reader i
r -> 
                      IO () -> IO r -> IO r
forall r. IO () -> IO r -> IO r
withThread (IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
finally (MVar HB -> Reader i -> Int -> IO ()
forall r. MVar HB -> Reader i -> Int -> IO r
tsk MVar HB
m Reader i
r Int
p) 
                                          (Con -> String -> String -> String -> Int -> IO ()
finalise Con
c String
jn String
reg String
rn Int
tmo)) IO r
action
        StatusCode
e -> PatternsException -> IO r
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO r) -> PatternsException -> IO r
forall a b. (a -> b) -> a -> b
$ StatusCode -> String -> PatternsException
NotOKX StatusCode
e String
"on register"
      where tsk :: MVar HB -> Reader i -> Int -> IO r
tsk MVar HB
m Reader i
r Int
p = Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound ()
-> (Writer () -> IO r)
-> IO r
forall o r.
Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound o
-> (Writer o -> IO r)
-> IO r
withWriter   Con
c String
"HB" String
reg [] [] OutBound ()
nobody ((Writer () -> IO r) -> IO r) -> (Writer () -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Writer ()
w -> 
                          IO () -> IO r
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO r) -> IO () -> IO r
forall a b. (a -> b) -> a -> b
$ IO () -> [Handler ()] -> IO ()
forall a. IO a -> [Handler a] -> IO a
catches (do
                            Maybe (Message i)
mbM <- Int -> IO (Message i) -> IO (Maybe (Message i))
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
p (IO (Message i) -> IO (Maybe (Message i)))
-> IO (Message i) -> IO (Maybe (Message i))
forall a b. (a -> b) -> a -> b
$ Reader i -> IO (Message i)
forall a. Reader a -> IO (Message a)
readQ Reader i
r 
                            case Maybe (Message i)
mbM of
                              Maybe (Message i)
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                              Just Message i
x  -> Message i -> IO ()
task Message i
x 
                            MVar HB -> Writer () -> String -> String -> IO ()
heartbeat MVar HB
m Writer ()
w String
jn String
rn)
                           (String -> OnError -> [Handler ()]
ignoreHandler String
n OnError
onErr)

  ------------------------------------------------------------------------
  -- | Unlike servers and workers,
  --   publishers have no interface to connect 
  --   internally to a registry.
  --   The rationale for this is that
  --   publishers do not need load balancers or similar means
  --   that would require registration.
  --   As a consequence, there is no means to send heartbeats internally.
  --   Sometimes, however, the need to connect to a registry may arise.
  --   The Desk pattern is an example where it makes sense 
  --   to register a publisher.
  --   But then, there is no means to internally send heartbeats
  --   proving that the publisher is still alive.
  --   For this case, a simple solution 
  --   for periodic publishers is available:
  --   a heartbeat proxy that is implemented as a subscriber
  --   receiving data from the publisher and 
  --   sending a heartbeat on every dataset that arrives.
  --   
  --   This function provides a proxy that internally
  --   connects to a registry on behalf of a publisher
  --   and sends heartbeats.
  --
  --   * 'Con': Connection to a Stomp broker;
  --
  --   * String: Name of the proxy used for error reporting;
  --
  --   * 'JobName': Name of the topic, the publisher provides;
  --
  --   * 'QName': Registration queue of the publisher -
  --              this is the queue 
  --              to which the internal subscriber connects;
  --
  --   * 'ReaderDesc' i: The queue through which the internal
  --                     subscriber receives data;
  --
  --   * 'RegistryDesc': The other registry - 
  --                     it is this registry to which the 
  --                     proxy will send heartbeats;
  --
  --   * 'OnError': Error Handler;
  --
  --   * IO r: Action that definex the proxy's lifetime;
  --           its result /r/ is also the result of /withPubProxy/.
  ------------------------------------------------------------------------
  withPubProxy :: Con -> String -> JobName      -> QName   ->
                  ReaderDesc i  -> RegistryDesc -> OnError -> IO r -> IO r
  withPubProxy :: Con
-> String
-> String
-> String
-> ReaderDesc i
-> RegistryDesc
-> OnError
-> IO r
-> IO r
withPubProxy Con
c String
n String
jn String
pq ReaderDesc i
rd (String
reg, Int
tmo, (Int
best, Int
mn, Int
mx)) OnError
onErr IO r
action =
    Con
-> String
-> String
-> String
-> Int
-> ReaderDesc i
-> (SubA i -> IO r)
-> IO r
forall i r.
Con
-> String
-> String
-> String
-> Int
-> ReaderDesc i
-> (SubA i -> IO r)
-> IO r
withSub Con
c String
n String
jn String
pq Int
tmo ReaderDesc i
rd ((SubA i -> IO r) -> IO r) -> (SubA i -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \SubA i
s -> 
      Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound ()
-> (Writer () -> IO r)
-> IO r
forall o r.
Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound o
-> (Writer o -> IO r)
-> IO r
withWriter Con
c String
"HB" String
reg [] [] OutBound ()
nobody ((Writer () -> IO r) -> IO r) -> (Writer () -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Writer ()
w -> do
        (StatusCode
sc, Int
h) <- Con
-> String
-> JobType
-> String
-> String
-> Int
-> Int
-> IO (StatusCode, Int)
register Con
c String
jn JobType
Topic String
reg String
pq Int
tmo Int
best
        if StatusCode
sc StatusCode -> StatusCode -> Bool
forall a. Eq a => a -> a -> Bool
/= StatusCode
OK
          then PatternsException -> IO r
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO r) -> PatternsException -> IO r
forall a b. (a -> b) -> a -> b
$ StatusCode -> String -> PatternsException
NotOKX StatusCode
sc String
"on register proxy"
          else if Int
h Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
mn Bool -> Bool -> Bool
|| Int
h Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
mx
                 then PatternsException -> IO r
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO r) -> PatternsException -> IO r
forall a b. (a -> b) -> a -> b
$ Int -> PatternsException
UnacceptableHbX Int
h
                 else do MVar HB
hb <- Int -> IO HB
mkHB Int
h IO HB -> (HB -> IO (MVar HB)) -> IO (MVar HB)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= HB -> IO (MVar HB)
forall a. a -> IO (MVar a)
newMVar
                         IO () -> IO r -> IO r
forall r. IO () -> IO r -> IO r
withThread (IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
finally (MVar HB -> Int -> SubA i -> Writer () -> Int -> IO ()
forall i. MVar HB -> Int -> SubA i -> Writer () -> Int -> IO ()
beat MVar HB
hb (Int
h Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000) SubA i
s Writer ()
w Int
0)
                                             (Con -> String -> String -> String -> Int -> IO ()
finalise Con
c String
jn String
reg String
pq Int
tmo)) 
                                    IO r
action
    where beat :: MVar HB -> Int -> SubA i -> Writer () -> Int -> IO ()
          beat :: MVar HB -> Int -> SubA i -> Writer () -> Int -> IO ()
beat MVar HB
hb Int
h SubA i
s Writer ()
w Int
i = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do -- forever continues 
                                         -- in case of exception
            Maybe (Message i)
mbM  <- SubA i -> Int -> IO (Maybe (Message i))
forall i. SubA i -> Int -> IO (Maybe (Message i))
checkIssue SubA i
s Int
h
            case Maybe (Message i)
mbM of
              Maybe (Message i)
Nothing -> if Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
10 
                           then PatternsException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (PatternsException -> IO ()) -> PatternsException -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> PatternsException
MissingHbX String
"No input from pub"
                           else MVar HB -> Int -> SubA i -> Writer () -> Int -> IO ()
forall i. MVar HB -> Int -> SubA i -> Writer () -> Int -> IO ()
beat MVar HB
hb Int
h SubA i
s Writer ()
w (Int
iInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1)
              Just Message i
_  -> MVar HB -> Writer () -> String -> String -> IO ()
heartbeat MVar HB
hb Writer ()
w String
jn String
pq IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar HB -> Int -> SubA i -> Writer () -> Int -> IO ()
forall i. MVar HB -> Int -> SubA i -> Writer () -> Int -> IO ()
beat MVar HB
hb Int
h SubA i
s Writer ()
w Int
0 
            IO () -> [Handler ()] -> IO ()
forall a. IO a -> [Handler a] -> IO a
`catches` (String -> OnError -> [Handler ()]
ignoreHandler String
n OnError
onErr)