amqp-worker: Type-safe AMQP workers

This is a package candidate release! Here you can preview how this package release will appear once published to the main package index (which can be accomplished via the 'maintain' link below). Please note that once a package has been published to the main package index it cannot be undone! Please consult the package uploading documentation for more information.

[maintain] [Publish]

Please see the README on GitHub at https://github.com/seanhess/amqp-worker#readme


[Skip to Readme]

Properties

Versions 0.2.0, 0.2.1, 0.2.2, 0.2.3, 0.2.4, 0.2.5, 0.3.2, 0.4.0, 1.0.0, 1.0.0, 2.0.0
Change log CHANGELOG.md
Dependencies aeson (>=2.0 && <2.3), amqp (>=0.20 && <1), amqp-worker, base (>=4.9 && <5), bytestring (>=0.11 && <0.12), data-default (>=0.7 && <0.8), exceptions (>=0.10 && <0.11), monad-loops (>=0.4 && <0.5), mtl (>=2.2 && <2.3), resource-pool (>=0.3 && <0.5), text (>=1.2 && <1.3) [details]
License BSD-3-Clause
Copyright Orbital Labs
Author Sean Hess
Maintainer seanhess@gmail.com
Category Network
Home page https://github.com/seanhess/amqp-worker#readme
Bug tracker https://github.com/seanhess/amqp-worker/issues
Source repo head: git clone https://github.com/seanhess/amqp-worker
Uploaded by seanhess at 2023-08-25T23:08:03Z

Modules

[Index] [Quick Jump]

Downloads

Maintainer's Corner

Package maintainers

For package maintainers and hackage trustees


Readme for amqp-worker-1.0.0

[back to package description]

AMQP Worker

Type-safe AMQP workers. Compatible with RabbitMQ

{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}

module Main where

import Control.Concurrent (forkIO)
import Control.Monad.Catch (SomeException)
import Data.Aeson (FromJSON, ToJSON)
import Data.Function ((&))
import Data.Text (Text)
import GHC.Generics (Generic)
import Network.AMQP.Worker
import qualified Network.AMQP.Worker as Worker
import System.IO (BufferMode (..), hSetBuffering, stderr, stdout)

newtype Greeting = Greeting
    {message :: Text}
    deriving (Generic, Show, Eq)

instance FromJSON Greeting
instance ToJSON Greeting

newGreetings :: Key Routing Greeting
newGreetings = key "greetings" & word "new"

anyGreetings :: Key Binding Greeting
anyGreetings = key "greetings" & any1

example :: IO ()
example = do
    conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672")
    simple conn

publishing :: Connection -> IO ()
publishing conn = do
    Worker.publish conn newGreetings $ Greeting "Hello"

-- | Create a queue to process messages
simple :: Connection -> IO ()
simple conn = do
    -- create a queue to receive them
    q <- Worker.queue conn def newGreetings

    -- publish a message (delivered to queue)
    Worker.publish conn newGreetings $ Greeting "Hello"

    -- We cannot publish to anyGreetings because it is a binding key (with wildcards in it)
    -- Worker.publish conn anyGreetings $ TestMessage "Compiler Error"

    -- Loop and print any values received
    Worker.worker conn def q onError (print . value)

-- | Multiple queues with distinct names will each get copies of published messages
multiple :: Connection -> IO ()
multiple conn = do
    -- create two separate queues
    one <- Worker.queue conn "one" newGreetings
    two <- Worker.queue conn "two" newGreetings

    -- publish a message (delivered to both)
    Worker.publish conn newGreetings $ Greeting "Hello"

    -- Each of these workers will receive the same message
    _ <- forkIO $ Worker.worker conn def one onError $ \m -> putStrLn "one" >> print (value m)
    _ <- forkIO $ Worker.worker conn def two onError $ \m -> putStrLn "two" >> print (value m)

    putStrLn "Press any key to exit"
    _ <- getLine
    return ()

-- | Create multiple workers on the same queue to load balance between them
balance :: Connection -> IO ()
balance conn = do
    -- create a single queue
    q <- Worker.queue conn def newGreetings

    -- publish two messages
    Worker.publish conn newGreetings $ Greeting "Hello1"
    Worker.publish conn newGreetings $ Greeting "Hello2"

    -- Each worker will receive one of the messages
    _ <- forkIO $ Worker.worker conn def q onError $ \m -> putStrLn "one" >> print (value m)
    _ <- forkIO $ Worker.worker conn def q onError $ \m -> putStrLn "two" >> print (value m)

    putStrLn "Press any key to exit"
    _ <- getLine
    return ()

-- | You can bind to messages dynamically with wildcards in Binding Keys
dynamic :: Connection -> IO ()
dynamic conn = do
    -- \| anyGreetings matches `greetings.*`
    q <- Worker.queue conn def anyGreetings

    -- You can only publish to a Routing Key. Publishing to anyGreetings will give a compile error
    Worker.publish conn newGreetings $ Greeting "Hello"

    -- This queue listens for anything under `greetings.`
    Worker.worker conn def q onError $ \m -> putStrLn "Got: " >> print (value m)

onError :: WorkerException SomeException -> IO ()
onError e = do
    putStrLn "Do something with errors"
    print e

test :: (Connection -> IO ()) -> IO ()
test action = do
    hSetBuffering stdout LineBuffering
    hSetBuffering stderr LineBuffering
    conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672")
    action conn

main :: IO ()
main = example