{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TupleSections #-}
module Kafka.Consumer
( KafkaConsumer
, module X
, runConsumer
, newConsumer
, assign, assignment, subscription
, pausePartitions, resumePartitions
, committed, position, seek
, pollMessage, pollConsumerEvents
, pollMessageBatch
, commitOffsetMessage, commitAllOffsets, commitPartitionsOffsets
, storeOffsets, storeOffsetMessage
, closeConsumer
, RdKafkaRespErrT (..)
)
where
import Control.Arrow (left, (&&&))
import Control.Concurrent (forkIO, modifyMVar, rtsSupportsBoundThreads, withMVar)
import Control.Exception (bracket)
import Control.Monad (forM_, void, when)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Trans.Except (ExceptT (ExceptT), runExceptT)
import Data.Bifunctor (bimap, first)
import qualified Data.ByteString as BS
import Data.IORef (readIORef, writeIORef)
import qualified Data.Map as M
import Data.Maybe (fromMaybe)
import Data.Monoid ((<>))
import Data.Set (Set)
import qualified Data.Set as Set
import qualified Data.Text as Text
import Foreign hiding (void)
import Kafka.Consumer.Convert (fromMessagePtr, fromNativeTopicPartitionList'', offsetCommitToBool, offsetToInt64, toMap, toNativeTopicPartitionList, toNativeTopicPartitionList', toNativeTopicPartitionListNoDispose, topicPartitionFromMessageForCommit)
import Kafka.Consumer.Types (KafkaConsumer (..))
import Kafka.Internal.RdKafka (RdKafkaRespErrT (..), RdKafkaTopicPartitionListTPtr, RdKafkaTypeT (..), newRdKafkaT, newRdKafkaTopicPartitionListT, newRdKafkaTopicT, rdKafkaAssign, rdKafkaAssignment, rdKafkaCommit, rdKafkaCommitted, rdKafkaConfSetDefaultTopicConf, rdKafkaConsumeBatchQueue, rdKafkaConsumeQueue, rdKafkaConsumerClose, rdKafkaConsumerPoll, rdKafkaOffsetsStore, rdKafkaPausePartitions, rdKafkaPollSetConsumer, rdKafkaPosition, rdKafkaQueueDestroy, rdKafkaQueueNew, rdKafkaResumePartitions, rdKafkaSeek, rdKafkaSetLogLevel, rdKafkaSubscribe, rdKafkaSubscription, rdKafkaTopicConfDup, rdKafkaTopicPartitionListAdd)
import Kafka.Internal.Setup (CallbackPollStatus (..), Kafka (..), KafkaConf (..), KafkaProps (..), TopicConf (..), TopicProps (..), getKafkaConf, getRdKafka, kafkaConf, topicConf, Callback(..))
import Kafka.Internal.Shared (kafkaErrorToMaybe, maybeToLeft, rdKafkaErrorToEither)
import Kafka.Consumer.ConsumerProperties as X
import Kafka.Consumer.Subscription as X
import Kafka.Consumer.Types as X hiding (KafkaConsumer)
import Kafka.Types as X
{-# DEPRECATED runConsumer "Use 'newConsumer'/'closeConsumer' instead" #-}
runConsumer :: ConsumerProperties
-> Subscription
-> (KafkaConsumer -> IO (Either KafkaError a))
-> IO (Either KafkaError a)
runConsumer :: ConsumerProperties
-> Subscription
-> (KafkaConsumer -> IO (Either KafkaError a))
-> IO (Either KafkaError a)
runConsumer cp :: ConsumerProperties
cp sub :: Subscription
sub f :: KafkaConsumer -> IO (Either KafkaError a)
f =
IO (Either KafkaError KafkaConsumer)
-> (Either KafkaError KafkaConsumer -> IO (Either KafkaError ()))
-> (Either KafkaError KafkaConsumer -> IO (Either KafkaError a))
-> IO (Either KafkaError a)
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO (Either KafkaError KafkaConsumer)
mkConsumer Either KafkaError KafkaConsumer -> IO (Either KafkaError ())
forall (m :: * -> *).
MonadIO m =>
Either KafkaError KafkaConsumer -> m (Either KafkaError ())
clConsumer Either KafkaError KafkaConsumer -> IO (Either KafkaError a)
runHandler
where
mkConsumer :: IO (Either KafkaError KafkaConsumer)
mkConsumer = ConsumerProperties
-> Subscription -> IO (Either KafkaError KafkaConsumer)
forall (m :: * -> *).
MonadIO m =>
ConsumerProperties
-> Subscription -> m (Either KafkaError KafkaConsumer)
newConsumer ConsumerProperties
cp Subscription
sub
clConsumer :: Either KafkaError KafkaConsumer -> m (Either KafkaError ())
clConsumer (Left err :: KafkaError
err) = Either KafkaError () -> m (Either KafkaError ())
forall (m :: * -> *) a. Monad m => a -> m a
return (KafkaError -> Either KafkaError ()
forall a b. a -> Either a b
Left KafkaError
err)
clConsumer (Right kc :: KafkaConsumer
kc) = Maybe KafkaError -> Either KafkaError ()
forall a. Maybe a -> Either a ()
maybeToLeft (Maybe KafkaError -> Either KafkaError ())
-> m (Maybe KafkaError) -> m (Either KafkaError ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> KafkaConsumer -> m (Maybe KafkaError)
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer -> m (Maybe KafkaError)
closeConsumer KafkaConsumer
kc
runHandler :: Either KafkaError KafkaConsumer -> IO (Either KafkaError a)
runHandler (Left err :: KafkaError
err) = Either KafkaError a -> IO (Either KafkaError a)
forall (m :: * -> *) a. Monad m => a -> m a
return (KafkaError -> Either KafkaError a
forall a b. a -> Either a b
Left KafkaError
err)
runHandler (Right kc :: KafkaConsumer
kc) = KafkaConsumer -> IO (Either KafkaError a)
f KafkaConsumer
kc
newConsumer :: MonadIO m
=> ConsumerProperties
-> Subscription
-> m (Either KafkaError KafkaConsumer)
newConsumer :: ConsumerProperties
-> Subscription -> m (Either KafkaError KafkaConsumer)
newConsumer props :: ConsumerProperties
props (Subscription ts :: Set TopicName
ts tp :: Map Text Text
tp) = IO (Either KafkaError KafkaConsumer)
-> m (Either KafkaError KafkaConsumer)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either KafkaError KafkaConsumer)
-> m (Either KafkaError KafkaConsumer))
-> IO (Either KafkaError KafkaConsumer)
-> m (Either KafkaError KafkaConsumer)
forall a b. (a -> b) -> a -> b
$ do
let cp :: ConsumerProperties
cp = case ConsumerProperties -> CallbackPollMode
cpCallbackPollMode ConsumerProperties
props of
CallbackPollModeAsync -> Callback -> ConsumerProperties
setCallback ((KafkaConsumer -> RebalanceEvent -> IO ()) -> Callback
rebalanceCallback (\_ _ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())) ConsumerProperties -> ConsumerProperties -> ConsumerProperties
forall a. Semigroup a => a -> a -> a
<> ConsumerProperties
props
CallbackPollModeSync -> ConsumerProperties
props
kc :: KafkaConf
kc@(KafkaConf kc' :: RdKafkaConfTPtr
kc' qref :: IORef (Maybe RdKafkaQueueTPtr)
qref _) <- ConsumerProperties -> IO KafkaConf
newConsumerConf ConsumerProperties
cp
TopicConf
tp' <- TopicProps -> IO TopicConf
topicConf (Map Text Text -> TopicProps
TopicProps Map Text Text
tp)
()
_ <- KafkaConf -> TopicConf -> IO ()
setDefaultTopicConf KafkaConf
kc TopicConf
tp'
Either Text RdKafkaTPtr
rdk <- RdKafkaTypeT -> RdKafkaConfTPtr -> IO (Either Text RdKafkaTPtr)
newRdKafkaT RdKafkaTypeT
RdKafkaConsumer RdKafkaConfTPtr
kc'
case Either Text RdKafkaTPtr
rdk of
Left err :: Text
err -> Either KafkaError KafkaConsumer
-> IO (Either KafkaError KafkaConsumer)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either KafkaError KafkaConsumer
-> IO (Either KafkaError KafkaConsumer))
-> (KafkaError -> Either KafkaError KafkaConsumer)
-> KafkaError
-> IO (Either KafkaError KafkaConsumer)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. KafkaError -> Either KafkaError KafkaConsumer
forall a b. a -> Either a b
Left (KafkaError -> IO (Either KafkaError KafkaConsumer))
-> KafkaError -> IO (Either KafkaError KafkaConsumer)
forall a b. (a -> b) -> a -> b
$ Text -> KafkaError
KafkaError Text
err
Right rdk' :: RdKafkaTPtr
rdk' -> do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConsumerProperties -> CallbackPollMode
cpCallbackPollMode ConsumerProperties
props CallbackPollMode -> CallbackPollMode -> Bool
forall a. Eq a => a -> a -> Bool
== CallbackPollMode
CallbackPollModeAsync) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
RdKafkaQueueTPtr
msgq <- RdKafkaTPtr -> IO RdKafkaQueueTPtr
rdKafkaQueueNew RdKafkaTPtr
rdk'
IORef (Maybe RdKafkaQueueTPtr) -> Maybe RdKafkaQueueTPtr -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe RdKafkaQueueTPtr)
qref (RdKafkaQueueTPtr -> Maybe RdKafkaQueueTPtr
forall a. a -> Maybe a
Just RdKafkaQueueTPtr
msgq)
let kafka :: KafkaConsumer
kafka = Kafka -> KafkaConf -> KafkaConsumer
KafkaConsumer (RdKafkaTPtr -> Kafka
Kafka RdKafkaTPtr
rdk') KafkaConf
kc
Maybe KafkaError
redErr <- KafkaConsumer -> IO (Maybe KafkaError)
redirectCallbacksPoll KafkaConsumer
kafka
case Maybe KafkaError
redErr of
Just err :: KafkaError
err -> KafkaConsumer -> IO (Maybe KafkaError)
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer -> m (Maybe KafkaError)
closeConsumer KafkaConsumer
kafka IO (Maybe KafkaError)
-> IO (Either KafkaError KafkaConsumer)
-> IO (Either KafkaError KafkaConsumer)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Either KafkaError KafkaConsumer
-> IO (Either KafkaError KafkaConsumer)
forall (m :: * -> *) a. Monad m => a -> m a
return (KafkaError -> Either KafkaError KafkaConsumer
forall a b. a -> Either a b
Left KafkaError
err)
Nothing -> do
Maybe KafkaLogLevel -> (KafkaLogLevel -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (ConsumerProperties -> Maybe KafkaLogLevel
cpLogLevel ConsumerProperties
cp) (KafkaConsumer -> KafkaLogLevel -> IO ()
setConsumerLogLevel KafkaConsumer
kafka)
Maybe KafkaError
sub <- KafkaConsumer -> Set TopicName -> IO (Maybe KafkaError)
subscribe KafkaConsumer
kafka Set TopicName
ts
case Maybe KafkaError
sub of
Nothing -> (Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConsumerProperties -> CallbackPollMode
cpCallbackPollMode ConsumerProperties
props CallbackPollMode -> CallbackPollMode -> Bool
forall a. Eq a => a -> a -> Bool
== CallbackPollMode
CallbackPollModeAsync) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
KafkaConsumer -> Maybe Timeout -> IO ()
runConsumerLoop KafkaConsumer
kafka (Timeout -> Maybe Timeout
forall a. a -> Maybe a
Just (Timeout -> Maybe Timeout) -> Timeout -> Maybe Timeout
forall a b. (a -> b) -> a -> b
$ Int -> Timeout
Timeout 100)) IO ()
-> IO (Either KafkaError KafkaConsumer)
-> IO (Either KafkaError KafkaConsumer)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Either KafkaError KafkaConsumer
-> IO (Either KafkaError KafkaConsumer)
forall (m :: * -> *) a. Monad m => a -> m a
return (KafkaConsumer -> Either KafkaError KafkaConsumer
forall a b. b -> Either a b
Right KafkaConsumer
kafka)
Just err :: KafkaError
err -> KafkaConsumer -> IO (Maybe KafkaError)
forall (m :: * -> *).
MonadIO m =>
KafkaConsumer -> m (Maybe KafkaError)
closeConsumer KafkaConsumer
kafka IO (Maybe KafkaError)
-> IO (Either KafkaError KafkaConsumer)
-> IO (Either KafkaError KafkaConsumer)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Either KafkaError KafkaConsumer
-> IO (Either KafkaError KafkaConsumer)
forall (m :: * -> *) a. Monad m => a -> m a
return (KafkaError -> Either KafkaError KafkaConsumer
forall a b. a -> Either a b
Left KafkaError
err)
pollMessage :: MonadIO m
=> KafkaConsumer
-> Timeout
-> m (Either KafkaError (ConsumerRecord (Maybe BS.ByteString) (Maybe BS.ByteString)))
pollMessage :: KafkaConsumer
-> Timeout
-> m (Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
pollMessage c :: KafkaConsumer
c@(KafkaConsumer _ (KafkaConf _ qr :: IORef (Maybe RdKafkaQueueTPtr)
qr _)) (Timeout ms :: Int
ms) = IO
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
-> m (Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
-> m (Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))))
-> IO
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
-> m (Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall a b. (a -> b) -> a -> b
$ do
Maybe RdKafkaQueueTPtr
mbq <- IORef (Maybe RdKafkaQueueTPtr) -> IO (Maybe RdKafkaQueueTPtr)
forall a. IORef a -> IO a
readIORef IORef (Maybe RdKafkaQueueTPtr)
qr
case Maybe RdKafkaQueueTPtr
mbq of
Nothing -> RdKafkaTPtr -> Int -> IO RdKafkaMessageTPtr
rdKafkaConsumerPoll (KafkaConsumer -> RdKafkaTPtr
forall k. HasKafka k => k -> RdKafkaTPtr
getRdKafka KafkaConsumer
c) Int
ms IO RdKafkaMessageTPtr
-> (RdKafkaMessageTPtr
-> IO
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))))
-> IO
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= RdKafkaMessageTPtr
-> IO
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
fromMessagePtr
Just q :: RdKafkaQueueTPtr
q -> RdKafkaQueueTPtr -> Int -> IO RdKafkaMessageTPtr
rdKafkaConsumeQueue RdKafkaQueueTPtr
q (Int -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
ms) IO RdKafkaMessageTPtr
-> (RdKafkaMessageTPtr
-> IO
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))))
-> IO
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= RdKafkaMessageTPtr
-> IO
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
fromMessagePtr
pollMessageBatch :: MonadIO m
=> KafkaConsumer
-> Timeout
-> BatchSize
-> m [Either KafkaError (ConsumerRecord (Maybe BS.ByteString) (Maybe BS.ByteString))]
pollMessageBatch :: KafkaConsumer
-> Timeout
-> BatchSize
-> m [Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
pollMessageBatch c :: KafkaConsumer
c@(KafkaConsumer _ (KafkaConf _ qr :: IORef (Maybe RdKafkaQueueTPtr)
qr _)) (Timeout ms :: Int
ms) (BatchSize b :: Int
b) = IO
[Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
-> m [Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO
[Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
-> m [Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))])
-> IO
[Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
-> m [Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
forall a b. (a -> b) -> a -> b
$ do
KafkaConsumer -> Maybe Timeout -> IO ()
pollConsumerEvents KafkaConsumer
c Maybe Timeout
forall a. Maybe a
Nothing
Maybe RdKafkaQueueTPtr
mbq <- IORef (Maybe RdKafkaQueueTPtr) -> IO (Maybe RdKafkaQueueTPtr)
forall a. IORef a -> IO a
readIORef IORef (Maybe RdKafkaQueueTPtr)
qr
case Maybe RdKafkaQueueTPtr
mbq of
Nothing -> [Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
-> IO
[Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
forall (m :: * -> *) a. Monad m => a -> m a
return [KafkaError
-> Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
forall a b. a -> Either a b
Left (KafkaError
-> Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
-> KafkaError
-> Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
forall a b. (a -> b) -> a -> b
$ Text -> KafkaError
KafkaBadSpecification "Calling pollMessageBatch while CallbackPollMode is set to CallbackPollModeSync."]
Just q :: RdKafkaQueueTPtr
q -> KafkaConsumer
-> IO
[Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
-> IO
[Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
forall a. KafkaConsumer -> IO a -> IO a
whileNoCallbackRunning KafkaConsumer
c (IO
[Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
-> IO
[Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))])
-> IO
[Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
-> IO
[Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
forall a b. (a -> b) -> a -> b
$ RdKafkaQueueTPtr -> Int -> Int -> IO [RdKafkaMessageTPtr]
rdKafkaConsumeBatchQueue RdKafkaQueueTPtr
q Int
ms Int
b IO [RdKafkaMessageTPtr]
-> ([RdKafkaMessageTPtr]
-> IO
[Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))])
-> IO
[Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (RdKafkaMessageTPtr
-> IO
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))))
-> [RdKafkaMessageTPtr]
-> IO
[Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse RdKafkaMessageTPtr
-> IO
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
fromMessagePtr
commitOffsetMessage :: MonadIO m
=> OffsetCommit
-> KafkaConsumer
-> ConsumerRecord k v
-> m (Maybe KafkaError)
commitOffsetMessage :: OffsetCommit
-> KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError)
commitOffsetMessage o :: OffsetCommit
o k :: KafkaConsumer
k m :: ConsumerRecord k v
m =
IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe KafkaError) -> m (Maybe KafkaError))
-> IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall a b. (a -> b) -> a -> b
$ [TopicPartition] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionList [ConsumerRecord k v -> TopicPartition
forall k v. ConsumerRecord k v -> TopicPartition
topicPartitionFromMessageForCommit ConsumerRecord k v
m] IO RdKafkaTopicPartitionListTPtr
-> (RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError))
-> IO (Maybe KafkaError)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= OffsetCommit
-> KafkaConsumer
-> RdKafkaTopicPartitionListTPtr
-> IO (Maybe KafkaError)
commitOffsets OffsetCommit
o KafkaConsumer
k
storeOffsetMessage :: MonadIO m
=> KafkaConsumer
-> ConsumerRecord k v
-> m (Maybe KafkaError)
storeOffsetMessage :: KafkaConsumer -> ConsumerRecord k v -> m (Maybe KafkaError)
storeOffsetMessage k :: KafkaConsumer
k m :: ConsumerRecord k v
m =
IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe KafkaError) -> m (Maybe KafkaError))
-> IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall a b. (a -> b) -> a -> b
$ [TopicPartition] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionListNoDispose [ConsumerRecord k v -> TopicPartition
forall k v. ConsumerRecord k v -> TopicPartition
topicPartitionFromMessageForCommit ConsumerRecord k v
m] IO RdKafkaTopicPartitionListTPtr
-> (RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError))
-> IO (Maybe KafkaError)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= KafkaConsumer
-> RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError)
commitOffsetsStore KafkaConsumer
k
storeOffsets :: MonadIO m
=> KafkaConsumer
-> [TopicPartition]
-> m (Maybe KafkaError)
storeOffsets :: KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError)
storeOffsets k :: KafkaConsumer
k ps :: [TopicPartition]
ps =
IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe KafkaError) -> m (Maybe KafkaError))
-> IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall a b. (a -> b) -> a -> b
$ [TopicPartition] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionListNoDispose [TopicPartition]
ps IO RdKafkaTopicPartitionListTPtr
-> (RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError))
-> IO (Maybe KafkaError)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= KafkaConsumer
-> RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError)
commitOffsetsStore KafkaConsumer
k
commitAllOffsets :: MonadIO m
=> OffsetCommit
-> KafkaConsumer
-> m (Maybe KafkaError)
commitAllOffsets :: OffsetCommit -> KafkaConsumer -> m (Maybe KafkaError)
commitAllOffsets o :: OffsetCommit
o k :: KafkaConsumer
k =
IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe KafkaError) -> m (Maybe KafkaError))
-> IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall a b. (a -> b) -> a -> b
$ Ptr RdKafkaTopicPartitionListT -> IO RdKafkaTopicPartitionListTPtr
forall a. Ptr a -> IO (ForeignPtr a)
newForeignPtr_ Ptr RdKafkaTopicPartitionListT
forall a. Ptr a
nullPtr IO RdKafkaTopicPartitionListTPtr
-> (RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError))
-> IO (Maybe KafkaError)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= OffsetCommit
-> KafkaConsumer
-> RdKafkaTopicPartitionListTPtr
-> IO (Maybe KafkaError)
commitOffsets OffsetCommit
o KafkaConsumer
k
commitPartitionsOffsets :: MonadIO m
=> OffsetCommit
-> KafkaConsumer
-> [TopicPartition]
-> m (Maybe KafkaError)
commitPartitionsOffsets :: OffsetCommit
-> KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError)
commitPartitionsOffsets o :: OffsetCommit
o k :: KafkaConsumer
k ps :: [TopicPartition]
ps =
IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe KafkaError) -> m (Maybe KafkaError))
-> IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall a b. (a -> b) -> a -> b
$ [TopicPartition] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionList [TopicPartition]
ps IO RdKafkaTopicPartitionListTPtr
-> (RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError))
-> IO (Maybe KafkaError)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= OffsetCommit
-> KafkaConsumer
-> RdKafkaTopicPartitionListTPtr
-> IO (Maybe KafkaError)
commitOffsets OffsetCommit
o KafkaConsumer
k
assign :: MonadIO m => KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError)
assign :: KafkaConsumer -> [TopicPartition] -> m (Maybe KafkaError)
assign (KafkaConsumer (Kafka k :: RdKafkaTPtr
k) _) ps :: [TopicPartition]
ps = IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe KafkaError) -> m (Maybe KafkaError))
-> IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall a b. (a -> b) -> a -> b
$ do
RdKafkaTopicPartitionListTPtr
tps <- [TopicPartition] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionList [TopicPartition]
ps
KafkaError -> Maybe KafkaError
kafkaErrorToMaybe (KafkaError -> Maybe KafkaError)
-> (RdKafkaRespErrT -> KafkaError)
-> RdKafkaRespErrT
-> Maybe KafkaError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RdKafkaRespErrT -> KafkaError
KafkaResponseError (RdKafkaRespErrT -> Maybe KafkaError)
-> IO RdKafkaRespErrT -> IO (Maybe KafkaError)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RdKafkaTPtr -> RdKafkaTopicPartitionListTPtr -> IO RdKafkaRespErrT
rdKafkaAssign RdKafkaTPtr
k RdKafkaTopicPartitionListTPtr
tps
assignment :: MonadIO m => KafkaConsumer -> m (Either KafkaError (M.Map TopicName [PartitionId]))
assignment :: KafkaConsumer
-> m (Either KafkaError (Map TopicName [PartitionId]))
assignment (KafkaConsumer (Kafka k :: RdKafkaTPtr
k) _) = IO (Either KafkaError (Map TopicName [PartitionId]))
-> m (Either KafkaError (Map TopicName [PartitionId]))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either KafkaError (Map TopicName [PartitionId]))
-> m (Either KafkaError (Map TopicName [PartitionId])))
-> IO (Either KafkaError (Map TopicName [PartitionId]))
-> m (Either KafkaError (Map TopicName [PartitionId]))
forall a b. (a -> b) -> a -> b
$ do
Either RdKafkaRespErrT RdKafkaTopicPartitionListTPtr
tpl <- RdKafkaTPtr
-> IO (Either RdKafkaRespErrT RdKafkaTopicPartitionListTPtr)
rdKafkaAssignment RdKafkaTPtr
k
Either KafkaError [TopicPartition]
tps <- (RdKafkaTopicPartitionListTPtr -> IO [TopicPartition])
-> Either KafkaError RdKafkaTopicPartitionListTPtr
-> IO (Either KafkaError [TopicPartition])
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse RdKafkaTopicPartitionListTPtr -> IO [TopicPartition]
fromNativeTopicPartitionList'' ((RdKafkaRespErrT -> KafkaError)
-> Either RdKafkaRespErrT RdKafkaTopicPartitionListTPtr
-> Either KafkaError RdKafkaTopicPartitionListTPtr
forall (a :: * -> * -> *) b c d.
ArrowChoice a =>
a b c -> a (Either b d) (Either c d)
left RdKafkaRespErrT -> KafkaError
KafkaResponseError Either RdKafkaRespErrT RdKafkaTopicPartitionListTPtr
tpl)
Either KafkaError (Map TopicName [PartitionId])
-> IO (Either KafkaError (Map TopicName [PartitionId]))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either KafkaError (Map TopicName [PartitionId])
-> IO (Either KafkaError (Map TopicName [PartitionId])))
-> Either KafkaError (Map TopicName [PartitionId])
-> IO (Either KafkaError (Map TopicName [PartitionId]))
forall a b. (a -> b) -> a -> b
$ [TopicPartition] -> Map TopicName [PartitionId]
tpMap ([TopicPartition] -> Map TopicName [PartitionId])
-> Either KafkaError [TopicPartition]
-> Either KafkaError (Map TopicName [PartitionId])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either KafkaError [TopicPartition]
tps
where
tpMap :: [TopicPartition] -> Map TopicName [PartitionId]
tpMap ts :: [TopicPartition]
ts = [(TopicName, PartitionId)] -> Map TopicName [PartitionId]
forall k v. Ord k => [(k, v)] -> Map k [v]
toMap ([(TopicName, PartitionId)] -> Map TopicName [PartitionId])
-> [(TopicName, PartitionId)] -> Map TopicName [PartitionId]
forall a b. (a -> b) -> a -> b
$ (TopicPartition -> TopicName
tpTopicName (TopicPartition -> TopicName)
-> (TopicPartition -> PartitionId)
-> TopicPartition
-> (TopicName, PartitionId)
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& TopicPartition -> PartitionId
tpPartition) (TopicPartition -> (TopicName, PartitionId))
-> [TopicPartition] -> [(TopicName, PartitionId)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [TopicPartition]
ts
subscription :: MonadIO m => KafkaConsumer -> m (Either KafkaError [(TopicName, SubscribedPartitions)])
subscription :: KafkaConsumer
-> m (Either KafkaError [(TopicName, SubscribedPartitions)])
subscription (KafkaConsumer (Kafka k :: RdKafkaTPtr
k) _) = IO (Either KafkaError [(TopicName, SubscribedPartitions)])
-> m (Either KafkaError [(TopicName, SubscribedPartitions)])
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either KafkaError [(TopicName, SubscribedPartitions)])
-> m (Either KafkaError [(TopicName, SubscribedPartitions)]))
-> IO (Either KafkaError [(TopicName, SubscribedPartitions)])
-> m (Either KafkaError [(TopicName, SubscribedPartitions)])
forall a b. (a -> b) -> a -> b
$ do
Either RdKafkaRespErrT RdKafkaTopicPartitionListTPtr
tpl <- RdKafkaTPtr
-> IO (Either RdKafkaRespErrT RdKafkaTopicPartitionListTPtr)
rdKafkaSubscription RdKafkaTPtr
k
Either KafkaError [TopicPartition]
tps <- (RdKafkaTopicPartitionListTPtr -> IO [TopicPartition])
-> Either KafkaError RdKafkaTopicPartitionListTPtr
-> IO (Either KafkaError [TopicPartition])
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse RdKafkaTopicPartitionListTPtr -> IO [TopicPartition]
fromNativeTopicPartitionList'' ((RdKafkaRespErrT -> KafkaError)
-> Either RdKafkaRespErrT RdKafkaTopicPartitionListTPtr
-> Either KafkaError RdKafkaTopicPartitionListTPtr
forall (a :: * -> * -> *) b c d.
ArrowChoice a =>
a b c -> a (Either b d) (Either c d)
left RdKafkaRespErrT -> KafkaError
KafkaResponseError Either RdKafkaRespErrT RdKafkaTopicPartitionListTPtr
tpl)
Either KafkaError [(TopicName, SubscribedPartitions)]
-> IO (Either KafkaError [(TopicName, SubscribedPartitions)])
forall (m :: * -> *) a. Monad m => a -> m a
return (Either KafkaError [(TopicName, SubscribedPartitions)]
-> IO (Either KafkaError [(TopicName, SubscribedPartitions)]))
-> Either KafkaError [(TopicName, SubscribedPartitions)]
-> IO (Either KafkaError [(TopicName, SubscribedPartitions)])
forall a b. (a -> b) -> a -> b
$ [TopicPartition] -> [(TopicName, SubscribedPartitions)]
toSub ([TopicPartition] -> [(TopicName, SubscribedPartitions)])
-> Either KafkaError [TopicPartition]
-> Either KafkaError [(TopicName, SubscribedPartitions)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either KafkaError [TopicPartition]
tps
where
toSub :: [TopicPartition] -> [(TopicName, SubscribedPartitions)]
toSub ts :: [TopicPartition]
ts = Map TopicName SubscribedPartitions
-> [(TopicName, SubscribedPartitions)]
forall k a. Map k a -> [(k, a)]
M.toList (Map TopicName SubscribedPartitions
-> [(TopicName, SubscribedPartitions)])
-> Map TopicName SubscribedPartitions
-> [(TopicName, SubscribedPartitions)]
forall a b. (a -> b) -> a -> b
$ [PartitionId] -> SubscribedPartitions
subParts ([PartitionId] -> SubscribedPartitions)
-> Map TopicName [PartitionId]
-> Map TopicName SubscribedPartitions
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [TopicPartition] -> Map TopicName [PartitionId]
tpMap [TopicPartition]
ts
tpMap :: [TopicPartition] -> Map TopicName [PartitionId]
tpMap ts :: [TopicPartition]
ts = [(TopicName, PartitionId)] -> Map TopicName [PartitionId]
forall k v. Ord k => [(k, v)] -> Map k [v]
toMap ([(TopicName, PartitionId)] -> Map TopicName [PartitionId])
-> [(TopicName, PartitionId)] -> Map TopicName [PartitionId]
forall a b. (a -> b) -> a -> b
$ (TopicPartition -> TopicName
tpTopicName (TopicPartition -> TopicName)
-> (TopicPartition -> PartitionId)
-> TopicPartition
-> (TopicName, PartitionId)
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& TopicPartition -> PartitionId
tpPartition) (TopicPartition -> (TopicName, PartitionId))
-> [TopicPartition] -> [(TopicName, PartitionId)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [TopicPartition]
ts
subParts :: [PartitionId] -> SubscribedPartitions
subParts [PartitionId (-1)] = SubscribedPartitions
SubscribedPartitionsAll
subParts ps :: [PartitionId]
ps = [PartitionId] -> SubscribedPartitions
SubscribedPartitions [PartitionId]
ps
pausePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError
pausePartitions :: KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError
pausePartitions (KafkaConsumer (Kafka k :: RdKafkaTPtr
k) _) ps :: [(TopicName, PartitionId)]
ps = IO KafkaError -> m KafkaError
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO KafkaError -> m KafkaError) -> IO KafkaError -> m KafkaError
forall a b. (a -> b) -> a -> b
$ do
RdKafkaTopicPartitionListTPtr
pl <- Int -> IO RdKafkaTopicPartitionListTPtr
newRdKafkaTopicPartitionListT ([(TopicName, PartitionId)] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(TopicName, PartitionId)]
ps)
((TopicName, PartitionId) -> IO RdKafkaTopicPartitionTPtr)
-> [(TopicName, PartitionId)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(TopicName topicName :: Text
topicName, PartitionId partitionId :: Int
partitionId) -> RdKafkaTopicPartitionListTPtr
-> String -> Int -> IO RdKafkaTopicPartitionTPtr
rdKafkaTopicPartitionListAdd RdKafkaTopicPartitionListTPtr
pl (Text -> String
Text.unpack Text
topicName) Int
partitionId) [(TopicName, PartitionId)]
ps
RdKafkaRespErrT -> KafkaError
KafkaResponseError (RdKafkaRespErrT -> KafkaError)
-> IO RdKafkaRespErrT -> IO KafkaError
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RdKafkaTPtr -> RdKafkaTopicPartitionListTPtr -> IO RdKafkaRespErrT
rdKafkaPausePartitions RdKafkaTPtr
k RdKafkaTopicPartitionListTPtr
pl
resumePartitions :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError
resumePartitions :: KafkaConsumer -> [(TopicName, PartitionId)] -> m KafkaError
resumePartitions (KafkaConsumer (Kafka k :: RdKafkaTPtr
k) _) ps :: [(TopicName, PartitionId)]
ps = IO KafkaError -> m KafkaError
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO KafkaError -> m KafkaError) -> IO KafkaError -> m KafkaError
forall a b. (a -> b) -> a -> b
$ do
RdKafkaTopicPartitionListTPtr
pl <- Int -> IO RdKafkaTopicPartitionListTPtr
newRdKafkaTopicPartitionListT ([(TopicName, PartitionId)] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(TopicName, PartitionId)]
ps)
((TopicName, PartitionId) -> IO RdKafkaTopicPartitionTPtr)
-> [(TopicName, PartitionId)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(TopicName topicName :: Text
topicName, PartitionId partitionId :: Int
partitionId) -> RdKafkaTopicPartitionListTPtr
-> String -> Int -> IO RdKafkaTopicPartitionTPtr
rdKafkaTopicPartitionListAdd RdKafkaTopicPartitionListTPtr
pl (Text -> String
Text.unpack Text
topicName) Int
partitionId) [(TopicName, PartitionId)]
ps
RdKafkaRespErrT -> KafkaError
KafkaResponseError (RdKafkaRespErrT -> KafkaError)
-> IO RdKafkaRespErrT -> IO KafkaError
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RdKafkaTPtr -> RdKafkaTopicPartitionListTPtr -> IO RdKafkaRespErrT
rdKafkaResumePartitions RdKafkaTPtr
k RdKafkaTopicPartitionListTPtr
pl
seek :: MonadIO m => KafkaConsumer -> Timeout -> [TopicPartition] -> m (Maybe KafkaError)
seek :: KafkaConsumer
-> Timeout -> [TopicPartition] -> m (Maybe KafkaError)
seek (KafkaConsumer (Kafka k :: RdKafkaTPtr
k) _) (Timeout timeout :: Int
timeout) tps :: [TopicPartition]
tps = IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe KafkaError) -> m (Maybe KafkaError))
-> IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall a b. (a -> b) -> a -> b
$
(KafkaError -> Maybe KafkaError)
-> (() -> Maybe KafkaError)
-> Either KafkaError ()
-> Maybe KafkaError
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either KafkaError -> Maybe KafkaError
forall a. a -> Maybe a
Just (Maybe KafkaError -> () -> Maybe KafkaError
forall a b. a -> b -> a
const Maybe KafkaError
forall a. Maybe a
Nothing) (Either KafkaError () -> Maybe KafkaError)
-> IO (Either KafkaError ()) -> IO (Maybe KafkaError)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (Either KafkaError ())
seekAll
where
seekAll :: IO (Either KafkaError ())
seekAll = ExceptT KafkaError IO () -> IO (Either KafkaError ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT KafkaError IO () -> IO (Either KafkaError ()))
-> ExceptT KafkaError IO () -> IO (Either KafkaError ())
forall a b. (a -> b) -> a -> b
$ do
[(RdKafkaTopicTPtr, PartitionId, PartitionOffset)]
tr <- (TopicPartition
-> ExceptT
KafkaError IO (RdKafkaTopicTPtr, PartitionId, PartitionOffset))
-> [TopicPartition]
-> ExceptT
KafkaError IO [(RdKafkaTopicTPtr, PartitionId, PartitionOffset)]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (IO
(Either
KafkaError (RdKafkaTopicTPtr, PartitionId, PartitionOffset))
-> ExceptT
KafkaError IO (RdKafkaTopicTPtr, PartitionId, PartitionOffset)
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (IO
(Either
KafkaError (RdKafkaTopicTPtr, PartitionId, PartitionOffset))
-> ExceptT
KafkaError IO (RdKafkaTopicTPtr, PartitionId, PartitionOffset))
-> (TopicPartition
-> IO
(Either
KafkaError (RdKafkaTopicTPtr, PartitionId, PartitionOffset)))
-> TopicPartition
-> ExceptT
KafkaError IO (RdKafkaTopicTPtr, PartitionId, PartitionOffset)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TopicPartition
-> IO
(Either
KafkaError (RdKafkaTopicTPtr, PartitionId, PartitionOffset))
topicPair) [TopicPartition]
tps
((RdKafkaTopicTPtr, PartitionId, PartitionOffset)
-> ExceptT KafkaError IO ())
-> [(RdKafkaTopicTPtr, PartitionId, PartitionOffset)]
-> ExceptT KafkaError IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(kt :: RdKafkaTopicTPtr
kt, p :: PartitionId
p, o :: PartitionOffset
o) -> IO (Either KafkaError ()) -> ExceptT KafkaError IO ()
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (RdKafkaTopicTPtr
-> PartitionId -> PartitionOffset -> IO (Either KafkaError ())
rdSeek RdKafkaTopicTPtr
kt PartitionId
p PartitionOffset
o)) [(RdKafkaTopicTPtr, PartitionId, PartitionOffset)]
tr
rdSeek :: RdKafkaTopicTPtr
-> PartitionId -> PartitionOffset -> IO (Either KafkaError ())
rdSeek kt :: RdKafkaTopicTPtr
kt (PartitionId p :: Int
p) o :: PartitionOffset
o =
RdKafkaRespErrT -> Either KafkaError ()
rdKafkaErrorToEither (RdKafkaRespErrT -> Either KafkaError ())
-> IO RdKafkaRespErrT -> IO (Either KafkaError ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RdKafkaTopicTPtr -> Int32 -> Int64 -> Int -> IO RdKafkaRespErrT
rdKafkaSeek RdKafkaTopicTPtr
kt (Int -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
p) (PartitionOffset -> Int64
offsetToInt64 PartitionOffset
o) Int
timeout
topicPair :: TopicPartition
-> IO
(Either
KafkaError (RdKafkaTopicTPtr, PartitionId, PartitionOffset))
topicPair tp :: TopicPartition
tp = do
let (TopicName tn :: Text
tn) = TopicPartition -> TopicName
tpTopicName TopicPartition
tp
Either String RdKafkaTopicTPtr
nt <- RdKafkaTPtr
-> String
-> Maybe RdKafkaTopicConfTPtr
-> IO (Either String RdKafkaTopicTPtr)
newRdKafkaTopicT RdKafkaTPtr
k (Text -> String
Text.unpack Text
tn) Maybe RdKafkaTopicConfTPtr
forall a. Maybe a
Nothing
Either KafkaError (RdKafkaTopicTPtr, PartitionId, PartitionOffset)
-> IO
(Either
KafkaError (RdKafkaTopicTPtr, PartitionId, PartitionOffset))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either KafkaError (RdKafkaTopicTPtr, PartitionId, PartitionOffset)
-> IO
(Either
KafkaError (RdKafkaTopicTPtr, PartitionId, PartitionOffset)))
-> Either
KafkaError (RdKafkaTopicTPtr, PartitionId, PartitionOffset)
-> IO
(Either
KafkaError (RdKafkaTopicTPtr, PartitionId, PartitionOffset))
forall a b. (a -> b) -> a -> b
$ (Text -> KafkaError)
-> (RdKafkaTopicTPtr
-> (RdKafkaTopicTPtr, PartitionId, PartitionOffset))
-> Either Text RdKafkaTopicTPtr
-> Either
KafkaError (RdKafkaTopicTPtr, PartitionId, PartitionOffset)
forall (p :: * -> * -> *) a b c d.
Bifunctor p =>
(a -> b) -> (c -> d) -> p a c -> p b d
bimap Text -> KafkaError
KafkaError (,TopicPartition -> PartitionId
tpPartition TopicPartition
tp, TopicPartition -> PartitionOffset
tpOffset TopicPartition
tp) ((String -> Text)
-> Either String RdKafkaTopicTPtr -> Either Text RdKafkaTopicTPtr
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first String -> Text
Text.pack Either String RdKafkaTopicTPtr
nt)
committed :: MonadIO m => KafkaConsumer -> Timeout -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition])
committed :: KafkaConsumer
-> Timeout
-> [(TopicName, PartitionId)]
-> m (Either KafkaError [TopicPartition])
committed (KafkaConsumer (Kafka k :: RdKafkaTPtr
k) _) (Timeout timeout :: Int
timeout) tps :: [(TopicName, PartitionId)]
tps = IO (Either KafkaError [TopicPartition])
-> m (Either KafkaError [TopicPartition])
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either KafkaError [TopicPartition])
-> m (Either KafkaError [TopicPartition]))
-> IO (Either KafkaError [TopicPartition])
-> m (Either KafkaError [TopicPartition])
forall a b. (a -> b) -> a -> b
$ do
RdKafkaTopicPartitionListTPtr
ntps <- [(TopicName, PartitionId)] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionList' [(TopicName, PartitionId)]
tps
RdKafkaRespErrT
res <- RdKafkaTPtr
-> RdKafkaTopicPartitionListTPtr -> Int -> IO RdKafkaRespErrT
rdKafkaCommitted RdKafkaTPtr
k RdKafkaTopicPartitionListTPtr
ntps Int
timeout
case RdKafkaRespErrT
res of
RdKafkaRespErrNoError -> [TopicPartition] -> Either KafkaError [TopicPartition]
forall a b. b -> Either a b
Right ([TopicPartition] -> Either KafkaError [TopicPartition])
-> IO [TopicPartition] -> IO (Either KafkaError [TopicPartition])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RdKafkaTopicPartitionListTPtr -> IO [TopicPartition]
fromNativeTopicPartitionList'' RdKafkaTopicPartitionListTPtr
ntps
err :: RdKafkaRespErrT
err -> Either KafkaError [TopicPartition]
-> IO (Either KafkaError [TopicPartition])
forall (m :: * -> *) a. Monad m => a -> m a
return (Either KafkaError [TopicPartition]
-> IO (Either KafkaError [TopicPartition]))
-> Either KafkaError [TopicPartition]
-> IO (Either KafkaError [TopicPartition])
forall a b. (a -> b) -> a -> b
$ KafkaError -> Either KafkaError [TopicPartition]
forall a b. a -> Either a b
Left (RdKafkaRespErrT -> KafkaError
KafkaResponseError RdKafkaRespErrT
err)
position :: MonadIO m => KafkaConsumer -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition])
position :: KafkaConsumer
-> [(TopicName, PartitionId)]
-> m (Either KafkaError [TopicPartition])
position (KafkaConsumer (Kafka k :: RdKafkaTPtr
k) _) tps :: [(TopicName, PartitionId)]
tps = IO (Either KafkaError [TopicPartition])
-> m (Either KafkaError [TopicPartition])
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either KafkaError [TopicPartition])
-> m (Either KafkaError [TopicPartition]))
-> IO (Either KafkaError [TopicPartition])
-> m (Either KafkaError [TopicPartition])
forall a b. (a -> b) -> a -> b
$ do
RdKafkaTopicPartitionListTPtr
ntps <- [(TopicName, PartitionId)] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionList' [(TopicName, PartitionId)]
tps
RdKafkaRespErrT
res <- RdKafkaTPtr -> RdKafkaTopicPartitionListTPtr -> IO RdKafkaRespErrT
rdKafkaPosition RdKafkaTPtr
k RdKafkaTopicPartitionListTPtr
ntps
case RdKafkaRespErrT
res of
RdKafkaRespErrNoError -> [TopicPartition] -> Either KafkaError [TopicPartition]
forall a b. b -> Either a b
Right ([TopicPartition] -> Either KafkaError [TopicPartition])
-> IO [TopicPartition] -> IO (Either KafkaError [TopicPartition])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RdKafkaTopicPartitionListTPtr -> IO [TopicPartition]
fromNativeTopicPartitionList'' RdKafkaTopicPartitionListTPtr
ntps
err :: RdKafkaRespErrT
err -> Either KafkaError [TopicPartition]
-> IO (Either KafkaError [TopicPartition])
forall (m :: * -> *) a. Monad m => a -> m a
return (Either KafkaError [TopicPartition]
-> IO (Either KafkaError [TopicPartition]))
-> Either KafkaError [TopicPartition]
-> IO (Either KafkaError [TopicPartition])
forall a b. (a -> b) -> a -> b
$ KafkaError -> Either KafkaError [TopicPartition]
forall a b. a -> Either a b
Left (RdKafkaRespErrT -> KafkaError
KafkaResponseError RdKafkaRespErrT
err)
pollConsumerEvents :: KafkaConsumer -> Maybe Timeout -> IO ()
pollConsumerEvents :: KafkaConsumer -> Maybe Timeout -> IO ()
pollConsumerEvents k :: KafkaConsumer
k timeout :: Maybe Timeout
timeout =
IO CallbackPollStatus -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO CallbackPollStatus -> IO ())
-> (IO () -> IO CallbackPollStatus) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. KafkaConsumer -> IO () -> IO CallbackPollStatus
withCallbackPollEnabled KafkaConsumer
k (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ KafkaConsumer -> Maybe Timeout -> IO ()
pollConsumerEvents' KafkaConsumer
k Maybe Timeout
timeout
closeConsumer :: MonadIO m => KafkaConsumer -> m (Maybe KafkaError)
closeConsumer :: KafkaConsumer -> m (Maybe KafkaError)
closeConsumer (KafkaConsumer (Kafka k :: RdKafkaTPtr
k) (KafkaConf _ qr :: IORef (Maybe RdKafkaQueueTPtr)
qr statusVar :: MVar CallbackPollStatus
statusVar)) = IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe KafkaError) -> m (Maybe KafkaError))
-> IO (Maybe KafkaError) -> m (Maybe KafkaError)
forall a b. (a -> b) -> a -> b
$
MVar CallbackPollStatus
-> (CallbackPollStatus
-> IO (CallbackPollStatus, Maybe KafkaError))
-> IO (Maybe KafkaError)
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar CallbackPollStatus
statusVar ((CallbackPollStatus -> IO (CallbackPollStatus, Maybe KafkaError))
-> IO (Maybe KafkaError))
-> (CallbackPollStatus
-> IO (CallbackPollStatus, Maybe KafkaError))
-> IO (Maybe KafkaError)
forall a b. (a -> b) -> a -> b
$ \_ -> do
IORef (Maybe RdKafkaQueueTPtr) -> IO (Maybe RdKafkaQueueTPtr)
forall a. IORef a -> IO a
readIORef IORef (Maybe RdKafkaQueueTPtr)
qr IO (Maybe RdKafkaQueueTPtr)
-> (Maybe RdKafkaQueueTPtr -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (RdKafkaQueueTPtr -> IO ()) -> Maybe RdKafkaQueueTPtr -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ RdKafkaQueueTPtr -> IO ()
rdKafkaQueueDestroy
Maybe KafkaError
res <- KafkaError -> Maybe KafkaError
kafkaErrorToMaybe (KafkaError -> Maybe KafkaError)
-> (RdKafkaRespErrT -> KafkaError)
-> RdKafkaRespErrT
-> Maybe KafkaError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RdKafkaRespErrT -> KafkaError
KafkaResponseError (RdKafkaRespErrT -> Maybe KafkaError)
-> IO RdKafkaRespErrT -> IO (Maybe KafkaError)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RdKafkaTPtr -> IO RdKafkaRespErrT
rdKafkaConsumerClose RdKafkaTPtr
k
(CallbackPollStatus, Maybe KafkaError)
-> IO (CallbackPollStatus, Maybe KafkaError)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (CallbackPollStatus
CallbackPollDisabled, Maybe KafkaError
res)
newConsumerConf :: ConsumerProperties -> IO KafkaConf
newConsumerConf :: ConsumerProperties -> IO KafkaConf
newConsumerConf ConsumerProperties {cpProps :: ConsumerProperties -> Map Text Text
cpProps = Map Text Text
m, cpCallbacks :: ConsumerProperties -> [Callback]
cpCallbacks = [Callback]
cbs} = do
KafkaConf
conf <- KafkaProps -> IO KafkaConf
kafkaConf (Map Text Text -> KafkaProps
KafkaProps Map Text Text
m)
[Callback] -> (Callback -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Callback]
cbs (\(Callback setCb :: KafkaConf -> IO ()
setCb) -> KafkaConf -> IO ()
setCb KafkaConf
conf)
KafkaConf -> IO KafkaConf
forall (m :: * -> *) a. Monad m => a -> m a
return KafkaConf
conf
subscribe :: KafkaConsumer -> Set TopicName -> IO (Maybe KafkaError)
subscribe :: KafkaConsumer -> Set TopicName -> IO (Maybe KafkaError)
subscribe (KafkaConsumer (Kafka k :: RdKafkaTPtr
k) _) ts :: Set TopicName
ts = do
RdKafkaTopicPartitionListTPtr
pl <- Int -> IO RdKafkaTopicPartitionListTPtr
newRdKafkaTopicPartitionListT (Set TopicName -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length Set TopicName
ts)
(TopicName -> IO RdKafkaTopicPartitionTPtr) -> [TopicName] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(TopicName t :: Text
t) -> RdKafkaTopicPartitionListTPtr
-> String -> Int -> IO RdKafkaTopicPartitionTPtr
rdKafkaTopicPartitionListAdd RdKafkaTopicPartitionListTPtr
pl (Text -> String
Text.unpack Text
t) (-1)) (Set TopicName -> [TopicName]
forall a. Set a -> [a]
Set.toList Set TopicName
ts)
KafkaError
res <- RdKafkaRespErrT -> KafkaError
KafkaResponseError (RdKafkaRespErrT -> KafkaError)
-> IO RdKafkaRespErrT -> IO KafkaError
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RdKafkaTPtr -> RdKafkaTopicPartitionListTPtr -> IO RdKafkaRespErrT
rdKafkaSubscribe RdKafkaTPtr
k RdKafkaTopicPartitionListTPtr
pl
Maybe KafkaError -> IO (Maybe KafkaError)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe KafkaError -> IO (Maybe KafkaError))
-> Maybe KafkaError -> IO (Maybe KafkaError)
forall a b. (a -> b) -> a -> b
$ KafkaError -> Maybe KafkaError
kafkaErrorToMaybe KafkaError
res
setDefaultTopicConf :: KafkaConf -> TopicConf -> IO ()
setDefaultTopicConf :: KafkaConf -> TopicConf -> IO ()
setDefaultTopicConf (KafkaConf kc :: RdKafkaConfTPtr
kc _ _) (TopicConf tc :: RdKafkaTopicConfTPtr
tc) =
RdKafkaTopicConfTPtr -> IO RdKafkaTopicConfTPtr
rdKafkaTopicConfDup RdKafkaTopicConfTPtr
tc IO RdKafkaTopicConfTPtr -> (RdKafkaTopicConfTPtr -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= RdKafkaConfTPtr -> RdKafkaTopicConfTPtr -> IO ()
rdKafkaConfSetDefaultTopicConf RdKafkaConfTPtr
kc
commitOffsets :: OffsetCommit -> KafkaConsumer -> RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError)
commitOffsets :: OffsetCommit
-> KafkaConsumer
-> RdKafkaTopicPartitionListTPtr
-> IO (Maybe KafkaError)
commitOffsets o :: OffsetCommit
o (KafkaConsumer (Kafka k :: RdKafkaTPtr
k) _) pl :: RdKafkaTopicPartitionListTPtr
pl =
KafkaError -> Maybe KafkaError
kafkaErrorToMaybe (KafkaError -> Maybe KafkaError)
-> (RdKafkaRespErrT -> KafkaError)
-> RdKafkaRespErrT
-> Maybe KafkaError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RdKafkaRespErrT -> KafkaError
KafkaResponseError (RdKafkaRespErrT -> Maybe KafkaError)
-> IO RdKafkaRespErrT -> IO (Maybe KafkaError)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RdKafkaTPtr
-> RdKafkaTopicPartitionListTPtr -> Bool -> IO RdKafkaRespErrT
rdKafkaCommit RdKafkaTPtr
k RdKafkaTopicPartitionListTPtr
pl (OffsetCommit -> Bool
offsetCommitToBool OffsetCommit
o)
commitOffsetsStore :: KafkaConsumer -> RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError)
commitOffsetsStore :: KafkaConsumer
-> RdKafkaTopicPartitionListTPtr -> IO (Maybe KafkaError)
commitOffsetsStore (KafkaConsumer (Kafka k :: RdKafkaTPtr
k) _) pl :: RdKafkaTopicPartitionListTPtr
pl =
KafkaError -> Maybe KafkaError
kafkaErrorToMaybe (KafkaError -> Maybe KafkaError)
-> (RdKafkaRespErrT -> KafkaError)
-> RdKafkaRespErrT
-> Maybe KafkaError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RdKafkaRespErrT -> KafkaError
KafkaResponseError (RdKafkaRespErrT -> Maybe KafkaError)
-> IO RdKafkaRespErrT -> IO (Maybe KafkaError)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RdKafkaTPtr -> RdKafkaTopicPartitionListTPtr -> IO RdKafkaRespErrT
rdKafkaOffsetsStore RdKafkaTPtr
k RdKafkaTopicPartitionListTPtr
pl
setConsumerLogLevel :: KafkaConsumer -> KafkaLogLevel -> IO ()
setConsumerLogLevel :: KafkaConsumer -> KafkaLogLevel -> IO ()
setConsumerLogLevel (KafkaConsumer (Kafka k :: RdKafkaTPtr
k) _) level :: KafkaLogLevel
level =
IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ RdKafkaTPtr -> Int -> IO ()
rdKafkaSetLogLevel RdKafkaTPtr
k (KafkaLogLevel -> Int
forall a. Enum a => a -> Int
fromEnum KafkaLogLevel
level)
redirectCallbacksPoll :: KafkaConsumer -> IO (Maybe KafkaError)
redirectCallbacksPoll :: KafkaConsumer -> IO (Maybe KafkaError)
redirectCallbacksPoll (KafkaConsumer (Kafka k :: RdKafkaTPtr
k) _) =
KafkaError -> Maybe KafkaError
kafkaErrorToMaybe (KafkaError -> Maybe KafkaError)
-> (RdKafkaRespErrT -> KafkaError)
-> RdKafkaRespErrT
-> Maybe KafkaError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RdKafkaRespErrT -> KafkaError
KafkaResponseError (RdKafkaRespErrT -> Maybe KafkaError)
-> IO RdKafkaRespErrT -> IO (Maybe KafkaError)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RdKafkaTPtr -> IO RdKafkaRespErrT
rdKafkaPollSetConsumer RdKafkaTPtr
k
runConsumerLoop :: KafkaConsumer -> Maybe Timeout -> IO ()
runConsumerLoop :: KafkaConsumer -> Maybe Timeout -> IO ()
runConsumerLoop k :: KafkaConsumer
k timeout :: Maybe Timeout
timeout =
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
rtsSupportsBoundThreads (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO IO ()
go
where
go :: IO ()
go = do
CallbackPollStatus
st <- KafkaConsumer -> IO () -> IO CallbackPollStatus
withCallbackPollEnabled KafkaConsumer
k (KafkaConsumer -> Maybe Timeout -> IO ()
pollConsumerEvents' KafkaConsumer
k Maybe Timeout
timeout)
case CallbackPollStatus
st of
CallbackPollEnabled -> IO ()
go
CallbackPollDisabled -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
whileNoCallbackRunning :: KafkaConsumer -> IO a -> IO a
whileNoCallbackRunning :: KafkaConsumer -> IO a -> IO a
whileNoCallbackRunning k :: KafkaConsumer
k f :: IO a
f = do
let statusVar :: MVar CallbackPollStatus
statusVar = KafkaConf -> MVar CallbackPollStatus
kcfgCallbackPollStatus (KafkaConsumer -> KafkaConf
forall a. HasKafkaConf a => a -> KafkaConf
getKafkaConf KafkaConsumer
k)
MVar CallbackPollStatus -> (CallbackPollStatus -> IO a) -> IO a
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar CallbackPollStatus
statusVar ((CallbackPollStatus -> IO a) -> IO a)
-> (CallbackPollStatus -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \_ -> IO a
f
withCallbackPollEnabled :: KafkaConsumer -> IO () -> IO CallbackPollStatus
withCallbackPollEnabled :: KafkaConsumer -> IO () -> IO CallbackPollStatus
withCallbackPollEnabled k :: KafkaConsumer
k f :: IO ()
f = do
let statusVar :: MVar CallbackPollStatus
statusVar = KafkaConf -> MVar CallbackPollStatus
kcfgCallbackPollStatus (KafkaConsumer -> KafkaConf
forall a. HasKafkaConf a => a -> KafkaConf
getKafkaConf KafkaConsumer
k)
MVar CallbackPollStatus
-> (CallbackPollStatus -> IO CallbackPollStatus)
-> IO CallbackPollStatus
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar CallbackPollStatus
statusVar ((CallbackPollStatus -> IO CallbackPollStatus)
-> IO CallbackPollStatus)
-> (CallbackPollStatus -> IO CallbackPollStatus)
-> IO CallbackPollStatus
forall a b. (a -> b) -> a -> b
$ \case
CallbackPollEnabled -> IO ()
f IO () -> IO CallbackPollStatus -> IO CallbackPollStatus
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> CallbackPollStatus -> IO CallbackPollStatus
forall (f :: * -> *) a. Applicative f => a -> f a
pure CallbackPollStatus
CallbackPollEnabled
CallbackPollDisabled -> CallbackPollStatus -> IO CallbackPollStatus
forall (f :: * -> *) a. Applicative f => a -> f a
pure CallbackPollStatus
CallbackPollDisabled
pollConsumerEvents' :: KafkaConsumer -> Maybe Timeout -> IO ()
pollConsumerEvents' :: KafkaConsumer -> Maybe Timeout -> IO ()
pollConsumerEvents' k :: KafkaConsumer
k timeout :: Maybe Timeout
timeout =
let (Timeout tm :: Int
tm) = Timeout -> Maybe Timeout -> Timeout
forall a. a -> Maybe a -> a
fromMaybe (Int -> Timeout
Timeout 0) Maybe Timeout
timeout
in IO RdKafkaMessageTPtr -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO RdKafkaMessageTPtr -> IO ()) -> IO RdKafkaMessageTPtr -> IO ()
forall a b. (a -> b) -> a -> b
$ RdKafkaTPtr -> Int -> IO RdKafkaMessageTPtr
rdKafkaConsumerPoll (KafkaConsumer -> RdKafkaTPtr
forall k. HasKafka k => k -> RdKafkaTPtr
getRdKafka KafkaConsumer
k) Int
tm