{-# LANGUAGE BangPatterns #-}
module Kafka.Consumer.Callbacks
( rebalanceCallback
, offsetCommitCallback
, module X
)
where
import Control.Arrow ((&&&))
import Control.Monad (forM_, void)
import Foreign.ForeignPtr (newForeignPtr_)
import Foreign.Ptr (nullPtr)
import Kafka.Callbacks as X
import Kafka.Consumer.Convert (fromNativeTopicPartitionList', fromNativeTopicPartitionList'')
import Kafka.Consumer.Types (KafkaConsumer (..), RebalanceEvent (..), TopicPartition (..))
import Kafka.Internal.RdKafka
import Kafka.Internal.Setup (HasKafka (..), HasKafkaConf (..), Kafka (..), KafkaConf (..), getRdMsgQueue, Callback (..))
import Kafka.Types (KafkaError (..), PartitionId (..), TopicName (..))
import qualified Data.Text as Text
rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> Callback
rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> Callback
rebalanceCallback callback :: KafkaConsumer -> RebalanceEvent -> IO ()
callback =
(KafkaConf -> IO ()) -> Callback
Callback ((KafkaConf -> IO ()) -> Callback)
-> (KafkaConf -> IO ()) -> Callback
forall a b. (a -> b) -> a -> b
$ \kc :: KafkaConf
kc@(KafkaConf con :: RdKafkaConfTPtr
con _ _) -> RdKafkaConfTPtr -> RdRebalanceCallback -> IO ()
rdKafkaConfSetRebalanceCb RdKafkaConfTPtr
con (KafkaConf -> RdRebalanceCallback
realCb KafkaConf
kc)
where
realCb :: KafkaConf -> RdRebalanceCallback
realCb kc :: KafkaConf
kc k :: Ptr RdKafkaT
k err :: RdKafkaRespErrT
err pl :: Ptr RdKafkaTopicPartitionListT
pl = do
ForeignPtr RdKafkaT
k' <- Ptr RdKafkaT -> IO (ForeignPtr RdKafkaT)
forall a. Ptr a -> IO (ForeignPtr a)
newForeignPtr_ Ptr RdKafkaT
k
ForeignPtr RdKafkaTopicPartitionListT
pls <- Ptr RdKafkaTopicPartitionListT
-> IO (ForeignPtr RdKafkaTopicPartitionListT)
forall a. Ptr a -> IO (ForeignPtr a)
newForeignPtr_ Ptr RdKafkaTopicPartitionListT
pl
(KafkaConsumer -> RebalanceEvent -> IO ())
-> KafkaConsumer
-> KafkaError
-> ForeignPtr RdKafkaTopicPartitionListT
-> IO ()
setRebalanceCallback KafkaConsumer -> RebalanceEvent -> IO ()
callback (Kafka -> KafkaConf -> KafkaConsumer
KafkaConsumer (ForeignPtr RdKafkaT -> Kafka
Kafka ForeignPtr RdKafkaT
k') KafkaConf
kc) (RdKafkaRespErrT -> KafkaError
KafkaResponseError RdKafkaRespErrT
err) ForeignPtr RdKafkaTopicPartitionListT
pls
offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> Callback
offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ())
-> Callback
offsetCommitCallback callback :: KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()
callback =
(KafkaConf -> IO ()) -> Callback
Callback ((KafkaConf -> IO ()) -> Callback)
-> (KafkaConf -> IO ()) -> Callback
forall a b. (a -> b) -> a -> b
$ \kc :: KafkaConf
kc@(KafkaConf conf :: RdKafkaConfTPtr
conf _ _) -> RdKafkaConfTPtr -> RdRebalanceCallback -> IO ()
rdKafkaConfSetOffsetCommitCb RdKafkaConfTPtr
conf (KafkaConf -> RdRebalanceCallback
realCb KafkaConf
kc)
where
realCb :: KafkaConf -> RdRebalanceCallback
realCb kc :: KafkaConf
kc k :: Ptr RdKafkaT
k err :: RdKafkaRespErrT
err pl :: Ptr RdKafkaTopicPartitionListT
pl = do
ForeignPtr RdKafkaT
k' <- Ptr RdKafkaT -> IO (ForeignPtr RdKafkaT)
forall a. Ptr a -> IO (ForeignPtr a)
newForeignPtr_ Ptr RdKafkaT
k
[TopicPartition]
pls <- Ptr RdKafkaTopicPartitionListT -> IO [TopicPartition]
fromNativeTopicPartitionList' Ptr RdKafkaTopicPartitionListT
pl
KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()
callback (Kafka -> KafkaConf -> KafkaConsumer
KafkaConsumer (ForeignPtr RdKafkaT -> Kafka
Kafka ForeignPtr RdKafkaT
k') KafkaConf
kc) (RdKafkaRespErrT -> KafkaError
KafkaResponseError RdKafkaRespErrT
err) [TopicPartition]
pls
redirectPartitionQueue :: Kafka -> TopicName -> PartitionId -> RdKafkaQueueTPtr -> IO ()
redirectPartitionQueue :: Kafka -> TopicName -> PartitionId -> RdKafkaQueueTPtr -> IO ()
redirectPartitionQueue (Kafka k :: ForeignPtr RdKafkaT
k) (TopicName t :: Text
t) (PartitionId p :: Int
p) q :: RdKafkaQueueTPtr
q = do
Maybe RdKafkaQueueTPtr
mpq <- ForeignPtr RdKafkaT -> String -> Int -> IO (Maybe RdKafkaQueueTPtr)
rdKafkaQueueGetPartition ForeignPtr RdKafkaT
k (Text -> String
Text.unpack Text
t) Int
p
case Maybe RdKafkaQueueTPtr
mpq of
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just pq :: RdKafkaQueueTPtr
pq -> RdKafkaQueueTPtr -> RdKafkaQueueTPtr -> IO ()
rdKafkaQueueForward RdKafkaQueueTPtr
pq RdKafkaQueueTPtr
q
setRebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ())
-> KafkaConsumer
-> KafkaError
-> RdKafkaTopicPartitionListTPtr -> IO ()
setRebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ())
-> KafkaConsumer
-> KafkaError
-> ForeignPtr RdKafkaTopicPartitionListT
-> IO ()
setRebalanceCallback f :: KafkaConsumer -> RebalanceEvent -> IO ()
f k :: KafkaConsumer
k e :: KafkaError
e pls :: ForeignPtr RdKafkaTopicPartitionListT
pls = do
[TopicPartition]
ps <- ForeignPtr RdKafkaTopicPartitionListT -> IO [TopicPartition]
fromNativeTopicPartitionList'' ForeignPtr RdKafkaTopicPartitionListT
pls
let assignment :: [(TopicName, PartitionId)]
assignment = (TopicPartition -> TopicName
tpTopicName (TopicPartition -> TopicName)
-> (TopicPartition -> PartitionId)
-> TopicPartition
-> (TopicName, PartitionId)
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& TopicPartition -> PartitionId
tpPartition) (TopicPartition -> (TopicName, PartitionId))
-> [TopicPartition] -> [(TopicName, PartitionId)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [TopicPartition]
ps
let (Kafka kptr :: ForeignPtr RdKafkaT
kptr) = KafkaConsumer -> Kafka
forall a. HasKafka a => a -> Kafka
getKafka KafkaConsumer
k
case KafkaError
e of
KafkaResponseError RdKafkaRespErrAssignPartitions -> do
KafkaConsumer -> RebalanceEvent -> IO ()
f KafkaConsumer
k ([(TopicName, PartitionId)] -> RebalanceEvent
RebalanceBeforeAssign [(TopicName, PartitionId)]
assignment)
IO RdKafkaRespErrT -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO RdKafkaRespErrT -> IO ()) -> IO RdKafkaRespErrT -> IO ()
forall a b. (a -> b) -> a -> b
$ ForeignPtr RdKafkaT
-> ForeignPtr RdKafkaTopicPartitionListT -> IO RdKafkaRespErrT
rdKafkaAssign ForeignPtr RdKafkaT
kptr ForeignPtr RdKafkaTopicPartitionListT
pls
Maybe RdKafkaQueueTPtr
mbq <- KafkaConf -> IO (Maybe RdKafkaQueueTPtr)
forall k. HasKafkaConf k => k -> IO (Maybe RdKafkaQueueTPtr)
getRdMsgQueue (KafkaConf -> IO (Maybe RdKafkaQueueTPtr))
-> KafkaConf -> IO (Maybe RdKafkaQueueTPtr)
forall a b. (a -> b) -> a -> b
$ KafkaConsumer -> KafkaConf
forall a. HasKafkaConf a => a -> KafkaConf
getKafkaConf KafkaConsumer
k
case Maybe RdKafkaQueueTPtr
mbq of
Nothing -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just mq :: RdKafkaQueueTPtr
mq -> do
IO RdKafkaRespErrT -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO RdKafkaRespErrT -> IO ()) -> IO RdKafkaRespErrT -> IO ()
forall a b. (a -> b) -> a -> b
$ ForeignPtr RdKafkaT
-> ForeignPtr RdKafkaTopicPartitionListT -> IO RdKafkaRespErrT
rdKafkaPausePartitions ForeignPtr RdKafkaT
kptr ForeignPtr RdKafkaTopicPartitionListT
pls
[TopicPartition] -> (TopicPartition -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [TopicPartition]
ps (\tp :: TopicPartition
tp -> Kafka -> TopicName -> PartitionId -> RdKafkaQueueTPtr -> IO ()
redirectPartitionQueue (KafkaConsumer -> Kafka
forall a. HasKafka a => a -> Kafka
getKafka KafkaConsumer
k) (TopicPartition -> TopicName
tpTopicName TopicPartition
tp) (TopicPartition -> PartitionId
tpPartition TopicPartition
tp) RdKafkaQueueTPtr
mq)
IO RdKafkaRespErrT -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO RdKafkaRespErrT -> IO ()) -> IO RdKafkaRespErrT -> IO ()
forall a b. (a -> b) -> a -> b
$ ForeignPtr RdKafkaT
-> ForeignPtr RdKafkaTopicPartitionListT -> IO RdKafkaRespErrT
rdKafkaResumePartitions ForeignPtr RdKafkaT
kptr ForeignPtr RdKafkaTopicPartitionListT
pls
KafkaConsumer -> RebalanceEvent -> IO ()
f KafkaConsumer
k ([(TopicName, PartitionId)] -> RebalanceEvent
RebalanceAssign [(TopicName, PartitionId)]
assignment)
KafkaResponseError RdKafkaRespErrRevokePartitions -> do
KafkaConsumer -> RebalanceEvent -> IO ()
f KafkaConsumer
k ([(TopicName, PartitionId)] -> RebalanceEvent
RebalanceBeforeRevoke [(TopicName, PartitionId)]
assignment)
IO RdKafkaRespErrT -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO RdKafkaRespErrT -> IO ()) -> IO RdKafkaRespErrT -> IO ()
forall a b. (a -> b) -> a -> b
$ Ptr RdKafkaTopicPartitionListT
-> IO (ForeignPtr RdKafkaTopicPartitionListT)
forall a. Ptr a -> IO (ForeignPtr a)
newForeignPtr_ Ptr RdKafkaTopicPartitionListT
forall a. Ptr a
nullPtr IO (ForeignPtr RdKafkaTopicPartitionListT)
-> (ForeignPtr RdKafkaTopicPartitionListT -> IO RdKafkaRespErrT)
-> IO RdKafkaRespErrT
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ForeignPtr RdKafkaT
-> ForeignPtr RdKafkaTopicPartitionListT -> IO RdKafkaRespErrT
rdKafkaAssign ForeignPtr RdKafkaT
kptr
KafkaConsumer -> RebalanceEvent -> IO ()
f KafkaConsumer
k ([(TopicName, PartitionId)] -> RebalanceEvent
RebalanceRevoke [(TopicName, PartitionId)]
assignment)
x :: KafkaError
x -> String -> IO ()
forall a. HasCallStack => String -> a
error (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ "Rebalance: UNKNOWN response: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> KafkaError -> String
forall a. Show a => a -> String
show KafkaError
x