{-# LANGUAGE OverloadedStrings #-}
module Kafka.Producer.ProducerProperties
( ProducerProperties(..)
, brokersList
, setCallback
, logLevel
, compression
, topicCompression
, sendTimeout
, extraProps
, suppressDisconnectLogs
, extraTopicProps
, debugOptions
, module Kafka.Producer.Callbacks
)
where
import Data.Text (Text)
import qualified Data.Text as Text
import Control.Monad (MonadPlus(mplus))
import Data.Map (Map)
import qualified Data.Map as M
import Data.Semigroup as Sem
import Kafka.Internal.Setup (KafkaConf(..))
import Kafka.Types (KafkaDebug(..), Timeout(..), KafkaCompressionCodec(..), KafkaLogLevel(..), BrokerAddress(..), kafkaDebugToText, kafkaCompressionCodecToText)
import Kafka.Producer.Callbacks
data ProducerProperties = ProducerProperties
{ ppKafkaProps :: Map Text Text
, ppTopicProps :: Map Text Text
, ppLogLevel :: Maybe KafkaLogLevel
, ppCallbacks :: [KafkaConf -> IO ()]
}
instance Sem.Semigroup ProducerProperties where
(ProducerProperties k1 t1 ll1 cb1) <> (ProducerProperties k2 t2 ll2 cb2) =
ProducerProperties (M.union k2 k1) (M.union t2 t1) (ll2 `mplus` ll1) (cb1 `mplus` cb2)
{-# INLINE (<>) #-}
instance Monoid ProducerProperties where
mempty = ProducerProperties
{ ppKafkaProps = M.empty
, ppTopicProps = M.empty
, ppLogLevel = Nothing
, ppCallbacks = []
}
{-# INLINE mempty #-}
mappend = (Sem.<>)
{-# INLINE mappend #-}
brokersList :: [BrokerAddress] -> ProducerProperties
brokersList bs =
let bs' = Text.intercalate "," ((\(BrokerAddress x) -> x) <$> bs)
in extraProps $ M.fromList [("bootstrap.servers", bs')]
setCallback :: (KafkaConf -> IO ()) -> ProducerProperties
setCallback cb = mempty { ppCallbacks = [cb] }
logLevel :: KafkaLogLevel -> ProducerProperties
logLevel ll = mempty { ppLogLevel = Just ll }
compression :: KafkaCompressionCodec -> ProducerProperties
compression c =
extraProps $ M.singleton "compression.codec" (kafkaCompressionCodecToText c)
topicCompression :: KafkaCompressionCodec -> ProducerProperties
topicCompression c =
extraTopicProps $ M.singleton "compression.codec" (kafkaCompressionCodecToText c)
sendTimeout :: Timeout -> ProducerProperties
sendTimeout (Timeout t) =
extraTopicProps $ M.singleton "message.timeout.ms" (Text.pack $ show t)
extraProps :: Map Text Text -> ProducerProperties
extraProps m = mempty { ppKafkaProps = m }
suppressDisconnectLogs :: ProducerProperties
suppressDisconnectLogs =
extraProps $ M.fromList [("log.connection.close", "false")]
extraTopicProps :: Map Text Text -> ProducerProperties
extraTopicProps m = mempty { ppTopicProps = m }
debugOptions :: [KafkaDebug] -> ProducerProperties
debugOptions [] = extraProps M.empty
debugOptions d =
let points = Text.intercalate "," (kafkaDebugToText <$> d)
in extraProps $ M.fromList [("debug", points)]