amqp-worker: Type-safe AMQP workers

This is a package candidate release! Here you can preview how this package release will appear once published to the main package index (which can be accomplished via the 'maintain' link below). Please note that once a package has been published to the main package index it cannot be undone! Please consult the package uploading documentation for more information.

[maintain] [Publish]

Please see the README on GitHub at https://github.com/seanhess/amqp-worker#readme


[Skip to Readme]

Properties

Versions 0.2.0, 0.2.1, 0.2.2, 0.2.3, 0.2.4, 0.2.5, 0.3.2, 0.4.0, 0.4.0, 1.0.0, 2.0.0
Change log CHANGELOG.md
Dependencies aeson (>=2.0 && <2.3), amqp (>=0.20 && <1), amqp-worker, base (>=4.9 && <5), bytestring (>=0.11 && <0.12), data-default (>=0.7 && <0.8), exceptions (>=0.10 && <0.11), monad-loops (>=0.4 && <0.5), mtl (>=2.2 && <2.3), resource-pool (>=0.3 && <0.5), text (>=1.2 && <1.3), transformers-base (>=0.4 && <0.5) [details]
License BSD-3-Clause
Copyright Orbital Labs
Author Sean Hess
Maintainer seanhess@gmail.com
Category Network
Home page https://github.com/seanhess/amqp-worker#readme
Bug tracker https://github.com/seanhess/amqp-worker/issues
Source repo head: git clone https://github.com/seanhess/amqp-worker
Uploaded by seanhess at 2023-08-24T19:07:21Z

Modules

[Index] [Quick Jump]

Downloads

Maintainer's Corner

Package maintainers

For package maintainers and hackage trustees


Readme for amqp-worker-0.4.0

[back to package description]

AMQP Worker

Type-safe AMQP workers. Compatible with RabbitMQ

{-# LANGUAGE DeriveGeneric     #-}
{-# LANGUAGE OverloadedStrings #-}
module Main where

import           Control.Concurrent      (forkIO)
import           Control.Monad.Catch     (SomeException)
import           Data.Aeson              (FromJSON, ToJSON)
import           Data.Function           ((&))
import           Data.Text               (Text, pack)
import           GHC.Generics            (Generic)
import           Network.AMQP.Worker     (Connection, Message (..),
                                          WorkerException, def, fromURI)
import qualified Network.AMQP.Worker     as Worker
import           Network.AMQP.Worker.Key
import           System.IO               (BufferMode (..), hSetBuffering,
                                          stderr, stdout)

data TestMessage = TestMessage
  { greeting :: Text }
  deriving (Generic, Show, Eq)

instance FromJSON TestMessage
instance ToJSON TestMessage


newMessages :: Key Routing TestMessage
newMessages = key "messages" & word "new"

results :: Key Routing Text
results = key "results"

anyMessages :: Key Binding TestMessage
anyMessages = key "messages" & star



example :: IO ()
example = do

  -- connect
  conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672")

  let handleAnyMessages = Worker.topic anyMessages "handleAnyMessage"

  -- initialize the queues
  Worker.bindQueue conn (Worker.direct newMessages)
  Worker.bindQueue conn (Worker.direct results)

  -- topic queue!
  Worker.bindQueue conn handleAnyMessages

  putStrLn "Enter a message"
  msg <- getLine

  -- publish a message
  putStrLn "Publishing a message"
  Worker.publish conn newMessages (TestMessage $ pack msg)

  -- create a worker, the program loops here
  _ <- forkIO $ Worker.worker conn def (Worker.direct newMessages) onError (onMessage conn)
  _ <- forkIO $ Worker.worker conn def (handleAnyMessages) onError (onMessage conn)

  putStrLn "Press any key to exit"
  _ <- getLine
  return ()




onMessage :: Connection -> Message TestMessage -> IO ()
onMessage conn m = do
  let testMessage = value m
  putStrLn "Got Message"
  print testMessage
  Worker.publish conn results (greeting testMessage)


onError :: WorkerException SomeException -> IO ()
onError e = do
  putStrLn "Do something with errors"
  print e



main :: IO ()
main = do
  hSetBuffering stdout LineBuffering
  hSetBuffering stderr LineBuffering
  example