{-# LANGUAGE TupleSections #-}
{-# LANGUAGE LambdaCase #-}
module Kafka.Producer
( KafkaProducer
, module X
, runProducer
, newProducer
, produceMessage
, produceMessage'
, flushProducer
, closeProducer
, RdKafkaRespErrT (..)
)
where
import Control.Exception (bracket)
import Control.Monad (forM_)
import Control.Monad.IO.Class (MonadIO (liftIO))
import qualified Data.ByteString as BS
import qualified Data.ByteString.Internal as BSI
import qualified Data.Text as Text
import Foreign.C.String (withCString)
import Foreign.ForeignPtr (withForeignPtr)
import Foreign.Marshal.Utils (withMany)
import Foreign.Ptr (Ptr, nullPtr, plusPtr)
import Foreign.StablePtr (newStablePtr, castStablePtrToPtr)
import Kafka.Internal.RdKafka (RdKafkaRespErrT (..), RdKafkaTypeT (..), RdKafkaVuT(..), newRdKafkaT, rdKafkaErrorCode, rdKafkaErrorDestroy, rdKafkaOutqLen, rdKafkaMessageProduceVa, rdKafkaSetLogLevel)
import Kafka.Internal.Setup (Kafka (..), KafkaConf (..), KafkaProps (..), TopicProps (..), kafkaConf, topicConf, Callback(..))
import Kafka.Internal.Shared (pollEvents)
import Kafka.Producer.Convert (copyMsgFlags, handleProduceErrT, producePartitionCInt)
import Kafka.Producer.Types (KafkaProducer (..))
import Kafka.Producer.ProducerProperties as X
import Kafka.Producer.Types as X hiding (KafkaProducer)
import Kafka.Types as X
{-# DEPRECATED runProducer "Use 'newProducer'/'closeProducer' instead" #-}
runProducer :: ProducerProperties
-> (KafkaProducer -> IO (Either KafkaError a))
-> IO (Either KafkaError a)
runProducer :: ProducerProperties
-> (KafkaProducer -> IO (Either KafkaError a))
-> IO (Either KafkaError a)
runProducer props :: ProducerProperties
props f :: KafkaProducer -> IO (Either KafkaError a)
f =
IO (Either KafkaError KafkaProducer)
-> (Either KafkaError KafkaProducer -> IO ())
-> (Either KafkaError KafkaProducer -> IO (Either KafkaError a))
-> IO (Either KafkaError a)
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO (Either KafkaError KafkaProducer)
mkProducer Either KafkaError KafkaProducer -> IO ()
forall (m :: * -> *) a. MonadIO m => Either a KafkaProducer -> m ()
clProducer Either KafkaError KafkaProducer -> IO (Either KafkaError a)
runHandler
where
mkProducer :: IO (Either KafkaError KafkaProducer)
mkProducer = ProducerProperties -> IO (Either KafkaError KafkaProducer)
forall (m :: * -> *).
MonadIO m =>
ProducerProperties -> m (Either KafkaError KafkaProducer)
newProducer ProducerProperties
props
clProducer :: Either a KafkaProducer -> m ()
clProducer (Left _) = () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
clProducer (Right prod :: KafkaProducer
prod) = KafkaProducer -> m ()
forall (m :: * -> *). MonadIO m => KafkaProducer -> m ()
closeProducer KafkaProducer
prod
runHandler :: Either KafkaError KafkaProducer -> IO (Either KafkaError a)
runHandler (Left err :: KafkaError
err) = Either KafkaError a -> IO (Either KafkaError a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either KafkaError a -> IO (Either KafkaError a))
-> Either KafkaError a -> IO (Either KafkaError a)
forall a b. (a -> b) -> a -> b
$ KafkaError -> Either KafkaError a
forall a b. a -> Either a b
Left KafkaError
err
runHandler (Right prod :: KafkaProducer
prod) = KafkaProducer -> IO (Either KafkaError a)
f KafkaProducer
prod
newProducer :: MonadIO m => ProducerProperties -> m (Either KafkaError KafkaProducer)
newProducer :: ProducerProperties -> m (Either KafkaError KafkaProducer)
newProducer pps :: ProducerProperties
pps = IO (Either KafkaError KafkaProducer)
-> m (Either KafkaError KafkaProducer)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either KafkaError KafkaProducer)
-> m (Either KafkaError KafkaProducer))
-> IO (Either KafkaError KafkaProducer)
-> m (Either KafkaError KafkaProducer)
forall a b. (a -> b) -> a -> b
$ do
kc :: KafkaConf
kc@(KafkaConf kc' :: RdKafkaConfTPtr
kc' _ _) <- KafkaProps -> IO KafkaConf
kafkaConf (Map Text Text -> KafkaProps
KafkaProps (Map Text Text -> KafkaProps) -> Map Text Text -> KafkaProps
forall a b. (a -> b) -> a -> b
$ (ProducerProperties -> Map Text Text
ppKafkaProps ProducerProperties
pps))
TopicConf
tc <- TopicProps -> IO TopicConf
topicConf (Map Text Text -> TopicProps
TopicProps (Map Text Text -> TopicProps) -> Map Text Text -> TopicProps
forall a b. (a -> b) -> a -> b
$ (ProducerProperties -> Map Text Text
ppTopicProps ProducerProperties
pps))
let Callback setDeliveryCallback :: KafkaConf -> IO ()
setDeliveryCallback = (DeliveryReport -> IO ()) -> Callback
deliveryCallback (IO () -> DeliveryReport -> IO ()
forall a b. a -> b -> a
const IO ()
forall a. Monoid a => a
mempty)
KafkaConf -> IO ()
setDeliveryCallback KafkaConf
kc
[Callback] -> (Callback -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (ProducerProperties -> [Callback]
ppCallbacks ProducerProperties
pps) (\(Callback setCb :: KafkaConf -> IO ()
setCb) -> KafkaConf -> IO ()
setCb KafkaConf
kc)
Either Text RdKafkaTPtr
mbKafka <- RdKafkaTypeT -> RdKafkaConfTPtr -> IO (Either Text RdKafkaTPtr)
newRdKafkaT RdKafkaTypeT
RdKafkaProducer RdKafkaConfTPtr
kc'
case Either Text RdKafkaTPtr
mbKafka of
Left err :: Text
err -> Either KafkaError KafkaProducer
-> IO (Either KafkaError KafkaProducer)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either KafkaError KafkaProducer
-> IO (Either KafkaError KafkaProducer))
-> (KafkaError -> Either KafkaError KafkaProducer)
-> KafkaError
-> IO (Either KafkaError KafkaProducer)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. KafkaError -> Either KafkaError KafkaProducer
forall a b. a -> Either a b
Left (KafkaError -> IO (Either KafkaError KafkaProducer))
-> KafkaError -> IO (Either KafkaError KafkaProducer)
forall a b. (a -> b) -> a -> b
$ Text -> KafkaError
KafkaError Text
err
Right kafka :: RdKafkaTPtr
kafka -> do
Maybe KafkaLogLevel -> (KafkaLogLevel -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (ProducerProperties -> Maybe KafkaLogLevel
ppLogLevel ProducerProperties
pps) (RdKafkaTPtr -> Int -> IO ()
rdKafkaSetLogLevel RdKafkaTPtr
kafka (Int -> IO ()) -> (KafkaLogLevel -> Int) -> KafkaLogLevel -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. KafkaLogLevel -> Int
forall a. Enum a => a -> Int
fromEnum)
let prod :: KafkaProducer
prod = Kafka -> KafkaConf -> TopicConf -> KafkaProducer
KafkaProducer (RdKafkaTPtr -> Kafka
Kafka RdKafkaTPtr
kafka) KafkaConf
kc TopicConf
tc
Either KafkaError KafkaProducer
-> IO (Either KafkaError KafkaProducer)
forall (m :: * -> *) a. Monad m => a -> m a
return (KafkaProducer -> Either KafkaError KafkaProducer
forall a b. b -> Either a b
Right KafkaProducer
prod)
produceMessage :: MonadIO m
=> KafkaProducer
-> ProducerRecord
-> m (Maybe KafkaError)
produceMessage :: KafkaProducer -> ProducerRecord -> m (Maybe KafkaError)
produceMessage kp :: KafkaProducer
kp m :: ProducerRecord
m = KafkaProducer
-> ProducerRecord
-> (DeliveryReport -> IO ())
-> m (Either ImmediateError ())
forall (m :: * -> *).
MonadIO m =>
KafkaProducer
-> ProducerRecord
-> (DeliveryReport -> IO ())
-> m (Either ImmediateError ())
produceMessage' KafkaProducer
kp ProducerRecord
m (() -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> IO ()) -> (DeliveryReport -> ()) -> DeliveryReport -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DeliveryReport -> ()
forall a. Monoid a => a
mempty) m (Either ImmediateError ())
-> (Either ImmediateError () -> m (Maybe KafkaError))
-> m (Maybe KafkaError)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Either ImmediateError () -> m (Maybe KafkaError)
adjustRes
where
adjustRes :: Either ImmediateError () -> m (Maybe KafkaError)
adjustRes = \case
Right () -> Maybe KafkaError -> m (Maybe KafkaError)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe KafkaError
forall a. Maybe a
Nothing
Left (ImmediateError err :: KafkaError
err) -> Maybe KafkaError -> m (Maybe KafkaError)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (KafkaError -> Maybe KafkaError
forall a. a -> Maybe a
Just KafkaError
err)
produceMessage' :: MonadIO m
=> KafkaProducer
-> ProducerRecord
-> (DeliveryReport -> IO ())
-> m (Either ImmediateError ())
produceMessage' :: KafkaProducer
-> ProducerRecord
-> (DeliveryReport -> IO ())
-> m (Either ImmediateError ())
produceMessage' kp :: KafkaProducer
kp@(KafkaProducer (Kafka k :: RdKafkaTPtr
k) _ _) msg :: ProducerRecord
msg cb :: DeliveryReport -> IO ()
cb = IO (Either ImmediateError ()) -> m (Either ImmediateError ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either ImmediateError ()) -> m (Either ImmediateError ()))
-> IO (Either ImmediateError ()) -> m (Either ImmediateError ())
forall a b. (a -> b) -> a -> b
$
IO ()
fireCallbacks IO ()
-> IO (Either ImmediateError ()) -> IO (Either ImmediateError ())
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO (Either ImmediateError ())
produceIt
where
fireCallbacks :: IO ()
fireCallbacks =
KafkaProducer -> Maybe Timeout -> IO ()
forall a. HasKafka a => a -> Maybe Timeout -> IO ()
pollEvents KafkaProducer
kp (Maybe Timeout -> IO ()) -> (Int -> Maybe Timeout) -> Int -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Timeout -> Maybe Timeout
forall a. a -> Maybe a
Just (Timeout -> Maybe Timeout)
-> (Int -> Timeout) -> Int -> Maybe Timeout
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Timeout
Timeout (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ 0
produceIt :: IO (Either ImmediateError ())
produceIt =
Maybe ByteString
-> (Ptr Word8 -> Int -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ())
forall a b. Maybe ByteString -> (Ptr a -> Int -> IO b) -> IO b
withBS (ProducerRecord -> Maybe ByteString
prValue ProducerRecord
msg) ((Ptr Word8 -> Int -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ()))
-> (Ptr Word8 -> Int -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ())
forall a b. (a -> b) -> a -> b
$ \payloadPtr :: Ptr Word8
payloadPtr payloadLength :: Int
payloadLength ->
Maybe ByteString
-> (Ptr Word8 -> Int -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ())
forall a b. Maybe ByteString -> (Ptr a -> Int -> IO b) -> IO b
withBS (ProducerRecord -> Maybe ByteString
prKey ProducerRecord
msg) ((Ptr Word8 -> Int -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ()))
-> (Ptr Word8 -> Int -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ())
forall a b. (a -> b) -> a -> b
$ \keyPtr :: Ptr Word8
keyPtr keyLength :: Int
keyLength ->
Headers
-> ([RdKafkaVuT] -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ())
forall a. Headers -> ([RdKafkaVuT] -> IO a) -> IO a
withHeaders (ProducerRecord -> Headers
prHeaders ProducerRecord
msg) (([RdKafkaVuT] -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ()))
-> ([RdKafkaVuT] -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ())
forall a b. (a -> b) -> a -> b
$ \hdrs :: [RdKafkaVuT]
hdrs ->
String
-> (CString -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ())
forall a. String -> (CString -> IO a) -> IO a
withCString (Text -> String
Text.unpack (Text -> String)
-> (ProducerRecord -> Text) -> ProducerRecord -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TopicName -> Text
unTopicName (TopicName -> Text)
-> (ProducerRecord -> TopicName) -> ProducerRecord -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProducerRecord -> TopicName
prTopic (ProducerRecord -> String) -> ProducerRecord -> String
forall a b. (a -> b) -> a -> b
$ ProducerRecord
msg) ((CString -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ()))
-> (CString -> IO (Either ImmediateError ()))
-> IO (Either ImmediateError ())
forall a b. (a -> b) -> a -> b
$ \topicName :: CString
topicName -> do
StablePtr (DeliveryReport -> IO ())
callbackPtr <- (DeliveryReport -> IO ())
-> IO (StablePtr (DeliveryReport -> IO ()))
forall a. a -> IO (StablePtr a)
newStablePtr DeliveryReport -> IO ()
cb
let opts :: [RdKafkaVuT]
opts = [
CString -> RdKafkaVuT
Topic'RdKafkaVu CString
topicName
, CInt32T -> RdKafkaVuT
Partition'RdKafkaVu (CInt32T -> RdKafkaVuT)
-> (ProducerRecord -> CInt32T) -> ProducerRecord -> RdKafkaVuT
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProducePartition -> CInt32T
producePartitionCInt (ProducePartition -> CInt32T)
-> (ProducerRecord -> ProducePartition)
-> ProducerRecord
-> CInt32T
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProducerRecord -> ProducePartition
prPartition (ProducerRecord -> RdKafkaVuT) -> ProducerRecord -> RdKafkaVuT
forall a b. (a -> b) -> a -> b
$ ProducerRecord
msg
, CInt32T -> RdKafkaVuT
MsgFlags'RdKafkaVu (Int -> CInt32T
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
copyMsgFlags)
, Ptr Word8 -> CSize -> RdKafkaVuT
Value'RdKafkaVu Ptr Word8
payloadPtr (Int -> CSize
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
payloadLength)
, Ptr Word8 -> CSize -> RdKafkaVuT
Key'RdKafkaVu Ptr Word8
keyPtr (Int -> CSize
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
keyLength)
, Ptr () -> RdKafkaVuT
Opaque'RdKafkaVu (StablePtr (DeliveryReport -> IO ()) -> Ptr ()
forall a. StablePtr a -> Ptr ()
castStablePtrToPtr StablePtr (DeliveryReport -> IO ())
callbackPtr)
]
RdKafkaRespErrT
code <- IO RdKafkaErrorTPtr
-> (RdKafkaErrorTPtr -> IO ())
-> (RdKafkaErrorTPtr -> IO RdKafkaRespErrT)
-> IO RdKafkaRespErrT
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (RdKafkaTPtr -> [RdKafkaVuT] -> IO RdKafkaErrorTPtr
rdKafkaMessageProduceVa RdKafkaTPtr
k ([RdKafkaVuT]
hdrs [RdKafkaVuT] -> [RdKafkaVuT] -> [RdKafkaVuT]
forall a. [a] -> [a] -> [a]
++ [RdKafkaVuT]
opts)) RdKafkaErrorTPtr -> IO ()
rdKafkaErrorDestroy RdKafkaErrorTPtr -> IO RdKafkaRespErrT
rdKafkaErrorCode
Maybe KafkaError
res <- RdKafkaRespErrT -> IO (Maybe KafkaError)
handleProduceErrT RdKafkaRespErrT
code
Either ImmediateError () -> IO (Either ImmediateError ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either ImmediateError () -> IO (Either ImmediateError ()))
-> Either ImmediateError () -> IO (Either ImmediateError ())
forall a b. (a -> b) -> a -> b
$ case Maybe KafkaError
res of
Just err :: KafkaError
err -> ImmediateError -> Either ImmediateError ()
forall a b. a -> Either a b
Left (ImmediateError -> Either ImmediateError ())
-> (KafkaError -> ImmediateError)
-> KafkaError
-> Either ImmediateError ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. KafkaError -> ImmediateError
ImmediateError (KafkaError -> Either ImmediateError ())
-> KafkaError -> Either ImmediateError ()
forall a b. (a -> b) -> a -> b
$ KafkaError
err
Nothing -> () -> Either ImmediateError ()
forall a b. b -> Either a b
Right ()
closeProducer :: MonadIO m => KafkaProducer -> m ()
closeProducer :: KafkaProducer -> m ()
closeProducer = KafkaProducer -> m ()
forall (m :: * -> *). MonadIO m => KafkaProducer -> m ()
flushProducer
flushProducer :: MonadIO m => KafkaProducer -> m ()
flushProducer :: KafkaProducer -> m ()
flushProducer kp :: KafkaProducer
kp = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
KafkaProducer -> Maybe Timeout -> IO ()
forall a. HasKafka a => a -> Maybe Timeout -> IO ()
pollEvents KafkaProducer
kp (Timeout -> Maybe Timeout
forall a. a -> Maybe a
Just (Timeout -> Maybe Timeout) -> Timeout -> Maybe Timeout
forall a b. (a -> b) -> a -> b
$ Int -> Timeout
Timeout 100)
Int
l <- Kafka -> IO Int
outboundQueueLength (KafkaProducer -> Kafka
kpKafkaPtr KafkaProducer
kp)
if (Int
l Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== 0)
then KafkaProducer -> Maybe Timeout -> IO ()
forall a. HasKafka a => a -> Maybe Timeout -> IO ()
pollEvents KafkaProducer
kp (Timeout -> Maybe Timeout
forall a. a -> Maybe a
Just (Timeout -> Maybe Timeout) -> Timeout -> Maybe Timeout
forall a b. (a -> b) -> a -> b
$ Int -> Timeout
Timeout 0)
else KafkaProducer -> IO ()
forall (m :: * -> *). MonadIO m => KafkaProducer -> m ()
flushProducer KafkaProducer
kp
withHeaders :: Headers -> ([RdKafkaVuT] -> IO a) -> IO a
hds :: Headers
hds = ((ByteString, ByteString) -> (RdKafkaVuT -> IO a) -> IO a)
-> [(ByteString, ByteString)] -> ([RdKafkaVuT] -> IO a) -> IO a
forall a b res.
(a -> (b -> res) -> res) -> [a] -> ([b] -> res) -> res
withMany (ByteString, ByteString) -> (RdKafkaVuT -> IO a) -> IO a
forall a. (ByteString, ByteString) -> (RdKafkaVuT -> IO a) -> IO a
allocHeader (Headers -> [(ByteString, ByteString)]
headersToList Headers
hds)
where
allocHeader :: (ByteString, ByteString) -> (RdKafkaVuT -> IO a) -> IO a
allocHeader (nm :: ByteString
nm, val :: ByteString
val) f :: RdKafkaVuT -> IO a
f =
ByteString -> (CString -> IO a) -> IO a
forall a. ByteString -> (CString -> IO a) -> IO a
BS.useAsCString ByteString
nm ((CString -> IO a) -> IO a) -> (CString -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \cnm :: CString
cnm ->
Maybe ByteString -> (Ptr Word8 -> Int -> IO a) -> IO a
forall a b. Maybe ByteString -> (Ptr a -> Int -> IO b) -> IO b
withBS (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
val) ((Ptr Word8 -> Int -> IO a) -> IO a)
-> (Ptr Word8 -> Int -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \vp :: Ptr Word8
vp vl :: Int
vl ->
RdKafkaVuT -> IO a
f (RdKafkaVuT -> IO a) -> RdKafkaVuT -> IO a
forall a b. (a -> b) -> a -> b
$ CString -> Ptr Word8 -> CSize -> RdKafkaVuT
Header'RdKafkaVu CString
cnm Ptr Word8
vp (Int -> CSize
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
vl)
withBS :: Maybe BS.ByteString -> (Ptr a -> Int -> IO b) -> IO b
withBS :: Maybe ByteString -> (Ptr a -> Int -> IO b) -> IO b
withBS Nothing f :: Ptr a -> Int -> IO b
f = Ptr a -> Int -> IO b
f Ptr a
forall a. Ptr a
nullPtr 0
withBS (Just bs :: ByteString
bs) f :: Ptr a -> Int -> IO b
f =
let (d :: ForeignPtr Word8
d, o :: Int
o, l :: Int
l) = ByteString -> (ForeignPtr Word8, Int, Int)
BSI.toForeignPtr ByteString
bs
in ForeignPtr Word8 -> (Ptr Word8 -> IO b) -> IO b
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr Word8
d ((Ptr Word8 -> IO b) -> IO b) -> (Ptr Word8 -> IO b) -> IO b
forall a b. (a -> b) -> a -> b
$ \p :: Ptr Word8
p -> Ptr a -> Int -> IO b
f (Ptr Word8
p Ptr Word8 -> Int -> Ptr a
forall a b. Ptr a -> Int -> Ptr b
`plusPtr` Int
o) Int
l
outboundQueueLength :: Kafka -> IO Int
outboundQueueLength :: Kafka -> IO Int
outboundQueueLength (Kafka k :: RdKafkaTPtr
k) = RdKafkaTPtr -> IO Int
rdKafkaOutqLen RdKafkaTPtr
k