{-# LANGUAGE ApplicativeDo #-}
module Freckle.App.Kafka.Consumer
( HasKafkaConsumer (..)
, withKafkaConsumer
, KafkaConsumerConfig (..)
, envKafkaConsumerConfig
, runConsumer
) where
import Freckle.App.Prelude
import Blammo.Logging
import Control.Lens (Lens', view)
import Control.Monad (forever)
import Data.Aeson
import Data.ByteString (ByteString)
import qualified Data.List.NonEmpty as NE
import qualified Data.Map.Strict as Map
import qualified Data.Text as T
import qualified Env
import Freckle.App.Async
import Freckle.App.Env
import Freckle.App.Exception
( AnnotatedException (..)
, annotatedExceptionMessageFrom
)
import Freckle.App.Kafka.Producer (envKafkaBrokerAddresses)
import Freckle.App.OpenTelemetry
import Kafka.Consumer hiding
( Timeout
, closeConsumer
, newConsumer
, runConsumer
, subscription
)
import qualified Kafka.Consumer as Kafka
import UnliftIO.Exception (bracket)
data KafkaConsumerConfig = KafkaConsumerConfig
{ KafkaConsumerConfig -> NonEmpty BrokerAddress
kafkaConsumerConfigBrokerAddresses :: NonEmpty BrokerAddress
, KafkaConsumerConfig -> ConsumerGroupId
kafkaConsumerConfigGroupId :: ConsumerGroupId
, KafkaConsumerConfig -> TopicName
kafkaConsumerConfigTopic :: TopicName
, KafkaConsumerConfig -> OffsetReset
kafkaConsumerConfigOffsetReset :: OffsetReset
, :: Map Text Text
}
deriving stock (Int -> KafkaConsumerConfig -> ShowS
[KafkaConsumerConfig] -> ShowS
KafkaConsumerConfig -> String
(Int -> KafkaConsumerConfig -> ShowS)
-> (KafkaConsumerConfig -> String)
-> ([KafkaConsumerConfig] -> ShowS)
-> Show KafkaConsumerConfig
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> KafkaConsumerConfig -> ShowS
showsPrec :: Int -> KafkaConsumerConfig -> ShowS
$cshow :: KafkaConsumerConfig -> String
show :: KafkaConsumerConfig -> String
$cshowList :: [KafkaConsumerConfig] -> ShowS
showList :: [KafkaConsumerConfig] -> ShowS
Show)
envKafkaTopic
:: Env.Parser Env.Error TopicName
envKafkaTopic :: Parser Error TopicName
envKafkaTopic =
Reader Error TopicName
-> String -> Mod Var TopicName -> Parser Error TopicName
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var
((String -> Either String TopicName) -> Reader Error TopicName
forall a. (String -> Either String a) -> Reader Error a
eitherReader String -> Either String TopicName
readKafkaTopic)
String
"KAFKA_TOPIC"
Mod Var TopicName
forall a. Monoid a => a
mempty
readKafkaTopic :: String -> Either String TopicName
readKafkaTopic :: String -> Either String TopicName
readKafkaTopic String
t = case String -> Text
T.pack String
t of
Text
"" -> String -> Either String TopicName
forall a b. a -> Either a b
Left String
"Kafka topics cannot be empty"
Text
x -> TopicName -> Either String TopicName
forall a b. b -> Either a b
Right (TopicName -> Either String TopicName)
-> TopicName -> Either String TopicName
forall a b. (a -> b) -> a -> b
$ Text -> TopicName
TopicName Text
x
envKafkaOffsetReset
:: Env.Parser Env.Error OffsetReset
envKafkaOffsetReset :: Parser Error OffsetReset
envKafkaOffsetReset =
Reader Error OffsetReset
-> String -> Mod Var OffsetReset -> Parser Error OffsetReset
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var
((String -> Either String OffsetReset) -> Reader Error OffsetReset
forall a. (String -> Either String a) -> Reader Error a
eitherReader String -> Either String OffsetReset
readKafkaOffsetReset)
String
"KAFKA_OFFSET_RESET"
(Mod Var OffsetReset -> Parser Error OffsetReset)
-> Mod Var OffsetReset -> Parser Error OffsetReset
forall a b. (a -> b) -> a -> b
$ OffsetReset -> Mod Var OffsetReset
forall a. a -> Mod Var a
Env.def OffsetReset
Earliest
readKafkaOffsetReset :: String -> Either String OffsetReset
readKafkaOffsetReset :: String -> Either String OffsetReset
readKafkaOffsetReset String
t = case String -> Text
T.pack String
t of
Text
"earliest" -> OffsetReset -> Either String OffsetReset
forall a b. b -> Either a b
Right OffsetReset
Earliest
Text
"latest" -> OffsetReset -> Either String OffsetReset
forall a b. b -> Either a b
Right OffsetReset
Latest
Text
_ -> String -> Either String OffsetReset
forall a b. a -> Either a b
Left String
"Kafka offset reset must be one of earliest or latest"
envKafkaConsumerConfig
:: Env.Parser Env.Error KafkaConsumerConfig
envKafkaConsumerConfig :: Parser Error KafkaConsumerConfig
envKafkaConsumerConfig = do
NonEmpty BrokerAddress
brokerAddresses <- Parser Error (NonEmpty BrokerAddress)
envKafkaBrokerAddresses
ConsumerGroupId
consumerGroupId <- Reader Error ConsumerGroupId
-> String
-> Mod Var ConsumerGroupId
-> Parser Error ConsumerGroupId
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var Reader Error ConsumerGroupId
forall e s. (AsEmpty e, IsString s) => Reader e s
Env.nonempty String
"KAFKA_CONSUMER_GROUP_ID" Mod Var ConsumerGroupId
forall a. Monoid a => a
mempty
TopicName
kafkaTopic <- Parser Error TopicName
envKafkaTopic
OffsetReset
kafkaOffsetReset <- Parser Error OffsetReset
envKafkaOffsetReset
Map Text Text
kafkaExtraProps <-
Reader Error (Map Text Text)
-> String
-> Mod Var (Map Text Text)
-> Parser Error (Map Text Text)
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var
(([(Text, Text)] -> Map Text Text)
-> Either Error [(Text, Text)] -> Either Error (Map Text Text)
forall a b. (a -> b) -> Either Error a -> Either Error b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList (Either Error [(Text, Text)] -> Either Error (Map Text Text))
-> (String -> Either Error [(Text, Text)])
-> Reader Error (Map Text Text)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Either Error [(Text, Text)]
keyValues)
String
"KAFKA_EXTRA_SUBSCRIPTION_PROPS"
(Map Text Text -> Mod Var (Map Text Text)
forall a. a -> Mod Var a
Env.def Map Text Text
forall a. Monoid a => a
mempty)
pure $
NonEmpty BrokerAddress
-> ConsumerGroupId
-> TopicName
-> OffsetReset
-> Map Text Text
-> KafkaConsumerConfig
KafkaConsumerConfig
NonEmpty BrokerAddress
brokerAddresses
ConsumerGroupId
consumerGroupId
TopicName
kafkaTopic
OffsetReset
kafkaOffsetReset
Map Text Text
kafkaExtraProps
class HasKafkaConsumer env where
kafkaConsumerL :: Lens' env KafkaConsumer
consumerProps :: KafkaConsumerConfig -> ConsumerProperties
consumerProps :: KafkaConsumerConfig -> ConsumerProperties
consumerProps KafkaConsumerConfig {NonEmpty BrokerAddress
Map Text Text
TopicName
OffsetReset
ConsumerGroupId
kafkaConsumerConfigBrokerAddresses :: KafkaConsumerConfig -> NonEmpty BrokerAddress
kafkaConsumerConfigGroupId :: KafkaConsumerConfig -> ConsumerGroupId
kafkaConsumerConfigTopic :: KafkaConsumerConfig -> TopicName
kafkaConsumerConfigOffsetReset :: KafkaConsumerConfig -> OffsetReset
kafkaConsumerConfigExtraSubscriptionProps :: KafkaConsumerConfig -> Map Text Text
kafkaConsumerConfigBrokerAddresses :: NonEmpty BrokerAddress
kafkaConsumerConfigGroupId :: ConsumerGroupId
kafkaConsumerConfigTopic :: TopicName
kafkaConsumerConfigOffsetReset :: OffsetReset
kafkaConsumerConfigExtraSubscriptionProps :: Map Text Text
..} =
[BrokerAddress] -> ConsumerProperties
brokersList [BrokerAddress]
brokers
ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall a. Semigroup a => a -> a -> a
<> ConsumerGroupId -> ConsumerProperties
groupId ConsumerGroupId
kafkaConsumerConfigGroupId
ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall a. Semigroup a => a -> a -> a
<> ConsumerProperties
noAutoCommit
ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall a. Semigroup a => a -> a -> a
<> ConsumerProperties
noAutoOffsetStore
ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall a. Semigroup a => a -> a -> a
<> KafkaLogLevel -> ConsumerProperties
logLevel KafkaLogLevel
KafkaLogInfo
where
brokers :: [BrokerAddress]
brokers = NonEmpty BrokerAddress -> [BrokerAddress]
forall a. NonEmpty a -> [a]
NE.toList NonEmpty BrokerAddress
kafkaConsumerConfigBrokerAddresses
subscription :: KafkaConsumerConfig -> Subscription
subscription :: KafkaConsumerConfig -> Subscription
subscription KafkaConsumerConfig {NonEmpty BrokerAddress
Map Text Text
TopicName
OffsetReset
ConsumerGroupId
kafkaConsumerConfigBrokerAddresses :: KafkaConsumerConfig -> NonEmpty BrokerAddress
kafkaConsumerConfigGroupId :: KafkaConsumerConfig -> ConsumerGroupId
kafkaConsumerConfigTopic :: KafkaConsumerConfig -> TopicName
kafkaConsumerConfigOffsetReset :: KafkaConsumerConfig -> OffsetReset
kafkaConsumerConfigExtraSubscriptionProps :: KafkaConsumerConfig -> Map Text Text
kafkaConsumerConfigBrokerAddresses :: NonEmpty BrokerAddress
kafkaConsumerConfigGroupId :: ConsumerGroupId
kafkaConsumerConfigTopic :: TopicName
kafkaConsumerConfigOffsetReset :: OffsetReset
kafkaConsumerConfigExtraSubscriptionProps :: Map Text Text
..} =
[TopicName] -> Subscription
topics [TopicName
kafkaConsumerConfigTopic]
Subscription -> Subscription -> Subscription
forall a. Semigroup a => a -> a -> a
<> OffsetReset -> Subscription
offsetReset OffsetReset
kafkaConsumerConfigOffsetReset
Subscription -> Subscription -> Subscription
forall a. Semigroup a => a -> a -> a
<> Map Text Text -> Subscription
extraSubscriptionProps Map Text Text
kafkaConsumerConfigExtraSubscriptionProps
withKafkaConsumer
:: (MonadUnliftIO m, HasCallStack)
=> KafkaConsumerConfig
-> (KafkaConsumer -> m a)
-> m a
withKafkaConsumer :: forall (m :: * -> *) a.
(MonadUnliftIO m, HasCallStack) =>
KafkaConsumerConfig -> (KafkaConsumer -> m a) -> m a
withKafkaConsumer KafkaConsumerConfig
config = m KafkaConsumer
-> (KafkaConsumer -> m ()) -> (KafkaConsumer -> m a) -> m a
forall (m :: * -> *) a b c.
MonadUnliftIO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket m KafkaConsumer
newConsumer KafkaConsumer -> m ()
forall {m :: * -> *}. MonadIO m => KafkaConsumer -> m ()
closeConsumer
where
(ConsumerProperties
props, Subscription
sub) = (KafkaConsumerConfig -> ConsumerProperties
consumerProps (KafkaConsumerConfig -> ConsumerProperties)
-> (KafkaConsumerConfig -> Subscription)
-> KafkaConsumerConfig
-> (ConsumerProperties, Subscription)
forall b c c'. (b -> c) -> (b -> c') -> b -> (c, c')
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& KafkaConsumerConfig -> Subscription
subscription) KafkaConsumerConfig
config
newConsumer :: m KafkaConsumer
newConsumer = (KafkaError -> m KafkaConsumer)
-> (KafkaConsumer -> m KafkaConsumer)
-> Either KafkaError KafkaConsumer
-> m KafkaConsumer
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either KafkaError -> m KafkaConsumer
forall e (m :: * -> *) a.
(Exception e, MonadIO m, HasCallStack) =>
e -> m a
throwM KafkaConsumer -> m KafkaConsumer
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either KafkaError KafkaConsumer -> m KafkaConsumer)
-> m (Either KafkaError KafkaConsumer) -> m KafkaConsumer
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< ConsumerProperties
-> Subscription -> m (Either KafkaError KafkaConsumer)
forall (m :: * -> *).
MonadIO m =>
ConsumerProperties
-> Subscription -> m (Either KafkaError KafkaConsumer)
Kafka.newConsumer ConsumerProperties
props Subscription
sub
closeConsumer :: KafkaConsumer -> m ()
closeConsumer = m () -> (KafkaError -> m ()) -> Maybe KafkaError -> m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) KafkaError -> m ()
forall e (m :: * -> *) a.
(Exception e, MonadIO m, HasCallStack) =>
e -> m a
throwM (Maybe KafkaError -> m ())
-> (KafkaConsumer -> m (Maybe KafkaError)) -> KafkaConsumer -> m ()
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> (a -> m b) -> a -> m c
<=< KafkaConsumer -> m (Maybe KafkaError)
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer -> m (Maybe KafkaError)
Kafka.closeConsumer
timeoutMs :: Timeout -> Int
timeoutMs :: Timeout -> Int
timeoutMs = \case
TimeoutSeconds Int
s -> Int
s Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000
TimeoutMilliseconds Int
ms -> Int
ms
data KafkaMessageDecodeError = KafkaMessageDecodeError
{ KafkaMessageDecodeError -> ByteString
input :: ByteString
, KafkaMessageDecodeError -> String
errors :: String
}
deriving stock (Int -> KafkaMessageDecodeError -> ShowS
[KafkaMessageDecodeError] -> ShowS
KafkaMessageDecodeError -> String
(Int -> KafkaMessageDecodeError -> ShowS)
-> (KafkaMessageDecodeError -> String)
-> ([KafkaMessageDecodeError] -> ShowS)
-> Show KafkaMessageDecodeError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> KafkaMessageDecodeError -> ShowS
showsPrec :: Int -> KafkaMessageDecodeError -> ShowS
$cshow :: KafkaMessageDecodeError -> String
show :: KafkaMessageDecodeError -> String
$cshowList :: [KafkaMessageDecodeError] -> ShowS
showList :: [KafkaMessageDecodeError] -> ShowS
Show)
instance Exception KafkaMessageDecodeError where
displayException :: KafkaMessageDecodeError -> String
displayException KafkaMessageDecodeError {String
ByteString
input :: KafkaMessageDecodeError -> ByteString
errors :: KafkaMessageDecodeError -> String
input :: ByteString
errors :: String
..} =
[String] -> String
forall a. Monoid a => [a] -> a
mconcat
[ String
"Unable to decode JSON"
, String
"\n input: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Text -> String
unpack (ByteString -> Text
decodeUtf8 ByteString
input)
, String
"\n errors: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
errors
]
runConsumer
:: ( MonadMask m
, MonadUnliftIO m
, MonadReader env m
, MonadLogger m
, MonadTracer m
, HasKafkaConsumer env
, FromJSON a
, HasCallStack
)
=> Timeout
-> (a -> m ())
-> m ()
runConsumer :: forall (m :: * -> *) env a.
(MonadMask m, MonadUnliftIO m, MonadReader env m, MonadLogger m,
MonadTracer m, HasKafkaConsumer env, FromJSON a, HasCallStack) =>
Timeout -> (a -> m ()) -> m ()
runConsumer Timeout
pollTimeout a -> m ()
onMessage =
m () -> m ()
forall (m :: * -> *) a. (MonadIO m, MonadMask m) => m a -> m a
withTraceIdContext (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ (Either SomeException () -> m ()) -> m () -> m ()
forall (m :: * -> *) a.
(MonadMask m, MonadUnliftIO m) =>
(Either SomeException () -> m ()) -> m () -> m a
immortalCreate Either SomeException () -> m ()
forall {m :: * -> *} {s} {a}.
(MonadReader s m, HasKafkaConsumer s, MonadLogger m, MonadIO m,
Exception a) =>
Either a () -> m ()
onFinish (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
KafkaConsumer
consumer <- Getting KafkaConsumer env KafkaConsumer -> m KafkaConsumer
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting KafkaConsumer env KafkaConsumer
forall env. HasKafkaConsumer env => Lens' env KafkaConsumer
Lens' env KafkaConsumer
kafkaConsumerL
(m () -> [ExceptionHandler m ()] -> m ())
-> [ExceptionHandler m ()] -> m () -> m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip m () -> [ExceptionHandler m ()] -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, HasCallStack) =>
m a -> [ExceptionHandler m a] -> m a
catches [ExceptionHandler m ()]
forall {m :: * -> *}. MonadLogger m => [ExceptionHandler m ()]
handlers (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> SpanArguments -> m () -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadTracer m, HasCallStack) =>
Text -> SpanArguments -> m a -> m a
inSpan Text
"kafka.consumer" SpanArguments
consumerSpanArguments (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Maybe (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
mRecord <- Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> m (Maybe (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall (m :: * -> *) a.
(MonadIO m, MonadLogger m, HasCallStack) =>
Either KafkaError a -> m (Maybe a)
fromKafkaError (Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> m (Maybe
(ConsumerRecord (Maybe ByteString) (Maybe ByteString))))
-> m (Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
-> m (Maybe (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< KafkaConsumer
-> Timeout
-> m (Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer
-> Timeout
-> m (Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
pollMessage KafkaConsumer
consumer Timeout
kTimeout
Maybe (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> (ConsumerRecord (Maybe ByteString) (Maybe ByteString) -> m ())
-> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
mRecord ((ConsumerRecord (Maybe ByteString) (Maybe ByteString) -> m ())
-> m ())
-> (ConsumerRecord (Maybe ByteString) (Maybe ByteString) -> m ())
-> m ()
forall a b. (a -> b) -> a -> b
$ \ConsumerRecord (Maybe ByteString) (Maybe ByteString)
r -> do
Maybe ByteString -> (ByteString -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> Maybe ByteString
forall k v. ConsumerRecord k v -> v
crValue ConsumerRecord (Maybe ByteString) (Maybe ByteString)
r) ((ByteString -> m ()) -> m ()) -> (ByteString -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \ByteString
bs -> do
a
a <-
Text -> SpanArguments -> m a -> m a
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadTracer m, HasCallStack) =>
Text -> SpanArguments -> m a -> m a
inSpan Text
"kafka.consumer.message.decode" SpanArguments
defaultSpanArguments (m a -> m a) -> m a -> m a
forall a b. (a -> b) -> a -> b
$
(String -> m a) -> (a -> m a) -> Either String a -> m a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (KafkaMessageDecodeError -> m a
forall e (m :: * -> *) a.
(Exception e, MonadIO m, HasCallStack) =>
e -> m a
throwM (KafkaMessageDecodeError -> m a)
-> (String -> KafkaMessageDecodeError) -> String -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> String -> KafkaMessageDecodeError
KafkaMessageDecodeError ByteString
bs) a -> m a
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either String a -> m a) -> Either String a -> m a
forall a b. (a -> b) -> a -> b
$
ByteString -> Either String a
forall a. FromJSON a => ByteString -> Either String a
eitherDecodeStrict ByteString
bs
Text -> SpanArguments -> m () -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadTracer m, HasCallStack) =>
Text -> SpanArguments -> m a -> m a
inSpan Text
"kafka.consumer.message.handle" SpanArguments
defaultSpanArguments (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ a -> m ()
onMessage a
a
Text -> SpanArguments -> m () -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadTracer m, HasCallStack) =>
Text -> SpanArguments -> m a -> m a
inSpan Text
"kafka.consumer.message.commit" SpanArguments
defaultSpanArguments (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Message -> m (Maybe KafkaError) -> m ()
forall {m :: * -> *} {a}.
(MonadLogger m, Exception a) =>
Message -> m (Maybe a) -> m ()
logExMay Message
"Unable to store offset" (m (Maybe KafkaError) -> m ()) -> m (Maybe KafkaError) -> m ()
forall a b. (a -> b) -> a -> b
$ KafkaConsumer
-> ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> m (Maybe KafkaError)
forall (m :: * -> *) k v.
MonadIO m =>
KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError)
storeOffsetMessage KafkaConsumer
consumer ConsumerRecord (Maybe ByteString) (Maybe ByteString)
r
m (Maybe KafkaError) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Maybe KafkaError) -> m ()) -> m (Maybe KafkaError) -> m ()
forall a b. (a -> b) -> a -> b
$ OffsetCommit -> KafkaConsumer -> m (Maybe KafkaError)
forall (m :: * -> *).
MonadIO m =>
OffsetCommit -> KafkaConsumer -> m (Maybe KafkaError)
commitAllOffsets OffsetCommit
OffsetCommitAsync KafkaConsumer
consumer
where
kTimeout :: Timeout
kTimeout = Int -> Timeout
Kafka.Timeout (Int -> Timeout) -> Int -> Timeout
forall a b. (a -> b) -> a -> b
$ Timeout -> Int
timeoutMs Timeout
pollTimeout
handlers :: [ExceptionHandler m ()]
handlers =
[ (AnnotatedException KafkaError -> m ()) -> ExceptionHandler m ()
forall (m :: * -> *) a e.
Exception e =>
(e -> m a) -> ExceptionHandler m a
ExceptionHandler ((AnnotatedException KafkaError -> m ()) -> ExceptionHandler m ())
-> (AnnotatedException KafkaError -> m ()) -> ExceptionHandler m ()
forall a b. (a -> b) -> a -> b
$
Text -> Message -> m ()
forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Text -> Message -> m ()
logErrorNS Text
"kafka"
(Message -> m ())
-> (AnnotatedException KafkaError -> Message)
-> AnnotatedException KafkaError
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall ex.
Exception ex =>
(ex -> Message) -> AnnotatedException ex -> Message
annotatedExceptionMessageFrom @KafkaError
(Message -> KafkaError -> Message
forall a b. a -> b -> a
const Message
"Error polling for message from Kafka")
, (AnnotatedException KafkaMessageDecodeError -> m ())
-> ExceptionHandler m ()
forall (m :: * -> *) a e.
Exception e =>
(e -> m a) -> ExceptionHandler m a
ExceptionHandler ((AnnotatedException KafkaMessageDecodeError -> m ())
-> ExceptionHandler m ())
-> (AnnotatedException KafkaMessageDecodeError -> m ())
-> ExceptionHandler m ()
forall a b. (a -> b) -> a -> b
$
Text -> Message -> m ()
forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Text -> Message -> m ()
logErrorNS Text
"kafka"
(Message -> m ())
-> (AnnotatedException KafkaMessageDecodeError -> Message)
-> AnnotatedException KafkaMessageDecodeError
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall ex.
Exception ex =>
(ex -> Message) -> AnnotatedException ex -> Message
annotatedExceptionMessageFrom @KafkaMessageDecodeError
(Message -> KafkaMessageDecodeError -> Message
forall a b. a -> b -> a
const Message
"Could not decode message value")
]
onFinish :: Either a () -> m ()
onFinish Either a ()
finishResult = do
KafkaConsumer
consumer <- Getting KafkaConsumer s KafkaConsumer -> m KafkaConsumer
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting KafkaConsumer s KafkaConsumer
forall env. HasKafkaConsumer env => Lens' env KafkaConsumer
Lens' s KafkaConsumer
kafkaConsumerL
Message -> m (Maybe KafkaError) -> m ()
forall {m :: * -> *} {a}.
(MonadLogger m, Exception a) =>
Message -> m (Maybe a) -> m ()
logExMay
Message
"Unable to commit offsets"
(Maybe KafkaError -> Maybe KafkaError
silenceNoOffsetError (Maybe KafkaError -> Maybe KafkaError)
-> m (Maybe KafkaError) -> m (Maybe KafkaError)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> OffsetCommit -> KafkaConsumer -> m (Maybe KafkaError)
forall (m :: * -> *).
MonadIO m =>
OffsetCommit -> KafkaConsumer -> m (Maybe KafkaError)
commitAllOffsets OffsetCommit
OffsetCommit KafkaConsumer
consumer)
(a -> m ()) -> (() -> m ()) -> Either a () -> m ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (Message -> a -> m ()
forall {m :: * -> *} {ex}.
(MonadLogger m, Exception ex) =>
Message -> ex -> m ()
logEx Message
"Unexpected finish") () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Either a ()
finishResult
logExMay :: Message -> m (Maybe a) -> m ()
logExMay Message
msg m (Maybe a)
f = m () -> (a -> m ()) -> Maybe a -> m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (Message -> a -> m ()
forall {m :: * -> *} {ex}.
(MonadLogger m, Exception ex) =>
Message -> ex -> m ()
logEx Message
msg) (Maybe a -> m ()) -> m (Maybe a) -> m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< m (Maybe a)
f
logEx :: Message -> ex -> m ()
logEx Message
msg =
Text -> Message -> m ()
forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Text -> Message -> m ()
logErrorNS Text
"kafka"
(Message -> m ()) -> (ex -> Message) -> ex -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ex -> Message) -> AnnotatedException ex -> Message
forall ex.
Exception ex =>
(ex -> Message) -> AnnotatedException ex -> Message
annotatedExceptionMessageFrom (Message -> ex -> Message
forall a b. a -> b -> a
const Message
msg)
(AnnotatedException ex -> Message)
-> (ex -> AnnotatedException ex) -> ex -> Message
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Annotation] -> ex -> AnnotatedException ex
forall exception.
[Annotation] -> exception -> AnnotatedException exception
AnnotatedException []
silenceNoOffsetError :: Maybe KafkaError -> Maybe KafkaError
silenceNoOffsetError :: Maybe KafkaError -> Maybe KafkaError
silenceNoOffsetError = \case
Just (KafkaResponseError RdKafkaRespErrT
RdKafkaRespErrNoOffset) -> Maybe KafkaError
forall a. Maybe a
Nothing
Maybe KafkaError
e -> Maybe KafkaError
e
fromKafkaError
:: (MonadIO m, MonadLogger m, HasCallStack)
=> Either KafkaError a
-> m (Maybe a)
fromKafkaError :: forall (m :: * -> *) a.
(MonadIO m, MonadLogger m, HasCallStack) =>
Either KafkaError a -> m (Maybe a)
fromKafkaError =
(KafkaError -> m (Maybe a))
-> (a -> m (Maybe a)) -> Either KafkaError a -> m (Maybe a)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either
( \case
KafkaResponseError RdKafkaRespErrT
RdKafkaRespErrTimedOut ->
Maybe a
forall a. Maybe a
Nothing Maybe a -> m () -> m (Maybe a)
forall a b. a -> m b -> m a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Message -> m ()
forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Message -> m ()
logDebug Message
"Polling timeout"
KafkaError
err -> KafkaError -> m (Maybe a)
forall e (m :: * -> *) a.
(Exception e, MonadIO m, HasCallStack) =>
e -> m a
throwM KafkaError
err
)
((a -> m (Maybe a)) -> Either KafkaError a -> m (Maybe a))
-> (a -> m (Maybe a)) -> Either KafkaError a -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ Maybe a -> m (Maybe a)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe a -> m (Maybe a)) -> (a -> Maybe a) -> a -> m (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Maybe a
forall a. a -> Maybe a
Just