{-# LANGUAGE BangPatterns #-} module Kafka.Consumer.Callbacks ( rebalanceCallback , offsetCommitCallback , module X ) where import Control.Arrow ((&&&)) import Control.Monad (forM_, void) import Foreign.ForeignPtr (newForeignPtr_) import Foreign.Ptr (nullPtr) import Kafka.Callbacks as X import Kafka.Consumer.Convert (fromNativeTopicPartitionList', fromNativeTopicPartitionList'') import Kafka.Consumer.Types (KafkaConsumer (..), RebalanceEvent (..), TopicPartition (..)) import Kafka.Internal.RdKafka import Kafka.Internal.Setup (HasKafka (..), HasKafkaConf (..), Kafka (..), KafkaConf (..), getRdMsgQueue, Callback (..)) import Kafka.Types (KafkaError (..), PartitionId (..), TopicName (..)) import qualified Data.Text as Text -- | Sets a callback that is called when rebalance is needed. rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> Callback rebalanceCallback callback = Callback $ \kc@(KafkaConf con _ _) -> rdKafkaConfSetRebalanceCb con (realCb kc) where realCb kc k err pl = do k' <- newForeignPtr_ k pls <- newForeignPtr_ pl setRebalanceCallback callback (KafkaConsumer (Kafka k') kc) (KafkaResponseError err) pls -- | Sets a callback that is called when rebalance is needed. -- -- The results of automatic or manual offset commits will be scheduled -- for this callback and is served by 'Kafka.Consumer.pollMessage'. -- -- If no partitions had valid offsets to commit this callback will be called -- with 'KafkaResponseError' 'RdKafkaRespErrNoOffset' which is not to be considered -- an error. offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> Callback offsetCommitCallback callback = Callback $ \kc@(KafkaConf conf _ _) -> rdKafkaConfSetOffsetCommitCb conf (realCb kc) where realCb kc k err pl = do k' <- newForeignPtr_ k pls <- fromNativeTopicPartitionList' pl callback (KafkaConsumer (Kafka k') kc) (KafkaResponseError err) pls ------------------------------------------------------------------------------- redirectPartitionQueue :: Kafka -> TopicName -> PartitionId -> RdKafkaQueueTPtr -> IO () redirectPartitionQueue (Kafka k) (TopicName t) (PartitionId p) q = do mpq <- rdKafkaQueueGetPartition k (Text.unpack t) p case mpq of Nothing -> return () Just pq -> rdKafkaQueueForward pq q setRebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> KafkaConsumer -> KafkaError -> RdKafkaTopicPartitionListTPtr -> IO () setRebalanceCallback f k e pls = do ps <- fromNativeTopicPartitionList'' pls let assignment = (tpTopicName &&& tpPartition) <$> ps let (Kafka kptr) = getKafka k case e of KafkaResponseError RdKafkaRespErrAssignPartitions -> do f k (RebalanceBeforeAssign assignment) void $ rdKafkaAssign kptr pls mbq <- getRdMsgQueue $ getKafkaConf k case mbq of Nothing -> pure () Just mq -> do {- Magnus Edenhill: If you redirect after assign() it means some messages may be forwarded to the single consumer queue, so either do it before assign() or do: assign(); pause(); redirect; resume() -} void $ rdKafkaPausePartitions kptr pls forM_ ps (\tp -> redirectPartitionQueue (getKafka k) (tpTopicName tp) (tpPartition tp) mq) void $ rdKafkaResumePartitions kptr pls f k (RebalanceAssign assignment) KafkaResponseError RdKafkaRespErrRevokePartitions -> do f k (RebalanceBeforeRevoke assignment) void $ newForeignPtr_ nullPtr >>= rdKafkaAssign kptr f k (RebalanceRevoke assignment) x -> error $ "Rebalance: UNKNOWN response: " <> show x