amqp-worker-2.0.0: Type-safe AMQP workers
Copyright(c) 2023 Sean Hess
LicenseBSD3
MaintainerSean Hess <seanhess@gmail.com>
Stabilityexperimental
Portabilityportable
Safe HaskellSafe-Inferred
LanguageHaskell2010

Network.AMQP.Worker

Description

Type safe and simplified message queues with AMQP. Compatible with RabbitMQ

Synopsis

How to use this library

Define keys to identify how messages will be published and what the message type is

import Network.AMQP.Worker as Worker

data Greeting = Greeting
  { message :: Text }
  deriving (Generic, Show, Eq)

instance FromJSON Greeting
instance ToJSON Greeting

newGreetings :: Key Routing Greeting
newGreetings = key "greetings" & word "new"

Connect to AMQP and publish a message

conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672")
Worker.publish conn newGreetings $ Greeting "hello"

Create a queue to receive messages. You can bind direclty to the Routing Key to ensure it is delivered once

q <- Worker.queue conn "new" newMessages :: IO (Queue Greeting)
m <- Worker.takeMessage conn q
print (value m)

Define dynamic Routing Keys to receive many kinds of messages

let anyMessages = key "messages" & any1
q <- Worker.queue conn "main" anyMessages
m <- Worker.takeMessage conn q
print (value m)

Create a worker to conintually process messages

forkIO $ Worker.worker conn q $ \m -> do
    print (value m)

Connecting

connect :: MonadIO m => ConnectionOpts -> m Connection Source #

Connect to the AMQP server using simple defaults

conn <- connect (fromURI "amqp://guest:guest@localhost:5672")

fromURI :: String -> ConnectionOpts #

Parses amqp standard URI of the form amqp://user:passwordhost:port/vhost and returns a ConnectionOpts for use with openConnection'' | Any of these fields may be empty and will be replaced with defaults from amqp:/guest:guest@localhost:5672@

Binding and Routing Keys

newtype Key a msg Source #

Messages are published with a specific identifier called a Routing key. Queues can use Binding Keys to control which messages are delivered to them.

Routing keys have no dynamic component and can be used to publish messages

commentsKey :: Key Route Comment
commentsKey = key "posts" & word "new"

Binding keys can contain wildcards, only used for matching messages

commentsKey :: Key Bind Comment
commentsKey = key "posts" & any1 & word "comments" & many

Constructors

Key [Bind] 

Instances

Instances details
Monoid (Key a msg) Source # 
Instance details

Defined in Network.AMQP.Worker.Key

Methods

mempty :: Key a msg #

mappend :: Key a msg -> Key a msg -> Key a msg #

mconcat :: [Key a msg] -> Key a msg #

Semigroup (Key a msg) Source # 
Instance details

Defined in Network.AMQP.Worker.Key

Methods

(<>) :: Key a msg -> Key a msg -> Key a msg #

sconcat :: NonEmpty (Key a msg) -> Key a msg #

stimes :: Integral b => b -> Key a msg -> Key a msg #

Show (Key a msg) Source # 
Instance details

Defined in Network.AMQP.Worker.Key

Methods

showsPrec :: Int -> Key a msg -> ShowS #

show :: Key a msg -> String #

showList :: [Key a msg] -> ShowS #

Eq (Key a msg) Source # 
Instance details

Defined in Network.AMQP.Worker.Key

Methods

(==) :: Key a msg -> Key a msg -> Bool #

(/=) :: Key a msg -> Key a msg -> Bool #

data Bind Source #

Instances

Instances details
Show Bind Source # 
Instance details

Defined in Network.AMQP.Worker.Key

Methods

showsPrec :: Int -> Bind -> ShowS #

show :: Bind -> String #

showList :: [Bind] -> ShowS #

Eq Bind Source # 
Instance details

Defined in Network.AMQP.Worker.Key

Methods

(==) :: Bind -> Bind -> Bool #

(/=) :: Bind -> Bind -> Bool #

key :: Text -> Key Route msg Source #

Start a new routing key (can also be used for bindings)

key "messages"
-- matches "messages"

word :: Text -> Key a msg -> Key a msg Source #

A specific word. Can be used to chain Routing keys or Binding keys

key "messages" & word "new"
-- matches "messages.new"

any1 :: Key a msg -> Key Bind msg Source #

Match any one word. Equivalent to *. Converts to a Binding key and can no longer be used to publish messaages

key "messages" & any1
-- matches "messages.new"
-- matches "messages.update"

many :: Key a msg -> Key Bind msg Source #

Match zero or more words. Equivalient to #. Converts to a Binding key and can no longer be used to publish messages

key "messages" & many
-- matches "messages"
-- matches "messages.new"
-- matches "messages.1234.update"

Sending Messages

publish :: (RequireRoute a, ToJSON msg, MonadIO m) => Connection -> Key a msg -> msg -> m () Source #

send a message to a queue. Enforces that the message type and queue name are correct at the type level

let newUsers = key "users" & word "new" :: Key Route User
publish conn newUsers (User "username")

Publishing to a Binding Key results in an error

-- Compiler error! This doesn't make sense
let users = key "users" & many :: Key Binding User
publish conn users (User "username")

Initializing queues

queue :: MonadIO m => Connection -> QueuePrefix -> Key a msg -> m (Queue msg) Source #

Create a queue to receive messages matching the Key with a name prefixed via queueName.

q <- Worker.queue conn "main" $ key "messages" & any1
Worker.worker conn def q onError onMessage

queueNamed :: MonadIO m => Connection -> QueueName -> Key a msg -> m (Queue msg) Source #

Create a queue to receive messages matching the binding key. Each queue with a unique name will be delivered a separate copy of the messsage. Workers will load balance if operating on the same queue, or on queues with the same name

data Queue msg Source #

A queue is an inbox for messages to be delivered

Constructors

Queue (Key Bind msg) QueueName 

Instances

Instances details
Show (Queue msg) Source # 
Instance details

Defined in Network.AMQP.Worker.Queue

Methods

showsPrec :: Int -> Queue msg -> ShowS #

show :: Queue msg -> String #

showList :: [Queue msg] -> ShowS #

Eq (Queue msg) Source # 
Instance details

Defined in Network.AMQP.Worker.Queue

Methods

(==) :: Queue msg -> Queue msg -> Bool #

(/=) :: Queue msg -> Queue msg -> Bool #

queueName :: QueuePrefix -> Key a msg -> QueueName Source #

Name a queue with a prefix and the binding key name. Useful for seeing at a glance which queues are receiving which messages

-- "main messages.new"
queueName "main" (key "messages" & word "new")

Messages

takeMessage :: (MonadIO m, FromJSON a) => Connection -> Queue a -> m (Message a) Source #

Wait until a message is read from the queue. Throws an exception if the incoming message doesn't match the key type

m <- Worker.takeMessage conn queue
print (value m)

data Message a Source #

a parsed message from the queue

Constructors

Message 

Fields

Instances

Instances details
Show a => Show (Message a) Source # 
Instance details

Defined in Network.AMQP.Worker.Message

Methods

showsPrec :: Int -> Message a -> ShowS #

show :: Message a -> String #

showList :: [Message a] -> ShowS #

Eq a => Eq (Message a) Source # 
Instance details

Defined in Network.AMQP.Worker.Message

Methods

(==) :: Message a -> Message a -> Bool #

(/=) :: Message a -> Message a -> Bool #

Worker

worker :: (FromJSON a, MonadIO m) => Connection -> Queue a -> (Message a -> m ()) -> m () Source #

Create a worker which loops and handles messages. Throws an exception if the incoming message doesn't match the key type. It is recommended that you catch errors in your handler and allow message parsing errors to crash your program.

Worker.worker conn queue $ \m -> do
  print (value m)