module Kafka.Consumer.Convert
where
import Control.Monad
import qualified Data.ByteString as BS
import Data.Map.Strict (Map, fromListWith)
import qualified Data.Set as S
import Foreign
import Foreign.C.Error
import Foreign.C.String
import Kafka.Consumer.Types
import Kafka.Internal.RdKafka
import Kafka.Internal.Shared
import Kafka.Types
offsetSyncToInt :: OffsetStoreSync -> Int
offsetSyncToInt sync =
case sync of
OffsetSyncDisable -> -1
OffsetSyncImmediate -> 0
OffsetSyncInterval ms -> ms
{-# INLINE offsetSyncToInt #-}
offsetToInt64 :: PartitionOffset -> Int64
offsetToInt64 o = case o of
PartitionOffsetBeginning -> -2
PartitionOffsetEnd -> -1
PartitionOffset off -> off
PartitionOffsetStored -> -1000
PartitionOffsetInvalid -> -1001
{-# INLINE offsetToInt64 #-}
int64ToOffset :: Int64 -> PartitionOffset
int64ToOffset o
| o == -2 = PartitionOffsetBeginning
| o == -1 = PartitionOffsetEnd
| o == -1000 = PartitionOffsetStored
| o >= 0 = PartitionOffset o
| otherwise = PartitionOffsetInvalid
{-# INLINE int64ToOffset #-}
fromNativeTopicPartitionList'' :: RdKafkaTopicPartitionListTPtr -> IO [TopicPartition]
fromNativeTopicPartitionList'' ptr =
withForeignPtr ptr $ \fptr -> fromNativeTopicPartitionList' fptr
fromNativeTopicPartitionList' :: Ptr RdKafkaTopicPartitionListT -> IO [TopicPartition]
fromNativeTopicPartitionList' ppl =
if ppl == nullPtr
then return []
else peek ppl >>= fromNativeTopicPartitionList
fromNativeTopicPartitionList :: RdKafkaTopicPartitionListT -> IO [TopicPartition]
fromNativeTopicPartitionList pl =
let count = cnt'RdKafkaTopicPartitionListT pl
elems = elems'RdKafkaTopicPartitionListT pl
in mapM (peekElemOff elems >=> toPart) [0..(fromIntegral count - 1)]
where
toPart :: RdKafkaTopicPartitionT -> IO TopicPartition
toPart p = do
topic <- peekCString $ topic'RdKafkaTopicPartitionT p
return TopicPartition {
tpTopicName = TopicName topic,
tpPartition = PartitionId $ partition'RdKafkaTopicPartitionT p,
tpOffset = int64ToOffset $ offset'RdKafkaTopicPartitionT p
}
toNativeTopicPartitionList :: [TopicPartition] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionList ps = do
pl <- newRdKafkaTopicPartitionListT (length ps)
mapM_ (\p -> do
let TopicName tn = tpTopicName p
(PartitionId tp) = tpPartition p
to = offsetToInt64 $ tpOffset p
_ <- rdKafkaTopicPartitionListAdd pl tn tp
rdKafkaTopicPartitionListSetOffset pl tn tp to) ps
return pl
toNativeTopicPartitionList' :: [(TopicName, PartitionId)] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionList' tps = do
let utps = S.toList . S.fromList $ tps
pl <- newRdKafkaTopicPartitionListT (length utps)
mapM_ (\(TopicName t, PartitionId p) -> rdKafkaTopicPartitionListAdd pl t p) utps
return pl
topicPartitionFromMessage :: ConsumerRecord k v -> TopicPartition
topicPartitionFromMessage m =
let (Offset moff) = crOffset m
in TopicPartition (crTopic m) (crPartition m) (PartitionOffset moff)
topicPartitionFromMessageForCommit :: ConsumerRecord k v -> TopicPartition
topicPartitionFromMessageForCommit m =
let (TopicPartition t p (PartitionOffset moff)) = topicPartitionFromMessage m
in TopicPartition t p (PartitionOffset $ moff + 1)
toMap :: Ord k => [(k, v)] -> Map k [v]
toMap kvs = fromListWith (++) [(k, [v]) | (k, v) <- kvs]
fromMessagePtr :: RdKafkaMessageTPtr -> IO (Either KafkaError (ConsumerRecord (Maybe BS.ByteString) (Maybe BS.ByteString)))
fromMessagePtr ptr =
withForeignPtr ptr $ \realPtr ->
if realPtr == nullPtr then (Left . kafkaRespErr) <$> getErrno
else do
s <- peek realPtr
msg <- if err'RdKafkaMessageT s /= RdKafkaRespErrNoError
then return . Left . KafkaResponseError $ err'RdKafkaMessageT s
else Right <$> mkRecord s
rdKafkaMessageDestroy realPtr
return msg
where
mkRecord msg = do
topic <- readTopic msg
key <- readKey msg
payload <- readPayload msg
timestamp <- readTimestamp ptr
return ConsumerRecord
{ crTopic = TopicName topic
, crPartition = PartitionId $ partition'RdKafkaMessageT msg
, crOffset = Offset $ offset'RdKafkaMessageT msg
, crTimestamp = timestamp
, crKey = key
, crValue = payload
}
readTopic msg = newForeignPtr_ (topic'RdKafkaMessageT msg) >>= rdKafkaTopicName
readPayload = readBS len'RdKafkaMessageT payload'RdKafkaMessageT
readKey = readBS keyLen'RdKafkaMessageT key'RdKafkaMessageT
readTimestamp msg =
alloca $ \p -> do
typeP <- newForeignPtr_ p
ts <- fromIntegral <$> rdKafkaMessageTimestamp msg typeP
tsType <- peek p
return $ case tsType of
RdKafkaTimestampCreateTime -> CreateTime (Millis ts)
RdKafkaTimestampLogAppendTime -> LogAppendTime (Millis ts)
RdKafkaTimestampNotAvailable -> NoTimestamp
readBS flen fdata s = if fdata s == nullPtr
then return Nothing
else Just <$> word8PtrToBS (flen s) (fdata s)
offsetCommitToBool :: OffsetCommit -> Bool
offsetCommitToBool OffsetCommit = False
offsetCommitToBool OffsetCommitAsync = True