module MessageDb.Subscription
( Subscription (..)
, subscribe
, start
)
where
import Control.Concurrent (threadDelay)
import Control.Exception.Safe (handleAny)
import Control.Monad (when)
import Data.Foldable (traverse_)
import Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as NonEmpty
import Data.Maybe (fromMaybe)
import qualified Data.Text as Text
import Data.Void (Void)
import MessageDb.Functions ()
import qualified MessageDb.Functions as Functions
import qualified MessageDb.Handlers as Handlers
import MessageDb.Message (Message)
import qualified MessageDb.Message as Message
import MessageDb.StreamName (Category)
import qualified MessageDb.Subscription.FailedMessage as FailedMessage
import MessageDb.Subscription.FailureStrategy (FailureStrategy)
import qualified MessageDb.Subscription.FailureStrategy as FailureStrategy
import MessageDb.Subscription.PositionStrategy (PositionStrategy)
import qualified MessageDb.Subscription.PositionStrategy as PositionStrategy
import MessageDb.Units (Microseconds (..), NumberOfMessages (..))
data Subscription = Subscription
{ Subscription -> Category
categoryName :: Category
, Subscription -> NumberOfMessages
batchSize :: NumberOfMessages
, Subscription -> Microseconds
tickInterval :: Microseconds
, Subscription -> NonEmpty Message -> IO ()
logMessages :: NonEmpty Message -> IO ()
, Subscription -> FailureStrategy
failureStrategy :: FailureStrategy
, Subscription -> PositionStrategy
positionStrategy :: PositionStrategy
, Subscription -> SubscriptionHandlers
handlers :: Handlers.SubscriptionHandlers
, Subscription -> Maybe ConsumerGroup
consumerGroup :: Maybe Functions.ConsumerGroup
, Subscription -> Maybe Condition
condition :: Maybe Functions.Condition
, Subscription -> Maybe Correlation
correlation :: Maybe Functions.Correlation
}
subscribe :: Category -> Subscription
subscribe :: Category -> Subscription
subscribe Category
categoryName =
Subscription :: Category
-> NumberOfMessages
-> Microseconds
-> (NonEmpty Message -> IO ())
-> FailureStrategy
-> PositionStrategy
-> SubscriptionHandlers
-> Maybe ConsumerGroup
-> Maybe Condition
-> Maybe Correlation
-> Subscription
Subscription
{ categoryName :: Category
categoryName = Category
categoryName
, batchSize :: NumberOfMessages
batchSize = NumberOfMessages
100
, tickInterval :: Microseconds
tickInterval = Microseconds
100_000
, logMessages :: NonEmpty Message -> IO ()
logMessages = \NonEmpty Message
_ -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
, failureStrategy :: FailureStrategy
failureStrategy = FailureStrategy
FailureStrategy.ignoreFailures
, positionStrategy :: PositionStrategy
positionStrategy = PositionStrategy
PositionStrategy.dontSave
, handlers :: SubscriptionHandlers
handlers = SubscriptionHandlers
forall output. Handlers output
Handlers.emptyHandlers
, consumerGroup :: Maybe ConsumerGroup
consumerGroup = Maybe ConsumerGroup
forall a. Maybe a
Nothing
, condition :: Maybe Condition
condition = Maybe Condition
forall a. Maybe a
Nothing
, correlation :: Maybe Correlation
correlation = Maybe Correlation
forall a. Maybe a
Nothing
}
start :: Functions.WithConnection -> Subscription -> IO Void
start :: WithConnection -> Subscription -> IO Void
start WithConnection
withConnection Subscription{Maybe Correlation
Maybe Condition
Maybe ConsumerGroup
SubscriptionHandlers
Category
Microseconds
NumberOfMessages
PositionStrategy
FailureStrategy
NonEmpty Message -> IO ()
correlation :: Maybe Correlation
condition :: Maybe Condition
consumerGroup :: Maybe ConsumerGroup
handlers :: SubscriptionHandlers
positionStrategy :: PositionStrategy
failureStrategy :: FailureStrategy
logMessages :: NonEmpty Message -> IO ()
tickInterval :: Microseconds
batchSize :: NumberOfMessages
categoryName :: Category
correlation :: Subscription -> Maybe Correlation
condition :: Subscription -> Maybe Condition
consumerGroup :: Subscription -> Maybe ConsumerGroup
handlers :: Subscription -> SubscriptionHandlers
positionStrategy :: Subscription -> PositionStrategy
failureStrategy :: Subscription -> FailureStrategy
logMessages :: Subscription -> NonEmpty Message -> IO ()
tickInterval :: Subscription -> Microseconds
batchSize :: Subscription -> NumberOfMessages
categoryName :: Subscription -> Category
..} = do
let sleep :: IO ()
sleep :: IO ()
sleep =
Int -> IO ()
threadDelay (Natural -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Microseconds -> Natural
microsecondsToNatural Microseconds
tickInterval))
queryCategory :: Message.GlobalPosition -> IO [Message]
queryCategory :: GlobalPosition -> IO [Message]
queryCategory GlobalPosition
position =
(Connection -> IO [Message]) -> IO [Message]
WithConnection
withConnection ((Connection -> IO [Message]) -> IO [Message])
-> (Connection -> IO [Message]) -> IO [Message]
forall a b. (a -> b) -> a -> b
$ \Connection
connection ->
Connection
-> Category
-> Maybe GlobalPosition
-> Maybe BatchSize
-> Maybe Correlation
-> Maybe ConsumerGroup
-> Maybe Condition
-> IO [Message]
Functions.getCategoryMessages
Connection
connection
Category
categoryName
(GlobalPosition -> Maybe GlobalPosition
forall a. a -> Maybe a
Just GlobalPosition
position)
(BatchSize -> Maybe BatchSize
forall a. a -> Maybe a
Just (BatchSize -> Maybe BatchSize) -> BatchSize -> Maybe BatchSize
forall a b. (a -> b) -> a -> b
$ NumberOfMessages -> BatchSize
Functions.FixedSize NumberOfMessages
batchSize)
Maybe Correlation
correlation
Maybe ConsumerGroup
consumerGroup
Maybe Condition
condition
handle :: Message -> IO ()
handle :: Message -> IO ()
handle Message
message =
let logHandleFailure :: HandleError -> IO ()
logHandleFailure HandleError
reason =
FailureStrategy -> FailedMessage -> IO ()
FailureStrategy.logFailure FailureStrategy
failureStrategy (FailedMessage -> IO ()) -> FailedMessage -> IO ()
forall a b. (a -> b) -> a -> b
$
Message -> FailureReason -> FailedMessage
FailedMessage.FailedMessage Message
message (HandleError -> FailureReason
FailedMessage.HandleFailure HandleError
reason)
logUnknownFailure :: a -> IO ()
logUnknownFailure a
exception =
FailureStrategy -> FailedMessage -> IO ()
FailureStrategy.logFailure FailureStrategy
failureStrategy (FailedMessage -> IO ()) -> FailedMessage -> IO ()
forall a b. (a -> b) -> a -> b
$
Message -> FailureReason -> FailedMessage
FailedMessage.FailedMessage Message
message (Text -> FailureReason
FailedMessage.UnknownFailure (String -> Text
Text.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ a -> String
forall a. Show a => a -> String
show a
exception))
in (SomeException -> IO ()) -> IO () -> IO ()
forall (m :: * -> *) a.
MonadCatch m =>
(SomeException -> m a) -> m a -> m a
handleAny SomeException -> IO ()
forall a. Show a => a -> IO ()
logUnknownFailure (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Either HandleError ()
result <- SubscriptionHandlers -> Message -> IO (Either HandleError ())
Handlers.subscriptionHandle SubscriptionHandlers
handlers Message
message
(HandleError -> IO ())
-> (() -> IO ()) -> Either HandleError () -> IO ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either HandleError -> IO ()
logHandleFailure () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure Either HandleError ()
result
processMessages :: [Message] -> IO (NumberOfMessages, Maybe Message.GlobalPosition)
processMessages :: [Message] -> IO (NumberOfMessages, Maybe GlobalPosition)
processMessages [Message]
messages =
case [Message] -> Maybe (NonEmpty Message)
forall a. [a] -> Maybe (NonEmpty a)
NonEmpty.nonEmpty [Message]
messages of
Maybe (NonEmpty Message)
Nothing -> (NumberOfMessages, Maybe GlobalPosition)
-> IO (NumberOfMessages, Maybe GlobalPosition)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (NumberOfMessages
0, Maybe GlobalPosition
forall a. Maybe a
Nothing)
Just NonEmpty Message
nonEmptyMessages -> do
NonEmpty Message -> IO ()
logMessages NonEmpty Message
nonEmptyMessages
(Message -> IO ()) -> NonEmpty Message -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ Message -> IO ()
handle NonEmpty Message
nonEmptyMessages
(NumberOfMessages, Maybe GlobalPosition)
-> IO (NumberOfMessages, Maybe GlobalPosition)
forall (f :: * -> *) a. Applicative f => a -> f a
pure
( Natural -> NumberOfMessages
NumberOfMessages (Natural -> NumberOfMessages)
-> (Int -> Natural) -> Int -> NumberOfMessages
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> NumberOfMessages) -> Int -> NumberOfMessages
forall a b. (a -> b) -> a -> b
$ NonEmpty Message -> Int
forall a. NonEmpty a -> Int
NonEmpty.length NonEmpty Message
nonEmptyMessages
, GlobalPosition -> Maybe GlobalPosition
forall a. a -> Maybe a
Just (GlobalPosition -> Maybe GlobalPosition)
-> (Message -> GlobalPosition) -> Message -> Maybe GlobalPosition
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Message -> GlobalPosition
Message.messageGlobalPosition (Message -> Maybe GlobalPosition)
-> Message -> Maybe GlobalPosition
forall a b. (a -> b) -> a -> b
$ NonEmpty Message -> Message
forall a. NonEmpty a -> a
NonEmpty.last NonEmpty Message
nonEmptyMessages
)
poll :: Message.GlobalPosition -> Message.GlobalPosition -> IO Void
poll :: GlobalPosition -> GlobalPosition -> IO Void
poll GlobalPosition
initialPosition GlobalPosition
lastPositionSaved = do
[Message]
messages <- GlobalPosition -> IO [Message]
queryCategory GlobalPosition
initialPosition
(NumberOfMessages
numberOfMessages, Maybe GlobalPosition
lastMessagePosition) <- [Message] -> IO (NumberOfMessages, Maybe GlobalPosition)
processMessages [Message]
messages
let currentPosition :: GlobalPosition
currentPosition = GlobalPosition -> Maybe GlobalPosition -> GlobalPosition
forall a. a -> Maybe a -> a
fromMaybe GlobalPosition
initialPosition Maybe GlobalPosition
lastMessagePosition
nextPosition :: GlobalPosition
nextPosition = GlobalPosition
-> (GlobalPosition -> GlobalPosition)
-> Maybe GlobalPosition
-> GlobalPosition
forall b a. b -> (a -> b) -> Maybe a -> b
maybe GlobalPosition
initialPosition (GlobalPosition -> GlobalPosition -> GlobalPosition
forall a. Num a => a -> a -> a
+ GlobalPosition
1) Maybe GlobalPosition
lastMessagePosition
Maybe GlobalPosition
positionSaved <-
PositionStrategy
-> GlobalPosition -> GlobalPosition -> IO (Maybe GlobalPosition)
PositionStrategy.save
PositionStrategy
positionStrategy
GlobalPosition
lastPositionSaved
GlobalPosition
currentPosition
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NumberOfMessages
numberOfMessages NumberOfMessages -> NumberOfMessages -> Bool
forall a. Ord a => a -> a -> Bool
< NumberOfMessages
batchSize) IO ()
sleep
GlobalPosition -> GlobalPosition -> IO Void
poll GlobalPosition
nextPosition (GlobalPosition -> Maybe GlobalPosition -> GlobalPosition
forall a. a -> Maybe a -> a
fromMaybe GlobalPosition
lastPositionSaved Maybe GlobalPosition
positionSaved)
GlobalPosition
lastPositionSaved <- PositionStrategy -> IO GlobalPosition
PositionStrategy.restore PositionStrategy
positionStrategy
GlobalPosition -> GlobalPosition -> IO Void
poll (GlobalPosition
lastPositionSaved GlobalPosition -> GlobalPosition -> GlobalPosition
forall a. Num a => a -> a -> a
+ GlobalPosition
1) GlobalPosition
lastPositionSaved