{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}
module Kafka.Metadata
( KafkaMetadata(..), BrokerMetadata(..), TopicMetadata(..), PartitionMetadata(..)
, WatermarkOffsets(..)
, GroupMemberId(..), GroupMemberInfo(..)
, GroupProtocolType(..), GroupProtocol(..), GroupState(..)
, GroupInfo(..)
, allTopicsMetadata, topicMetadata
, watermarkOffsets, watermarkOffsets'
, partitionWatermarkOffsets
, offsetsForTime, offsetsForTime', topicOffsetsForTime
, allConsumerGroupsInfo, consumerGroupInfo
)
where
import Control.Arrow (left)
import Control.Exception (bracket)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Data.Bifunctor (bimap)
import Data.ByteString (ByteString, pack)
import Data.Text (Text)
import Foreign (Storable (peek), peekArray, withForeignPtr)
import GHC.Generics (Generic)
import Kafka.Consumer.Convert (fromNativeTopicPartitionList'', toNativeTopicPartitionList)
import Kafka.Consumer.Types (ConsumerGroupId (..), Offset (..), PartitionOffset (..), TopicPartition (..))
import Kafka.Internal.RdKafka (RdKafkaGroupInfoT (..), RdKafkaGroupListT (..), RdKafkaGroupListTPtr, RdKafkaGroupMemberInfoT (..), RdKafkaMetadataBrokerT (..), RdKafkaMetadataPartitionT (..), RdKafkaMetadataT (..), RdKafkaMetadataTPtr, RdKafkaMetadataTopicT (..), RdKafkaRespErrT (..), RdKafkaTPtr, destroyUnmanagedRdKafkaTopic, newUnmanagedRdKafkaTopicT, peekCAText, rdKafkaListGroups, rdKafkaMetadata, rdKafkaOffsetsForTimes, rdKafkaQueryWatermarkOffsets)
import Kafka.Internal.Setup (HasKafka (..), Kafka (..))
import Kafka.Internal.Shared (kafkaErrorToMaybe)
import Kafka.Types (BrokerId (..), ClientId (..), KafkaError (..), Millis (..), PartitionId (..), Timeout (..), TopicName (..))
import qualified Data.Set as S
import qualified Data.Text as Text
data KafkaMetadata = KafkaMetadata
{ kmBrokers :: [BrokerMetadata]
, kmTopics :: [TopicMetadata]
, kmOrigBroker :: !BrokerId
} deriving (Show, Eq, Generic)
data BrokerMetadata = BrokerMetadata
{ bmBrokerId :: !BrokerId
, bmBrokerHost :: !Text
, bmBrokerPort :: !Int
} deriving (Show, Eq, Generic)
data PartitionMetadata = PartitionMetadata
{ pmPartitionId :: !PartitionId
, pmError :: Maybe KafkaError
, pmLeader :: !BrokerId
, pmReplicas :: [BrokerId]
, pmInSyncReplicas :: [BrokerId]
} deriving (Show, Eq, Generic)
data TopicMetadata = TopicMetadata
{ tmTopicName :: !TopicName
, tmPartitions :: [PartitionMetadata]
, tmError :: Maybe KafkaError
} deriving (Show, Eq, Generic)
data WatermarkOffsets = WatermarkOffsets
{ woTopicName :: !TopicName
, woPartitionId :: !PartitionId
, woLowWatermark :: !Offset
, woHighWatermark :: !Offset
} deriving (Show, Eq, Generic)
newtype GroupMemberId = GroupMemberId Text deriving (Show, Eq, Read, Ord)
data GroupMemberInfo = GroupMemberInfo
{ gmiMemberId :: !GroupMemberId
, gmiClientId :: !ClientId
, gmiClientHost :: !Text
, gmiMetadata :: !ByteString
, gmiAssignment :: !ByteString
} deriving (Show, Eq, Generic)
newtype GroupProtocolType = GroupProtocolType Text deriving (Show, Eq, Read, Ord, Generic)
newtype GroupProtocol = GroupProtocol Text deriving (Show, Eq, Read, Ord, Generic)
data GroupState
= GroupPreparingRebalance
| GroupEmpty
| GroupAwaitingSync
| GroupStable
| GroupDead
deriving (Show, Eq, Read, Ord, Generic)
data GroupInfo = GroupInfo
{ giGroup :: !ConsumerGroupId
, giError :: Maybe KafkaError
, giState :: !GroupState
, giProtocolType :: !GroupProtocolType
, giProtocol :: !GroupProtocol
, giMembers :: [GroupMemberInfo]
} deriving (Show, Eq, Generic)
allTopicsMetadata :: (MonadIO m, HasKafka k) => k -> Timeout -> m (Either KafkaError KafkaMetadata)
allTopicsMetadata k (Timeout timeout) = liftIO $ do
meta <- rdKafkaMetadata (getKafkaPtr k) True Nothing timeout
traverse fromKafkaMetadataPtr (left KafkaResponseError meta)
topicMetadata :: (MonadIO m, HasKafka k) => k -> Timeout -> TopicName -> m (Either KafkaError KafkaMetadata)
topicMetadata k (Timeout timeout) (TopicName tn) = liftIO $
bracket mkTopic clTopic $ \mbt -> case mbt of
Left err -> return (Left $ KafkaError (Text.pack err))
Right t -> do
meta <- rdKafkaMetadata (getKafkaPtr k) False (Just t) timeout
traverse fromKafkaMetadataPtr (left KafkaResponseError meta)
where
mkTopic = newUnmanagedRdKafkaTopicT (getKafkaPtr k) (Text.unpack tn) Nothing
clTopic = either (return . const ()) destroyUnmanagedRdKafkaTopic
watermarkOffsets :: (MonadIO m, HasKafka k) => k -> Timeout -> TopicName -> m [Either KafkaError WatermarkOffsets]
watermarkOffsets k timeout t = do
meta <- topicMetadata k timeout t
case meta of
Left err -> return [Left err]
Right tm -> if null (kmTopics tm)
then return []
else watermarkOffsets' k timeout (head $ kmTopics tm)
watermarkOffsets' :: (MonadIO m, HasKafka k) => k -> Timeout -> TopicMetadata -> m [Either KafkaError WatermarkOffsets]
watermarkOffsets' k timeout tm =
let pids = pmPartitionId <$> tmPartitions tm
in liftIO $ traverse (partitionWatermarkOffsets k timeout (tmTopicName tm)) pids
partitionWatermarkOffsets :: (MonadIO m, HasKafka k) => k -> Timeout -> TopicName -> PartitionId -> m (Either KafkaError WatermarkOffsets)
partitionWatermarkOffsets k (Timeout timeout) (TopicName t) (PartitionId p) = liftIO $ do
offs <- rdKafkaQueryWatermarkOffsets (getKafkaPtr k) (Text.unpack t) p timeout
return $ bimap KafkaResponseError toWatermark offs
where
toWatermark (l, h) = WatermarkOffsets (TopicName t) (PartitionId p) (Offset l) (Offset h)
topicOffsetsForTime :: (MonadIO m, HasKafka k) => k -> Timeout -> Millis -> TopicName -> m (Either KafkaError [TopicPartition])
topicOffsetsForTime k timeout timestamp topic = do
meta <- topicMetadata k timeout topic
case meta of
Left err -> return $ Left err
Right meta' ->
let tps = [(tmTopicName t, pmPartitionId p)| t <- kmTopics meta', p <- tmPartitions t]
in offsetsForTime k timeout timestamp tps
offsetsForTime' :: (MonadIO m, HasKafka k) => k -> Timeout -> Millis -> TopicMetadata -> m (Either KafkaError [TopicPartition])
offsetsForTime' k timeout timestamp t =
let tps = [(tmTopicName t, pmPartitionId p) | p <- tmPartitions t]
in offsetsForTime k timeout timestamp tps
offsetsForTime :: (MonadIO m, HasKafka k) => k -> Timeout -> Millis -> [(TopicName, PartitionId)] -> m (Either KafkaError [TopicPartition])
offsetsForTime k (Timeout timeout) (Millis t) tps = liftIO $ do
ntps <- toNativeTopicPartitionList $ mkTopicPartition <$> uniqueTps
res <- rdKafkaOffsetsForTimes (getKafkaPtr k) ntps timeout
case res of
RdKafkaRespErrNoError -> Right <$> fromNativeTopicPartitionList'' ntps
err -> return $ Left (KafkaResponseError err)
where
uniqueTps = S.toList . S.fromList $ tps
mkTopicPartition (tn, p) = TopicPartition tn p (PartitionOffset t)
allConsumerGroupsInfo :: (MonadIO m, HasKafka k) => k -> Timeout -> m (Either KafkaError [GroupInfo])
allConsumerGroupsInfo k (Timeout t) = liftIO $ do
res <- rdKafkaListGroups (getKafkaPtr k) Nothing t
traverse fromGroupInfoListPtr (left KafkaResponseError res)
consumerGroupInfo :: (MonadIO m, HasKafka k) => k -> Timeout -> ConsumerGroupId -> m (Either KafkaError [GroupInfo])
consumerGroupInfo k (Timeout timeout) (ConsumerGroupId gn) = liftIO $ do
res <- rdKafkaListGroups (getKafkaPtr k) (Just (Text.unpack gn)) timeout
traverse fromGroupInfoListPtr (left KafkaResponseError res)
getKafkaPtr :: HasKafka k => k -> RdKafkaTPtr
getKafkaPtr k = let (Kafka k') = getKafka k in k'
{-# INLINE getKafkaPtr #-}
fromGroupInfoListPtr :: RdKafkaGroupListTPtr -> IO [GroupInfo]
fromGroupInfoListPtr ptr =
withForeignPtr ptr $ \realPtr -> do
gl <- peek realPtr
pgis <- peekArray (groupCnt'RdKafkaGroupListT gl) (groups'RdKafkaGroupListT gl)
traverse fromGroupInfoPtr pgis
fromGroupInfoPtr :: RdKafkaGroupInfoT -> IO GroupInfo
fromGroupInfoPtr gi = do
cid <- peekCAText $ group'RdKafkaGroupInfoT gi
stt <- peekCAText $ state'RdKafkaGroupInfoT gi
prt <- peekCAText $ protocolType'RdKafkaGroupInfoT gi
pr <- peekCAText $ protocol'RdKafkaGroupInfoT gi
mbs <- peekArray (memberCnt'RdKafkaGroupInfoT gi) (members'RdKafkaGroupInfoT gi)
mbl <- mapM fromGroupMemberInfoPtr mbs
return GroupInfo
{
giGroup = ConsumerGroupId cid
, giError = kafkaErrorToMaybe $ KafkaResponseError (err'RdKafkaGroupInfoT gi)
, giState = groupStateFromKafkaString stt
, giProtocolType = GroupProtocolType prt
, giProtocol = GroupProtocol pr
, giMembers = mbl
}
fromGroupMemberInfoPtr :: RdKafkaGroupMemberInfoT -> IO GroupMemberInfo
fromGroupMemberInfoPtr mi = do
mid <- peekCAText $ memberId'RdKafkaGroupMemberInfoT mi
cid <- peekCAText $ clientId'RdKafkaGroupMemberInfoT mi
hst <- peekCAText $ clientHost'RdKafkaGroupMemberInfoT mi
mtd <- peekArray (memberMetadataSize'RdKafkaGroupMemberInfoT mi) (memberMetadata'RdKafkaGroupMemberInfoT mi)
ass <- peekArray (memberAssignmentSize'RdKafkaGroupMemberInfoT mi) (memberAssignment'RdKafkaGroupMemberInfoT mi)
return GroupMemberInfo
{ gmiMemberId = GroupMemberId mid
, gmiClientId = ClientId cid
, gmiClientHost = hst
, gmiMetadata = pack mtd
, gmiAssignment = pack ass
}
fromTopicMetadataPtr :: RdKafkaMetadataTopicT -> IO TopicMetadata
fromTopicMetadataPtr tm = do
tnm <- peekCAText (topic'RdKafkaMetadataTopicT tm)
pts <- peekArray (partitionCnt'RdKafkaMetadataTopicT tm) (partitions'RdKafkaMetadataTopicT tm)
pms <- mapM fromPartitionMetadataPtr pts
return TopicMetadata
{ tmTopicName = TopicName tnm
, tmPartitions = pms
, tmError = kafkaErrorToMaybe $ KafkaResponseError (err'RdKafkaMetadataTopicT tm)
}
fromPartitionMetadataPtr :: RdKafkaMetadataPartitionT -> IO PartitionMetadata
fromPartitionMetadataPtr pm = do
reps <- peekArray (replicaCnt'RdKafkaMetadataPartitionT pm) (replicas'RdKafkaMetadataPartitionT pm)
isrs <- peekArray (isrCnt'RdKafkaMetadataPartitionT pm) (isrs'RdKafkaMetadataPartitionT pm)
return PartitionMetadata
{ pmPartitionId = PartitionId (id'RdKafkaMetadataPartitionT pm)
, pmError = kafkaErrorToMaybe $ KafkaResponseError (err'RdKafkaMetadataPartitionT pm)
, pmLeader = BrokerId (leader'RdKafkaMetadataPartitionT pm)
, pmReplicas = (BrokerId . fromIntegral) <$> reps
, pmInSyncReplicas = (BrokerId . fromIntegral) <$> isrs
}
fromBrokerMetadataPtr :: RdKafkaMetadataBrokerT -> IO BrokerMetadata
fromBrokerMetadataPtr bm = do
host <- peekCAText (host'RdKafkaMetadataBrokerT bm)
return BrokerMetadata
{ bmBrokerId = BrokerId (id'RdKafkaMetadataBrokerT bm)
, bmBrokerHost = host
, bmBrokerPort = port'RdKafkaMetadataBrokerT bm
}
fromKafkaMetadataPtr :: RdKafkaMetadataTPtr -> IO KafkaMetadata
fromKafkaMetadataPtr ptr =
withForeignPtr ptr $ \realPtr -> do
km <- peek realPtr
pbms <- peekArray (brokerCnt'RdKafkaMetadataT km) (brokers'RdKafkaMetadataT km)
bms <- mapM fromBrokerMetadataPtr pbms
ptms <- peekArray (topicCnt'RdKafkaMetadataT km) (topics'RdKafkaMetadataT km)
tms <- mapM fromTopicMetadataPtr ptms
return KafkaMetadata
{ kmBrokers = bms
, kmTopics = tms
, kmOrigBroker = BrokerId $ fromIntegral (origBrokerId'RdKafkaMetadataT km)
}
groupStateFromKafkaString :: Text -> GroupState
groupStateFromKafkaString s = case s of
"PreparingRebalance" -> GroupPreparingRebalance
"AwaitingSync" -> GroupAwaitingSync
"Stable" -> GroupStable
"Dead" -> GroupDead
"Empty" -> GroupEmpty
_ -> error $ "Unknown group state: " <> (Text.unpack s)