{-# LANGUAGE ConstraintKinds #-} {-# LANGUAGE DataKinds #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} module Main where import Database.PostgreSQL.Consumers import Database.PostgreSQL.PQTypes import Database.PostgreSQL.PQTypes.Checks import Database.PostgreSQL.PQTypes.Model import Control.Concurrent import Control.Exception import Control.Monad import Control.Monad.IO.Class import Data.Int import Data.Monoid import Log import Log.Backend.StandardOutput import Prelude import System.Environment import TextShow import qualified Data.Text as T -- | Main application monad. See the 'log-base' and the 'hpqtypes' -- packages for documentation on 'DBT' and 'LogT'. type AppM a = DBT (LogT IO) a main :: IO () main = do args <- getArgs let connString = case args of [] -> defaultConnString (cs:_) -> cs let connSettings = def { csConnInfo = T.pack connString } ConnectionSource connSource = simpleSource connSettings -- Monad stack initialisation. withSimpleStdOutLogger $ \logger -> runLogT "consumers-example" logger $ runDBT connSource {- transactionSettings -} def $ do -- Initialise. createTables -- Create a consumer, put ten jobs into its queue, and wait -- for it to finish. 'runConsumer' returns a finaliser that is -- invoked by 'finalize' after the 'putJob' loop. finalize (localDomain "process" $ runConsumer consumerConfig connSource) $ forM_ [(0::Int)..10] $ \_ -> do putJob liftIO $ threadDelay (1 * 1000000) -- 1 sec -- Clean up. dropTables where -- How to connect to DB. defaultConnString = "postgresql://postgres@localhost/travis_ci_test" tables = [consumersTable, jobsTable] -- NB: order of migrations is important. migrations = [ createTableMigration consumersTable , createTableMigration jobsTable ] createTables :: AppM () createTables = do migrateDatabase {- options -} def {- extensions -} [] {- domains -} [] tables migrations checkDatabase {- options -} def {- domains -} [] tables dropTables :: AppM () dropTables = do migrateDatabase {- options -} def {- extensions -} [] {- domains -} [] {- tables -} [] [ dropTableMigration jobsTable , dropTableMigration consumersTable ] -- Configuration of a consumer. See -- 'Database.PostgreSQL.Consumers.Config.ConsumerConfig'. consumerConfig = ConsumerConfig { ccJobsTable = "consumers_example_jobs" , ccConsumersTable = "consumers_example_consumers" , ccJobSelectors = ["id", "message"] , ccJobFetcher = id , ccJobIndex = \(i::Int64, _msg::T.Text) -> i , ccNotificationChannel = Just "consumers_example_chan" , ccNotificationTimeout = 10 * 1000000 -- 10 sec , ccMaxRunningJobs = 1 , ccProcessJob = processJob , ccOnException = handleException } -- Add a job to the consumer's queue. putJob :: AppM () putJob = localDomain "put" $ do logInfo_ "putJob" runSQL_ $ "INSERT INTO consumers_example_jobs " <> "(run_at, finished_at, reserved_by, attempts, message) " <> "VALUES (NOW(), NULL, NULL, 0, 'hello')" commit -- Invoked when a job is ready to be processed. processJob :: (Int64, T.Text) -> AppM Result processJob (_idx, msg) = do logInfo_ msg return (Ok Remove) -- Invoked when 'processJob' throws an exception. Can handle -- failure in different ways, such as: remove the job from the -- queue, mark it as processed, or schedule it for rerun. handleException :: SomeException -> (Int64, T.Text) -> AppM Action handleException exc (idx, _msg) = do logAttention_ $ "Job #" <> showt idx <> " failed with: " <> showt exc return . RerunAfter $ imicroseconds 500000 -- | Table where jobs are stored. See -- 'Database.PostgreSQL.Consumers.Config.ConsumerConfig'. jobsTable :: Table jobsTable = tblTable { tblName = "consumers_example_jobs" , tblVersion = 1 , tblColumns = [ tblColumn { colName = "id", colType = BigSerialT , colNullable = False } , tblColumn { colName = "run_at", colType = TimestampWithZoneT , colNullable = True } , tblColumn { colName = "finished_at", colType = TimestampWithZoneT , colNullable = True } , tblColumn { colName = "reserved_by", colType = BigIntT , colNullable = True } , tblColumn { colName = "attempts", colType = IntegerT , colNullable = False } -- The only non-obligatory field: , tblColumn { colName = "message", colType = TextT , colNullable = False } ] , tblPrimaryKey = pkOnColumn "id" , tblForeignKeys = [ (fkOnColumn "reserved_by" "consumers_example_consumers" "id") { fkOnDelete = ForeignKeySetNull } ] } -- | Table where registered consumers are stored. See -- 'Database.PostgreSQL.Consumers.Config.ConsumerConfig'. consumersTable :: Table consumersTable = tblTable { tblName = "consumers_example_consumers" , tblVersion = 1 , tblColumns = [ tblColumn { colName = "id", colType = BigSerialT , colNullable = False } , tblColumn { colName = "name", colType = TextT , colNullable = False } , tblColumn { colName = "last_activity", colType = TimestampWithZoneT , colNullable = False } ] , tblPrimaryKey = pkOnColumn "id" } createTableMigration :: (MonadDB m) => Table -> Migration m createTableMigration tbl = Migration { mgrTableName = tblName tbl , mgrFrom = 0 , mgrAction = StandardMigration $ do createTable True tbl } dropTableMigration :: Table -> Migration m dropTableMigration tbl = Migration { mgrTableName = tblName tbl , mgrFrom = 1 , mgrAction = DropTableMigration DropTableRestrict }