| Versions | 
          0.2.0, 0.2.1, 0.2.2, 0.2.3, 0.2.4, 0.2.5, 0.3.2, 0.4.0, 1.0.0, 2.0.0, 2.0.1, 2.0.1 | 
        
        
                    | Change log | 
          CHANGELOG.md | 
          
        
        
          | Dependencies | 
          aeson (>=2.0 && <2.3), amqp (>=0.20 && <1), amqp-worker, base (>=4.9 && <5), bytestring (>=0.11 && <0.13), data-default (>=0.7 && <0.9), exceptions (>=0.10 && <0.11), monad-loops (>=0.4 && <0.5), mtl (>=2.2 && <2.4), resource-pool (>=0.4 && <0.5), text (>=2 && <3) [details] | 
        
        
          | License | 
          BSD-3-Clause | 
        
                
          | Copyright | 
          Orbital Labs | 
        
        
        
          | Author | 
          Sean Hess | 
        
        
          | Maintainer | 
          seanhess@gmail.com | 
        
                
          | Category | 
          Network | 
        
        
                
          | Home page | 
          
            
              https://github.com/seanhess/amqp-worker#readme
            
           | 
        
        
                
          | Bug tracker | 
          
            
              https://github.com/seanhess/amqp-worker/issues
            
           | 
        
        
                
          | Source repo | 
          head: git clone https://github.com/seanhess/amqp-worker | 
        
        
        
          | Uploaded | 
          by seanhess at 2025-05-21T17:26:41Z | 
        
    
    AMQP Worker
Type-safe AMQP workers. Compatible with RabbitMQ
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}
module Main where
import Control.Concurrent (forkIO)
import Control.Monad.Catch (SomeException)
import Data.Aeson (FromJSON, ToJSON)
import Data.Function ((&))
import Data.Text (Text)
import GHC.Generics (Generic)
import Network.AMQP.Worker
import qualified Network.AMQP.Worker as Worker
import System.IO (BufferMode (..), hSetBuffering, stderr, stdout)
newtype Greeting = Greeting
    {message :: Text}
    deriving (Generic, Show, Eq)
instance FromJSON Greeting
instance ToJSON Greeting
newGreetings :: Key Bind Greeting
newGreetings = key "greetings" & word "new"
anyGreetings :: Key Bind Greeting
anyGreetings = key "greetings" & any1
example :: IO ()
example = do
    conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672")
    simple conn
publishing :: Connection -> IO ()
publishing conn = do
    Worker.publish conn newGreetings $ Greeting "Hello"
-- | Create a queue to process messages
simple :: Connection -> IO ()
simple conn = do
    -- create a queue to receive them
    q <- Worker.queue conn def newGreetings
    -- publish a message (delivered to queue)
    Worker.publish conn newGreetings $ Greeting "Hello"
    -- We cannot publish to anyGreetings because it is a binding key (with wildcards in it)
    -- Worker.publish conn anyGreetings $ TestMessage "Compiler Error"
    -- Loop and print any values received
    Worker.worker conn def q onError (print . value)
-- | Multiple queues with distinct names will each get copies of published messages
multiple :: Connection -> IO ()
multiple conn = do
    -- create two separate queues
    one <- Worker.queue conn "one" newGreetings
    two <- Worker.queue conn "two" newGreetings
    -- publish a message (delivered to both)
    Worker.publish conn newGreetings $ Greeting "Hello"
    -- Each of these workers will receive the same message
    _ <- forkIO $ Worker.worker conn def one onError $ \m -> putStrLn "one" >> print (value m)
    _ <- forkIO $ Worker.worker conn def two onError $ \m -> putStrLn "two" >> print (value m)
    putStrLn "Press any key to exit"
    _ <- getLine
    return ()
-- | Create multiple workers on the same queue to load balance between them
balance :: Connection -> IO ()
balance conn = do
    -- create a single queue
    q <- Worker.queue conn def newGreetings
    -- publish two messages
    Worker.publish conn newGreetings $ Greeting "Hello1"
    Worker.publish conn newGreetings $ Greeting "Hello2"
    -- Each worker will receive one of the messages
    _ <- forkIO $ Worker.worker conn def q onError $ \m -> putStrLn "one" >> print (value m)
    _ <- forkIO $ Worker.worker conn def q onError $ \m -> putStrLn "two" >> print (value m)
    putStrLn "Press any key to exit"
    _ <- getLine
    return ()
-- | You can bind to messages dynamically with wildcards in Binding Keys
dynamic :: Connection -> IO ()
dynamic conn = do
    -- \| anyGreetings matches `greetings.*`
    q <- Worker.queue conn def anyGreetings
    -- You can only publish to a Routing Key. Publishing to anyGreetings will give a compile error
    Worker.publish conn newGreetings $ Greeting "Hello"
    -- This queue listens for anything under `greetings.`
    Worker.worker conn def q onError $ \m -> putStrLn "Got: " >> print (value m)
onError :: WorkerException SomeException -> IO ()
onError e = do
    putStrLn "Do something with errors"
    print e
test :: (Connection -> IO ()) -> IO ()
test action = do
    hSetBuffering stdout LineBuffering
    hSetBuffering stderr LineBuffering
    conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672")
    action conn
main :: IO ()
main = example