Versions |
0.2.0, 0.2.1, 0.2.2, 0.2.3, 0.2.4, 0.2.5, 0.3.2, 0.4.0, 1.0.0, 2.0.0, 2.0.0 |
Change log |
CHANGELOG.md |
Dependencies |
aeson (>=2.0 && <2.3), amqp (>=0.20 && <1), amqp-worker, base (>=4.9 && <5), bytestring (>=0.11 && <0.12), exceptions (>=0.10 && <0.11), monad-loops (>=0.4 && <0.5), mtl (>=2.2 && <2.3), resource-pool (>=0.3 && <0.5), text (>=1.2 && <1.3) [details] |
License |
BSD-3-Clause |
Copyright |
Orbital Labs |
Author |
Sean Hess |
Maintainer |
seanhess@gmail.com |
Category |
Network |
Home page |
https://github.com/seanhess/amqp-worker#readme
|
Bug tracker |
https://github.com/seanhess/amqp-worker/issues
|
Source repo |
head: git clone https://github.com/seanhess/amqp-worker |
Uploaded |
by seanhess at 2023-09-05T17:09:36Z |
AMQP Worker
Type-safe and simplified message queues with AMQP. Compatible with RabbitMQ.
View on Hackage
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}
module Main where
import Control.Concurrent (forkIO)
import Data.Aeson (FromJSON, ToJSON)
import Data.Function ((&))
import Data.Text (Text)
import GHC.Generics (Generic)
import Network.AMQP.Worker
import qualified Network.AMQP.Worker as Worker
newtype Greeting = Greeting
{message :: Text}
deriving (Generic, Show, Eq)
instance FromJSON Greeting
instance ToJSON Greeting
newGreetings :: Key Route Greeting
newGreetings = key "greetings" & word "new"
anyGreetings :: Key Bind Greeting
anyGreetings = key "greetings" & any1
example :: IO ()
example = do
conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672")
simple conn
publishing :: Connection -> IO ()
publishing conn = do
Worker.publish conn newGreetings $ Greeting "Hello"
-- | Create a queue to process messages
simple :: Connection -> IO ()
simple conn = do
-- create a queue to receive them
q <- Worker.queue conn "main" newGreetings
-- publish a message (delivered to queue)
Worker.publish conn newGreetings $ Greeting "Hello"
-- wait until we receive the message
m <- Worker.takeMessage conn q
print (value m)
-- | Multiple queues with distinct names will each get copies of published messages
multiple :: Connection -> IO ()
multiple conn = do
-- create two separate queues
one <- Worker.queue conn "one" newGreetings
two <- Worker.queue conn "two" newGreetings
-- publish a message (delivered to both)
Worker.publish conn newGreetings $ Greeting "Hello"
-- Each of these queues will receive the same message
m1 <- Worker.takeMessage conn one
m2 <- Worker.takeMessage conn two
print $ value m1
print $ value m2
-- | Create workers to continually process messages
workers :: Connection -> IO ()
workers conn = do
-- create a single queue
q <- Worker.queue conn "main" newGreetings
-- publish some messages
Worker.publish conn newGreetings $ Greeting "Hello1"
Worker.publish conn newGreetings $ Greeting "Hello2"
Worker.publish conn newGreetings $ Greeting "Hello3"
-- Create a worker to process any messages on the queue
_ <- forkIO $ Worker.worker conn q $ \m -> do
putStrLn "one"
print (value m)
-- Listening to the same queue with N workers will load balance them
_ <- forkIO $ Worker.worker conn q $ \m -> do
putStrLn "two"
print (value m)
putStrLn "Press any key to exit"
_ <- getLine
return ()
-- | You can bind to messages dynamically with wildcards in Binding Keys
dynamic :: Connection -> IO ()
dynamic conn = do
-- anyGreetings matches `greetings.*`
q <- Worker.queue conn "main" anyGreetings
-- You can only publish to a specific Routing Key, like `greetings.new`
Worker.publish conn newGreetings $ Greeting "Hello"
-- We cannot publish to anyGreetings because it is a Binding Key (with wildcards in it)
-- Worker.publish conn anyGreetings $ Greeting "Compiler Error"
m <- Worker.takeMessage conn q
print $ value m