-- | Subscribe to a category and react to the messages.
module MessageDb.Subscription
  ( Subscription (..),
    subscribe,
    start,
  )
where

import Control.Concurrent (threadDelay)
import Control.Exception.Safe (handleAny)
import Control.Monad (when)
import Data.Foldable (traverse_)
import Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as NonEmpty
import Data.Maybe (fromMaybe)
import Data.Void (Void)
import MessageDb.Functions ()
import qualified MessageDb.Functions as Functions
import qualified MessageDb.Handlers as Handlers
import MessageDb.Message (Message)
import qualified MessageDb.Message as Message
import MessageDb.StreamName (CategoryName)
import MessageDb.Subscription.FailureStrategy (FailureStrategy)
import qualified MessageDb.Subscription.FailureStrategy as FailureStrategy
import MessageDb.Subscription.PositionStrategy (PositionStrategy)
import qualified MessageDb.Subscription.PositionStrategy as PositionStrategy
import MessageDb.Units (Microseconds (..), NumberOfMessages (..))


-- | Defines how to subscribe to a category.
data Subscription = Subscription
  { Subscription -> CategoryName
categoryName :: CategoryName
  , Subscription -> NumberOfMessages
messagesPerTick :: NumberOfMessages
  , Subscription -> Microseconds
tickInterval :: Microseconds
  , Subscription -> NonEmpty Message -> IO ()
logMessages :: NonEmpty Message -> IO ()
  , Subscription -> FailureStrategy
failureStrategy :: FailureStrategy
  , Subscription -> PositionStrategy
positionStrategy :: PositionStrategy
  , Subscription -> SubscriptionHandlers
handlers :: Handlers.SubscriptionHandlers
  , Subscription -> Maybe ConsumerGroup
consumerGroup :: Maybe Functions.ConsumerGroup
  , Subscription -> Maybe Condition
condition :: Maybe Functions.Condition
  , Subscription -> Maybe Correlation
correlation :: Maybe Functions.Correlation
  }


-- | Construct a new subscription.
subscribe :: CategoryName -> Subscription
subscribe :: CategoryName -> Subscription
subscribe CategoryName
categoryName =
  Subscription :: CategoryName
-> NumberOfMessages
-> Microseconds
-> (NonEmpty Message -> IO ())
-> FailureStrategy
-> PositionStrategy
-> SubscriptionHandlers
-> Maybe ConsumerGroup
-> Maybe Condition
-> Maybe Correlation
-> Subscription
Subscription
    { categoryName :: CategoryName
categoryName = CategoryName
categoryName
    , messagesPerTick :: NumberOfMessages
messagesPerTick = NumberOfMessages
100
    , tickInterval :: Microseconds
tickInterval = Microseconds
100_000
    , logMessages :: NonEmpty Message -> IO ()
logMessages = \NonEmpty Message
_ -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    , failureStrategy :: FailureStrategy
failureStrategy = FailureStrategy
FailureStrategy.ignoreFailures
    , positionStrategy :: PositionStrategy
positionStrategy = PositionStrategy
PositionStrategy.dontSave
    , handlers :: SubscriptionHandlers
handlers = SubscriptionHandlers
forall output. Handlers output
Handlers.emptyHandlers
    , consumerGroup :: Maybe ConsumerGroup
consumerGroup = Maybe ConsumerGroup
forall a. Maybe a
Nothing
    , condition :: Maybe Condition
condition = Maybe Condition
forall a. Maybe a
Nothing
    , correlation :: Maybe Correlation
correlation = Maybe Correlation
forall a. Maybe a
Nothing
    }


-- | Start the subscription. Notice this will never return.
start :: Functions.WithConnection -> Subscription -> IO Void
start :: WithConnection -> Subscription -> IO Void
start WithConnection
withConnection Subscription{Maybe Correlation
Maybe Condition
Maybe ConsumerGroup
SubscriptionHandlers
CategoryName
Microseconds
NumberOfMessages
PositionStrategy
FailureStrategy
NonEmpty Message -> IO ()
correlation :: Maybe Correlation
condition :: Maybe Condition
consumerGroup :: Maybe ConsumerGroup
handlers :: SubscriptionHandlers
positionStrategy :: PositionStrategy
failureStrategy :: FailureStrategy
logMessages :: NonEmpty Message -> IO ()
tickInterval :: Microseconds
messagesPerTick :: NumberOfMessages
categoryName :: CategoryName
correlation :: Subscription -> Maybe Correlation
condition :: Subscription -> Maybe Condition
consumerGroup :: Subscription -> Maybe ConsumerGroup
handlers :: Subscription -> SubscriptionHandlers
positionStrategy :: Subscription -> PositionStrategy
failureStrategy :: Subscription -> FailureStrategy
logMessages :: Subscription -> NonEmpty Message -> IO ()
tickInterval :: Subscription -> Microseconds
messagesPerTick :: Subscription -> NumberOfMessages
categoryName :: Subscription -> CategoryName
..} = do
  let sleep :: IO ()
      sleep :: IO ()
sleep =
        Int -> IO ()
threadDelay (Natural -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Microseconds -> Natural
microsecondsToNatural Microseconds
tickInterval))

      queryCategory :: Message.GlobalPosition -> IO [Message]
      queryCategory :: GlobalPosition -> IO [Message]
queryCategory GlobalPosition
position =
        (Connection -> IO [Message]) -> IO [Message]
WithConnection
withConnection ((Connection -> IO [Message]) -> IO [Message])
-> (Connection -> IO [Message]) -> IO [Message]
forall a b. (a -> b) -> a -> b
$ \Connection
connection ->
          Connection
-> CategoryName
-> Maybe GlobalPosition
-> Maybe BatchSize
-> Maybe Correlation
-> Maybe ConsumerGroup
-> Maybe Condition
-> IO [Message]
Functions.getCategoryMessages
            Connection
connection
            CategoryName
categoryName
            (GlobalPosition -> Maybe GlobalPosition
forall a. a -> Maybe a
Just GlobalPosition
position)
            (BatchSize -> Maybe BatchSize
forall a. a -> Maybe a
Just (BatchSize -> Maybe BatchSize) -> BatchSize -> Maybe BatchSize
forall a b. (a -> b) -> a -> b
$ NumberOfMessages -> BatchSize
Functions.FixedSize NumberOfMessages
messagesPerTick)
            Maybe Correlation
correlation
            Maybe ConsumerGroup
consumerGroup
            Maybe Condition
condition

      handle :: Message -> IO ()
      handle :: Message -> IO ()
handle Message
message =
        let logHandleFailure :: HandleError -> IO ()
logHandleFailure =
              FailureStrategy -> Message -> FailureReason -> IO ()
FailureStrategy.logFailure FailureStrategy
failureStrategy Message
message (FailureReason -> IO ())
-> (HandleError -> FailureReason) -> HandleError -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HandleError -> FailureReason
FailureStrategy.HandleFailure

            logUnknownFailure :: SomeException -> IO ()
logUnknownFailure =
              FailureStrategy -> Message -> FailureReason -> IO ()
FailureStrategy.logFailure FailureStrategy
failureStrategy Message
message (FailureReason -> IO ())
-> (SomeException -> FailureReason) -> SomeException -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> FailureReason
FailureStrategy.UnknownFailure
         in (SomeException -> IO ()) -> IO () -> IO ()
forall (m :: * -> *) a.
MonadCatch m =>
(SomeException -> m a) -> m a -> m a
handleAny SomeException -> IO ()
logUnknownFailure (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
              Either HandleError ()
result <- SubscriptionHandlers -> Message -> IO (Either HandleError ())
Handlers.subscriptionHandle SubscriptionHandlers
handlers Message
message
              (HandleError -> IO ())
-> (() -> IO ()) -> Either HandleError () -> IO ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either HandleError -> IO ()
logHandleFailure () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure Either HandleError ()
result

      processMessages :: [Message] -> IO (NumberOfMessages, Maybe Message.GlobalPosition)
      processMessages :: [Message] -> IO (NumberOfMessages, Maybe GlobalPosition)
processMessages [Message]
messages =
        case [Message] -> Maybe (NonEmpty Message)
forall a. [a] -> Maybe (NonEmpty a)
NonEmpty.nonEmpty [Message]
messages of
          Maybe (NonEmpty Message)
Nothing -> (NumberOfMessages, Maybe GlobalPosition)
-> IO (NumberOfMessages, Maybe GlobalPosition)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (NumberOfMessages
0, Maybe GlobalPosition
forall a. Maybe a
Nothing)
          Just NonEmpty Message
nonEmptyMessages -> do
            NonEmpty Message -> IO ()
logMessages NonEmpty Message
nonEmptyMessages
            (Message -> IO ()) -> NonEmpty Message -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ Message -> IO ()
handle NonEmpty Message
nonEmptyMessages
            (NumberOfMessages, Maybe GlobalPosition)
-> IO (NumberOfMessages, Maybe GlobalPosition)
forall (f :: * -> *) a. Applicative f => a -> f a
pure
              ( Natural -> NumberOfMessages
NumberOfMessages (Natural -> NumberOfMessages)
-> (Int -> Natural) -> Int -> NumberOfMessages
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> NumberOfMessages) -> Int -> NumberOfMessages
forall a b. (a -> b) -> a -> b
$ NonEmpty Message -> Int
forall a. NonEmpty a -> Int
NonEmpty.length NonEmpty Message
nonEmptyMessages
              , GlobalPosition -> Maybe GlobalPosition
forall a. a -> Maybe a
Just (GlobalPosition -> Maybe GlobalPosition)
-> (Message -> GlobalPosition) -> Message -> Maybe GlobalPosition
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Message -> GlobalPosition
Message.messageGlobalPosition (Message -> Maybe GlobalPosition)
-> Message -> Maybe GlobalPosition
forall a b. (a -> b) -> a -> b
$ NonEmpty Message -> Message
forall a. NonEmpty a -> a
NonEmpty.last NonEmpty Message
nonEmptyMessages
              )

      poll :: Message.GlobalPosition -> Message.GlobalPosition -> IO Void
      poll :: GlobalPosition -> GlobalPosition -> IO Void
poll GlobalPosition
initialPosition GlobalPosition
lastPositionSaved = do
        [Message]
messages <- GlobalPosition -> IO [Message]
queryCategory GlobalPosition
initialPosition

        (NumberOfMessages
numberOfMessages, Maybe GlobalPosition
lastMessagePosition) <- [Message] -> IO (NumberOfMessages, Maybe GlobalPosition)
processMessages [Message]
messages

        let currentPosition :: GlobalPosition
currentPosition = GlobalPosition -> Maybe GlobalPosition -> GlobalPosition
forall a. a -> Maybe a -> a
fromMaybe GlobalPosition
initialPosition Maybe GlobalPosition
lastMessagePosition
            nextPosition :: GlobalPosition
nextPosition = GlobalPosition
-> (GlobalPosition -> GlobalPosition)
-> Maybe GlobalPosition
-> GlobalPosition
forall b a. b -> (a -> b) -> Maybe a -> b
maybe GlobalPosition
initialPosition (GlobalPosition -> GlobalPosition -> GlobalPosition
forall a. Num a => a -> a -> a
+ GlobalPosition
1) Maybe GlobalPosition
lastMessagePosition

        Maybe GlobalPosition
positionSaved <-
          PositionStrategy
-> GlobalPosition -> GlobalPosition -> IO (Maybe GlobalPosition)
PositionStrategy.save
            PositionStrategy
positionStrategy
            GlobalPosition
lastPositionSaved
            GlobalPosition
currentPosition

        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NumberOfMessages
numberOfMessages NumberOfMessages -> NumberOfMessages -> Bool
forall a. Ord a => a -> a -> Bool
< NumberOfMessages
messagesPerTick) IO ()
sleep

        GlobalPosition -> GlobalPosition -> IO Void
poll GlobalPosition
nextPosition (GlobalPosition -> Maybe GlobalPosition -> GlobalPosition
forall a. a -> Maybe a -> a
fromMaybe GlobalPosition
lastPositionSaved Maybe GlobalPosition
positionSaved)

  GlobalPosition
lastPositionSaved <- PositionStrategy -> IO GlobalPosition
PositionStrategy.restore PositionStrategy
positionStrategy

  GlobalPosition -> GlobalPosition -> IO Void
poll (GlobalPosition
lastPositionSaved GlobalPosition -> GlobalPosition -> GlobalPosition
forall a. Num a => a -> a -> a
+ GlobalPosition
1) GlobalPosition
lastPositionSaved