{-# LANGUAGE FlexibleContexts #-}

module Network.AMQP.Worker.Worker where

import Control.Concurrent (threadDelay)
import Control.Exception (SomeException (..))
import Control.Monad (forever)
import Control.Monad.Catch (Exception (..), MonadCatch, catch)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.Aeson (FromJSON)
import Data.ByteString.Lazy (ByteString)
import Data.Default (Default (..))

import Network.AMQP.Worker.Connection (Connection)
import Network.AMQP.Worker.Message (ConsumeResult (..), Message (..), Microseconds, ParseError (..), consumeNext)
import Network.AMQP.Worker.Queue (Queue (..))

-- | Create a worker which loops, checks for messages, and handles errors
--
-- > startWorker conn queue = do
-- >   Worker.worker conn def queue onError onMessage
-- >
-- >   where
-- >     onMessage :: Message User
-- >     onMessage m = do
-- >       putStrLn "handle user message"
-- >       print (value m)
-- >
-- >     onError :: WorkerException SomeException -> IO ()
-- >     onError e = do
-- >       putStrLn "Do something with errors"
worker :: (FromJSON a, MonadIO m, MonadCatch m) => Connection -> WorkerOptions -> Queue a -> (WorkerException SomeException -> m ()) -> (Message a -> m ()) -> m ()
worker :: forall a (m :: * -> *).
(FromJSON a, MonadIO m, MonadCatch m) =>
Connection
-> WorkerOptions
-> Queue a
-> (WorkerException SomeException -> m ())
-> (Message a -> m ())
-> m ()
worker Connection
conn WorkerOptions
opts Queue a
queue WorkerException SomeException -> m ()
onError Message a -> m ()
action =
    forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
        ConsumeResult a
eres <- forall msg (m :: * -> *).
(FromJSON msg, MonadIO m) =>
Microseconds -> Connection -> Queue msg -> m (ConsumeResult msg)
consumeNext (WorkerOptions -> Microseconds
pollDelay WorkerOptions
opts) Connection
conn Queue a
queue
        case ConsumeResult a
eres of
            Error (ParseError String
reason ByteString
bd) ->
                WorkerException SomeException -> m ()
onError (forall e. ByteString -> String -> WorkerException e
MessageParseError ByteString
bd String
reason)
            Parsed Message a
msg ->
                forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
catch
                    (Message a -> m ()
action Message a
msg)
                    (WorkerException SomeException -> m ()
onError forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall e. ByteString -> e -> WorkerException e
OtherException (forall a. Message a -> ByteString
body Message a
msg))
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Microseconds -> IO ()
threadDelay (WorkerOptions -> Microseconds
loopDelay WorkerOptions
opts)

-- | Options for worker
data WorkerOptions = WorkerOptions
    { WorkerOptions -> Microseconds
pollDelay :: Microseconds
    -- ^ Delay between checks to consume. Defaults to 10ms
    , WorkerOptions -> Microseconds
loopDelay :: Microseconds
    -- ^ Delay between calls to job. Defaults to 0
    }
    deriving (Microseconds -> WorkerOptions -> ShowS
[WorkerOptions] -> ShowS
WorkerOptions -> String
forall a.
(Microseconds -> a -> ShowS)
-> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [WorkerOptions] -> ShowS
$cshowList :: [WorkerOptions] -> ShowS
show :: WorkerOptions -> String
$cshow :: WorkerOptions -> String
showsPrec :: Microseconds -> WorkerOptions -> ShowS
$cshowsPrec :: Microseconds -> WorkerOptions -> ShowS
Show, WorkerOptions -> WorkerOptions -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: WorkerOptions -> WorkerOptions -> Bool
$c/= :: WorkerOptions -> WorkerOptions -> Bool
== :: WorkerOptions -> WorkerOptions -> Bool
$c== :: WorkerOptions -> WorkerOptions -> Bool
Eq)

instance Default WorkerOptions where
    def :: WorkerOptions
def =
        WorkerOptions
            { pollDelay :: Microseconds
pollDelay = Microseconds
10 forall a. Num a => a -> a -> a
* Microseconds
1000
            , loopDelay :: Microseconds
loopDelay = Microseconds
0
            }

-- | Exceptions created while processing
data WorkerException e
    = MessageParseError ByteString String
    | OtherException ByteString e
    deriving (Microseconds -> WorkerException e -> ShowS
forall e. Show e => Microseconds -> WorkerException e -> ShowS
forall e. Show e => [WorkerException e] -> ShowS
forall e. Show e => WorkerException e -> String
forall a.
(Microseconds -> a -> ShowS)
-> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [WorkerException e] -> ShowS
$cshowList :: forall e. Show e => [WorkerException e] -> ShowS
show :: WorkerException e -> String
$cshow :: forall e. Show e => WorkerException e -> String
showsPrec :: Microseconds -> WorkerException e -> ShowS
$cshowsPrec :: forall e. Show e => Microseconds -> WorkerException e -> ShowS
Show, WorkerException e -> WorkerException e -> Bool
forall e. Eq e => WorkerException e -> WorkerException e -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: WorkerException e -> WorkerException e -> Bool
$c/= :: forall e. Eq e => WorkerException e -> WorkerException e -> Bool
== :: WorkerException e -> WorkerException e -> Bool
$c== :: forall e. Eq e => WorkerException e -> WorkerException e -> Bool
Eq)

instance (Exception e) => Exception (WorkerException e)