{-# LANGUAGE OverloadedStrings #-}
module Kafka.Consumer.ConsumerProperties
( ConsumerProperties(..)
, CallbackPollMode(..)
, brokersList
, autoCommit
, noAutoCommit
, noAutoOffsetStore
, groupId
, clientId
, setCallback
, logLevel
, compression
, suppressDisconnectLogs
, statisticsInterval
, extraProps
, extraProp
, debugOptions
, queuedMaxMessagesKBytes
, callbackPollMode
, module X
)
where
import Control.Monad (MonadPlus (mplus))
import Data.Map (Map)
import qualified Data.Map as M
import Data.Semigroup as Sem
import Data.Text (Text)
import qualified Data.Text as Text
import Kafka.Consumer.Types (ConsumerGroupId (..))
import Kafka.Internal.Setup (KafkaConf (..), Callback(..))
import Kafka.Types (BrokerAddress (..), ClientId (..), KafkaCompressionCodec (..), KafkaDebug (..), KafkaLogLevel (..), Millis (..), kafkaCompressionCodecToText, kafkaDebugToText)
import Kafka.Consumer.Callbacks as X
data CallbackPollMode =
CallbackPollModeSync
| CallbackPollModeAsync deriving (Int -> CallbackPollMode -> ShowS
[CallbackPollMode] -> ShowS
CallbackPollMode -> String
(Int -> CallbackPollMode -> ShowS)
-> (CallbackPollMode -> String)
-> ([CallbackPollMode] -> ShowS)
-> Show CallbackPollMode
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [CallbackPollMode] -> ShowS
$cshowList :: [CallbackPollMode] -> ShowS
show :: CallbackPollMode -> String
$cshow :: CallbackPollMode -> String
showsPrec :: Int -> CallbackPollMode -> ShowS
$cshowsPrec :: Int -> CallbackPollMode -> ShowS
Show, CallbackPollMode -> CallbackPollMode -> Bool
(CallbackPollMode -> CallbackPollMode -> Bool)
-> (CallbackPollMode -> CallbackPollMode -> Bool)
-> Eq CallbackPollMode
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: CallbackPollMode -> CallbackPollMode -> Bool
$c/= :: CallbackPollMode -> CallbackPollMode -> Bool
== :: CallbackPollMode -> CallbackPollMode -> Bool
$c== :: CallbackPollMode -> CallbackPollMode -> Bool
Eq)
data ConsumerProperties = ConsumerProperties
{ ConsumerProperties -> Map Text Text
cpProps :: Map Text Text
, ConsumerProperties -> Maybe KafkaLogLevel
cpLogLevel :: Maybe KafkaLogLevel
, ConsumerProperties -> [Callback]
cpCallbacks :: [Callback]
, ConsumerProperties -> CallbackPollMode
cpCallbackPollMode :: CallbackPollMode
}
instance Sem.Semigroup ConsumerProperties where
(ConsumerProperties m1 :: Map Text Text
m1 ll1 :: Maybe KafkaLogLevel
ll1 cb1 :: [Callback]
cb1 _) <> :: ConsumerProperties -> ConsumerProperties -> ConsumerProperties
<> (ConsumerProperties m2 :: Map Text Text
m2 ll2 :: Maybe KafkaLogLevel
ll2 cb2 :: [Callback]
cb2 cup2 :: CallbackPollMode
cup2) =
Map Text Text
-> Maybe KafkaLogLevel
-> [Callback]
-> CallbackPollMode
-> ConsumerProperties
ConsumerProperties (Map Text Text -> Map Text Text -> Map Text Text
forall k a. Ord k => Map k a -> Map k a -> Map k a
M.union Map Text Text
m2 Map Text Text
m1) (Maybe KafkaLogLevel
ll2 Maybe KafkaLogLevel -> Maybe KafkaLogLevel -> Maybe KafkaLogLevel
forall (m :: * -> *) a. MonadPlus m => m a -> m a -> m a
`mplus` Maybe KafkaLogLevel
ll1) ([Callback]
cb1 [Callback] -> [Callback] -> [Callback]
forall (m :: * -> *) a. MonadPlus m => m a -> m a -> m a
`mplus` [Callback]
cb2) CallbackPollMode
cup2
{-# INLINE (<>) #-}
instance Monoid ConsumerProperties where
mempty :: ConsumerProperties
mempty = ConsumerProperties :: Map Text Text
-> Maybe KafkaLogLevel
-> [Callback]
-> CallbackPollMode
-> ConsumerProperties
ConsumerProperties
{ cpProps :: Map Text Text
cpProps = Map Text Text
forall k a. Map k a
M.empty
, cpLogLevel :: Maybe KafkaLogLevel
cpLogLevel = Maybe KafkaLogLevel
forall a. Maybe a
Nothing
, cpCallbacks :: [Callback]
cpCallbacks = []
, cpCallbackPollMode :: CallbackPollMode
cpCallbackPollMode = CallbackPollMode
CallbackPollModeAsync
}
{-# INLINE mempty #-}
mappend :: ConsumerProperties -> ConsumerProperties -> ConsumerProperties
mappend = ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall a. Semigroup a => a -> a -> a
(Sem.<>)
{-# INLINE mappend #-}
brokersList :: [BrokerAddress] -> ConsumerProperties
brokersList :: [BrokerAddress] -> ConsumerProperties
brokersList bs :: [BrokerAddress]
bs =
let bs' :: Text
bs' = Text -> [Text] -> Text
Text.intercalate "," (BrokerAddress -> Text
unBrokerAddress (BrokerAddress -> Text) -> [BrokerAddress] -> [Text]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [BrokerAddress]
bs)
in Map Text Text -> ConsumerProperties
extraProps (Map Text Text -> ConsumerProperties)
-> Map Text Text -> ConsumerProperties
forall a b. (a -> b) -> a -> b
$ [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [("bootstrap.servers", Text
bs')]
autoCommit :: Millis -> ConsumerProperties
autoCommit :: Millis -> ConsumerProperties
autoCommit (Millis ms :: Int64
ms) = Map Text Text -> ConsumerProperties
extraProps (Map Text Text -> ConsumerProperties)
-> Map Text Text -> ConsumerProperties
forall a b. (a -> b) -> a -> b
$
[(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList
[ ("enable.auto.commit", "true")
, ("auto.commit.interval.ms", String -> Text
Text.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Int64 -> String
forall a. Show a => a -> String
show Int64
ms)
]
noAutoCommit :: ConsumerProperties
noAutoCommit :: ConsumerProperties
noAutoCommit =
Map Text Text -> ConsumerProperties
extraProps (Map Text Text -> ConsumerProperties)
-> Map Text Text -> ConsumerProperties
forall a b. (a -> b) -> a -> b
$ [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [("enable.auto.commit", "false")]
noAutoOffsetStore :: ConsumerProperties
noAutoOffsetStore :: ConsumerProperties
noAutoOffsetStore =
Map Text Text -> ConsumerProperties
extraProps (Map Text Text -> ConsumerProperties)
-> Map Text Text -> ConsumerProperties
forall a b. (a -> b) -> a -> b
$ [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [("enable.auto.offset.store", "false")]
groupId :: ConsumerGroupId -> ConsumerProperties
groupId :: ConsumerGroupId -> ConsumerProperties
groupId (ConsumerGroupId cid :: Text
cid) =
Map Text Text -> ConsumerProperties
extraProps (Map Text Text -> ConsumerProperties)
-> Map Text Text -> ConsumerProperties
forall a b. (a -> b) -> a -> b
$ [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [("group.id", Text
cid)]
clientId :: ClientId -> ConsumerProperties
clientId :: ClientId -> ConsumerProperties
clientId (ClientId cid :: Text
cid) =
Map Text Text -> ConsumerProperties
extraProps (Map Text Text -> ConsumerProperties)
-> Map Text Text -> ConsumerProperties
forall a b. (a -> b) -> a -> b
$ [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [("client.id", Text
cid)]
setCallback :: Callback -> ConsumerProperties
setCallback :: Callback -> ConsumerProperties
setCallback cb :: Callback
cb = ConsumerProperties
forall a. Monoid a => a
mempty { cpCallbacks :: [Callback]
cpCallbacks = [Callback
cb] }
logLevel :: KafkaLogLevel -> ConsumerProperties
logLevel :: KafkaLogLevel -> ConsumerProperties
logLevel ll :: KafkaLogLevel
ll = ConsumerProperties
forall a. Monoid a => a
mempty { cpLogLevel :: Maybe KafkaLogLevel
cpLogLevel = KafkaLogLevel -> Maybe KafkaLogLevel
forall a. a -> Maybe a
Just KafkaLogLevel
ll }
compression :: KafkaCompressionCodec -> ConsumerProperties
compression :: KafkaCompressionCodec -> ConsumerProperties
compression c :: KafkaCompressionCodec
c =
Map Text Text -> ConsumerProperties
extraProps (Map Text Text -> ConsumerProperties)
-> Map Text Text -> ConsumerProperties
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Map Text Text
forall k a. k -> a -> Map k a
M.singleton "compression.codec" (KafkaCompressionCodec -> Text
kafkaCompressionCodecToText KafkaCompressionCodec
c)
suppressDisconnectLogs :: ConsumerProperties
suppressDisconnectLogs :: ConsumerProperties
suppressDisconnectLogs =
Map Text Text -> ConsumerProperties
extraProps (Map Text Text -> ConsumerProperties)
-> Map Text Text -> ConsumerProperties
forall a b. (a -> b) -> a -> b
$ [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [("log.connection.close", "false")]
statisticsInterval :: Millis -> ConsumerProperties
statisticsInterval :: Millis -> ConsumerProperties
statisticsInterval (Millis t :: Int64
t) =
Map Text Text -> ConsumerProperties
extraProps (Map Text Text -> ConsumerProperties)
-> Map Text Text -> ConsumerProperties
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Map Text Text
forall k a. k -> a -> Map k a
M.singleton "statistics.interval.ms" (String -> Text
Text.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Int64 -> String
forall a. Show a => a -> String
show Int64
t)
extraProps :: Map Text Text -> ConsumerProperties
m :: Map Text Text
m = ConsumerProperties
forall a. Monoid a => a
mempty { cpProps :: Map Text Text
cpProps = Map Text Text
m }
{-# INLINE extraProps #-}
extraProp :: Text -> Text -> ConsumerProperties
k :: Text
k v :: Text
v = ConsumerProperties
forall a. Monoid a => a
mempty { cpProps :: Map Text Text
cpProps = Text -> Text -> Map Text Text
forall k a. k -> a -> Map k a
M.singleton Text
k Text
v }
{-# INLINE extraProp #-}
debugOptions :: [KafkaDebug] -> ConsumerProperties
debugOptions :: [KafkaDebug] -> ConsumerProperties
debugOptions [] = Map Text Text -> ConsumerProperties
extraProps Map Text Text
forall k a. Map k a
M.empty
debugOptions d :: [KafkaDebug]
d =
let points :: Text
points = Text -> [Text] -> Text
Text.intercalate "," (KafkaDebug -> Text
kafkaDebugToText (KafkaDebug -> Text) -> [KafkaDebug] -> [Text]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [KafkaDebug]
d)
in Map Text Text -> ConsumerProperties
extraProps (Map Text Text -> ConsumerProperties)
-> Map Text Text -> ConsumerProperties
forall a b. (a -> b) -> a -> b
$ [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [("debug", Text
points)]
queuedMaxMessagesKBytes :: Int -> ConsumerProperties
queuedMaxMessagesKBytes :: Int -> ConsumerProperties
queuedMaxMessagesKBytes kBytes :: Int
kBytes =
Text -> Text -> ConsumerProperties
extraProp "queued.max.messages.kbytes" (String -> Text
Text.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Int -> String
forall a. Show a => a -> String
show Int
kBytes)
{-# INLINE queuedMaxMessagesKBytes #-}
callbackPollMode :: CallbackPollMode -> ConsumerProperties
callbackPollMode :: CallbackPollMode -> ConsumerProperties
callbackPollMode mode :: CallbackPollMode
mode = ConsumerProperties
forall a. Monoid a => a
mempty { cpCallbackPollMode :: CallbackPollMode
cpCallbackPollMode = CallbackPollMode
mode }