module MessageDb.Subscription
( Subscription (..),
subscribe,
start,
)
where
import Control.Concurrent (threadDelay)
import Control.Exception.Safe (handleAny)
import Control.Monad (when)
import Data.Foldable (traverse_)
import Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as NonEmpty
import Data.Maybe (fromMaybe)
import Data.Void (Void)
import MessageDb.Functions ()
import qualified MessageDb.Functions as Functions
import qualified MessageDb.Handlers as Handlers
import MessageDb.Message (Message)
import qualified MessageDb.Message as Message
import MessageDb.StreamName (CategoryName)
import MessageDb.Subscription.FailureStrategy (FailureStrategy)
import qualified MessageDb.Subscription.FailureStrategy as FailureStrategy
import MessageDb.Subscription.PositionStrategy (PositionStrategy)
import qualified MessageDb.Subscription.PositionStrategy as PositionStrategy
import MessageDb.Units (Microseconds (..), NumberOfMessages (..))
data Subscription = Subscription
{ Subscription -> CategoryName
categoryName :: CategoryName
, Subscription -> NumberOfMessages
messagesPerTick :: NumberOfMessages
, Subscription -> Microseconds
tickInterval :: Microseconds
, Subscription -> NonEmpty Message -> IO ()
logMessages :: NonEmpty Message -> IO ()
, Subscription -> FailureStrategy
failureStrategy :: FailureStrategy
, Subscription -> PositionStrategy
positionStrategy :: PositionStrategy
, Subscription -> SubscriptionHandlers
handlers :: Handlers.SubscriptionHandlers
, Subscription -> Maybe ConsumerGroup
consumerGroup :: Maybe Functions.ConsumerGroup
, Subscription -> Maybe Condition
condition :: Maybe Functions.Condition
, Subscription -> Maybe Correlation
correlation :: Maybe Functions.Correlation
}
subscribe :: CategoryName -> Subscription
subscribe :: CategoryName -> Subscription
subscribe CategoryName
categoryName =
Subscription :: CategoryName
-> NumberOfMessages
-> Microseconds
-> (NonEmpty Message -> IO ())
-> FailureStrategy
-> PositionStrategy
-> SubscriptionHandlers
-> Maybe ConsumerGroup
-> Maybe Condition
-> Maybe Correlation
-> Subscription
Subscription
{ categoryName :: CategoryName
categoryName = CategoryName
categoryName
, messagesPerTick :: NumberOfMessages
messagesPerTick = NumberOfMessages
100
, tickInterval :: Microseconds
tickInterval = Microseconds
100_000
, logMessages :: NonEmpty Message -> IO ()
logMessages = \NonEmpty Message
_ -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
, failureStrategy :: FailureStrategy
failureStrategy = FailureStrategy
FailureStrategy.ignoreFailures
, positionStrategy :: PositionStrategy
positionStrategy = PositionStrategy
PositionStrategy.dontSave
, handlers :: SubscriptionHandlers
handlers = SubscriptionHandlers
forall output. Handlers output
Handlers.emptyHandlers
, consumerGroup :: Maybe ConsumerGroup
consumerGroup = Maybe ConsumerGroup
forall a. Maybe a
Nothing
, condition :: Maybe Condition
condition = Maybe Condition
forall a. Maybe a
Nothing
, correlation :: Maybe Correlation
correlation = Maybe Correlation
forall a. Maybe a
Nothing
}
start :: Functions.WithConnection -> Subscription -> IO Void
start :: WithConnection -> Subscription -> IO Void
start WithConnection
withConnection Subscription{Maybe Correlation
Maybe Condition
Maybe ConsumerGroup
SubscriptionHandlers
CategoryName
Microseconds
NumberOfMessages
PositionStrategy
FailureStrategy
NonEmpty Message -> IO ()
correlation :: Maybe Correlation
condition :: Maybe Condition
consumerGroup :: Maybe ConsumerGroup
handlers :: SubscriptionHandlers
positionStrategy :: PositionStrategy
failureStrategy :: FailureStrategy
logMessages :: NonEmpty Message -> IO ()
tickInterval :: Microseconds
messagesPerTick :: NumberOfMessages
categoryName :: CategoryName
correlation :: Subscription -> Maybe Correlation
condition :: Subscription -> Maybe Condition
consumerGroup :: Subscription -> Maybe ConsumerGroup
handlers :: Subscription -> SubscriptionHandlers
positionStrategy :: Subscription -> PositionStrategy
failureStrategy :: Subscription -> FailureStrategy
logMessages :: Subscription -> NonEmpty Message -> IO ()
tickInterval :: Subscription -> Microseconds
messagesPerTick :: Subscription -> NumberOfMessages
categoryName :: Subscription -> CategoryName
..} = do
let sleep :: IO ()
sleep :: IO ()
sleep =
Int -> IO ()
threadDelay (Natural -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Microseconds -> Natural
microsecondsToNatural Microseconds
tickInterval))
queryCategory :: Message.GlobalPosition -> IO [Message]
queryCategory :: GlobalPosition -> IO [Message]
queryCategory GlobalPosition
position =
(Connection -> IO [Message]) -> IO [Message]
WithConnection
withConnection ((Connection -> IO [Message]) -> IO [Message])
-> (Connection -> IO [Message]) -> IO [Message]
forall a b. (a -> b) -> a -> b
$ \Connection
connection ->
Connection
-> CategoryName
-> Maybe GlobalPosition
-> Maybe BatchSize
-> Maybe Correlation
-> Maybe ConsumerGroup
-> Maybe Condition
-> IO [Message]
Functions.getCategoryMessages
Connection
connection
CategoryName
categoryName
(GlobalPosition -> Maybe GlobalPosition
forall a. a -> Maybe a
Just GlobalPosition
position)
(BatchSize -> Maybe BatchSize
forall a. a -> Maybe a
Just (BatchSize -> Maybe BatchSize) -> BatchSize -> Maybe BatchSize
forall a b. (a -> b) -> a -> b
$ NumberOfMessages -> BatchSize
Functions.FixedSize NumberOfMessages
messagesPerTick)
Maybe Correlation
correlation
Maybe ConsumerGroup
consumerGroup
Maybe Condition
condition
handle :: Message -> IO ()
handle :: Message -> IO ()
handle Message
message =
let logHandleFailure :: HandleError -> IO ()
logHandleFailure =
FailureStrategy -> Message -> FailureReason -> IO ()
FailureStrategy.logFailure FailureStrategy
failureStrategy Message
message (FailureReason -> IO ())
-> (HandleError -> FailureReason) -> HandleError -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HandleError -> FailureReason
FailureStrategy.HandleFailure
logUnknownFailure :: SomeException -> IO ()
logUnknownFailure =
FailureStrategy -> Message -> FailureReason -> IO ()
FailureStrategy.logFailure FailureStrategy
failureStrategy Message
message (FailureReason -> IO ())
-> (SomeException -> FailureReason) -> SomeException -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> FailureReason
FailureStrategy.UnknownFailure
in (SomeException -> IO ()) -> IO () -> IO ()
forall (m :: * -> *) a.
MonadCatch m =>
(SomeException -> m a) -> m a -> m a
handleAny SomeException -> IO ()
logUnknownFailure (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Either HandleError ()
result <- SubscriptionHandlers -> Message -> IO (Either HandleError ())
Handlers.subscriptionHandle SubscriptionHandlers
handlers Message
message
(HandleError -> IO ())
-> (() -> IO ()) -> Either HandleError () -> IO ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either HandleError -> IO ()
logHandleFailure () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure Either HandleError ()
result
processMessages :: [Message] -> IO (NumberOfMessages, Maybe Message.GlobalPosition)
processMessages :: [Message] -> IO (NumberOfMessages, Maybe GlobalPosition)
processMessages [Message]
messages =
case [Message] -> Maybe (NonEmpty Message)
forall a. [a] -> Maybe (NonEmpty a)
NonEmpty.nonEmpty [Message]
messages of
Maybe (NonEmpty Message)
Nothing -> (NumberOfMessages, Maybe GlobalPosition)
-> IO (NumberOfMessages, Maybe GlobalPosition)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (NumberOfMessages
0, Maybe GlobalPosition
forall a. Maybe a
Nothing)
Just NonEmpty Message
nonEmptyMessages -> do
NonEmpty Message -> IO ()
logMessages NonEmpty Message
nonEmptyMessages
(Message -> IO ()) -> NonEmpty Message -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ Message -> IO ()
handle NonEmpty Message
nonEmptyMessages
(NumberOfMessages, Maybe GlobalPosition)
-> IO (NumberOfMessages, Maybe GlobalPosition)
forall (f :: * -> *) a. Applicative f => a -> f a
pure
( Natural -> NumberOfMessages
NumberOfMessages (Natural -> NumberOfMessages)
-> (Int -> Natural) -> Int -> NumberOfMessages
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> NumberOfMessages) -> Int -> NumberOfMessages
forall a b. (a -> b) -> a -> b
$ NonEmpty Message -> Int
forall a. NonEmpty a -> Int
NonEmpty.length NonEmpty Message
nonEmptyMessages
, GlobalPosition -> Maybe GlobalPosition
forall a. a -> Maybe a
Just (GlobalPosition -> Maybe GlobalPosition)
-> (Message -> GlobalPosition) -> Message -> Maybe GlobalPosition
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Message -> GlobalPosition
Message.messageGlobalPosition (Message -> Maybe GlobalPosition)
-> Message -> Maybe GlobalPosition
forall a b. (a -> b) -> a -> b
$ NonEmpty Message -> Message
forall a. NonEmpty a -> a
NonEmpty.last NonEmpty Message
nonEmptyMessages
)
poll :: Message.GlobalPosition -> Message.GlobalPosition -> IO Void
poll :: GlobalPosition -> GlobalPosition -> IO Void
poll GlobalPosition
initialPosition GlobalPosition
lastPositionSaved = do
[Message]
messages <- GlobalPosition -> IO [Message]
queryCategory GlobalPosition
initialPosition
(NumberOfMessages
numberOfMessages, Maybe GlobalPosition
lastMessagePosition) <- [Message] -> IO (NumberOfMessages, Maybe GlobalPosition)
processMessages [Message]
messages
let currentPosition :: GlobalPosition
currentPosition = GlobalPosition -> Maybe GlobalPosition -> GlobalPosition
forall a. a -> Maybe a -> a
fromMaybe GlobalPosition
initialPosition Maybe GlobalPosition
lastMessagePosition
nextPosition :: GlobalPosition
nextPosition = GlobalPosition
-> (GlobalPosition -> GlobalPosition)
-> Maybe GlobalPosition
-> GlobalPosition
forall b a. b -> (a -> b) -> Maybe a -> b
maybe GlobalPosition
initialPosition (GlobalPosition -> GlobalPosition -> GlobalPosition
forall a. Num a => a -> a -> a
+ GlobalPosition
1) Maybe GlobalPosition
lastMessagePosition
Maybe GlobalPosition
positionSaved <-
PositionStrategy
-> GlobalPosition -> GlobalPosition -> IO (Maybe GlobalPosition)
PositionStrategy.save
PositionStrategy
positionStrategy
GlobalPosition
lastPositionSaved
GlobalPosition
currentPosition
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NumberOfMessages
numberOfMessages NumberOfMessages -> NumberOfMessages -> Bool
forall a. Ord a => a -> a -> Bool
< NumberOfMessages
messagesPerTick) IO ()
sleep
GlobalPosition -> GlobalPosition -> IO Void
poll GlobalPosition
nextPosition (GlobalPosition -> Maybe GlobalPosition -> GlobalPosition
forall a. a -> Maybe a -> a
fromMaybe GlobalPosition
lastPositionSaved Maybe GlobalPosition
positionSaved)
GlobalPosition
lastPositionSaved <- PositionStrategy -> IO GlobalPosition
PositionStrategy.restore PositionStrategy
positionStrategy
GlobalPosition -> GlobalPosition -> IO Void
poll (GlobalPosition
lastPositionSaved GlobalPosition -> GlobalPosition -> GlobalPosition
forall a. Num a => a -> a -> a
+ GlobalPosition
1) GlobalPosition
lastPositionSaved