module Kafka.Internal.Setup
( KafkaProps(..)
, TopicProps(..)
, Kafka(..)
, KafkaConf(..)
, TopicConf(..)
, HasKafka(..)
, HasKafkaConf(..)
, HasTopicConf(..)
, Callback(..)
, CallbackPollStatus(..)
, getRdKafka
, getRdKafkaConf
, getRdMsgQueue
, getRdTopicConf
, newTopicConf
, newKafkaConf
, kafkaConf
, topicConf
, checkConfSetValue
, setKafkaConfValue
, setAllKafkaConfValues
, setTopicConfValue
, setAllTopicConfValues
)
where
import Kafka.Internal.RdKafka (CCharBufPointer, RdKafkaConfResT (..), RdKafkaConfTPtr, RdKafkaQueueTPtr, RdKafkaTPtr, RdKafkaTopicConfTPtr, nErrorBytes, newRdKafkaConfT, newRdKafkaTopicConfT, rdKafkaConfSet, rdKafkaTopicConfSet)
import Kafka.Types (KafkaError (..))
import Control.Concurrent.MVar (MVar, newMVar)
import Control.Exception (throw)
import Data.IORef (IORef, newIORef, readIORef)
import Data.Map (Map)
import Data.Text (Text)
import Foreign.C.String (peekCString)
import Foreign.Marshal.Alloc (allocaBytes)
import qualified Data.Map as Map
import qualified Data.Text as Text
newtype KafkaProps = KafkaProps (Map Text Text) deriving (Int -> KafkaProps -> ShowS
[KafkaProps] -> ShowS
KafkaProps -> String
(Int -> KafkaProps -> ShowS)
-> (KafkaProps -> String)
-> ([KafkaProps] -> ShowS)
-> Show KafkaProps
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [KafkaProps] -> ShowS
$cshowList :: [KafkaProps] -> ShowS
show :: KafkaProps -> String
$cshow :: KafkaProps -> String
showsPrec :: Int -> KafkaProps -> ShowS
$cshowsPrec :: Int -> KafkaProps -> ShowS
Show, KafkaProps -> KafkaProps -> Bool
(KafkaProps -> KafkaProps -> Bool)
-> (KafkaProps -> KafkaProps -> Bool) -> Eq KafkaProps
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: KafkaProps -> KafkaProps -> Bool
$c/= :: KafkaProps -> KafkaProps -> Bool
== :: KafkaProps -> KafkaProps -> Bool
$c== :: KafkaProps -> KafkaProps -> Bool
Eq)
newtype TopicProps = TopicProps (Map Text Text) deriving (Int -> TopicProps -> ShowS
[TopicProps] -> ShowS
TopicProps -> String
(Int -> TopicProps -> ShowS)
-> (TopicProps -> String)
-> ([TopicProps] -> ShowS)
-> Show TopicProps
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [TopicProps] -> ShowS
$cshowList :: [TopicProps] -> ShowS
show :: TopicProps -> String
$cshow :: TopicProps -> String
showsPrec :: Int -> TopicProps -> ShowS
$cshowsPrec :: Int -> TopicProps -> ShowS
Show, TopicProps -> TopicProps -> Bool
(TopicProps -> TopicProps -> Bool)
-> (TopicProps -> TopicProps -> Bool) -> Eq TopicProps
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: TopicProps -> TopicProps -> Bool
$c/= :: TopicProps -> TopicProps -> Bool
== :: TopicProps -> TopicProps -> Bool
$c== :: TopicProps -> TopicProps -> Bool
Eq)
newtype Kafka = Kafka RdKafkaTPtr deriving Int -> Kafka -> ShowS
[Kafka] -> ShowS
Kafka -> String
(Int -> Kafka -> ShowS)
-> (Kafka -> String) -> ([Kafka] -> ShowS) -> Show Kafka
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Kafka] -> ShowS
$cshowList :: [Kafka] -> ShowS
show :: Kafka -> String
$cshow :: Kafka -> String
showsPrec :: Int -> Kafka -> ShowS
$cshowsPrec :: Int -> Kafka -> ShowS
Show
newtype TopicConf = TopicConf RdKafkaTopicConfTPtr deriving Int -> TopicConf -> ShowS
[TopicConf] -> ShowS
TopicConf -> String
(Int -> TopicConf -> ShowS)
-> (TopicConf -> String)
-> ([TopicConf] -> ShowS)
-> Show TopicConf
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [TopicConf] -> ShowS
$cshowList :: [TopicConf] -> ShowS
show :: TopicConf -> String
$cshow :: TopicConf -> String
showsPrec :: Int -> TopicConf -> ShowS
$cshowsPrec :: Int -> TopicConf -> ShowS
Show
newtype Callback = Callback (KafkaConf -> IO ())
data CallbackPollStatus = CallbackPollEnabled | CallbackPollDisabled deriving (Int -> CallbackPollStatus -> ShowS
[CallbackPollStatus] -> ShowS
CallbackPollStatus -> String
(Int -> CallbackPollStatus -> ShowS)
-> (CallbackPollStatus -> String)
-> ([CallbackPollStatus] -> ShowS)
-> Show CallbackPollStatus
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [CallbackPollStatus] -> ShowS
$cshowList :: [CallbackPollStatus] -> ShowS
show :: CallbackPollStatus -> String
$cshow :: CallbackPollStatus -> String
showsPrec :: Int -> CallbackPollStatus -> ShowS
$cshowsPrec :: Int -> CallbackPollStatus -> ShowS
Show, CallbackPollStatus -> CallbackPollStatus -> Bool
(CallbackPollStatus -> CallbackPollStatus -> Bool)
-> (CallbackPollStatus -> CallbackPollStatus -> Bool)
-> Eq CallbackPollStatus
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: CallbackPollStatus -> CallbackPollStatus -> Bool
$c/= :: CallbackPollStatus -> CallbackPollStatus -> Bool
== :: CallbackPollStatus -> CallbackPollStatus -> Bool
$c== :: CallbackPollStatus -> CallbackPollStatus -> Bool
Eq)
data KafkaConf = KafkaConf
{ KafkaConf -> RdKafkaConfTPtr
kcfgKafkaConfPtr :: RdKafkaConfTPtr
, KafkaConf -> IORef (Maybe RdKafkaQueueTPtr)
kcfgMessagesQueue :: IORef (Maybe RdKafkaQueueTPtr)
, KafkaConf -> MVar CallbackPollStatus
kcfgCallbackPollStatus :: MVar CallbackPollStatus
}
class HasKafka a where
getKafka :: a -> Kafka
class HasKafkaConf a where
getKafkaConf :: a -> KafkaConf
class HasTopicConf a where
getTopicConf :: a -> TopicConf
instance HasKafkaConf KafkaConf where
getKafkaConf :: KafkaConf -> KafkaConf
getKafkaConf = KafkaConf -> KafkaConf
forall a. a -> a
id
{-# INLINE getKafkaConf #-}
instance HasKafka Kafka where
getKafka :: Kafka -> Kafka
getKafka = Kafka -> Kafka
forall a. a -> a
id
{-# INLINE getKafka #-}
instance HasTopicConf TopicConf where
getTopicConf :: TopicConf -> TopicConf
getTopicConf = TopicConf -> TopicConf
forall a. a -> a
id
{-# INLINE getTopicConf #-}
getRdKafka :: HasKafka k => k -> RdKafkaTPtr
getRdKafka :: k -> RdKafkaTPtr
getRdKafka k :: k
k = let (Kafka k' :: RdKafkaTPtr
k') = k -> Kafka
forall a. HasKafka a => a -> Kafka
getKafka k
k in RdKafkaTPtr
k'
{-# INLINE getRdKafka #-}
getRdKafkaConf :: HasKafkaConf k => k -> RdKafkaConfTPtr
getRdKafkaConf :: k -> RdKafkaConfTPtr
getRdKafkaConf k :: k
k = let (KafkaConf k' :: RdKafkaConfTPtr
k' _ _) = k -> KafkaConf
forall a. HasKafkaConf a => a -> KafkaConf
getKafkaConf k
k in RdKafkaConfTPtr
k'
{-# INLINE getRdKafkaConf #-}
getRdMsgQueue :: HasKafkaConf k => k -> IO (Maybe RdKafkaQueueTPtr)
getRdMsgQueue :: k -> IO (Maybe RdKafkaQueueTPtr)
getRdMsgQueue k :: k
k =
let (KafkaConf _ rq :: IORef (Maybe RdKafkaQueueTPtr)
rq _) = k -> KafkaConf
forall a. HasKafkaConf a => a -> KafkaConf
getKafkaConf k
k
in IORef (Maybe RdKafkaQueueTPtr) -> IO (Maybe RdKafkaQueueTPtr)
forall a. IORef a -> IO a
readIORef IORef (Maybe RdKafkaQueueTPtr)
rq
getRdTopicConf :: HasTopicConf t => t -> RdKafkaTopicConfTPtr
getRdTopicConf :: t -> RdKafkaTopicConfTPtr
getRdTopicConf t :: t
t = let (TopicConf t' :: RdKafkaTopicConfTPtr
t') = t -> TopicConf
forall a. HasTopicConf a => a -> TopicConf
getTopicConf t
t in RdKafkaTopicConfTPtr
t'
{-# INLINE getRdTopicConf #-}
newTopicConf :: IO TopicConf
newTopicConf :: IO TopicConf
newTopicConf = RdKafkaTopicConfTPtr -> TopicConf
TopicConf (RdKafkaTopicConfTPtr -> TopicConf)
-> IO RdKafkaTopicConfTPtr -> IO TopicConf
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO RdKafkaTopicConfTPtr
newRdKafkaTopicConfT
newKafkaConf :: IO KafkaConf
newKafkaConf :: IO KafkaConf
newKafkaConf = RdKafkaConfTPtr
-> IORef (Maybe RdKafkaQueueTPtr)
-> MVar CallbackPollStatus
-> KafkaConf
KafkaConf (RdKafkaConfTPtr
-> IORef (Maybe RdKafkaQueueTPtr)
-> MVar CallbackPollStatus
-> KafkaConf)
-> IO RdKafkaConfTPtr
-> IO
(IORef (Maybe RdKafkaQueueTPtr)
-> MVar CallbackPollStatus -> KafkaConf)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO RdKafkaConfTPtr
newRdKafkaConfT IO
(IORef (Maybe RdKafkaQueueTPtr)
-> MVar CallbackPollStatus -> KafkaConf)
-> IO (IORef (Maybe RdKafkaQueueTPtr))
-> IO (MVar CallbackPollStatus -> KafkaConf)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe RdKafkaQueueTPtr -> IO (IORef (Maybe RdKafkaQueueTPtr))
forall a. a -> IO (IORef a)
newIORef Maybe RdKafkaQueueTPtr
forall a. Maybe a
Nothing IO (MVar CallbackPollStatus -> KafkaConf)
-> IO (MVar CallbackPollStatus) -> IO KafkaConf
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> CallbackPollStatus -> IO (MVar CallbackPollStatus)
forall a. a -> IO (MVar a)
newMVar CallbackPollStatus
CallbackPollEnabled
kafkaConf :: KafkaProps -> IO KafkaConf
kafkaConf :: KafkaProps -> IO KafkaConf
kafkaConf overrides :: KafkaProps
overrides = do
KafkaConf
conf <- IO KafkaConf
newKafkaConf
KafkaConf -> KafkaProps -> IO ()
setAllKafkaConfValues KafkaConf
conf KafkaProps
overrides
KafkaConf -> IO KafkaConf
forall (m :: * -> *) a. Monad m => a -> m a
return KafkaConf
conf
topicConf :: TopicProps -> IO TopicConf
topicConf :: TopicProps -> IO TopicConf
topicConf overrides :: TopicProps
overrides = do
TopicConf
conf <- IO TopicConf
newTopicConf
TopicConf -> TopicProps -> IO ()
setAllTopicConfValues TopicConf
conf TopicProps
overrides
TopicConf -> IO TopicConf
forall (m :: * -> *) a. Monad m => a -> m a
return TopicConf
conf
checkConfSetValue :: RdKafkaConfResT -> CCharBufPointer -> IO ()
checkConfSetValue :: RdKafkaConfResT -> CCharBufPointer -> IO ()
checkConfSetValue err :: RdKafkaConfResT
err charPtr :: CCharBufPointer
charPtr = case RdKafkaConfResT
err of
RdKafkaConfOk -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
RdKafkaConfInvalid -> do
String
str <- CCharBufPointer -> IO String
peekCString CCharBufPointer
charPtr
KafkaError -> IO ()
forall a e. Exception e => e -> a
throw (KafkaError -> IO ()) -> KafkaError -> IO ()
forall a b. (a -> b) -> a -> b
$ Text -> KafkaError
KafkaInvalidConfigurationValue (String -> Text
Text.pack String
str)
RdKafkaConfUnknown -> do
String
str <- CCharBufPointer -> IO String
peekCString CCharBufPointer
charPtr
KafkaError -> IO ()
forall a e. Exception e => e -> a
throw (KafkaError -> IO ()) -> KafkaError -> IO ()
forall a b. (a -> b) -> a -> b
$ Text -> KafkaError
KafkaUnknownConfigurationKey (String -> Text
Text.pack String
str)
setKafkaConfValue :: KafkaConf -> Text -> Text -> IO ()
setKafkaConfValue :: KafkaConf -> Text -> Text -> IO ()
setKafkaConfValue (KafkaConf confPtr :: RdKafkaConfTPtr
confPtr _ _) key :: Text
key value :: Text
value =
Int -> (CCharBufPointer -> IO ()) -> IO ()
forall a b. Int -> (Ptr a -> IO b) -> IO b
allocaBytes Int
nErrorBytes ((CCharBufPointer -> IO ()) -> IO ())
-> (CCharBufPointer -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \charPtr :: CCharBufPointer
charPtr -> do
RdKafkaConfResT
err <- RdKafkaConfTPtr
-> String
-> String
-> CCharBufPointer
-> CSize
-> IO RdKafkaConfResT
rdKafkaConfSet RdKafkaConfTPtr
confPtr (Text -> String
Text.unpack Text
key) (Text -> String
Text.unpack Text
value) CCharBufPointer
charPtr (Int -> CSize
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
nErrorBytes)
RdKafkaConfResT -> CCharBufPointer -> IO ()
checkConfSetValue RdKafkaConfResT
err CCharBufPointer
charPtr
setAllKafkaConfValues :: KafkaConf -> KafkaProps -> IO ()
setAllKafkaConfValues :: KafkaConf -> KafkaProps -> IO ()
setAllKafkaConfValues conf :: KafkaConf
conf (KafkaProps props :: Map Text Text
props) = (Text -> Text -> IO ()) -> Map Text Text -> IO ()
forall m k a. Monoid m => (k -> a -> m) -> Map k a -> m
Map.foldMapWithKey (KafkaConf -> Text -> Text -> IO ()
setKafkaConfValue KafkaConf
conf) Map Text Text
props
setTopicConfValue :: TopicConf -> Text -> Text -> IO ()
setTopicConfValue :: TopicConf -> Text -> Text -> IO ()
setTopicConfValue (TopicConf confPtr :: RdKafkaTopicConfTPtr
confPtr) key :: Text
key value :: Text
value =
Int -> (CCharBufPointer -> IO ()) -> IO ()
forall a b. Int -> (Ptr a -> IO b) -> IO b
allocaBytes Int
nErrorBytes ((CCharBufPointer -> IO ()) -> IO ())
-> (CCharBufPointer -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \charPtr :: CCharBufPointer
charPtr -> do
RdKafkaConfResT
err <- RdKafkaTopicConfTPtr
-> String
-> String
-> CCharBufPointer
-> CSize
-> IO RdKafkaConfResT
rdKafkaTopicConfSet RdKafkaTopicConfTPtr
confPtr (Text -> String
Text.unpack Text
key) (Text -> String
Text.unpack Text
value) CCharBufPointer
charPtr (Int -> CSize
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
nErrorBytes)
RdKafkaConfResT -> CCharBufPointer -> IO ()
checkConfSetValue RdKafkaConfResT
err CCharBufPointer
charPtr
setAllTopicConfValues :: TopicConf -> TopicProps -> IO ()
setAllTopicConfValues :: TopicConf -> TopicProps -> IO ()
setAllTopicConfValues conf :: TopicConf
conf (TopicProps props :: Map Text Text
props) = (Text -> Text -> IO ()) -> Map Text Text -> IO ()
forall m k a. Monoid m => (k -> a -> m) -> Map k a -> m
Map.foldMapWithKey (TopicConf -> Text -> Text -> IO ()
setTopicConfValue TopicConf
conf) Map Text Text
props