| Safe Haskell | Safe-Inferred |
|---|---|
| Language | Haskell2010 |
Network.AMQP.Worker
Description
High level functions for working with message queues. Built on top of Network.AMQP. See https://hackage.haskell.org/package/amqp, which only works with RabbitMQ: https://www.rabbitmq.com/
Example:
Connect to a server, initialize a queue, publish a message, and create a worker to process them.
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}
module Main where
import Control.Concurrent (forkIO)
import Control.Monad.Catch (SomeException)
import Data.Aeson (FromJSON, ToJSON)
import Data.Function ((&))
import Data.Text (Text, pack)
import GHC.Generics (Generic)
import Network.AMQP.Worker (Connection, Message (..),
WorkerException, def, fromURI)
import qualified Network.AMQP.Worker as Worker
import Network.AMQP.Worker.Key
import System.IO (BufferMode (..), hSetBuffering,
stderr, stdout)
data TestMessage = TestMessage
{ greeting :: Text }
deriving (Generic, Show, Eq)
instance FromJSON TestMessage
instance ToJSON TestMessage
newMessages :: Key Routing TestMessage
newMessages = key "messages" & word "new"
results :: Key Routing Text
results = key "results"
anyMessages :: Key Binding TestMessage
anyMessages = key "messages" & star
example :: IO ()
example = do
-- connect
conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672")
let handleAnyMessages = Worker.topic anyMessages "handleAnyMessage"
-- initialize the queues
Worker.bindQueue conn (Worker.direct newMessages)
Worker.bindQueue conn (Worker.direct results)
-- topic queue!
Worker.bindQueue conn handleAnyMessages
putStrLn "Enter a message"
msg <- getLine
-- publish a message
putStrLn "Publishing a message"
Worker.publish conn newMessages (TestMessage $ pack msg)
-- create a worker, the program loops here
_ <- forkIO $ Worker.worker conn def (Worker.direct newMessages) onError (onMessage conn)
_ <- forkIO $ Worker.worker conn def (handleAnyMessages) onError (onMessage conn)
putStrLn "Press any key to exit"
_ <- getLine
return ()
onMessage :: Connection -> Message TestMessage -> IO ()
onMessage conn m = do
let testMessage = value m
putStrLn "Got Message"
print testMessage
Worker.publish conn results (greeting testMessage)
onError :: WorkerException SomeException -> IO ()
onError e = do
putStrLn "Do something with errors"
print e
Synopsis
- newtype Key a msg = Key [a]
- data Routing
- data Binding
- word :: KeySegment a => Text -> Key a msg -> Key a msg
- key :: Text -> Key Routing msg
- star :: KeySegment a => Key a msg -> Key Binding msg
- hash :: KeySegment a => Key a msg -> Key Binding msg
- data Connection
- connect :: MonadIO m => ConnectionOpts -> m Connection
- disconnect :: MonadIO m => Connection -> m ()
- exchange :: Connection -> ExchangeName
- fromURI :: String -> ConnectionOpts
- data Queue msg = Queue (Key Binding msg) QueueName
- direct :: Key Routing msg -> Queue msg
- topic :: KeySegment a => Key a msg -> QueueName -> Queue msg
- bindQueue :: MonadIO m => Connection -> Queue msg -> m ()
- publish :: (ToJSON a, MonadIO m) => Connection -> Key Routing a -> a -> m ()
- publishToExchange :: (ToJSON a, MonadIO m) => Connection -> Key Routing a -> a -> m ()
- consume :: (FromJSON msg, MonadIO m) => Connection -> Queue msg -> m (Maybe (ConsumeResult msg))
- consumeNext :: (FromJSON msg, MonadIO m) => Microseconds -> Connection -> Queue msg -> m (ConsumeResult msg)
- data ConsumeResult a
- = Parsed (Message a)
- | Error ParseError
- data ParseError = ParseError String ByteString
- data Message a = Message {
- body :: ByteString
- value :: a
- worker :: (FromJSON a, MonadIO m, MonadCatch m) => Connection -> WorkerOptions -> Queue a -> (WorkerException SomeException -> m ()) -> (Message a -> m ()) -> m ()
- data WorkerException e
- data WorkerOptions = WorkerOptions {}
- type Microseconds = Int
- def :: Default a => a
Routing Keys
Keys describe routing and binding info for a message
Constructors
| Key [a] |
Every message is sent with a specific routing key
newCommentKey :: Key Routing Comment newCommentKey = key "posts" & word "1" & word "comments" & word "new"
A dynamic binding address for topic queues
commentsKey :: Key Binding Comment commentsKey = key "posts" & star & word "comments" & hash
Connecting
data Connection Source #
Internal connection details
connect :: MonadIO m => ConnectionOpts -> m Connection Source #
Connect to the AMQP server.
>>>conn <- connect (fromURI "amqp://guest:guest@localhost:5672")
disconnect :: MonadIO m => Connection -> m () Source #
exchange :: Connection -> ExchangeName Source #
fromURI :: String -> ConnectionOpts #
Parses amqp standard URI of the form amqp://user:passwordhost:port/vhost and returns a ConnectionOpts for use with openConnection''
| Any of these fields may be empty and will be replaced with defaults from amqp:/guest:guest@localhost:5672@
Initializing queues
direct :: Key Routing msg -> Queue msg Source #
Declare a direct queue, which will receive messages published with the exact same routing key
newUsers :: Queue User newUsers = Worker.direct (key "users" & word "new")
topic :: KeySegment a => Key a msg -> QueueName -> Queue msg Source #
Declare a topic queue, which will receive messages that match using wildcards
anyUsers :: Queue User anyUsers = Worker.topic "anyUsers" (key "users" & star)
bindQueue :: MonadIO m => Connection -> Queue msg -> m () Source #
Queues must be bound before you publish messages to them, or the messages will not be saved.
let queue = Worker.direct (key "users" & word "new") :: Queue User conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672") Worker.bindQueue conn queue
Sending Messages
publish :: (ToJSON a, MonadIO m) => Connection -> Key Routing a -> a -> m () Source #
send a message to a queue. Enforces that the message type and queue name are correct at the type level
publish conn (key "users" :: Key Routing User) (User "username")
publishToExchange :: (ToJSON a, MonadIO m) => Connection -> Key Routing a -> a -> m () Source #
publish a message to a routing key, without making sure a queue exists to handle it or if it is the right type of message body
publishToExchange conn key (User "username")
Reading Messages
consume :: (FromJSON msg, MonadIO m) => Connection -> Queue msg -> m (Maybe (ConsumeResult msg)) Source #
Check for a message once and attempt to parse it
res <- consume conn queue case res of Just (Parsed m) -> print m Just (Error e) -> putStrLn "could not parse message" Nothing -> putStrLn "No messages on the queue"
consumeNext :: (FromJSON msg, MonadIO m) => Microseconds -> Connection -> Queue msg -> m (ConsumeResult msg) Source #
Block while checking for messages every N microseconds. Return once you find one.
res <- consumeNext conn queue case res of (Parsed m) -> print m (Error e) -> putStrLn "could not parse message"
data ConsumeResult a Source #
Constructors
| Parsed (Message a) | |
| Error ParseError |
data ParseError Source #
Constructors
| ParseError String ByteString |
a parsed message from the queue
Constructors
| Message | |
Fields
| |
Worker
worker :: (FromJSON a, MonadIO m, MonadCatch m) => Connection -> WorkerOptions -> Queue a -> (WorkerException SomeException -> m ()) -> (Message a -> m ()) -> m () Source #
Create a worker which loops, checks for messages, and handles errors
startWorker conn queue = do
Worker.worker conn def queue onError onMessage
where
onMessage :: Message User
onMessage m = do
putStrLn "handle user message"
print (value m)
onError :: WorkerException SomeException -> IO ()
onError e = do
putStrLn "Do something with errors"data WorkerException e Source #
Exceptions created while processing
Constructors
| MessageParseError ByteString String | |
| OtherException ByteString e |
Instances
| Exception e => Exception (WorkerException e) Source # | |
Defined in Network.AMQP.Worker.Worker Methods toException :: WorkerException e -> SomeException # fromException :: SomeException -> Maybe (WorkerException e) # displayException :: WorkerException e -> String # | |
| Show e => Show (WorkerException e) Source # | |
Defined in Network.AMQP.Worker.Worker Methods showsPrec :: Int -> WorkerException e -> ShowS # show :: WorkerException e -> String # showList :: [WorkerException e] -> ShowS # | |
| Eq e => Eq (WorkerException e) Source # | |
Defined in Network.AMQP.Worker.Worker Methods (==) :: WorkerException e -> WorkerException e -> Bool # (/=) :: WorkerException e -> WorkerException e -> Bool # | |
data WorkerOptions Source #
Options for worker
Constructors
| WorkerOptions | |
Fields
| |
Instances
| Show WorkerOptions Source # | |
Defined in Network.AMQP.Worker.Worker Methods showsPrec :: Int -> WorkerOptions -> ShowS # show :: WorkerOptions -> String # showList :: [WorkerOptions] -> ShowS # | |
| Default WorkerOptions Source # | |
Defined in Network.AMQP.Worker.Worker Methods def :: WorkerOptions # | |
| Eq WorkerOptions Source # | |
Defined in Network.AMQP.Worker.Worker Methods (==) :: WorkerOptions -> WorkerOptions -> Bool # (/=) :: WorkerOptions -> WorkerOptions -> Bool # | |
type Microseconds = Int Source #