{-# LANGUAGE ApplicativeDo #-}
module Freckle.App.Kafka.Consumer
( HasKafkaConsumer (..)
, withKafkaConsumer
, KafkaConsumerConfig (..)
, envKafkaConsumerConfig
, runConsumer
) where
import Freckle.App.Prelude
import Blammo.Logging
import Control.Lens (Lens', view)
import Control.Monad (forever)
import Data.Aeson
import Data.ByteString (ByteString)
import qualified Data.List.NonEmpty as NE
import qualified Data.Map.Strict as Map
import qualified Data.Text as T
import qualified Env
import Freckle.App.Async
import Freckle.App.Env
import Freckle.App.Exception (annotatedExceptionMessageFrom)
import Freckle.App.Kafka.Producer (envKafkaBrokerAddresses)
import Freckle.App.OpenTelemetry
import Kafka.Consumer hiding
( Timeout
, closeConsumer
, newConsumer
, runConsumer
, subscription
)
import qualified Kafka.Consumer as Kafka
import UnliftIO.Exception (bracket)
data KafkaConsumerConfig = KafkaConsumerConfig
{ KafkaConsumerConfig -> NonEmpty BrokerAddress
kafkaConsumerConfigBrokerAddresses :: NonEmpty BrokerAddress
, KafkaConsumerConfig -> ConsumerGroupId
kafkaConsumerConfigGroupId :: ConsumerGroupId
, KafkaConsumerConfig -> TopicName
kafkaConsumerConfigTopic :: TopicName
, KafkaConsumerConfig -> OffsetReset
kafkaConsumerConfigOffsetReset :: OffsetReset
, KafkaConsumerConfig -> Millis
kafkaConsumerConfigAutoCommitInterval :: Millis
, :: Map Text Text
}
deriving stock (Int -> KafkaConsumerConfig -> ShowS
[KafkaConsumerConfig] -> ShowS
KafkaConsumerConfig -> String
(Int -> KafkaConsumerConfig -> ShowS)
-> (KafkaConsumerConfig -> String)
-> ([KafkaConsumerConfig] -> ShowS)
-> Show KafkaConsumerConfig
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> KafkaConsumerConfig -> ShowS
showsPrec :: Int -> KafkaConsumerConfig -> ShowS
$cshow :: KafkaConsumerConfig -> String
show :: KafkaConsumerConfig -> String
$cshowList :: [KafkaConsumerConfig] -> ShowS
showList :: [KafkaConsumerConfig] -> ShowS
Show)
envKafkaTopic
:: Env.Parser Env.Error TopicName
envKafkaTopic :: Parser Error TopicName
envKafkaTopic =
Reader Error TopicName
-> String -> Mod Var TopicName -> Parser Error TopicName
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var
((String -> Either String TopicName) -> Reader Error TopicName
forall a. (String -> Either String a) -> Reader Error a
eitherReader String -> Either String TopicName
readKafkaTopic)
String
"KAFKA_TOPIC"
Mod Var TopicName
forall a. Monoid a => a
mempty
readKafkaTopic :: String -> Either String TopicName
readKafkaTopic :: String -> Either String TopicName
readKafkaTopic String
t = case String -> Text
T.pack String
t of
Text
"" -> String -> Either String TopicName
forall a b. a -> Either a b
Left String
"Kafka topics cannot be empty"
Text
x -> TopicName -> Either String TopicName
forall a b. b -> Either a b
Right (TopicName -> Either String TopicName)
-> TopicName -> Either String TopicName
forall a b. (a -> b) -> a -> b
$ Text -> TopicName
TopicName Text
x
envKafkaOffsetReset
:: Env.Parser Env.Error OffsetReset
envKafkaOffsetReset :: Parser Error OffsetReset
envKafkaOffsetReset =
Reader Error OffsetReset
-> String -> Mod Var OffsetReset -> Parser Error OffsetReset
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var
((String -> Either String OffsetReset) -> Reader Error OffsetReset
forall a. (String -> Either String a) -> Reader Error a
eitherReader String -> Either String OffsetReset
readKafkaOffsetReset)
String
"KAFKA_OFFSET_RESET"
(Mod Var OffsetReset -> Parser Error OffsetReset)
-> Mod Var OffsetReset -> Parser Error OffsetReset
forall a b. (a -> b) -> a -> b
$ OffsetReset -> Mod Var OffsetReset
forall a. a -> Mod Var a
Env.def OffsetReset
Earliest
readKafkaOffsetReset :: String -> Either String OffsetReset
readKafkaOffsetReset :: String -> Either String OffsetReset
readKafkaOffsetReset String
t = case String -> Text
T.pack String
t of
Text
"earliest" -> OffsetReset -> Either String OffsetReset
forall a b. b -> Either a b
Right OffsetReset
Earliest
Text
"latest" -> OffsetReset -> Either String OffsetReset
forall a b. b -> Either a b
Right OffsetReset
Latest
Text
_ -> String -> Either String OffsetReset
forall a b. a -> Either a b
Left String
"Kafka offset reset must be one of earliest or latest"
envKafkaConsumerConfig
:: Env.Parser Env.Error KafkaConsumerConfig
envKafkaConsumerConfig :: Parser Error KafkaConsumerConfig
envKafkaConsumerConfig = do
NonEmpty BrokerAddress
brokerAddresses <- Parser Error (NonEmpty BrokerAddress)
envKafkaBrokerAddresses
ConsumerGroupId
consumerGroupId <- Reader Error ConsumerGroupId
-> String
-> Mod Var ConsumerGroupId
-> Parser Error ConsumerGroupId
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var Reader Error ConsumerGroupId
forall e s. (AsEmpty e, IsString s) => Reader e s
Env.nonempty String
"KAFKA_CONSUMER_GROUP_ID" Mod Var ConsumerGroupId
forall a. Monoid a => a
mempty
TopicName
kafkaTopic <- Parser Error TopicName
envKafkaTopic
OffsetReset
kafkaOffsetReset <- Parser Error OffsetReset
envKafkaOffsetReset
Millis
kafkaAutoOffsetInterval <-
Int -> Millis
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Millis) -> (Timeout -> Int) -> Timeout -> Millis
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Timeout -> Int
timeoutMs (Timeout -> Millis)
-> (Mod Var Timeout -> Parser Error Timeout)
-> Mod Var Timeout
-> Parser Error Millis
forall (c :: * -> *) (d :: * -> *) a b.
(Functor c, Functor d) =>
(a -> b) -> c (d a) -> c (d b)
<$$> Reader Error Timeout
-> String -> Mod Var Timeout -> Parser Error Timeout
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var Reader Error Timeout
timeout String
"KAFKA_AUTO_COMMIT_INTERVAL" (Mod Var Timeout -> Parser Error Millis)
-> Mod Var Timeout -> Parser Error Millis
forall a b. (a -> b) -> a -> b
$
Timeout -> Mod Var Timeout
forall a. a -> Mod Var a
Env.def (Timeout -> Mod Var Timeout) -> Timeout -> Mod Var Timeout
forall a b. (a -> b) -> a -> b
$
Int -> Timeout
TimeoutMilliseconds Int
5000
Map Text Text
kafkaExtraProps <-
Reader Error (Map Text Text)
-> String
-> Mod Var (Map Text Text)
-> Parser Error (Map Text Text)
forall e a.
AsUnset e =>
Reader e a -> String -> Mod Var a -> Parser e a
Env.var
(([(Text, Text)] -> Map Text Text)
-> Either Error [(Text, Text)] -> Either Error (Map Text Text)
forall a b. (a -> b) -> Either Error a -> Either Error b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList (Either Error [(Text, Text)] -> Either Error (Map Text Text))
-> (String -> Either Error [(Text, Text)])
-> Reader Error (Map Text Text)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Either Error [(Text, Text)]
keyValues)
String
"KAFKA_EXTRA_SUBSCRIPTION_PROPS"
(Map Text Text -> Mod Var (Map Text Text)
forall a. a -> Mod Var a
Env.def Map Text Text
forall a. Monoid a => a
mempty)
pure $
NonEmpty BrokerAddress
-> ConsumerGroupId
-> TopicName
-> OffsetReset
-> Millis
-> Map Text Text
-> KafkaConsumerConfig
KafkaConsumerConfig
NonEmpty BrokerAddress
brokerAddresses
ConsumerGroupId
consumerGroupId
TopicName
kafkaTopic
OffsetReset
kafkaOffsetReset
Millis
kafkaAutoOffsetInterval
Map Text Text
kafkaExtraProps
class HasKafkaConsumer env where
kafkaConsumerL :: Lens' env KafkaConsumer
consumerProps :: KafkaConsumerConfig -> ConsumerProperties
consumerProps :: KafkaConsumerConfig -> ConsumerProperties
consumerProps KafkaConsumerConfig {NonEmpty BrokerAddress
Map Text Text
TopicName
Millis
OffsetReset
ConsumerGroupId
kafkaConsumerConfigBrokerAddresses :: KafkaConsumerConfig -> NonEmpty BrokerAddress
kafkaConsumerConfigGroupId :: KafkaConsumerConfig -> ConsumerGroupId
kafkaConsumerConfigTopic :: KafkaConsumerConfig -> TopicName
kafkaConsumerConfigOffsetReset :: KafkaConsumerConfig -> OffsetReset
kafkaConsumerConfigAutoCommitInterval :: KafkaConsumerConfig -> Millis
kafkaConsumerConfigExtraSubscriptionProps :: KafkaConsumerConfig -> Map Text Text
kafkaConsumerConfigBrokerAddresses :: NonEmpty BrokerAddress
kafkaConsumerConfigGroupId :: ConsumerGroupId
kafkaConsumerConfigTopic :: TopicName
kafkaConsumerConfigOffsetReset :: OffsetReset
kafkaConsumerConfigAutoCommitInterval :: Millis
kafkaConsumerConfigExtraSubscriptionProps :: Map Text Text
..} =
[BrokerAddress] -> ConsumerProperties
brokersList [BrokerAddress]
brokers
ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall a. Semigroup a => a -> a -> a
<> ConsumerGroupId -> ConsumerProperties
groupId ConsumerGroupId
kafkaConsumerConfigGroupId
ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall a. Semigroup a => a -> a -> a
<> Millis -> ConsumerProperties
autoCommit Millis
kafkaConsumerConfigAutoCommitInterval
ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall a. Semigroup a => a -> a -> a
<> KafkaLogLevel -> ConsumerProperties
logLevel KafkaLogLevel
KafkaLogInfo
where
brokers :: [BrokerAddress]
brokers = NonEmpty BrokerAddress -> [BrokerAddress]
forall a. NonEmpty a -> [a]
NE.toList NonEmpty BrokerAddress
kafkaConsumerConfigBrokerAddresses
subscription :: KafkaConsumerConfig -> Subscription
subscription :: KafkaConsumerConfig -> Subscription
subscription KafkaConsumerConfig {NonEmpty BrokerAddress
Map Text Text
TopicName
Millis
OffsetReset
ConsumerGroupId
kafkaConsumerConfigBrokerAddresses :: KafkaConsumerConfig -> NonEmpty BrokerAddress
kafkaConsumerConfigGroupId :: KafkaConsumerConfig -> ConsumerGroupId
kafkaConsumerConfigTopic :: KafkaConsumerConfig -> TopicName
kafkaConsumerConfigOffsetReset :: KafkaConsumerConfig -> OffsetReset
kafkaConsumerConfigAutoCommitInterval :: KafkaConsumerConfig -> Millis
kafkaConsumerConfigExtraSubscriptionProps :: KafkaConsumerConfig -> Map Text Text
kafkaConsumerConfigBrokerAddresses :: NonEmpty BrokerAddress
kafkaConsumerConfigGroupId :: ConsumerGroupId
kafkaConsumerConfigTopic :: TopicName
kafkaConsumerConfigOffsetReset :: OffsetReset
kafkaConsumerConfigAutoCommitInterval :: Millis
kafkaConsumerConfigExtraSubscriptionProps :: Map Text Text
..} =
[TopicName] -> Subscription
topics [TopicName
kafkaConsumerConfigTopic]
Subscription -> Subscription -> Subscription
forall a. Semigroup a => a -> a -> a
<> OffsetReset -> Subscription
offsetReset OffsetReset
kafkaConsumerConfigOffsetReset
Subscription -> Subscription -> Subscription
forall a. Semigroup a => a -> a -> a
<> Map Text Text -> Subscription
extraSubscriptionProps Map Text Text
kafkaConsumerConfigExtraSubscriptionProps
withKafkaConsumer
:: (MonadUnliftIO m, HasCallStack)
=> KafkaConsumerConfig
-> (KafkaConsumer -> m a)
-> m a
withKafkaConsumer :: forall (m :: * -> *) a.
(MonadUnliftIO m, HasCallStack) =>
KafkaConsumerConfig -> (KafkaConsumer -> m a) -> m a
withKafkaConsumer KafkaConsumerConfig
config = m KafkaConsumer
-> (KafkaConsumer -> m ()) -> (KafkaConsumer -> m a) -> m a
forall (m :: * -> *) a b c.
MonadUnliftIO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket m KafkaConsumer
newConsumer KafkaConsumer -> m ()
forall {m :: * -> *}. MonadIO m => KafkaConsumer -> m ()
closeConsumer
where
(ConsumerProperties
props, Subscription
sub) = (KafkaConsumerConfig -> ConsumerProperties
consumerProps (KafkaConsumerConfig -> ConsumerProperties)
-> (KafkaConsumerConfig -> Subscription)
-> KafkaConsumerConfig
-> (ConsumerProperties, Subscription)
forall b c c'. (b -> c) -> (b -> c') -> b -> (c, c')
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& KafkaConsumerConfig -> Subscription
subscription) KafkaConsumerConfig
config
newConsumer :: m KafkaConsumer
newConsumer = (KafkaError -> m KafkaConsumer)
-> (KafkaConsumer -> m KafkaConsumer)
-> Either KafkaError KafkaConsumer
-> m KafkaConsumer
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either KafkaError -> m KafkaConsumer
forall e (m :: * -> *) a.
(Exception e, MonadIO m, HasCallStack) =>
e -> m a
throwM KafkaConsumer -> m KafkaConsumer
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either KafkaError KafkaConsumer -> m KafkaConsumer)
-> m (Either KafkaError KafkaConsumer) -> m KafkaConsumer
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< ConsumerProperties
-> Subscription -> m (Either KafkaError KafkaConsumer)
forall (m :: * -> *).
MonadIO m =>
ConsumerProperties
-> Subscription -> m (Either KafkaError KafkaConsumer)
Kafka.newConsumer ConsumerProperties
props Subscription
sub
closeConsumer :: KafkaConsumer -> m ()
closeConsumer = m () -> (KafkaError -> m ()) -> Maybe KafkaError -> m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) KafkaError -> m ()
forall e (m :: * -> *) a.
(Exception e, MonadIO m, HasCallStack) =>
e -> m a
throwM (Maybe KafkaError -> m ())
-> (KafkaConsumer -> m (Maybe KafkaError)) -> KafkaConsumer -> m ()
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> (a -> m b) -> a -> m c
<=< KafkaConsumer -> m (Maybe KafkaError)
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer -> m (Maybe KafkaError)
Kafka.closeConsumer
timeoutMs :: Timeout -> Int
timeoutMs :: Timeout -> Int
timeoutMs = \case
TimeoutSeconds Int
s -> Int
s Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000
TimeoutMilliseconds Int
ms -> Int
ms
data KafkaMessageDecodeError = KafkaMessageDecodeError
{ KafkaMessageDecodeError -> ByteString
input :: ByteString
, KafkaMessageDecodeError -> String
errors :: String
}
deriving stock (Int -> KafkaMessageDecodeError -> ShowS
[KafkaMessageDecodeError] -> ShowS
KafkaMessageDecodeError -> String
(Int -> KafkaMessageDecodeError -> ShowS)
-> (KafkaMessageDecodeError -> String)
-> ([KafkaMessageDecodeError] -> ShowS)
-> Show KafkaMessageDecodeError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> KafkaMessageDecodeError -> ShowS
showsPrec :: Int -> KafkaMessageDecodeError -> ShowS
$cshow :: KafkaMessageDecodeError -> String
show :: KafkaMessageDecodeError -> String
$cshowList :: [KafkaMessageDecodeError] -> ShowS
showList :: [KafkaMessageDecodeError] -> ShowS
Show)
instance Exception KafkaMessageDecodeError where
displayException :: KafkaMessageDecodeError -> String
displayException KafkaMessageDecodeError {String
ByteString
input :: KafkaMessageDecodeError -> ByteString
errors :: KafkaMessageDecodeError -> String
input :: ByteString
errors :: String
..} =
[String] -> String
forall a. Monoid a => [a] -> a
mconcat
[ String
"Unable to decode JSON"
, String
"\n input: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Text -> String
unpack (ByteString -> Text
decodeUtf8 ByteString
input)
, String
"\n errors: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
errors
]
runConsumer
:: ( MonadMask m
, MonadUnliftIO m
, MonadReader env m
, MonadLogger m
, MonadTracer m
, HasKafkaConsumer env
, FromJSON a
, HasCallStack
)
=> Timeout
-> (a -> m ())
-> m ()
runConsumer :: forall (m :: * -> *) env a.
(MonadMask m, MonadUnliftIO m, MonadReader env m, MonadLogger m,
MonadTracer m, HasKafkaConsumer env, FromJSON a, HasCallStack) =>
Timeout -> (a -> m ()) -> m ()
runConsumer Timeout
pollTimeout a -> m ()
onMessage =
m () -> m ()
forall (m :: * -> *) a. (MonadIO m, MonadMask m) => m a -> m a
withTraceIdContext (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ m () -> m ()
forall (m :: * -> *) a.
(MonadMask m, MonadUnliftIO m, MonadLogger m) =>
m () -> m a
immortalCreateLogged (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
KafkaConsumer
consumer <- Getting KafkaConsumer env KafkaConsumer -> m KafkaConsumer
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting KafkaConsumer env KafkaConsumer
forall env. HasKafkaConsumer env => Lens' env KafkaConsumer
Lens' env KafkaConsumer
kafkaConsumerL
(m () -> [ExceptionHandler m ()] -> m ())
-> [ExceptionHandler m ()] -> m () -> m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip m () -> [ExceptionHandler m ()] -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, HasCallStack) =>
m a -> [ExceptionHandler m a] -> m a
catches [ExceptionHandler m ()]
forall {m :: * -> *}. MonadLogger m => [ExceptionHandler m ()]
handlers (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> SpanArguments -> m () -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadTracer m, HasCallStack) =>
Text -> SpanArguments -> m a -> m a
inSpan Text
"kafka.consumer" SpanArguments
consumerSpanArguments (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Maybe (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
mRecord <- Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> m (Maybe (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall (m :: * -> *) a.
(MonadIO m, MonadLogger m, HasCallStack) =>
Either KafkaError a -> m (Maybe a)
fromKafkaError (Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> m (Maybe
(ConsumerRecord (Maybe ByteString) (Maybe ByteString))))
-> m (Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
-> m (Maybe (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< KafkaConsumer
-> Timeout
-> m (Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer
-> Timeout
-> m (Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
pollMessage KafkaConsumer
consumer Timeout
kTimeout
Maybe ByteString -> (ByteString -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> Maybe ByteString
forall k v. ConsumerRecord k v -> v
crValue (ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> Maybe ByteString)
-> Maybe (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> Maybe ByteString
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Maybe (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
mRecord) ((ByteString -> m ()) -> m ()) -> (ByteString -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \ByteString
bs -> do
a
a <-
Text -> SpanArguments -> m a -> m a
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadTracer m, HasCallStack) =>
Text -> SpanArguments -> m a -> m a
inSpan Text
"kafka.consumer.message.decode" SpanArguments
defaultSpanArguments (m a -> m a) -> m a -> m a
forall a b. (a -> b) -> a -> b
$
(String -> m a) -> (a -> m a) -> Either String a -> m a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (KafkaMessageDecodeError -> m a
forall e (m :: * -> *) a.
(Exception e, MonadIO m, HasCallStack) =>
e -> m a
throwM (KafkaMessageDecodeError -> m a)
-> (String -> KafkaMessageDecodeError) -> String -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> String -> KafkaMessageDecodeError
KafkaMessageDecodeError ByteString
bs) a -> m a
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either String a -> m a) -> Either String a -> m a
forall a b. (a -> b) -> a -> b
$
ByteString -> Either String a
forall a. FromJSON a => ByteString -> Either String a
eitherDecodeStrict ByteString
bs
Text -> SpanArguments -> m () -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadTracer m, HasCallStack) =>
Text -> SpanArguments -> m a -> m a
inSpan Text
"kafka.consumer.message.handle" SpanArguments
defaultSpanArguments (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ a -> m ()
onMessage a
a
where
kTimeout :: Timeout
kTimeout = Int -> Timeout
Kafka.Timeout (Int -> Timeout) -> Int -> Timeout
forall a b. (a -> b) -> a -> b
$ Timeout -> Int
timeoutMs Timeout
pollTimeout
handlers :: [ExceptionHandler m ()]
handlers =
[ (AnnotatedException KafkaError -> m ()) -> ExceptionHandler m ()
forall (m :: * -> *) a e.
Exception e =>
(e -> m a) -> ExceptionHandler m a
ExceptionHandler ((AnnotatedException KafkaError -> m ()) -> ExceptionHandler m ())
-> (AnnotatedException KafkaError -> m ()) -> ExceptionHandler m ()
forall a b. (a -> b) -> a -> b
$
Text -> Message -> m ()
forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Text -> Message -> m ()
logErrorNS Text
"kafka"
(Message -> m ())
-> (AnnotatedException KafkaError -> Message)
-> AnnotatedException KafkaError
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall ex.
Exception ex =>
(ex -> Message) -> AnnotatedException ex -> Message
annotatedExceptionMessageFrom @KafkaError
(Message -> KafkaError -> Message
forall a b. a -> b -> a
const Message
"Error polling for message from Kafka")
, (AnnotatedException KafkaMessageDecodeError -> m ())
-> ExceptionHandler m ()
forall (m :: * -> *) a e.
Exception e =>
(e -> m a) -> ExceptionHandler m a
ExceptionHandler ((AnnotatedException KafkaMessageDecodeError -> m ())
-> ExceptionHandler m ())
-> (AnnotatedException KafkaMessageDecodeError -> m ())
-> ExceptionHandler m ()
forall a b. (a -> b) -> a -> b
$
Text -> Message -> m ()
forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Text -> Message -> m ()
logErrorNS Text
"kafka"
(Message -> m ())
-> (AnnotatedException KafkaMessageDecodeError -> Message)
-> AnnotatedException KafkaMessageDecodeError
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall ex.
Exception ex =>
(ex -> Message) -> AnnotatedException ex -> Message
annotatedExceptionMessageFrom @KafkaMessageDecodeError
(Message -> KafkaMessageDecodeError -> Message
forall a b. a -> b -> a
const Message
"Could not decode message value")
]
fromKafkaError
:: (MonadIO m, MonadLogger m, HasCallStack)
=> Either KafkaError a
-> m (Maybe a)
fromKafkaError :: forall (m :: * -> *) a.
(MonadIO m, MonadLogger m, HasCallStack) =>
Either KafkaError a -> m (Maybe a)
fromKafkaError =
(KafkaError -> m (Maybe a))
-> (a -> m (Maybe a)) -> Either KafkaError a -> m (Maybe a)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either
( \case
KafkaResponseError RdKafkaRespErrT
RdKafkaRespErrTimedOut ->
Maybe a
forall a. Maybe a
Nothing Maybe a -> m () -> m (Maybe a)
forall a b. a -> m b -> m a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Message -> m ()
forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
Message -> m ()
logDebug Message
"Polling timeout"
KafkaError
err -> KafkaError -> m (Maybe a)
forall e (m :: * -> *) a.
(Exception e, MonadIO m, HasCallStack) =>
e -> m a
throwM KafkaError
err
)
((a -> m (Maybe a)) -> Either KafkaError a -> m (Maybe a))
-> (a -> m (Maybe a)) -> Either KafkaError a -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ Maybe a -> m (Maybe a)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe a -> m (Maybe a)) -> (a -> Maybe a) -> a -> m (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Maybe a
forall a. a -> Maybe a
Just