module Kafka.Internal.Shared ( pollEvents , word8PtrToBS , kafkaRespErr , throwOnError , hasError , rdKafkaErrorToEither , kafkaErrorToEither , kafkaErrorToMaybe , maybeToLeft , readPayload , readTopic , readKey , readTimestamp , readBS ) where import Control.Exception (throw) import Control.Monad (void) import qualified Data.ByteString as BS import qualified Data.ByteString.Internal as BSI import Data.Text (Text) import qualified Data.Text as Text import Data.Word (Word8) import Foreign.C.Error (Errno (..)) import Foreign.ForeignPtr (newForeignPtr_) import Foreign.Marshal.Alloc (alloca) import Foreign.Ptr (Ptr, nullPtr) import Foreign.Storable (Storable (peek)) import Kafka.Consumer.Types (Timestamp (..)) import Kafka.Internal.RdKafka (RdKafkaMessageT (..), RdKafkaMessageTPtr, RdKafkaRespErrT (..), RdKafkaTimestampTypeT (..), Word8Ptr, rdKafkaErrno2err, rdKafkaMessageTimestamp, rdKafkaPoll, rdKafkaTopicName) import Kafka.Internal.Setup (HasKafka (..), Kafka (..)) import Kafka.Types (KafkaError (..), Millis (..), Timeout (..)) pollEvents :: HasKafka a => a -> Maybe Timeout -> IO () pollEvents :: a -> Maybe Timeout -> IO () pollEvents a :: a a tm :: Maybe Timeout tm = let timeout :: Int timeout = Int -> (Timeout -> Int) -> Maybe Timeout -> Int forall b a. b -> (a -> b) -> Maybe a -> b maybe 0 Timeout -> Int unTimeout Maybe Timeout tm Kafka k :: RdKafkaTPtr k = a -> Kafka forall a. HasKafka a => a -> Kafka getKafka a a in IO Int -> IO () forall (f :: * -> *) a. Functor f => f a -> f () void (RdKafkaTPtr -> Int -> IO Int rdKafkaPoll RdKafkaTPtr k Int timeout) word8PtrToBS :: Int -> Word8Ptr -> IO BS.ByteString word8PtrToBS :: Int -> Word8Ptr -> IO ByteString word8PtrToBS len :: Int len ptr :: Word8Ptr ptr = Int -> (Word8Ptr -> IO ()) -> IO ByteString BSI.create Int len ((Word8Ptr -> IO ()) -> IO ByteString) -> (Word8Ptr -> IO ()) -> IO ByteString forall a b. (a -> b) -> a -> b $ \bsptr :: Word8Ptr bsptr -> Word8Ptr -> Word8Ptr -> Int -> IO () BSI.memcpy Word8Ptr bsptr Word8Ptr ptr Int len kafkaRespErr :: Errno -> KafkaError kafkaRespErr :: Errno -> KafkaError kafkaRespErr (Errno num :: CInt num) = RdKafkaRespErrT -> KafkaError KafkaResponseError (RdKafkaRespErrT -> KafkaError) -> RdKafkaRespErrT -> KafkaError forall a b. (a -> b) -> a -> b $ Int -> RdKafkaRespErrT rdKafkaErrno2err (CInt -> Int forall a b. (Integral a, Num b) => a -> b fromIntegral CInt num) {-# INLINE kafkaRespErr #-} throwOnError :: IO (Maybe Text) -> IO () throwOnError :: IO (Maybe Text) -> IO () throwOnError action :: IO (Maybe Text) action = do Maybe Text m <- IO (Maybe Text) action case Maybe Text m of Just e :: Text e -> KafkaError -> IO () forall a e. Exception e => e -> a throw (KafkaError -> IO ()) -> KafkaError -> IO () forall a b. (a -> b) -> a -> b $ Text -> KafkaError KafkaError Text e Nothing -> () -> IO () forall (m :: * -> *) a. Monad m => a -> m a return () hasError :: KafkaError -> Bool hasError :: KafkaError -> Bool hasError err :: KafkaError err = case KafkaError err of KafkaResponseError RdKafkaRespErrNoError -> Bool False _ -> Bool True {-# INLINE hasError #-} rdKafkaErrorToEither :: RdKafkaRespErrT -> Either KafkaError () rdKafkaErrorToEither :: RdKafkaRespErrT -> Either KafkaError () rdKafkaErrorToEither err :: RdKafkaRespErrT err = case RdKafkaRespErrT err of RdKafkaRespErrNoError -> () -> Either KafkaError () forall a b. b -> Either a b Right () _ -> KafkaError -> Either KafkaError () forall a b. a -> Either a b Left (RdKafkaRespErrT -> KafkaError KafkaResponseError RdKafkaRespErrT err) {-# INLINE rdKafkaErrorToEither #-} kafkaErrorToEither :: KafkaError -> Either KafkaError () kafkaErrorToEither :: KafkaError -> Either KafkaError () kafkaErrorToEither err :: KafkaError err = case KafkaError err of KafkaResponseError RdKafkaRespErrNoError -> () -> Either KafkaError () forall a b. b -> Either a b Right () _ -> KafkaError -> Either KafkaError () forall a b. a -> Either a b Left KafkaError err {-# INLINE kafkaErrorToEither #-} kafkaErrorToMaybe :: KafkaError -> Maybe KafkaError kafkaErrorToMaybe :: KafkaError -> Maybe KafkaError kafkaErrorToMaybe err :: KafkaError err = case KafkaError err of KafkaResponseError RdKafkaRespErrNoError -> Maybe KafkaError forall a. Maybe a Nothing _ -> KafkaError -> Maybe KafkaError forall a. a -> Maybe a Just KafkaError err {-# INLINE kafkaErrorToMaybe #-} maybeToLeft :: Maybe a -> Either a () maybeToLeft :: Maybe a -> Either a () maybeToLeft = Either a () -> (a -> Either a ()) -> Maybe a -> Either a () forall b a. b -> (a -> b) -> Maybe a -> b maybe (() -> Either a () forall a b. b -> Either a b Right ()) a -> Either a () forall a b. a -> Either a b Left {-# INLINE maybeToLeft #-} readPayload :: RdKafkaMessageT -> IO (Maybe BS.ByteString) readPayload :: RdKafkaMessageT -> IO (Maybe ByteString) readPayload = (RdKafkaMessageT -> Int) -> (RdKafkaMessageT -> Word8Ptr) -> RdKafkaMessageT -> IO (Maybe ByteString) forall t. (t -> Int) -> (t -> Word8Ptr) -> t -> IO (Maybe ByteString) readBS RdKafkaMessageT -> Int len'RdKafkaMessageT RdKafkaMessageT -> Word8Ptr payload'RdKafkaMessageT readTopic :: RdKafkaMessageT -> IO Text readTopic :: RdKafkaMessageT -> IO Text readTopic msg :: RdKafkaMessageT msg = Ptr RdKafkaTopicT -> IO (ForeignPtr RdKafkaTopicT) forall a. Ptr a -> IO (ForeignPtr a) newForeignPtr_ (RdKafkaMessageT -> Ptr RdKafkaTopicT topic'RdKafkaMessageT RdKafkaMessageT msg) IO (ForeignPtr RdKafkaTopicT) -> (ForeignPtr RdKafkaTopicT -> IO Text) -> IO Text forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= ((String -> Text) -> IO String -> IO Text forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b fmap String -> Text Text.pack (IO String -> IO Text) -> (ForeignPtr RdKafkaTopicT -> IO String) -> ForeignPtr RdKafkaTopicT -> IO Text forall b c a. (b -> c) -> (a -> b) -> a -> c . ForeignPtr RdKafkaTopicT -> IO String rdKafkaTopicName) readKey :: RdKafkaMessageT -> IO (Maybe BSI.ByteString) readKey :: RdKafkaMessageT -> IO (Maybe ByteString) readKey = (RdKafkaMessageT -> Int) -> (RdKafkaMessageT -> Word8Ptr) -> RdKafkaMessageT -> IO (Maybe ByteString) forall t. (t -> Int) -> (t -> Word8Ptr) -> t -> IO (Maybe ByteString) readBS RdKafkaMessageT -> Int keyLen'RdKafkaMessageT RdKafkaMessageT -> Word8Ptr key'RdKafkaMessageT readTimestamp :: RdKafkaMessageTPtr -> IO Timestamp readTimestamp :: RdKafkaMessageTPtr -> IO Timestamp readTimestamp msg :: RdKafkaMessageTPtr msg = (Ptr RdKafkaTimestampTypeT -> IO Timestamp) -> IO Timestamp forall a b. Storable a => (Ptr a -> IO b) -> IO b alloca ((Ptr RdKafkaTimestampTypeT -> IO Timestamp) -> IO Timestamp) -> (Ptr RdKafkaTimestampTypeT -> IO Timestamp) -> IO Timestamp forall a b. (a -> b) -> a -> b $ \p :: Ptr RdKafkaTimestampTypeT p -> do ForeignPtr RdKafkaTimestampTypeT typeP <- Ptr RdKafkaTimestampTypeT -> IO (ForeignPtr RdKafkaTimestampTypeT) forall a. Ptr a -> IO (ForeignPtr a) newForeignPtr_ Ptr RdKafkaTimestampTypeT p Int64 ts <- CInt64T -> Int64 forall a b. (Integral a, Num b) => a -> b fromIntegral (CInt64T -> Int64) -> IO CInt64T -> IO Int64 forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> RdKafkaMessageTPtr -> ForeignPtr RdKafkaTimestampTypeT -> IO CInt64T rdKafkaMessageTimestamp RdKafkaMessageTPtr msg ForeignPtr RdKafkaTimestampTypeT typeP RdKafkaTimestampTypeT tsType <- Ptr RdKafkaTimestampTypeT -> IO RdKafkaTimestampTypeT forall a. Storable a => Ptr a -> IO a peek Ptr RdKafkaTimestampTypeT p Timestamp -> IO Timestamp forall (m :: * -> *) a. Monad m => a -> m a return (Timestamp -> IO Timestamp) -> Timestamp -> IO Timestamp forall a b. (a -> b) -> a -> b $ case RdKafkaTimestampTypeT tsType of RdKafkaTimestampCreateTime -> Millis -> Timestamp CreateTime (Int64 -> Millis Millis Int64 ts) RdKafkaTimestampLogAppendTime -> Millis -> Timestamp LogAppendTime (Int64 -> Millis Millis Int64 ts) RdKafkaTimestampNotAvailable -> Timestamp NoTimestamp readBS :: (t -> Int) -> (t -> Ptr Word8) -> t -> IO (Maybe BS.ByteString) readBS :: (t -> Int) -> (t -> Word8Ptr) -> t -> IO (Maybe ByteString) readBS flen :: t -> Int flen fdata :: t -> Word8Ptr fdata s :: t s = if t -> Word8Ptr fdata t s Word8Ptr -> Word8Ptr -> Bool forall a. Eq a => a -> a -> Bool == Word8Ptr forall a. Ptr a nullPtr then Maybe ByteString -> IO (Maybe ByteString) forall (m :: * -> *) a. Monad m => a -> m a return Maybe ByteString forall a. Maybe a Nothing else ByteString -> Maybe ByteString forall a. a -> Maybe a Just (ByteString -> Maybe ByteString) -> IO ByteString -> IO (Maybe ByteString) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> Int -> Word8Ptr -> IO ByteString word8PtrToBS (t -> Int flen t s) (t -> Word8Ptr fdata t s)