Safe Haskell | Safe-Inferred |
---|---|
Language | Haskell2010 |
High level functions for working with message queues. Built on top of Network.AMQP. See https://hackage.haskell.org/package/amqp, which only works with RabbitMQ: https://www.rabbitmq.com/
Example:
Connect to a server, initialize a queue, publish a message, and create a worker to process them.
{-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE OverloadedStrings #-} module Main where import Control.Concurrent (forkIO) import Control.Monad.Catch (SomeException) import Data.Aeson (FromJSON, ToJSON) import Data.Function ((&)) import Data.Text (Text, pack) import GHC.Generics (Generic) import Network.AMQP.Worker (Connection, Message (..), WorkerException, def, fromURI) import qualified Network.AMQP.Worker as Worker import Network.AMQP.Worker.Key import System.IO (BufferMode (..), hSetBuffering, stderr, stdout) data TestMessage = TestMessage { greeting :: Text } deriving (Generic, Show, Eq) instance FromJSON TestMessage instance ToJSON TestMessage newMessages :: Key Routing TestMessage newMessages = key "messages" & word "new" results :: Key Routing Text results = key "results" anyMessages :: Key Binding TestMessage anyMessages = key "messages" & star example :: IO () example = do -- connect conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672") let handleAnyMessages = Worker.topic anyMessages "handleAnyMessage" -- initialize the queues Worker.bindQueue conn (Worker.direct newMessages) Worker.bindQueue conn (Worker.direct results) -- topic queue! Worker.bindQueue conn handleAnyMessages putStrLn "Enter a message" msg <- getLine -- publish a message putStrLn "Publishing a message" Worker.publish conn newMessages (TestMessage $ pack msg) -- create a worker, the program loops here _ <- forkIO $ Worker.worker conn def (Worker.direct newMessages) onError (onMessage conn) _ <- forkIO $ Worker.worker conn def (handleAnyMessages) onError (onMessage conn) putStrLn "Press any key to exit" _ <- getLine return () onMessage :: Connection -> Message TestMessage -> IO () onMessage conn m = do let testMessage = value m putStrLn "Got Message" print testMessage Worker.publish conn results (greeting testMessage) onError :: WorkerException SomeException -> IO () onError e = do putStrLn "Do something with errors" print e
Synopsis
- newtype Key a msg = Key [a]
- data Routing
- data Binding
- word :: KeySegment a => Text -> Key a msg -> Key a msg
- key :: Text -> Key Routing msg
- star :: KeySegment a => Key a msg -> Key Binding msg
- hash :: KeySegment a => Key a msg -> Key Binding msg
- data Connection
- connect :: MonadIO m => ConnectionOpts -> m Connection
- disconnect :: MonadIO m => Connection -> m ()
- exchange :: Connection -> ExchangeName
- fromURI :: String -> ConnectionOpts
- data Queue msg = Queue (Key Binding msg) QueueName
- direct :: Key Routing msg -> Queue msg
- topic :: KeySegment a => Key a msg -> QueueName -> Queue msg
- bindQueue :: MonadIO m => Connection -> Queue msg -> m ()
- publish :: (ToJSON a, MonadIO m) => Connection -> Key Routing a -> a -> m ()
- publishToExchange :: (ToJSON a, MonadIO m) => Connection -> Key Routing a -> a -> m ()
- consume :: (FromJSON msg, MonadIO m) => Connection -> Queue msg -> m (Maybe (ConsumeResult msg))
- consumeNext :: (FromJSON msg, MonadIO m) => Microseconds -> Connection -> Queue msg -> m (ConsumeResult msg)
- data ConsumeResult a
- = Parsed (Message a)
- | Error ParseError
- data ParseError = ParseError String ByteString
- data Message a = Message {
- body :: ByteString
- value :: a
- worker :: (FromJSON a, MonadIO m, MonadCatch m) => Connection -> WorkerOptions -> Queue a -> (WorkerException SomeException -> m ()) -> (Message a -> m ()) -> m ()
- data WorkerException e
- data WorkerOptions = WorkerOptions {}
- type Microseconds = Int
- def :: Default a => a
Routing Keys
Keys describe routing and binding info for a message
Key [a] |
Every message is sent with a specific routing key
newCommentKey :: Key Routing Comment newCommentKey = key "posts" & word "1" & word "comments" & word "new"
A dynamic binding address for topic queues
commentsKey :: Key Binding Comment commentsKey = key "posts" & star & word "comments" & hash
Connecting
data Connection Source #
Internal connection details
connect :: MonadIO m => ConnectionOpts -> m Connection Source #
Connect to the AMQP server.
>>>
conn <- connect (fromURI "amqp://guest:guest@localhost:5672")
disconnect :: MonadIO m => Connection -> m () Source #
exchange :: Connection -> ExchangeName Source #
fromURI :: String -> ConnectionOpts #
Parses amqp standard URI of the form amqp://user:password
host:port/vhost and returns a
ConnectionOpts for use with
openConnection''
| Any of these fields may be empty and will be replaced with defaults from
amqp:/guest:guest@localhost:5672@
Initializing queues
direct :: Key Routing msg -> Queue msg Source #
Declare a direct queue, which will receive messages published with the exact same routing key
newUsers :: Queue User newUsers = Worker.direct (key "users" & word "new")
topic :: KeySegment a => Key a msg -> QueueName -> Queue msg Source #
Declare a topic queue, which will receive messages that match using wildcards
anyUsers :: Queue User anyUsers = Worker.topic "anyUsers" (key "users" & star)
bindQueue :: MonadIO m => Connection -> Queue msg -> m () Source #
Queues must be bound before you publish messages to them, or the messages will not be saved.
let queue = Worker.direct (key "users" & word "new") :: Queue User conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672") Worker.bindQueue conn queue
Sending Messages
publish :: (ToJSON a, MonadIO m) => Connection -> Key Routing a -> a -> m () Source #
send a message to a queue. Enforces that the message type and queue name are correct at the type level
publish conn (key "users" :: Key Routing User) (User "username")
publishToExchange :: (ToJSON a, MonadIO m) => Connection -> Key Routing a -> a -> m () Source #
publish a message to a routing key, without making sure a queue exists to handle it or if it is the right type of message body
publishToExchange conn key (User "username")
Reading Messages
consume :: (FromJSON msg, MonadIO m) => Connection -> Queue msg -> m (Maybe (ConsumeResult msg)) Source #
Check for a message once and attempt to parse it
res <- consume conn queue case res of Just (Parsed m) -> print m Just (Error e) -> putStrLn "could not parse message" Nothing -> putStrLn "No messages on the queue"
consumeNext :: (FromJSON msg, MonadIO m) => Microseconds -> Connection -> Queue msg -> m (ConsumeResult msg) Source #
Block while checking for messages every N microseconds. Return once you find one.
res <- consumeNext conn queue case res of (Parsed m) -> print m (Error e) -> putStrLn "could not parse message"
data ConsumeResult a Source #
Parsed (Message a) | |
Error ParseError |
a parsed message from the queue
Message | |
|
Worker
worker :: (FromJSON a, MonadIO m, MonadCatch m) => Connection -> WorkerOptions -> Queue a -> (WorkerException SomeException -> m ()) -> (Message a -> m ()) -> m () Source #
Create a worker which loops, checks for messages, and handles errors
startWorker conn queue = do Worker.worker conn def queue onError onMessage where onMessage :: Message User onMessage m = do putStrLn "handle user message" print (value m) onError :: WorkerException SomeException -> IO () onError e = do putStrLn "Do something with errors"
data WorkerException e Source #
Exceptions created while processing
Instances
Exception e => Exception (WorkerException e) Source # | |
Defined in Network.AMQP.Worker.Worker toException :: WorkerException e -> SomeException # fromException :: SomeException -> Maybe (WorkerException e) # displayException :: WorkerException e -> String # | |
Show e => Show (WorkerException e) Source # | |
Defined in Network.AMQP.Worker.Worker showsPrec :: Int -> WorkerException e -> ShowS # show :: WorkerException e -> String # showList :: [WorkerException e] -> ShowS # | |
Eq e => Eq (WorkerException e) Source # | |
Defined in Network.AMQP.Worker.Worker (==) :: WorkerException e -> WorkerException e -> Bool # (/=) :: WorkerException e -> WorkerException e -> Bool # |
data WorkerOptions Source #
Options for worker
WorkerOptions | |
|
Instances
Show WorkerOptions Source # | |
Defined in Network.AMQP.Worker.Worker showsPrec :: Int -> WorkerOptions -> ShowS # show :: WorkerOptions -> String # showList :: [WorkerOptions] -> ShowS # | |
Default WorkerOptions Source # | |
Defined in Network.AMQP.Worker.Worker def :: WorkerOptions # | |
Eq WorkerOptions Source # | |
Defined in Network.AMQP.Worker.Worker (==) :: WorkerOptions -> WorkerOptions -> Bool # (/=) :: WorkerOptions -> WorkerOptions -> Bool # |
type Microseconds = Int Source #