module Kafka.Consumer.Convert
( offsetSyncToInt
, offsetToInt64
, int64ToOffset
, fromNativeTopicPartitionList''
, fromNativeTopicPartitionList'
, fromNativeTopicPartitionList
, toNativeTopicPartitionList
, toNativeTopicPartitionListNoDispose
, toNativeTopicPartitionList'
, topicPartitionFromMessage
, topicPartitionFromMessageForCommit
, toMap
, fromMessagePtr
, offsetCommitToBool
)
where
import Control.Monad ((>=>))
import qualified Data.ByteString as BS
import Data.Either (fromRight)
import Data.Int (Int64)
import Data.Map.Strict (Map, fromListWith)
import qualified Data.Set as S
import qualified Data.Text as Text
import Foreign.Ptr (Ptr, nullPtr)
import Foreign.ForeignPtr (withForeignPtr)
import Foreign.Storable (Storable(..))
import Foreign.C.Error (getErrno)
import Kafka.Consumer.Types (ConsumerRecord(..), TopicPartition(..), Offset(..), OffsetCommit(..), PartitionOffset(..), OffsetStoreSync(..))
import Kafka.Internal.RdKafka
( RdKafkaRespErrT(..)
, RdKafkaMessageT(..)
, RdKafkaTopicPartitionListTPtr
, RdKafkaTopicPartitionListT(..)
, RdKafkaMessageTPtr
, RdKafkaTopicPartitionT(..)
, rdKafkaTopicPartitionListAdd
, newRdKafkaTopicPartitionListT
, rdKafkaMessageDestroy
, rdKafkaTopicPartitionListSetOffset
, rdKafkaTopicPartitionListNew
, peekCText
)
import Kafka.Internal.Shared (kafkaRespErr, readHeaders, readTopic, readKey, readPayload, readTimestamp)
import Kafka.Types (KafkaError(..), PartitionId(..), TopicName(..))
offsetSyncToInt :: OffsetStoreSync -> Int
offsetSyncToInt :: OffsetStoreSync -> Int
offsetSyncToInt sync :: OffsetStoreSync
sync =
case OffsetStoreSync
sync of
OffsetSyncDisable -> -1
OffsetSyncImmediate -> 0
OffsetSyncInterval ms :: Int
ms -> Int
ms
{-# INLINE offsetSyncToInt #-}
offsetToInt64 :: PartitionOffset -> Int64
offsetToInt64 :: PartitionOffset -> Int64
offsetToInt64 o :: PartitionOffset
o = case PartitionOffset
o of
PartitionOffsetBeginning -> -2
PartitionOffsetEnd -> -1
PartitionOffset off :: Int64
off -> Int64
off
PartitionOffsetStored -> -1000
PartitionOffsetInvalid -> -1001
{-# INLINE offsetToInt64 #-}
int64ToOffset :: Int64 -> PartitionOffset
int64ToOffset :: Int64 -> PartitionOffset
int64ToOffset o :: Int64
o
| Int64
o Int64 -> Int64 -> Bool
forall a. Eq a => a -> a -> Bool
== -2 = PartitionOffset
PartitionOffsetBeginning
| Int64
o Int64 -> Int64 -> Bool
forall a. Eq a => a -> a -> Bool
== -1 = PartitionOffset
PartitionOffsetEnd
| Int64
o Int64 -> Int64 -> Bool
forall a. Eq a => a -> a -> Bool
== -1000 = PartitionOffset
PartitionOffsetStored
| Int64
o Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
>= 0 = Int64 -> PartitionOffset
PartitionOffset Int64
o
| Bool
otherwise = PartitionOffset
PartitionOffsetInvalid
{-# INLINE int64ToOffset #-}
fromNativeTopicPartitionList'' :: RdKafkaTopicPartitionListTPtr -> IO [TopicPartition]
fromNativeTopicPartitionList'' :: RdKafkaTopicPartitionListTPtr -> IO [TopicPartition]
fromNativeTopicPartitionList'' ptr :: RdKafkaTopicPartitionListTPtr
ptr =
RdKafkaTopicPartitionListTPtr
-> (Ptr RdKafkaTopicPartitionListT -> IO [TopicPartition])
-> IO [TopicPartition]
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr RdKafkaTopicPartitionListTPtr
ptr ((Ptr RdKafkaTopicPartitionListT -> IO [TopicPartition])
-> IO [TopicPartition])
-> (Ptr RdKafkaTopicPartitionListT -> IO [TopicPartition])
-> IO [TopicPartition]
forall a b. (a -> b) -> a -> b
$ \fptr :: Ptr RdKafkaTopicPartitionListT
fptr -> Ptr RdKafkaTopicPartitionListT -> IO [TopicPartition]
fromNativeTopicPartitionList' Ptr RdKafkaTopicPartitionListT
fptr
fromNativeTopicPartitionList' :: Ptr RdKafkaTopicPartitionListT -> IO [TopicPartition]
fromNativeTopicPartitionList' :: Ptr RdKafkaTopicPartitionListT -> IO [TopicPartition]
fromNativeTopicPartitionList' ppl :: Ptr RdKafkaTopicPartitionListT
ppl =
if Ptr RdKafkaTopicPartitionListT
ppl Ptr RdKafkaTopicPartitionListT
-> Ptr RdKafkaTopicPartitionListT -> Bool
forall a. Eq a => a -> a -> Bool
== Ptr RdKafkaTopicPartitionListT
forall a. Ptr a
nullPtr
then [TopicPartition] -> IO [TopicPartition]
forall (m :: * -> *) a. Monad m => a -> m a
return []
else Ptr RdKafkaTopicPartitionListT -> IO RdKafkaTopicPartitionListT
forall a. Storable a => Ptr a -> IO a
peek Ptr RdKafkaTopicPartitionListT
ppl IO RdKafkaTopicPartitionListT
-> (RdKafkaTopicPartitionListT -> IO [TopicPartition])
-> IO [TopicPartition]
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= RdKafkaTopicPartitionListT -> IO [TopicPartition]
fromNativeTopicPartitionList
fromNativeTopicPartitionList :: RdKafkaTopicPartitionListT -> IO [TopicPartition]
fromNativeTopicPartitionList :: RdKafkaTopicPartitionListT -> IO [TopicPartition]
fromNativeTopicPartitionList pl :: RdKafkaTopicPartitionListT
pl =
let count :: Int
count = RdKafkaTopicPartitionListT -> Int
cnt'RdKafkaTopicPartitionListT RdKafkaTopicPartitionListT
pl
elems :: Ptr RdKafkaTopicPartitionT
elems = RdKafkaTopicPartitionListT -> Ptr RdKafkaTopicPartitionT
elems'RdKafkaTopicPartitionListT RdKafkaTopicPartitionListT
pl
in (Int -> IO TopicPartition) -> [Int] -> IO [TopicPartition]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (Ptr RdKafkaTopicPartitionT -> Int -> IO RdKafkaTopicPartitionT
forall a. Storable a => Ptr a -> Int -> IO a
peekElemOff Ptr RdKafkaTopicPartitionT
elems (Int -> IO RdKafkaTopicPartitionT)
-> (RdKafkaTopicPartitionT -> IO TopicPartition)
-> Int
-> IO TopicPartition
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> RdKafkaTopicPartitionT -> IO TopicPartition
toPart) [0..(Int -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
count Int -> Int -> Int
forall a. Num a => a -> a -> a
- 1)]
where
toPart :: RdKafkaTopicPartitionT -> IO TopicPartition
toPart :: RdKafkaTopicPartitionT -> IO TopicPartition
toPart p :: RdKafkaTopicPartitionT
p = do
Text
topic <- CString -> IO Text
peekCText (CString -> IO Text) -> CString -> IO Text
forall a b. (a -> b) -> a -> b
$ RdKafkaTopicPartitionT -> CString
topic'RdKafkaTopicPartitionT RdKafkaTopicPartitionT
p
TopicPartition -> IO TopicPartition
forall (m :: * -> *) a. Monad m => a -> m a
return TopicPartition :: TopicName -> PartitionId -> PartitionOffset -> TopicPartition
TopicPartition {
tpTopicName :: TopicName
tpTopicName = Text -> TopicName
TopicName Text
topic,
tpPartition :: PartitionId
tpPartition = Int -> PartitionId
PartitionId (Int -> PartitionId) -> Int -> PartitionId
forall a b. (a -> b) -> a -> b
$ RdKafkaTopicPartitionT -> Int
partition'RdKafkaTopicPartitionT RdKafkaTopicPartitionT
p,
tpOffset :: PartitionOffset
tpOffset = Int64 -> PartitionOffset
int64ToOffset (Int64 -> PartitionOffset) -> Int64 -> PartitionOffset
forall a b. (a -> b) -> a -> b
$ RdKafkaTopicPartitionT -> Int64
offset'RdKafkaTopicPartitionT RdKafkaTopicPartitionT
p
}
toNativeTopicPartitionList :: [TopicPartition] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionList :: [TopicPartition] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionList ps :: [TopicPartition]
ps = do
RdKafkaTopicPartitionListTPtr
pl <- Int -> IO RdKafkaTopicPartitionListTPtr
newRdKafkaTopicPartitionListT ([TopicPartition] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [TopicPartition]
ps)
(TopicPartition -> IO RdKafkaRespErrT) -> [TopicPartition] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\p :: TopicPartition
p -> do
let TopicName tn :: Text
tn = TopicPartition -> TopicName
tpTopicName TopicPartition
p
(PartitionId tp :: Int
tp) = TopicPartition -> PartitionId
tpPartition TopicPartition
p
to :: Int64
to = PartitionOffset -> Int64
offsetToInt64 (PartitionOffset -> Int64) -> PartitionOffset -> Int64
forall a b. (a -> b) -> a -> b
$ TopicPartition -> PartitionOffset
tpOffset TopicPartition
p
tnS :: String
tnS = Text -> String
Text.unpack Text
tn
RdKafkaTopicPartitionTPtr
_ <- RdKafkaTopicPartitionListTPtr
-> String -> Int -> IO RdKafkaTopicPartitionTPtr
rdKafkaTopicPartitionListAdd RdKafkaTopicPartitionListTPtr
pl String
tnS Int
tp
RdKafkaTopicPartitionListTPtr
-> String -> Int -> Int64 -> IO RdKafkaRespErrT
rdKafkaTopicPartitionListSetOffset RdKafkaTopicPartitionListTPtr
pl String
tnS Int
tp Int64
to) [TopicPartition]
ps
RdKafkaTopicPartitionListTPtr -> IO RdKafkaTopicPartitionListTPtr
forall (m :: * -> *) a. Monad m => a -> m a
return RdKafkaTopicPartitionListTPtr
pl
toNativeTopicPartitionListNoDispose :: [TopicPartition] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionListNoDispose :: [TopicPartition] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionListNoDispose ps :: [TopicPartition]
ps = do
RdKafkaTopicPartitionListTPtr
pl <- Int -> IO RdKafkaTopicPartitionListTPtr
rdKafkaTopicPartitionListNew ([TopicPartition] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [TopicPartition]
ps)
(TopicPartition -> IO RdKafkaRespErrT) -> [TopicPartition] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\p :: TopicPartition
p -> do
let TopicName tn :: Text
tn = TopicPartition -> TopicName
tpTopicName TopicPartition
p
(PartitionId tp :: Int
tp) = TopicPartition -> PartitionId
tpPartition TopicPartition
p
to :: Int64
to = PartitionOffset -> Int64
offsetToInt64 (PartitionOffset -> Int64) -> PartitionOffset -> Int64
forall a b. (a -> b) -> a -> b
$ TopicPartition -> PartitionOffset
tpOffset TopicPartition
p
tnS :: String
tnS = Text -> String
Text.unpack Text
tn
RdKafkaTopicPartitionTPtr
_ <- RdKafkaTopicPartitionListTPtr
-> String -> Int -> IO RdKafkaTopicPartitionTPtr
rdKafkaTopicPartitionListAdd RdKafkaTopicPartitionListTPtr
pl String
tnS Int
tp
RdKafkaTopicPartitionListTPtr
-> String -> Int -> Int64 -> IO RdKafkaRespErrT
rdKafkaTopicPartitionListSetOffset RdKafkaTopicPartitionListTPtr
pl String
tnS Int
tp Int64
to) [TopicPartition]
ps
RdKafkaTopicPartitionListTPtr -> IO RdKafkaTopicPartitionListTPtr
forall (m :: * -> *) a. Monad m => a -> m a
return RdKafkaTopicPartitionListTPtr
pl
toNativeTopicPartitionList' :: [(TopicName, PartitionId)] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionList' :: [(TopicName, PartitionId)] -> IO RdKafkaTopicPartitionListTPtr
toNativeTopicPartitionList' tps :: [(TopicName, PartitionId)]
tps = do
let utps :: [(TopicName, PartitionId)]
utps = Set (TopicName, PartitionId) -> [(TopicName, PartitionId)]
forall a. Set a -> [a]
S.toList (Set (TopicName, PartitionId) -> [(TopicName, PartitionId)])
-> ([(TopicName, PartitionId)] -> Set (TopicName, PartitionId))
-> [(TopicName, PartitionId)]
-> [(TopicName, PartitionId)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [(TopicName, PartitionId)] -> Set (TopicName, PartitionId)
forall a. Ord a => [a] -> Set a
S.fromList ([(TopicName, PartitionId)] -> [(TopicName, PartitionId)])
-> [(TopicName, PartitionId)] -> [(TopicName, PartitionId)]
forall a b. (a -> b) -> a -> b
$ [(TopicName, PartitionId)]
tps
RdKafkaTopicPartitionListTPtr
pl <- Int -> IO RdKafkaTopicPartitionListTPtr
newRdKafkaTopicPartitionListT ([(TopicName, PartitionId)] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(TopicName, PartitionId)]
utps)
((TopicName, PartitionId) -> IO RdKafkaTopicPartitionTPtr)
-> [(TopicName, PartitionId)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(TopicName t :: Text
t, PartitionId p :: Int
p) -> RdKafkaTopicPartitionListTPtr
-> String -> Int -> IO RdKafkaTopicPartitionTPtr
rdKafkaTopicPartitionListAdd RdKafkaTopicPartitionListTPtr
pl (Text -> String
Text.unpack Text
t) Int
p) [(TopicName, PartitionId)]
utps
RdKafkaTopicPartitionListTPtr -> IO RdKafkaTopicPartitionListTPtr
forall (m :: * -> *) a. Monad m => a -> m a
return RdKafkaTopicPartitionListTPtr
pl
topicPartitionFromMessage :: ConsumerRecord k v -> TopicPartition
topicPartitionFromMessage :: ConsumerRecord k v -> TopicPartition
topicPartitionFromMessage m :: ConsumerRecord k v
m =
let (Offset moff :: Int64
moff) = ConsumerRecord k v -> Offset
forall k v. ConsumerRecord k v -> Offset
crOffset ConsumerRecord k v
m
in TopicName -> PartitionId -> PartitionOffset -> TopicPartition
TopicPartition (ConsumerRecord k v -> TopicName
forall k v. ConsumerRecord k v -> TopicName
crTopic ConsumerRecord k v
m) (ConsumerRecord k v -> PartitionId
forall k v. ConsumerRecord k v -> PartitionId
crPartition ConsumerRecord k v
m) (Int64 -> PartitionOffset
PartitionOffset Int64
moff)
topicPartitionFromMessageForCommit :: ConsumerRecord k v -> TopicPartition
topicPartitionFromMessageForCommit :: ConsumerRecord k v -> TopicPartition
topicPartitionFromMessageForCommit m :: ConsumerRecord k v
m =
case ConsumerRecord k v -> TopicPartition
forall k v. ConsumerRecord k v -> TopicPartition
topicPartitionFromMessage ConsumerRecord k v
m of
(TopicPartition t :: TopicName
t p :: PartitionId
p (PartitionOffset moff :: Int64
moff)) -> TopicName -> PartitionId -> PartitionOffset -> TopicPartition
TopicPartition TopicName
t PartitionId
p (Int64 -> PartitionOffset
PartitionOffset (Int64 -> PartitionOffset) -> Int64 -> PartitionOffset
forall a b. (a -> b) -> a -> b
$ Int64
moff Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
+ 1)
other :: TopicPartition
other -> TopicPartition
other
toMap :: Ord k => [(k, v)] -> Map k [v]
toMap :: [(k, v)] -> Map k [v]
toMap kvs :: [(k, v)]
kvs = ([v] -> [v] -> [v]) -> [(k, [v])] -> Map k [v]
forall k a. Ord k => (a -> a -> a) -> [(k, a)] -> Map k a
fromListWith [v] -> [v] -> [v]
forall a. [a] -> [a] -> [a]
(++) [(k
k, [v
v]) | (k :: k
k, v :: v
v) <- [(k, v)]
kvs]
fromMessagePtr :: RdKafkaMessageTPtr -> IO (Either KafkaError (ConsumerRecord (Maybe BS.ByteString) (Maybe BS.ByteString)))
fromMessagePtr :: RdKafkaMessageTPtr
-> IO
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
fromMessagePtr ptr :: RdKafkaMessageTPtr
ptr =
RdKafkaMessageTPtr
-> (Ptr RdKafkaMessageT
-> IO
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))))
-> IO
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr RdKafkaMessageTPtr
ptr ((Ptr RdKafkaMessageT
-> IO
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))))
-> IO
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))))
-> (Ptr RdKafkaMessageT
-> IO
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))))
-> IO
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall a b. (a -> b) -> a -> b
$ \realPtr :: Ptr RdKafkaMessageT
realPtr ->
if Ptr RdKafkaMessageT
realPtr Ptr RdKafkaMessageT -> Ptr RdKafkaMessageT -> Bool
forall a. Eq a => a -> a -> Bool
== Ptr RdKafkaMessageT
forall a. Ptr a
nullPtr then KafkaError
-> Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
forall a b. a -> Either a b
Left (KafkaError
-> Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
-> (Errno -> KafkaError)
-> Errno
-> Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Errno -> KafkaError
kafkaRespErr (Errno
-> Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
-> IO Errno
-> IO
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO Errno
getErrno
else do
RdKafkaMessageT
s <- Ptr RdKafkaMessageT -> IO RdKafkaMessageT
forall a. Storable a => Ptr a -> IO a
peek Ptr RdKafkaMessageT
realPtr
Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
msg <- if RdKafkaMessageT -> RdKafkaRespErrT
err'RdKafkaMessageT RdKafkaMessageT
s RdKafkaRespErrT -> RdKafkaRespErrT -> Bool
forall a. Eq a => a -> a -> Bool
/= RdKafkaRespErrT
RdKafkaRespErrNoError
then Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> IO
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> IO
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))))
-> (RdKafkaRespErrT
-> Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
-> RdKafkaRespErrT
-> IO
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. KafkaError
-> Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
forall a b. a -> Either a b
Left (KafkaError
-> Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
-> (RdKafkaRespErrT -> KafkaError)
-> RdKafkaRespErrT
-> Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RdKafkaRespErrT -> KafkaError
KafkaResponseError (RdKafkaRespErrT
-> IO
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))))
-> RdKafkaRespErrT
-> IO
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall a b. (a -> b) -> a -> b
$ RdKafkaMessageT -> RdKafkaRespErrT
err'RdKafkaMessageT RdKafkaMessageT
s
else ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
forall a b. b -> Either a b
Right (ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
-> IO (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> IO
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RdKafkaMessageT
-> Ptr RdKafkaMessageT
-> IO (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
mkRecord RdKafkaMessageT
s Ptr RdKafkaMessageT
realPtr
Ptr RdKafkaMessageT -> IO ()
rdKafkaMessageDestroy Ptr RdKafkaMessageT
realPtr
Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
-> IO
(Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString)))
forall (m :: * -> *) a. Monad m => a -> m a
return Either
KafkaError (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
msg
where
mkRecord :: RdKafkaMessageT
-> Ptr RdKafkaMessageT
-> IO (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
mkRecord msg :: RdKafkaMessageT
msg rptr :: Ptr RdKafkaMessageT
rptr = do
Text
topic <- RdKafkaMessageT -> IO Text
readTopic RdKafkaMessageT
msg
Maybe ByteString
key <- RdKafkaMessageT -> IO (Maybe ByteString)
readKey RdKafkaMessageT
msg
Maybe ByteString
payload <- RdKafkaMessageT -> IO (Maybe ByteString)
readPayload RdKafkaMessageT
msg
Timestamp
timestamp <- RdKafkaMessageTPtr -> IO Timestamp
readTimestamp RdKafkaMessageTPtr
ptr
Headers
headers <- Headers -> Either RdKafkaRespErrT Headers -> Headers
forall b a. b -> Either a b -> b
fromRight Headers
forall a. Monoid a => a
mempty (Either RdKafkaRespErrT Headers -> Headers)
-> IO (Either RdKafkaRespErrT Headers) -> IO Headers
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Ptr RdKafkaMessageT -> IO (Either RdKafkaRespErrT Headers)
readHeaders Ptr RdKafkaMessageT
rptr
ConsumerRecord (Maybe ByteString) (Maybe ByteString)
-> IO (ConsumerRecord (Maybe ByteString) (Maybe ByteString))
forall (m :: * -> *) a. Monad m => a -> m a
return $WConsumerRecord :: forall k v.
TopicName
-> PartitionId
-> Offset
-> Timestamp
-> Headers
-> k
-> v
-> ConsumerRecord k v
ConsumerRecord
{ crTopic :: TopicName
crTopic = Text -> TopicName
TopicName Text
topic
, crPartition :: PartitionId
crPartition = Int -> PartitionId
PartitionId (Int -> PartitionId) -> Int -> PartitionId
forall a b. (a -> b) -> a -> b
$ RdKafkaMessageT -> Int
partition'RdKafkaMessageT RdKafkaMessageT
msg
, crOffset :: Offset
crOffset = Int64 -> Offset
Offset (Int64 -> Offset) -> Int64 -> Offset
forall a b. (a -> b) -> a -> b
$ RdKafkaMessageT -> Int64
offset'RdKafkaMessageT RdKafkaMessageT
msg
, crTimestamp :: Timestamp
crTimestamp = Timestamp
timestamp
, crHeaders :: Headers
crHeaders = Headers
headers
, crKey :: Maybe ByteString
crKey = Maybe ByteString
key
, crValue :: Maybe ByteString
crValue = Maybe ByteString
payload
}
offsetCommitToBool :: OffsetCommit -> Bool
offsetCommitToBool :: OffsetCommit -> Bool
offsetCommitToBool OffsetCommit = Bool
False
offsetCommitToBool OffsetCommitAsync = Bool
True