Copyright | (c) 2023 Sean Hess |
---|---|
License | BSD3 |
Maintainer | Sean Hess <seanhess@gmail.com> |
Stability | experimental |
Portability | portable |
Safe Haskell | Safe-Inferred |
Language | Haskell2010 |
Type safe and simplified message queues with AMQP
Synopsis
- newtype Key a msg = Key [Binding]
- data Binding
- data Routing
- word :: Text -> Key a msg -> Key a msg
- key :: Text -> Key Routing msg
- any1 :: Key a msg -> Key Binding msg
- many :: Key a msg -> Key Binding msg
- connect :: MonadIO m => ConnectionOpts -> m Connection
- fromURI :: String -> ConnectionOpts
- data Connection
- publish :: (RequireRouting a, ToJSON msg, MonadIO m) => Connection -> Key a msg -> msg -> m ()
- queue :: MonadIO m => Connection -> QueuePrefix -> Key a msg -> m (Queue msg)
- queueNamed :: MonadIO m => Connection -> QueueName -> Key a msg -> m (Queue msg)
- data Queue msg = Queue (Key Binding msg) QueueName
- queueName :: QueuePrefix -> Key a msg -> QueueName
- type QueueName = Text
- newtype QueuePrefix = QueuePrefix Text
- data ParseError = ParseError String ByteString
- data Message a = Message {
- body :: ByteString
- value :: a
- worker :: (FromJSON a, MonadIO m, MonadCatch m) => Connection -> WorkerOptions -> Queue a -> (WorkerException SomeException -> m ()) -> (Message a -> m ()) -> m ()
- data WorkerException e
- data WorkerOptions = WorkerOptions {}
- type Microseconds = Int
- def :: Default a => a
How to use this library
Define keys to identify how messages will be published and what the message type is
import Network.AMQP.Worker as Worker data Greeting = Greeting { message :: Text } deriving (Generic, Show, Eq) instance FromJSON Greeting instance ToJSON Greeting newGreetings :: Key Routing Greeting newGreetings = key "messages" & word "greetings" & word "new"
Connect to AMQP and publish a message
conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672") Worker.publish conn newMessages $ TestMessage "hello"
To receive messages, first define a queue. You can bind direclty to the Routing Key to ensure it is delivered once
q <- Worker.queue conn def newMessages :: IO (Queue Greeting) -- Loop and print any values received Worker.worker conn def q onError (print . value)
You can also define dynamic Routing Keys to receive many kinds of messages
let newMessages = key "messages" & any1 & word "new" q <- Worker.queue conn def newMessages :: IO (Queue Greeting)
Binding and Routing Keys
Messages are published with a specific identifier called a Routing key. Queues can use Binding Keys to control which messages are delivered to them.
Routing keys have no dynamic component and can be used to publish messages
commentsKey :: Key Routing Comment commentsKey = key "posts" & word "new"
Binding keys can contain wildcards, only used for matching messages
commentsKey :: Key Binding Comment commentsKey = key "posts" & any1 & word "comments" & many
word :: Text -> Key a msg -> Key a msg Source #
A specific word. Can be used to chain Routing keys or Binding keys
any1 :: Key a msg -> Key Binding msg Source #
Match any one word. Equivalent to *
. Converts to a Binding key and can no longer be used to publish messaages
many :: Key a msg -> Key Binding msg Source #
Match zero or more words. Equivalient to #
. Converts to a Binding key and can no longer be used to publish messages
Connecting
connect :: MonadIO m => ConnectionOpts -> m Connection Source #
Connect to the AMQP server.
conn <- connect (fromURI "amqp://guest:guest@localhost:5672")
fromURI :: String -> ConnectionOpts #
Parses amqp standard URI of the form amqp://user:password
host:port/vhost and returns a
ConnectionOpts for use with
openConnection''
| Any of these fields may be empty and will be replaced with defaults from
amqp:/guest:guest@localhost:5672@
data Connection Source #
Sending Messages
publish :: (RequireRouting a, ToJSON msg, MonadIO m) => Connection -> Key a msg -> msg -> m () Source #
send a message to a queue. Enforces that the message type and queue name are correct at the type level
let key = key "users" :: Key Routing User publish conn key (User "username")
Publishing to a Binding Key results in an error
-- Compiler error! This doesn't make sense let key = key "users" & many :: Key Binding User publish conn key (User "username")
Initializing queues
queue :: MonadIO m => Connection -> QueuePrefix -> Key a msg -> m (Queue msg) Source #
queueNamed :: MonadIO m => Connection -> QueueName -> Key a msg -> m (Queue msg) Source #
Create a queue to receive messages matching the binding key. Each queue with a unique name will be delivered a separate copy of the messsage. Workers operating on the same queue, or on queues with the same name will load balance
A queue is an inbox for messages to be delivered
queueName :: QueuePrefix -> Key a msg -> QueueName Source #
Name a queue with a prefix and the binding key name. Useful for seeing at a glance which queues are receiving which messages
-- "main messages.new" queueName "main" (key "messages" & word "new")
newtype QueuePrefix Source #
Instances
IsString QueuePrefix Source # | |
Defined in Network.AMQP.Worker.Queue fromString :: String -> QueuePrefix # | |
Show QueuePrefix Source # | |
Defined in Network.AMQP.Worker.Queue showsPrec :: Int -> QueuePrefix -> ShowS # show :: QueuePrefix -> String # showList :: [QueuePrefix] -> ShowS # | |
Default QueuePrefix Source # | |
Defined in Network.AMQP.Worker.Queue def :: QueuePrefix # | |
Eq QueuePrefix Source # | |
Defined in Network.AMQP.Worker.Queue (==) :: QueuePrefix -> QueuePrefix -> Bool # (/=) :: QueuePrefix -> QueuePrefix -> Bool # |
Messages
a parsed message from the queue
Message | |
|
Worker
worker :: (FromJSON a, MonadIO m, MonadCatch m) => Connection -> WorkerOptions -> Queue a -> (WorkerException SomeException -> m ()) -> (Message a -> m ()) -> m () Source #
Create a worker which loops, checks for messages, and handles errors
startWorker conn queue = do Worker.worker conn def queue onError onMessage where onMessage :: Message User onMessage m = do putStrLn "handle user message" print (value m) onError :: WorkerException SomeException -> IO () onError e = do putStrLn "Do something with errors"
data WorkerException e Source #
Exceptions created while processing
Instances
Exception e => Exception (WorkerException e) Source # | |
Defined in Network.AMQP.Worker.Worker toException :: WorkerException e -> SomeException # fromException :: SomeException -> Maybe (WorkerException e) # displayException :: WorkerException e -> String # | |
Show e => Show (WorkerException e) Source # | |
Defined in Network.AMQP.Worker.Worker showsPrec :: Int -> WorkerException e -> ShowS # show :: WorkerException e -> String # showList :: [WorkerException e] -> ShowS # | |
Eq e => Eq (WorkerException e) Source # | |
Defined in Network.AMQP.Worker.Worker (==) :: WorkerException e -> WorkerException e -> Bool # (/=) :: WorkerException e -> WorkerException e -> Bool # |
data WorkerOptions Source #
Options for worker
WorkerOptions | |
|
Instances
Show WorkerOptions Source # | |
Defined in Network.AMQP.Worker.Worker showsPrec :: Int -> WorkerOptions -> ShowS # show :: WorkerOptions -> String # showList :: [WorkerOptions] -> ShowS # | |
Default WorkerOptions Source # | |
Defined in Network.AMQP.Worker.Worker def :: WorkerOptions # | |
Eq WorkerOptions Source # | |
Defined in Network.AMQP.Worker.Worker (==) :: WorkerOptions -> WorkerOptions -> Bool # (/=) :: WorkerOptions -> WorkerOptions -> Bool # |
type Microseconds = Int Source #