| Copyright | (c) 2023 Sean Hess |
|---|---|
| License | BSD3 |
| Maintainer | Sean Hess <seanhess@gmail.com> |
| Stability | experimental |
| Portability | portable |
| Safe Haskell | Safe-Inferred |
| Language | Haskell2010 |
Network.AMQP.Worker
Description
Type safe and simplified message queues with AMQP
Synopsis
- newtype Key a msg = Key [Binding]
- data Binding
- data Routing
- word :: Text -> Key a msg -> Key a msg
- key :: Text -> Key Routing msg
- any1 :: Key a msg -> Key Binding msg
- many :: Key a msg -> Key Binding msg
- connect :: MonadIO m => ConnectionOpts -> m Connection
- fromURI :: String -> ConnectionOpts
- data Connection
- publish :: (RequireRouting a, ToJSON msg, MonadIO m) => Connection -> Key a msg -> msg -> m ()
- queue :: MonadIO m => Connection -> QueuePrefix -> Key a msg -> m (Queue msg)
- queueNamed :: MonadIO m => Connection -> QueueName -> Key a msg -> m (Queue msg)
- data Queue msg = Queue (Key Binding msg) QueueName
- queueName :: QueuePrefix -> Key a msg -> QueueName
- type QueueName = Text
- newtype QueuePrefix = QueuePrefix Text
- data ParseError = ParseError String ByteString
- data Message a = Message {
- body :: ByteString
- value :: a
- worker :: (FromJSON a, MonadIO m, MonadCatch m) => Connection -> WorkerOptions -> Queue a -> (WorkerException SomeException -> m ()) -> (Message a -> m ()) -> m ()
- data WorkerException e
- data WorkerOptions = WorkerOptions {}
- type Microseconds = Int
- def :: Default a => a
How to use this library
Define keys to identify how messages will be published and what the message type is
import Network.AMQP.Worker as Worker
data Greeting = Greeting
{ message :: Text }
deriving (Generic, Show, Eq)
instance FromJSON Greeting
instance ToJSON Greeting
newGreetings :: Key Routing Greeting
newGreetings = key "messages" & word "greetings" & word "new"Connect to AMQP and publish a message
conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672") Worker.publish conn newMessages $ TestMessage "hello"
To receive messages, first define a queue. You can bind direclty to the Routing Key to ensure it is delivered once
q <- Worker.queue conn def newMessages :: IO (Queue Greeting) -- Loop and print any values received Worker.worker conn def q onError (print . value)
You can also define dynamic Routing Keys to receive many kinds of messages
let newMessages = key "messages" & any1 & word "new" q <- Worker.queue conn def newMessages :: IO (Queue Greeting)
Binding and Routing Keys
Messages are published with a specific identifier called a Routing key. Queues can use Binding Keys to control which messages are delivered to them.
Routing keys have no dynamic component and can be used to publish messages
commentsKey :: Key Routing Comment commentsKey = key "posts" & word "new"
Binding keys can contain wildcards, only used for matching messages
commentsKey :: Key Binding Comment commentsKey = key "posts" & any1 & word "comments" & many
Instances
word :: Text -> Key a msg -> Key a msg Source #
A specific word. Can be used to chain Routing keys or Binding keys
any1 :: Key a msg -> Key Binding msg Source #
Match any one word. Equivalent to *. Converts to a Binding key and can no longer be used to publish messaages
many :: Key a msg -> Key Binding msg Source #
Match zero or more words. Equivalient to #. Converts to a Binding key and can no longer be used to publish messages
Connecting
connect :: MonadIO m => ConnectionOpts -> m Connection Source #
Connect to the AMQP server.
conn <- connect (fromURI "amqp://guest:guest@localhost:5672")
fromURI :: String -> ConnectionOpts #
Parses amqp standard URI of the form amqp://user:passwordhost:port/vhost and returns a ConnectionOpts for use with openConnection''
| Any of these fields may be empty and will be replaced with defaults from amqp:/guest:guest@localhost:5672@
data Connection Source #
Sending Messages
publish :: (RequireRouting a, ToJSON msg, MonadIO m) => Connection -> Key a msg -> msg -> m () Source #
send a message to a queue. Enforces that the message type and queue name are correct at the type level
let key = key "users" :: Key Routing User publish conn key (User "username")
Publishing to a Binding Key results in an error
-- Compiler error! This doesn't make sense let key = key "users" & many :: Key Binding User publish conn key (User "username")
Initializing queues
queue :: MonadIO m => Connection -> QueuePrefix -> Key a msg -> m (Queue msg) Source #
queueNamed :: MonadIO m => Connection -> QueueName -> Key a msg -> m (Queue msg) Source #
Create a queue to receive messages matching the binding key. Each queue with a unique name will be delivered a separate copy of the messsage. Workers operating on the same queue, or on queues with the same name will load balance
A queue is an inbox for messages to be delivered
queueName :: QueuePrefix -> Key a msg -> QueueName Source #
Name a queue with a prefix and the binding key name. Useful for seeing at a glance which queues are receiving which messages
-- "main messages.new" queueName "main" (key "messages" & word "new")
newtype QueuePrefix Source #
Constructors
| QueuePrefix Text |
Instances
| IsString QueuePrefix Source # | |
Defined in Network.AMQP.Worker.Queue Methods fromString :: String -> QueuePrefix # | |
| Show QueuePrefix Source # | |
Defined in Network.AMQP.Worker.Queue Methods showsPrec :: Int -> QueuePrefix -> ShowS # show :: QueuePrefix -> String # showList :: [QueuePrefix] -> ShowS # | |
| Default QueuePrefix Source # | |
Defined in Network.AMQP.Worker.Queue Methods def :: QueuePrefix # | |
| Eq QueuePrefix Source # | |
Defined in Network.AMQP.Worker.Queue | |
Messages
data ParseError Source #
Constructors
| ParseError String ByteString |
a parsed message from the queue
Constructors
| Message | |
Fields
| |
Worker
worker :: (FromJSON a, MonadIO m, MonadCatch m) => Connection -> WorkerOptions -> Queue a -> (WorkerException SomeException -> m ()) -> (Message a -> m ()) -> m () Source #
Create a worker which loops, checks for messages, and handles errors
startWorker conn queue = do
Worker.worker conn def queue onError onMessage
where
onMessage :: Message User
onMessage m = do
putStrLn "handle user message"
print (value m)
onError :: WorkerException SomeException -> IO ()
onError e = do
putStrLn "Do something with errors"data WorkerException e Source #
Exceptions created while processing
Constructors
| MessageParseError ByteString String | |
| OtherException ByteString e |
Instances
| Exception e => Exception (WorkerException e) Source # | |
Defined in Network.AMQP.Worker.Worker Methods toException :: WorkerException e -> SomeException # fromException :: SomeException -> Maybe (WorkerException e) # displayException :: WorkerException e -> String # | |
| Show e => Show (WorkerException e) Source # | |
Defined in Network.AMQP.Worker.Worker Methods showsPrec :: Int -> WorkerException e -> ShowS # show :: WorkerException e -> String # showList :: [WorkerException e] -> ShowS # | |
| Eq e => Eq (WorkerException e) Source # | |
Defined in Network.AMQP.Worker.Worker Methods (==) :: WorkerException e -> WorkerException e -> Bool # (/=) :: WorkerException e -> WorkerException e -> Bool # | |
data WorkerOptions Source #
Options for worker
Constructors
| WorkerOptions | |
Fields
| |
Instances
| Show WorkerOptions Source # | |
Defined in Network.AMQP.Worker.Worker Methods showsPrec :: Int -> WorkerOptions -> ShowS # show :: WorkerOptions -> String # showList :: [WorkerOptions] -> ShowS # | |
| Default WorkerOptions Source # | |
Defined in Network.AMQP.Worker.Worker Methods def :: WorkerOptions # | |
| Eq WorkerOptions Source # | |
Defined in Network.AMQP.Worker.Worker Methods (==) :: WorkerOptions -> WorkerOptions -> Bool # (/=) :: WorkerOptions -> WorkerOptions -> Bool # | |
type Microseconds = Int Source #