amqp-worker-0.4.0: Type-safe AMQP workers
Safe HaskellSafe-Inferred
LanguageHaskell2010

Network.AMQP.Worker

Description

High level functions for working with message queues. Built on top of Network.AMQP. See https://hackage.haskell.org/package/amqp, which only works with RabbitMQ: https://www.rabbitmq.com/

Example:

Connect to a server, initialize a queue, publish a message, and create a worker to process them.

{-# LANGUAGE DeriveGeneric     #-}
{-# LANGUAGE OverloadedStrings #-}
module Main where

import           Control.Concurrent      (forkIO)
import           Control.Monad.Catch     (SomeException)
import           Data.Aeson              (FromJSON, ToJSON)
import           Data.Function           ((&))
import           Data.Text               (Text, pack)
import           GHC.Generics            (Generic)
import           Network.AMQP.Worker     (Connection, Message (..),
                                          WorkerException, def, fromURI)
import qualified Network.AMQP.Worker     as Worker
import           Network.AMQP.Worker.Key
import           System.IO               (BufferMode (..), hSetBuffering,
                                          stderr, stdout)

data TestMessage = TestMessage
  { greeting :: Text }
  deriving (Generic, Show, Eq)

instance FromJSON TestMessage
instance ToJSON TestMessage


newMessages :: Key Routing TestMessage
newMessages = key "messages" & word "new"

results :: Key Routing Text
results = key "results"

anyMessages :: Key Binding TestMessage
anyMessages = key "messages" & star



example :: IO ()
example = do

  -- connect
  conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672")

  let handleAnyMessages = Worker.topic anyMessages "handleAnyMessage"

  -- initialize the queues
  Worker.bindQueue conn (Worker.direct newMessages)
  Worker.bindQueue conn (Worker.direct results)

  -- topic queue!
  Worker.bindQueue conn handleAnyMessages

  putStrLn "Enter a message"
  msg <- getLine

  -- publish a message
  putStrLn "Publishing a message"
  Worker.publish conn newMessages (TestMessage $ pack msg)

  -- create a worker, the program loops here
  _ <- forkIO $ Worker.worker conn def (Worker.direct newMessages) onError (onMessage conn)
  _ <- forkIO $ Worker.worker conn def (handleAnyMessages) onError (onMessage conn)

  putStrLn "Press any key to exit"
  _ <- getLine
  return ()


onMessage :: Connection -> Message TestMessage -> IO ()
onMessage conn m = do
  let testMessage = value m
  putStrLn "Got Message"
  print testMessage
  Worker.publish conn results (greeting testMessage)


onError :: WorkerException SomeException -> IO ()
onError e = do
  putStrLn "Do something with errors"
  print e

Synopsis

Routing Keys

newtype Key a msg Source #

Keys describe routing and binding info for a message

Constructors

Key [a] 

Instances

Instances details
Monoid (Key a msg) Source # 
Instance details

Defined in Network.AMQP.Worker.Key

Methods

mempty :: Key a msg #

mappend :: Key a msg -> Key a msg -> Key a msg #

mconcat :: [Key a msg] -> Key a msg #

Semigroup (Key a msg) Source # 
Instance details

Defined in Network.AMQP.Worker.Key

Methods

(<>) :: Key a msg -> Key a msg -> Key a msg #

sconcat :: NonEmpty (Key a msg) -> Key a msg #

stimes :: Integral b => b -> Key a msg -> Key a msg #

Show a => Show (Key a msg) Source # 
Instance details

Defined in Network.AMQP.Worker.Key

Methods

showsPrec :: Int -> Key a msg -> ShowS #

show :: Key a msg -> String #

showList :: [Key a msg] -> ShowS #

Eq a => Eq (Key a msg) Source # 
Instance details

Defined in Network.AMQP.Worker.Key

Methods

(==) :: Key a msg -> Key a msg -> Bool #

(/=) :: Key a msg -> Key a msg -> Bool #

data Routing Source #

Every message is sent with a specific routing key

newCommentKey :: Key Routing Comment
newCommentKey = key "posts" & word "1" & word "comments" & word "new"

Instances

Instances details
KeySegment Routing Source # 
Instance details

Defined in Network.AMQP.Worker.Key

Show Routing Source # 
Instance details

Defined in Network.AMQP.Worker.Key

Eq Routing Source # 
Instance details

Defined in Network.AMQP.Worker.Key

Methods

(==) :: Routing -> Routing -> Bool #

(/=) :: Routing -> Routing -> Bool #

data Binding Source #

A dynamic binding address for topic queues

commentsKey :: Key Binding Comment
commentsKey = key "posts" & star & word "comments" & hash

Instances

Instances details
KeySegment Binding Source # 
Instance details

Defined in Network.AMQP.Worker.Key

Show Binding Source # 
Instance details

Defined in Network.AMQP.Worker.Key

Eq Binding Source # 
Instance details

Defined in Network.AMQP.Worker.Key

Methods

(==) :: Binding -> Binding -> Bool #

(/=) :: Binding -> Binding -> Bool #

word :: KeySegment a => Text -> Key a msg -> Key a msg Source #

Match a specific word

key :: Text -> Key Routing msg Source #

Create a new key

star :: KeySegment a => Key a msg -> Key Binding msg Source #

Match any one word

hash :: KeySegment a => Key a msg -> Key Binding msg Source #

Match any words

Connecting

data Connection Source #

Internal connection details

connect :: MonadIO m => ConnectionOpts -> m Connection Source #

Connect to the AMQP server.

>>> conn <- connect (fromURI "amqp://guest:guest@localhost:5672")

exchange :: Connection -> ExchangeName Source #

fromURI :: String -> ConnectionOpts #

Parses amqp standard URI of the form amqp://user:passwordhost:port/vhost and returns a ConnectionOpts for use with openConnection'' | Any of these fields may be empty and will be replaced with defaults from amqp:/guest:guest@localhost:5672@

Initializing queues

data Queue msg Source #

Constructors

Queue (Key Binding msg) QueueName 

Instances

Instances details
Show (Queue msg) Source # 
Instance details

Defined in Network.AMQP.Worker.Queue

Methods

showsPrec :: Int -> Queue msg -> ShowS #

show :: Queue msg -> String #

showList :: [Queue msg] -> ShowS #

Eq (Queue msg) Source # 
Instance details

Defined in Network.AMQP.Worker.Queue

Methods

(==) :: Queue msg -> Queue msg -> Bool #

(/=) :: Queue msg -> Queue msg -> Bool #

direct :: Key Routing msg -> Queue msg Source #

Declare a direct queue, which will receive messages published with the exact same routing key

newUsers :: Queue User
newUsers = Worker.direct (key "users" & word "new")

topic :: KeySegment a => Key a msg -> QueueName -> Queue msg Source #

Declare a topic queue, which will receive messages that match using wildcards

anyUsers :: Queue User
anyUsers = Worker.topic "anyUsers" (key "users" & star)

bindQueue :: MonadIO m => Connection -> Queue msg -> m () Source #

Queues must be bound before you publish messages to them, or the messages will not be saved.

let queue = Worker.direct (key "users" & word "new") :: Queue User
conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672")
Worker.bindQueue conn queue

Sending Messages

publish :: (ToJSON a, MonadIO m) => Connection -> Key Routing a -> a -> m () Source #

send a message to a queue. Enforces that the message type and queue name are correct at the type level

publish conn (key "users" :: Key Routing User) (User "username")

publishToExchange :: (ToJSON a, MonadIO m) => Connection -> Key Routing a -> a -> m () Source #

publish a message to a routing key, without making sure a queue exists to handle it or if it is the right type of message body

publishToExchange conn key (User "username")

Reading Messages

consume :: (FromJSON msg, MonadIO m) => Connection -> Queue msg -> m (Maybe (ConsumeResult msg)) Source #

Check for a message once and attempt to parse it

res <- consume conn queue
case res of
  Just (Parsed m) -> print m
  Just (Error e) -> putStrLn "could not parse message"
  Nothing -> putStrLn "No messages on the queue"

consumeNext :: (FromJSON msg, MonadIO m) => Microseconds -> Connection -> Queue msg -> m (ConsumeResult msg) Source #

Block while checking for messages every N microseconds. Return once you find one.

res <- consumeNext conn queue
case res of
  (Parsed m) -> print m
  (Error e) -> putStrLn "could not parse message"

data Message a Source #

a parsed message from the queue

Constructors

Message 

Fields

Instances

Instances details
Show a => Show (Message a) Source # 
Instance details

Defined in Network.AMQP.Worker.Message

Methods

showsPrec :: Int -> Message a -> ShowS #

show :: Message a -> String #

showList :: [Message a] -> ShowS #

Eq a => Eq (Message a) Source # 
Instance details

Defined in Network.AMQP.Worker.Message

Methods

(==) :: Message a -> Message a -> Bool #

(/=) :: Message a -> Message a -> Bool #

Worker

worker :: (FromJSON a, MonadIO m, MonadCatch m) => Connection -> WorkerOptions -> Queue a -> (WorkerException SomeException -> m ()) -> (Message a -> m ()) -> m () Source #

Create a worker which loops, checks for messages, and handles errors

startWorker conn queue = do
  Worker.worker conn def queue onError onMessage

  where
    onMessage :: Message User
    onMessage m = do
      putStrLn "handle user message"
      print (value m)

    onError :: WorkerException SomeException -> IO ()
    onError e = do
      putStrLn "Do something with errors"

data WorkerOptions Source #

Options for worker

Constructors

WorkerOptions 

Fields

def :: Default a => a #

The default value for this type.