{-# LANGUAGE LambdaCase #-}
module Kafka.Internal.Shared
( pollEvents
, word8PtrToBS
, kafkaRespErr
, throwOnError
, hasError
, rdKafkaErrorToEither
, kafkaErrorToEither
, kafkaErrorToMaybe
, maybeToLeft
, readHeaders
, readPayload
, readTopic
, readKey
, readTimestamp
, readBS
)
where
import Control.Exception (throw)
import Control.Monad (void)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Internal as BSI
import Data.Text (Text)
import qualified Data.Text as Text
import Data.Word (Word8)
import Foreign.C.Error (Errno (..))
import Foreign.ForeignPtr (newForeignPtr_)
import Foreign.Marshal.Alloc (alloca)
import Foreign.Ptr (Ptr, nullPtr)
import Foreign.Storable (Storable (peek))
import Kafka.Consumer.Types (Timestamp (..))
import Kafka.Internal.RdKafka (RdKafkaMessageT (..), RdKafkaMessageTPtr, RdKafkaRespErrT (..), RdKafkaTimestampTypeT (..), Word8Ptr, rdKafkaErrno2err, rdKafkaMessageTimestamp, rdKafkaPoll, rdKafkaTopicName, rdKafkaHeaderGetAll, rdKafkaMessageHeaders)
import Kafka.Internal.Setup (HasKafka (..), Kafka (..))
import Kafka.Types (KafkaError (..), Millis (..), Timeout (..), Headers, headersFromList)
pollEvents :: HasKafka a => a -> Maybe Timeout -> IO ()
pollEvents :: a -> Maybe Timeout -> IO ()
pollEvents a :: a
a tm :: Maybe Timeout
tm =
let timeout :: Int
timeout = Int -> (Timeout -> Int) -> Maybe Timeout -> Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe 0 Timeout -> Int
unTimeout Maybe Timeout
tm
Kafka k :: RdKafkaTPtr
k = a -> Kafka
forall a. HasKafka a => a -> Kafka
getKafka a
a
in IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (RdKafkaTPtr -> Int -> IO Int
rdKafkaPoll RdKafkaTPtr
k Int
timeout)
word8PtrToBS :: Int -> Word8Ptr -> IO BS.ByteString
word8PtrToBS :: Int -> Word8Ptr -> IO ByteString
word8PtrToBS len :: Int
len ptr :: Word8Ptr
ptr = Int -> (Word8Ptr -> IO ()) -> IO ByteString
BSI.create Int
len ((Word8Ptr -> IO ()) -> IO ByteString)
-> (Word8Ptr -> IO ()) -> IO ByteString
forall a b. (a -> b) -> a -> b
$ \bsptr :: Word8Ptr
bsptr ->
Word8Ptr -> Word8Ptr -> Int -> IO ()
BSI.memcpy Word8Ptr
bsptr Word8Ptr
ptr Int
len
kafkaRespErr :: Errno -> KafkaError
kafkaRespErr :: Errno -> KafkaError
kafkaRespErr (Errno num :: CInt
num) = RdKafkaRespErrT -> KafkaError
KafkaResponseError (RdKafkaRespErrT -> KafkaError) -> RdKafkaRespErrT -> KafkaError
forall a b. (a -> b) -> a -> b
$ Int -> RdKafkaRespErrT
rdKafkaErrno2err (CInt -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral CInt
num)
{-# INLINE kafkaRespErr #-}
throwOnError :: IO (Maybe Text) -> IO ()
throwOnError :: IO (Maybe Text) -> IO ()
throwOnError action :: IO (Maybe Text)
action = do
Maybe Text
m <- IO (Maybe Text)
action
case Maybe Text
m of
Just e :: Text
e -> KafkaError -> IO ()
forall a e. Exception e => e -> a
throw (KafkaError -> IO ()) -> KafkaError -> IO ()
forall a b. (a -> b) -> a -> b
$ Text -> KafkaError
KafkaError Text
e
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
hasError :: KafkaError -> Bool
hasError :: KafkaError -> Bool
hasError err :: KafkaError
err = case KafkaError
err of
KafkaResponseError RdKafkaRespErrNoError -> Bool
False
_ -> Bool
True
{-# INLINE hasError #-}
rdKafkaErrorToEither :: RdKafkaRespErrT -> Either KafkaError ()
rdKafkaErrorToEither :: RdKafkaRespErrT -> Either KafkaError ()
rdKafkaErrorToEither err :: RdKafkaRespErrT
err = case RdKafkaRespErrT
err of
RdKafkaRespErrNoError -> () -> Either KafkaError ()
forall a b. b -> Either a b
Right ()
_ -> KafkaError -> Either KafkaError ()
forall a b. a -> Either a b
Left (RdKafkaRespErrT -> KafkaError
KafkaResponseError RdKafkaRespErrT
err)
{-# INLINE rdKafkaErrorToEither #-}
kafkaErrorToEither :: KafkaError -> Either KafkaError ()
kafkaErrorToEither :: KafkaError -> Either KafkaError ()
kafkaErrorToEither err :: KafkaError
err = case KafkaError
err of
KafkaResponseError RdKafkaRespErrNoError -> () -> Either KafkaError ()
forall a b. b -> Either a b
Right ()
_ -> KafkaError -> Either KafkaError ()
forall a b. a -> Either a b
Left KafkaError
err
{-# INLINE kafkaErrorToEither #-}
kafkaErrorToMaybe :: KafkaError -> Maybe KafkaError
kafkaErrorToMaybe :: KafkaError -> Maybe KafkaError
kafkaErrorToMaybe err :: KafkaError
err = case KafkaError
err of
KafkaResponseError RdKafkaRespErrNoError -> Maybe KafkaError
forall a. Maybe a
Nothing
_ -> KafkaError -> Maybe KafkaError
forall a. a -> Maybe a
Just KafkaError
err
{-# INLINE kafkaErrorToMaybe #-}
maybeToLeft :: Maybe a -> Either a ()
maybeToLeft :: Maybe a -> Either a ()
maybeToLeft = Either a () -> (a -> Either a ()) -> Maybe a -> Either a ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> Either a ()
forall a b. b -> Either a b
Right ()) a -> Either a ()
forall a b. a -> Either a b
Left
{-# INLINE maybeToLeft #-}
readPayload :: RdKafkaMessageT -> IO (Maybe BS.ByteString)
readPayload :: RdKafkaMessageT -> IO (Maybe ByteString)
readPayload = (RdKafkaMessageT -> Int)
-> (RdKafkaMessageT -> Word8Ptr)
-> RdKafkaMessageT
-> IO (Maybe ByteString)
forall t.
(t -> Int) -> (t -> Word8Ptr) -> t -> IO (Maybe ByteString)
readBS RdKafkaMessageT -> Int
len'RdKafkaMessageT RdKafkaMessageT -> Word8Ptr
payload'RdKafkaMessageT
readTopic :: RdKafkaMessageT -> IO Text
readTopic :: RdKafkaMessageT -> IO Text
readTopic msg :: RdKafkaMessageT
msg = Ptr RdKafkaTopicT -> IO (ForeignPtr RdKafkaTopicT)
forall a. Ptr a -> IO (ForeignPtr a)
newForeignPtr_ (RdKafkaMessageT -> Ptr RdKafkaTopicT
topic'RdKafkaMessageT RdKafkaMessageT
msg) IO (ForeignPtr RdKafkaTopicT)
-> (ForeignPtr RdKafkaTopicT -> IO Text) -> IO Text
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ((String -> Text) -> IO String -> IO Text
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap String -> Text
Text.pack (IO String -> IO Text)
-> (ForeignPtr RdKafkaTopicT -> IO String)
-> ForeignPtr RdKafkaTopicT
-> IO Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ForeignPtr RdKafkaTopicT -> IO String
rdKafkaTopicName)
readKey :: RdKafkaMessageT -> IO (Maybe BSI.ByteString)
readKey :: RdKafkaMessageT -> IO (Maybe ByteString)
readKey = (RdKafkaMessageT -> Int)
-> (RdKafkaMessageT -> Word8Ptr)
-> RdKafkaMessageT
-> IO (Maybe ByteString)
forall t.
(t -> Int) -> (t -> Word8Ptr) -> t -> IO (Maybe ByteString)
readBS RdKafkaMessageT -> Int
keyLen'RdKafkaMessageT RdKafkaMessageT -> Word8Ptr
key'RdKafkaMessageT
readTimestamp :: RdKafkaMessageTPtr -> IO Timestamp
readTimestamp :: RdKafkaMessageTPtr -> IO Timestamp
readTimestamp msg :: RdKafkaMessageTPtr
msg =
(Ptr RdKafkaTimestampTypeT -> IO Timestamp) -> IO Timestamp
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca ((Ptr RdKafkaTimestampTypeT -> IO Timestamp) -> IO Timestamp)
-> (Ptr RdKafkaTimestampTypeT -> IO Timestamp) -> IO Timestamp
forall a b. (a -> b) -> a -> b
$ \p :: Ptr RdKafkaTimestampTypeT
p -> do
ForeignPtr RdKafkaTimestampTypeT
typeP <- Ptr RdKafkaTimestampTypeT -> IO (ForeignPtr RdKafkaTimestampTypeT)
forall a. Ptr a -> IO (ForeignPtr a)
newForeignPtr_ Ptr RdKafkaTimestampTypeT
p
Int64
ts <- CInt64T -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (CInt64T -> Int64) -> IO CInt64T -> IO Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RdKafkaMessageTPtr
-> ForeignPtr RdKafkaTimestampTypeT -> IO CInt64T
rdKafkaMessageTimestamp RdKafkaMessageTPtr
msg ForeignPtr RdKafkaTimestampTypeT
typeP
RdKafkaTimestampTypeT
tsType <- Ptr RdKafkaTimestampTypeT -> IO RdKafkaTimestampTypeT
forall a. Storable a => Ptr a -> IO a
peek Ptr RdKafkaTimestampTypeT
p
Timestamp -> IO Timestamp
forall (m :: * -> *) a. Monad m => a -> m a
return (Timestamp -> IO Timestamp) -> Timestamp -> IO Timestamp
forall a b. (a -> b) -> a -> b
$ case RdKafkaTimestampTypeT
tsType of
RdKafkaTimestampCreateTime -> Millis -> Timestamp
CreateTime (Int64 -> Millis
Millis Int64
ts)
RdKafkaTimestampLogAppendTime -> Millis -> Timestamp
LogAppendTime (Int64 -> Millis
Millis Int64
ts)
RdKafkaTimestampNotAvailable -> Timestamp
NoTimestamp
readHeaders :: Ptr RdKafkaMessageT -> IO (Either RdKafkaRespErrT Headers)
msg :: Ptr RdKafkaMessageT
msg = do
(err :: RdKafkaRespErrT
err, headersPtr :: RdKafkaHeadersTPtr
headersPtr) <- Ptr RdKafkaMessageT -> IO (RdKafkaRespErrT, RdKafkaHeadersTPtr)
rdKafkaMessageHeaders Ptr RdKafkaMessageT
msg
case RdKafkaRespErrT
err of
RdKafkaRespErrNoent -> Either RdKafkaRespErrT Headers
-> IO (Either RdKafkaRespErrT Headers)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either RdKafkaRespErrT Headers
-> IO (Either RdKafkaRespErrT Headers))
-> Either RdKafkaRespErrT Headers
-> IO (Either RdKafkaRespErrT Headers)
forall a b. (a -> b) -> a -> b
$ Headers -> Either RdKafkaRespErrT Headers
forall a b. b -> Either a b
Right Headers
forall a. Monoid a => a
mempty
RdKafkaRespErrNoError -> ([(ByteString, ByteString)] -> Headers)
-> Either RdKafkaRespErrT [(ByteString, ByteString)]
-> Either RdKafkaRespErrT Headers
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [(ByteString, ByteString)] -> Headers
headersFromList (Either RdKafkaRespErrT [(ByteString, ByteString)]
-> Either RdKafkaRespErrT Headers)
-> IO (Either RdKafkaRespErrT [(ByteString, ByteString)])
-> IO (Either RdKafkaRespErrT Headers)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RdKafkaHeadersTPtr
-> IO (Either RdKafkaRespErrT [(ByteString, ByteString)])
forall a.
RdKafkaHeadersTPtr -> IO (Either a [(ByteString, ByteString)])
extractHeaders RdKafkaHeadersTPtr
headersPtr
e :: RdKafkaRespErrT
e -> Either RdKafkaRespErrT Headers
-> IO (Either RdKafkaRespErrT Headers)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either RdKafkaRespErrT Headers
-> IO (Either RdKafkaRespErrT Headers))
-> (RdKafkaRespErrT -> Either RdKafkaRespErrT Headers)
-> RdKafkaRespErrT
-> IO (Either RdKafkaRespErrT Headers)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RdKafkaRespErrT -> Either RdKafkaRespErrT Headers
forall a b. a -> Either a b
Left (RdKafkaRespErrT -> IO (Either RdKafkaRespErrT Headers))
-> RdKafkaRespErrT -> IO (Either RdKafkaRespErrT Headers)
forall a b. (a -> b) -> a -> b
$ RdKafkaRespErrT
e
where extractHeaders :: RdKafkaHeadersTPtr -> IO (Either a [(ByteString, ByteString)])
extractHeaders ptHeaders :: RdKafkaHeadersTPtr
ptHeaders =
(Ptr CString -> IO (Either a [(ByteString, ByteString)]))
-> IO (Either a [(ByteString, ByteString)])
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca ((Ptr CString -> IO (Either a [(ByteString, ByteString)]))
-> IO (Either a [(ByteString, ByteString)]))
-> (Ptr CString -> IO (Either a [(ByteString, ByteString)]))
-> IO (Either a [(ByteString, ByteString)])
forall a b. (a -> b) -> a -> b
$ \nptr :: Ptr CString
nptr ->
(Ptr Word8Ptr -> IO (Either a [(ByteString, ByteString)]))
-> IO (Either a [(ByteString, ByteString)])
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca ((Ptr Word8Ptr -> IO (Either a [(ByteString, ByteString)]))
-> IO (Either a [(ByteString, ByteString)]))
-> (Ptr Word8Ptr -> IO (Either a [(ByteString, ByteString)]))
-> IO (Either a [(ByteString, ByteString)])
forall a b. (a -> b) -> a -> b
$ \vptr :: Ptr Word8Ptr
vptr ->
(Ptr CSize -> IO (Either a [(ByteString, ByteString)]))
-> IO (Either a [(ByteString, ByteString)])
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca ((Ptr CSize -> IO (Either a [(ByteString, ByteString)]))
-> IO (Either a [(ByteString, ByteString)]))
-> (Ptr CSize -> IO (Either a [(ByteString, ByteString)]))
-> IO (Either a [(ByteString, ByteString)])
forall a b. (a -> b) -> a -> b
$ \szptr :: Ptr CSize
szptr ->
let go :: [(ByteString, ByteString)]
-> CSize -> IO (Either a [(ByteString, ByteString)])
go acc :: [(ByteString, ByteString)]
acc idx :: CSize
idx = RdKafkaHeadersTPtr
-> CSize
-> Ptr CString
-> Ptr Word8Ptr
-> Ptr CSize
-> IO RdKafkaRespErrT
rdKafkaHeaderGetAll RdKafkaHeadersTPtr
ptHeaders CSize
idx Ptr CString
nptr Ptr Word8Ptr
vptr Ptr CSize
szptr IO RdKafkaRespErrT
-> (RdKafkaRespErrT -> IO (Either a [(ByteString, ByteString)]))
-> IO (Either a [(ByteString, ByteString)])
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
RdKafkaRespErrNoent -> Either a [(ByteString, ByteString)]
-> IO (Either a [(ByteString, ByteString)])
forall (m :: * -> *) a. Monad m => a -> m a
return (Either a [(ByteString, ByteString)]
-> IO (Either a [(ByteString, ByteString)]))
-> Either a [(ByteString, ByteString)]
-> IO (Either a [(ByteString, ByteString)])
forall a b. (a -> b) -> a -> b
$ [(ByteString, ByteString)] -> Either a [(ByteString, ByteString)]
forall a b. b -> Either a b
Right [(ByteString, ByteString)]
acc
RdKafkaRespErrNoError -> do
CString
cstr <- Ptr CString -> IO CString
forall a. Storable a => Ptr a -> IO a
peek Ptr CString
nptr
Word8Ptr
wptr <- Ptr Word8Ptr -> IO Word8Ptr
forall a. Storable a => Ptr a -> IO a
peek Ptr Word8Ptr
vptr
CSize
csize <- Ptr CSize -> IO CSize
forall a. Storable a => Ptr a -> IO a
peek Ptr CSize
szptr
ByteString
hn <- CString -> IO ByteString
BS.packCString CString
cstr
ByteString
hv <- Int -> Word8Ptr -> IO ByteString
word8PtrToBS (CSize -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral CSize
csize) Word8Ptr
wptr
[(ByteString, ByteString)]
-> CSize -> IO (Either a [(ByteString, ByteString)])
go ((ByteString
hn, ByteString
hv) (ByteString, ByteString)
-> [(ByteString, ByteString)] -> [(ByteString, ByteString)]
forall a. a -> [a] -> [a]
: [(ByteString, ByteString)]
acc) (CSize
idx CSize -> CSize -> CSize
forall a. Num a => a -> a -> a
+ 1)
_ -> String -> IO (Either a [(ByteString, ByteString)])
forall a. HasCallStack => String -> a
error "Unexpected error code while extracting headers"
in [(ByteString, ByteString)]
-> CSize -> IO (Either a [(ByteString, ByteString)])
forall a.
[(ByteString, ByteString)]
-> CSize -> IO (Either a [(ByteString, ByteString)])
go [] 0
readBS :: (t -> Int) -> (t -> Ptr Word8) -> t -> IO (Maybe BS.ByteString)
readBS :: (t -> Int) -> (t -> Word8Ptr) -> t -> IO (Maybe ByteString)
readBS flen :: t -> Int
flen fdata :: t -> Word8Ptr
fdata s :: t
s = if t -> Word8Ptr
fdata t
s Word8Ptr -> Word8Ptr -> Bool
forall a. Eq a => a -> a -> Bool
== Word8Ptr
forall a. Ptr a
nullPtr
then Maybe ByteString -> IO (Maybe ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ByteString
forall a. Maybe a
Nothing
else ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (ByteString -> Maybe ByteString)
-> IO ByteString -> IO (Maybe ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> Word8Ptr -> IO ByteString
word8PtrToBS (t -> Int
flen t
s) (t -> Word8Ptr
fdata t
s)