hats-0.1.0.1: Haskell client for the NATS messaging system

Copyright(c) 2016 Patrik Sandahl
LicenseMIT
MaintainerPatrik Sandahl <patrik.sandahl@gmail.com>
Stabilityexperimental
Portabilityportable
Safe HaskellNone
LanguageHaskell2010

Network.Nats

Contents

Description

A Haskell client for the NATS messaging system. See https://nats.io for general information and documentation.

Synopsis

Limitations in implementation

1) The current version of this library does not yet support TLS.

2) The current version of this library does not yet support authorization tokens (but support user names and passwords in the URI strings).

Simple messaging example.

This section gives a simple messaging example using this library. The example requires the presence of a NATS server, running on localhost using the default port 4222. If other host or port, adapt the example.

{-# LANGUAGE OverloadedStrings #-}
module Main
   ( main
   ) where

import Network.Nats
import Text.Printf

main :: IO ()
main =
   withNats defaultSettings ["nats://localhost"] $ \nats -> do

       -- Subscribe to the topic "foo".
       (s, q) <- subscribe nats "foo" Nothing

       -- Publish to topic "foo", do not request a reply.
       publish nats "foo" Nothing "Some payload"

       -- Wait for a message, print the message's payload
       msg <- nextMsg q
       printf "Received %s\n" (show $ payload msg)

       -- Unsubscribe from topic "foo".
       unsubscribe nats s Nothing

Ascyncronous message handling.

Beside from the subscription mode where messages, synchronously, are fetched from a queue there is also an asynchronous mode where each request is handled immediately in their own thread.

{-# LANGUAGE OverloadedStrings #-}
module Main
   ( main
   ) where

import Control.Monad
import Data.Maybe
import Network.Nats
import Text.Printf

main :: IO ()
main =
   withNats defaultSettings ["nats://localhost"] $ \nats -> do
      
       -- A simple - asynchronous - help service that will answer
       -- requesters that give a reply topic with "I can help".
       s1 <- subscribeAsync nats "help" Nothing $ \msg -> do
           printf "Help service received: %s\n" (show $ payload msg)
           when (isJust $ replyTo msg) $
               publish nats (fromJust $ replyTo msg) Nothing "I can help"

       -- Subscribe to help replies.
       (s2, q) <- subscribe nats "help.reply" Nothing

       -- Request help.
       publish nats "help" (Just "help.reply") "Please ..."

       -- Wait for reply.
       msg <- nextMsg q
       printf "Received: %s\n" (show $ payload msg)

       -- Unsubscribe.
       unsubscribe nats s1 Nothing
       unsubscribe nats s2 Nothing

Convenience API for the request pattern.

In the example above there's a common request pattern. Sending a message to a topic, requesting a reply, subscribing to the reply topic, receiving the reply message and then unsubscribe from the reply topic.

This pattern can be handled more simply using the request function.

{-# LANGUAGE OverloadedStrings #-}
module Main
   ( main
   ) where

import Control.Monad
import Data.Maybe
import Network.Nats
import Text.Printf

main :: IO ()
main =
   withNats defaultSettings ["nats://localhost"] $ \nats -> do

       -- A simple - asynchronous - help service that will answer
       -- requesters that give a reply topic with "I can help".
       s <- subscribeAsync nats "help" Nothing $ \msg -> do
           printf "Help service received: %s\n" (show $ payload msg)
           when (isJust $ replyTo msg) $
               publish nats (fromJust $ replyTo msg) Nothing "I can help"

       -- Request help.
       msg <- request nats "help" "Please ..."
       printf "Received: %s\n" (show $ payload msg)

       -- Unsubscribing the help service only.
       unsubscribe nats s Nothing

Topic structure.

Topic structure is tree like similar to file systems, or the Haskell module structure, and components in the tree is separated by dots. A subscriber of a topic can use wildcards to specify patterns.

{-# LANGUAGE OverloadedStrings #-}
module Main
   ( main
   ) where

import Control.Monad
import Data.Maybe
import Network.Nats
import Text.Printf

main :: IO ()
main =
   withNats defaultSettings ["nats://localhost"] $ \nats -> do

       -- "*" matches any token, at any level of the subject.
       (_, queue1) <- subscribe nats "foo.*.baz" Nothing
       (_, queue2) <- subscribe nats "foo.bar.*" Nothing

       -- ">" matches any length of the tail of the subject, and can
       -- only be the last token.
       (_, queue3) <- subscribe nats "foo.>" Nothing

       -- This publishing matches all the above.
       publish nats "foo.bar.baz" Nothing "Hello world"

       -- Show that the message showed up on all queues.
       forM_ [queue1, queue2, queue3] $ \queue -> do
           msg <- nextMsg queue
           printf "Received: %s\n" (show $ payload msg)

data Nats Source #

The type of the handle used by the API. To the user this type is opaque. The Nats handle is only valid within the scope of withNats function.

data Msg Source #

A NATS message as received by the user. The message itself is opaque to the user, but the fields can be read by the API functions topic, replyTo, sid, payload, jsonPayload and jsonPayload'

Instances

Eq Msg Source # 

Methods

(==) :: Msg -> Msg -> Bool #

(/=) :: Msg -> Msg -> Bool #

Show Msg Source # 

Methods

showsPrec :: Int -> Msg -> ShowS #

show :: Msg -> String #

showList :: [Msg] -> ShowS #

type Sid = Int64 Source #

The numeric id for a subscription. An id shall be unique within a NATS client. The value of the id will be generated automatically by the API. Type alias for Int64.

type Payload = ByteString Source #

The type of a message payload. Type alias for ByteString.

type Topic = ByteString Source #

The type of a topic where to publish, or to subscribe on. Type alias for ByteString.

type QueueGroup = ByteString Source #

A Topic subscriber can be part of a queue group, an entity for load balancing in NATS. Type alias for ByteString.

data MsgQueue Source #

A message queue, a queue of Msgs handled by a TQueue.

data ManagerSettings Source #

A set of parameters to guide the behavior of the connection manager. A default set of parameters can be obtained by calling defaultSettings.

Constructors

ManagerSettings 

Fields

  • reconnectionAttempts :: !Int

    The number of times the connection manager shall try to connect a server before giving up.

  • maxWaitTimeMS :: !Int

    Maximum waiting between a connection is made and a CONNECT message is received from the NATS server. If exceeded the connection is terminated and a new server selection is made. The unit for the time is in milliseconds.

  • serverSelect :: ([URI], Int) -> IO (URI, Int)

    A function to select one of the servers from the server pool. The arguments to the selector is the list of server uris and the current index. The reply is the chosen server and its index.

  • connectedTo :: SockAddr -> IO ()

    Callback to inform that a connection is made to the NATS server, with the SockAddr for the server. This callback is made in the connection manager's thread, and the callback's execution time must be minimized. forkIO if longer execution times are needed.

  • disconnectedFrom :: SockAddr -> IO ()

    Callback to inform that a disconnection to the NATS server has happen. Give the SockAddr for the server. This callback is made in the connection manager's thread, and the callback's execution time must be minimized. forkIO if longer execution times are needed.

data NatsException Source #

Exceptions generated from within this library.

Constructors

ConnectionGiveUpException

An exception thrown when all the configured connection attempts are consumed and the connection manager has been given up.

AuthorizationException

The NATS server currently connected to has said that there are authorization violations. Don't try to survive, just tell the user that there are such errors.

URIError !String

An exception caused by invalid URI strings given to the withNats function.

data SockAddr :: * #

The existence of a constructor does not necessarily imply that that socket address type is supported on your system: see isSupportedSockAddr.

withNats Source #

Arguments

:: ManagerSettings

Settings for the connection manager. Default ManagerSettings can be obtained by defaultSettings.

-> [String]

A list of URI strings to specify the NATS servers available. If any URI string is malformed an URIError exception is thrown. Parsing of URIs is performed using the parseAbsoluteURI function.

-> (Nats -> IO a)

The user provided action. Once the action is terminated the connection will close.

-> IO a 

Run an IO action while connection towards NATS is maintained. If a NATS connection is lost, the connection manager will try to reconnect the same or one of the other NATS servers (as specified by the provided URI strings). Strategies for reconnection is specified in the ManagerSettings. All subscriptions will be automatically replayed once a new connection is made.

publish :: Nats -> Topic -> Maybe Topic -> Payload -> IO () Source #

Publish some Payload message to a Topic. The NATS server will distribute the message to subscribers of the Topic.

publish nats "GREETINGS" Nothing "Hello, there!"

Will publish the string Hello, there! to subscribers of GREETINGS. No reply-to Topic is provided. To request a reply, provide a Topic where the subscriber can publish a reply.

publish nats "GREETINGS" (Just "THANKS") "Hello, there!"

publishJson :: ToJSON a => Nats -> Topic -> Maybe Topic -> a -> IO () Source #

As publish, but with JSON payload.

subscribe :: Nats -> Topic -> Maybe QueueGroup -> IO (Sid, MsgQueue) Source #

Subscribe to a Topic. Optionally a subscription can be part of a QueueGroup. The function will immediately return with a tuple of a Sid for the subscription, and a SubQueue from where messages can be fetched using nextMsg.

(sid, queue) <- subscribe nats "do.stuff" Nothing

Or

(sid, queue) <- subscribe nats "do.stuff" (Just "stuffworkers")

subscribeAsync :: Nats -> Topic -> Maybe QueueGroup -> (Msg -> IO ()) -> IO Sid Source #

Subscribe to a Topic. Optionally a subscription can be part of a QueueGroup.

Subscriptions using this function will be asynchronous, and each message will be handled in its own thread. A message handler is an IO action taking a Msg as its argument. The function return the Sid for the subscription.

sid <- subscribeAsync nats "do.stuff" Nothing $ \msg -> do
    -- Do stuff with the msg

Or

sid <- subscribeAsync nats "do.stuff" Nothing messageHandler

messageHandler :: Msg -> IO ()
messageHandler msg = do
   -- Do stuff with the msg

request :: Nats -> Topic -> Payload -> IO Msg Source #

Request is publishing a Payload to a Topic and waiting for a Msg. Request is a blocking operation, but can be interrupted by timeout.

msg <- request nats "do.stuff" "A little payload"

Or

maybeMsg <- timeout tmo $ request nats "do.stuff" "A little payload"

requestJson :: ToJSON a => Nats -> Topic -> a -> IO Msg Source #

As request, but with JSON payload.

unsubscribe :: Nats -> Sid -> Maybe Int -> IO () Source #

Unsubscribe from a subscription using its Sid. Optionally a limit for automatic unsubscription can be given. Unsubscription will happen once the number of messages - the limit - has been reached.

unsubscribe nats sid Nothing

Or

unsubscribe nats sid (Just 100)

nextMsg :: MsgQueue -> IO Msg Source #

Fetch a new Msg from the SubQueue. Fetching a message is a blocking operation, but can be interrupted by timeout.

msg <- nextMsg queue

Or

maybeMsg <- timeout tmo $ nextMsg queue

topic :: Msg -> Topic Source #

Read the complete topic on which a message was received.

replyTo :: Msg -> Maybe Topic Source #

Read the reply-to topic from a received message.

sid :: Msg -> Sid Source #

Read the subscription id for the subscription on which this message was received.

payload :: Msg -> Payload Source #

Read the raw payload from a received message.

jsonPayload :: FromJSON a => Msg -> Maybe a Source #

Decode a message's payload as JSON. Is using decode for the decoding.

jsonPayload' :: FromJSON a => Msg -> Maybe a Source #

Decode a message's payload as JSON. Is using decode' for the decoding.