{-# LANGUAGE OverloadedStrings #-}
module Kafka.Producer.ProducerProperties
( ProducerProperties(..)
, brokersList
, setCallback
, logLevel
, compression
, topicCompression
, sendTimeout
, statisticsInterval
, extraProps
, suppressDisconnectLogs
, extraTopicProps
, debugOptions
, module Kafka.Producer.Callbacks
)
where
import Data.Text (Text)
import qualified Data.Text as Text
import Control.Monad (MonadPlus(mplus))
import Data.Map (Map)
import qualified Data.Map as M
import Data.Semigroup as Sem
import Kafka.Internal.Setup (KafkaConf(..), Callback(..))
import Kafka.Types (KafkaDebug(..), Timeout(..), KafkaCompressionCodec(..), KafkaLogLevel(..), BrokerAddress(..), kafkaDebugToText, kafkaCompressionCodecToText, Millis(..))
import Kafka.Producer.Callbacks
data ProducerProperties = ProducerProperties
{ ProducerProperties -> Map Text Text
ppKafkaProps :: Map Text Text
, ProducerProperties -> Map Text Text
ppTopicProps :: Map Text Text
, ProducerProperties -> Maybe KafkaLogLevel
ppLogLevel :: Maybe KafkaLogLevel
, ProducerProperties -> [Callback]
ppCallbacks :: [Callback]
}
instance Sem.Semigroup ProducerProperties where
(ProducerProperties k1 :: Map Text Text
k1 t1 :: Map Text Text
t1 ll1 :: Maybe KafkaLogLevel
ll1 cb1 :: [Callback]
cb1) <> :: ProducerProperties -> ProducerProperties -> ProducerProperties
<> (ProducerProperties k2 :: Map Text Text
k2 t2 :: Map Text Text
t2 ll2 :: Maybe KafkaLogLevel
ll2 cb2 :: [Callback]
cb2) =
Map Text Text
-> Map Text Text
-> Maybe KafkaLogLevel
-> [Callback]
-> ProducerProperties
ProducerProperties (Map Text Text -> Map Text Text -> Map Text Text
forall k a. Ord k => Map k a -> Map k a -> Map k a
M.union Map Text Text
k2 Map Text Text
k1) (Map Text Text -> Map Text Text -> Map Text Text
forall k a. Ord k => Map k a -> Map k a -> Map k a
M.union Map Text Text
t2 Map Text Text
t1) (Maybe KafkaLogLevel
ll2 Maybe KafkaLogLevel -> Maybe KafkaLogLevel -> Maybe KafkaLogLevel
forall (m :: * -> *) a. MonadPlus m => m a -> m a -> m a
`mplus` Maybe KafkaLogLevel
ll1) ([Callback]
cb1 [Callback] -> [Callback] -> [Callback]
forall (m :: * -> *) a. MonadPlus m => m a -> m a -> m a
`mplus` [Callback]
cb2)
{-# INLINE (<>) #-}
instance Monoid ProducerProperties where
mempty :: ProducerProperties
mempty = ProducerProperties :: Map Text Text
-> Map Text Text
-> Maybe KafkaLogLevel
-> [Callback]
-> ProducerProperties
ProducerProperties
{ ppKafkaProps :: Map Text Text
ppKafkaProps = Map Text Text
forall k a. Map k a
M.empty
, ppTopicProps :: Map Text Text
ppTopicProps = Map Text Text
forall k a. Map k a
M.empty
, ppLogLevel :: Maybe KafkaLogLevel
ppLogLevel = Maybe KafkaLogLevel
forall a. Maybe a
Nothing
, ppCallbacks :: [Callback]
ppCallbacks = []
}
{-# INLINE mempty #-}
mappend :: ProducerProperties -> ProducerProperties -> ProducerProperties
mappend = ProducerProperties -> ProducerProperties -> ProducerProperties
forall a. Semigroup a => a -> a -> a
(Sem.<>)
{-# INLINE mappend #-}
brokersList :: [BrokerAddress] -> ProducerProperties
brokersList :: [BrokerAddress] -> ProducerProperties
brokersList bs :: [BrokerAddress]
bs =
let bs' :: Text
bs' = Text -> [Text] -> Text
Text.intercalate "," (BrokerAddress -> Text
unBrokerAddress (BrokerAddress -> Text) -> [BrokerAddress] -> [Text]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [BrokerAddress]
bs)
in Map Text Text -> ProducerProperties
extraProps (Map Text Text -> ProducerProperties)
-> Map Text Text -> ProducerProperties
forall a b. (a -> b) -> a -> b
$ [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [("bootstrap.servers", Text
bs')]
setCallback :: Callback -> ProducerProperties
setCallback :: Callback -> ProducerProperties
setCallback cb :: Callback
cb = ProducerProperties
forall a. Monoid a => a
mempty { ppCallbacks :: [Callback]
ppCallbacks = [Callback
cb] }
logLevel :: KafkaLogLevel -> ProducerProperties
logLevel :: KafkaLogLevel -> ProducerProperties
logLevel ll :: KafkaLogLevel
ll = ProducerProperties
forall a. Monoid a => a
mempty { ppLogLevel :: Maybe KafkaLogLevel
ppLogLevel = KafkaLogLevel -> Maybe KafkaLogLevel
forall a. a -> Maybe a
Just KafkaLogLevel
ll }
compression :: KafkaCompressionCodec -> ProducerProperties
compression :: KafkaCompressionCodec -> ProducerProperties
compression c :: KafkaCompressionCodec
c =
Map Text Text -> ProducerProperties
extraProps (Map Text Text -> ProducerProperties)
-> Map Text Text -> ProducerProperties
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Map Text Text
forall k a. k -> a -> Map k a
M.singleton "compression.codec" (KafkaCompressionCodec -> Text
kafkaCompressionCodecToText KafkaCompressionCodec
c)
topicCompression :: KafkaCompressionCodec -> ProducerProperties
topicCompression :: KafkaCompressionCodec -> ProducerProperties
topicCompression c :: KafkaCompressionCodec
c =
Map Text Text -> ProducerProperties
extraTopicProps (Map Text Text -> ProducerProperties)
-> Map Text Text -> ProducerProperties
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Map Text Text
forall k a. k -> a -> Map k a
M.singleton "compression.codec" (KafkaCompressionCodec -> Text
kafkaCompressionCodecToText KafkaCompressionCodec
c)
sendTimeout :: Timeout -> ProducerProperties
sendTimeout :: Timeout -> ProducerProperties
sendTimeout (Timeout t :: Int
t) =
Map Text Text -> ProducerProperties
extraTopicProps (Map Text Text -> ProducerProperties)
-> Map Text Text -> ProducerProperties
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Map Text Text
forall k a. k -> a -> Map k a
M.singleton "message.timeout.ms" (String -> Text
Text.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Int -> String
forall a. Show a => a -> String
show Int
t)
statisticsInterval :: Millis -> ProducerProperties
statisticsInterval :: Millis -> ProducerProperties
statisticsInterval (Millis t :: Int64
t) =
Map Text Text -> ProducerProperties
extraProps (Map Text Text -> ProducerProperties)
-> Map Text Text -> ProducerProperties
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Map Text Text
forall k a. k -> a -> Map k a
M.singleton "statistics.interval.ms" (String -> Text
Text.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Int64 -> String
forall a. Show a => a -> String
show Int64
t)
extraProps :: Map Text Text -> ProducerProperties
m :: Map Text Text
m = ProducerProperties
forall a. Monoid a => a
mempty { ppKafkaProps :: Map Text Text
ppKafkaProps = Map Text Text
m }
suppressDisconnectLogs :: ProducerProperties
suppressDisconnectLogs :: ProducerProperties
suppressDisconnectLogs =
Map Text Text -> ProducerProperties
extraProps (Map Text Text -> ProducerProperties)
-> Map Text Text -> ProducerProperties
forall a b. (a -> b) -> a -> b
$ [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [("log.connection.close", "false")]
extraTopicProps :: Map Text Text -> ProducerProperties
m :: Map Text Text
m = ProducerProperties
forall a. Monoid a => a
mempty { ppTopicProps :: Map Text Text
ppTopicProps = Map Text Text
m }
debugOptions :: [KafkaDebug] -> ProducerProperties
debugOptions :: [KafkaDebug] -> ProducerProperties
debugOptions [] = Map Text Text -> ProducerProperties
extraProps Map Text Text
forall k a. Map k a
M.empty
debugOptions d :: [KafkaDebug]
d =
let points :: Text
points = Text -> [Text] -> Text
Text.intercalate "," (KafkaDebug -> Text
kafkaDebugToText (KafkaDebug -> Text) -> [KafkaDebug] -> [Text]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [KafkaDebug]
d)
in Map Text Text -> ProducerProperties
extraProps (Map Text Text -> ProducerProperties)
-> Map Text Text -> ProducerProperties
forall a b. (a -> b) -> a -> b
$ [(Text, Text)] -> Map Text Text
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [("debug", Text
points)]