{-# LANGUAGE TypeApplications #-}
module Kafka.Producer.Callbacks
( deliveryCallback
, module X
)
where
import Control.Monad (void)
import Control.Exception (bracket)
import Control.Concurrent (forkIO)
import Foreign.C.Error (getErrno)
import Foreign.Ptr (Ptr, nullPtr)
import Foreign.Storable (Storable(peek))
import Foreign.StablePtr (castPtrToStablePtr, deRefStablePtr, freeStablePtr)
import Kafka.Callbacks as X
import Kafka.Consumer.Types (Offset(..))
import Kafka.Internal.RdKafka (RdKafkaMessageT(..), RdKafkaRespErrT(..), rdKafkaConfSetDrMsgCb)
import Kafka.Internal.Setup (KafkaConf(..), getRdKafkaConf, Callback(..))
import Kafka.Internal.Shared (kafkaRespErr, readTopic, readKey, readPayload)
import Kafka.Producer.Types (ProducerRecord(..), DeliveryReport(..), ProducePartition(..))
import Kafka.Types (KafkaError(..), TopicName(..))
deliveryCallback :: (DeliveryReport -> IO ()) -> Callback
deliveryCallback :: (DeliveryReport -> IO ()) -> Callback
deliveryCallback callback :: DeliveryReport -> IO ()
callback = (KafkaConf -> IO ()) -> Callback
Callback ((KafkaConf -> IO ()) -> Callback)
-> (KafkaConf -> IO ()) -> Callback
forall a b. (a -> b) -> a -> b
$ \kc :: KafkaConf
kc -> RdKafkaConfTPtr -> DeliveryCallback -> IO ()
rdKafkaConfSetDrMsgCb (KafkaConf -> RdKafkaConfTPtr
forall k. HasKafkaConf k => k -> RdKafkaConfTPtr
getRdKafkaConf KafkaConf
kc) DeliveryCallback
forall t. t -> Ptr RdKafkaMessageT -> IO ()
realCb
where
realCb :: t -> Ptr RdKafkaMessageT -> IO ()
realCb :: t -> Ptr RdKafkaMessageT -> IO ()
realCb _ mptr :: Ptr RdKafkaMessageT
mptr =
if Ptr RdKafkaMessageT
mptr Ptr RdKafkaMessageT -> Ptr RdKafkaMessageT -> Bool
forall a. Eq a => a -> a -> Bool
== Ptr RdKafkaMessageT
forall a. Ptr a
nullPtr
then IO Errno
getErrno IO Errno -> (Errno -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (DeliveryReport -> IO ()
callback (DeliveryReport -> IO ())
-> (Errno -> DeliveryReport) -> Errno -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. KafkaError -> DeliveryReport
NoMessageError (KafkaError -> DeliveryReport)
-> (Errno -> KafkaError) -> Errno -> DeliveryReport
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Errno -> KafkaError
kafkaRespErr)
else do
RdKafkaMessageT
s <- Ptr RdKafkaMessageT -> IO RdKafkaMessageT
forall a. Storable a => Ptr a -> IO a
peek Ptr RdKafkaMessageT
mptr
let cbPtr :: Ptr ()
cbPtr = RdKafkaMessageT -> Ptr ()
opaque'RdKafkaMessageT RdKafkaMessageT
s
if RdKafkaMessageT -> RdKafkaRespErrT
err'RdKafkaMessageT RdKafkaMessageT
s RdKafkaRespErrT -> RdKafkaRespErrT -> Bool
forall a. Eq a => a -> a -> Bool
/= RdKafkaRespErrT
RdKafkaRespErrNoError
then RdKafkaMessageT -> IO DeliveryReport
mkErrorReport RdKafkaMessageT
s IO DeliveryReport -> (DeliveryReport -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Ptr () -> DeliveryReport -> IO ()
callbacks Ptr ()
cbPtr
else RdKafkaMessageT -> IO DeliveryReport
mkSuccessReport RdKafkaMessageT
s IO DeliveryReport -> (DeliveryReport -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Ptr () -> DeliveryReport -> IO ()
callbacks Ptr ()
cbPtr
callbacks :: Ptr () -> DeliveryReport -> IO ()
callbacks cbPtr :: Ptr ()
cbPtr rep :: DeliveryReport
rep = do
DeliveryReport -> IO ()
callback DeliveryReport
rep
if Ptr ()
cbPtr Ptr () -> Ptr () -> Bool
forall a. Eq a => a -> a -> Bool
== Ptr ()
forall a. Ptr a
nullPtr then
() -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
else IO (StablePtr (DeliveryReport -> IO ()))
-> (StablePtr (DeliveryReport -> IO ()) -> IO ())
-> (StablePtr (DeliveryReport -> IO ()) -> IO ())
-> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (StablePtr (DeliveryReport -> IO ())
-> IO (StablePtr (DeliveryReport -> IO ()))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (StablePtr (DeliveryReport -> IO ())
-> IO (StablePtr (DeliveryReport -> IO ())))
-> StablePtr (DeliveryReport -> IO ())
-> IO (StablePtr (DeliveryReport -> IO ()))
forall a b. (a -> b) -> a -> b
$ Ptr () -> StablePtr (DeliveryReport -> IO ())
forall a. Ptr () -> StablePtr a
castPtrToStablePtr Ptr ()
cbPtr) StablePtr (DeliveryReport -> IO ()) -> IO ()
forall a. StablePtr a -> IO ()
freeStablePtr ((StablePtr (DeliveryReport -> IO ()) -> IO ()) -> IO ())
-> (StablePtr (DeliveryReport -> IO ()) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \stablePtr :: StablePtr (DeliveryReport -> IO ())
stablePtr -> do
DeliveryReport -> IO ()
msgCb <- StablePtr (DeliveryReport -> IO ()) -> IO (DeliveryReport -> IO ())
forall a. StablePtr a -> IO a
deRefStablePtr @(DeliveryReport -> IO ()) StablePtr (DeliveryReport -> IO ())
stablePtr
IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> (IO () -> IO ThreadId) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ThreadId
forkIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ DeliveryReport -> IO ()
msgCb DeliveryReport
rep
mkErrorReport :: RdKafkaMessageT -> IO DeliveryReport
mkErrorReport :: RdKafkaMessageT -> IO DeliveryReport
mkErrorReport msg :: RdKafkaMessageT
msg = do
ProducerRecord
prodRec <- RdKafkaMessageT -> IO ProducerRecord
mkProdRec RdKafkaMessageT
msg
DeliveryReport -> IO DeliveryReport
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DeliveryReport -> IO DeliveryReport)
-> DeliveryReport -> IO DeliveryReport
forall a b. (a -> b) -> a -> b
$ ProducerRecord -> KafkaError -> DeliveryReport
DeliveryFailure ProducerRecord
prodRec (RdKafkaRespErrT -> KafkaError
KafkaResponseError (RdKafkaMessageT -> RdKafkaRespErrT
err'RdKafkaMessageT RdKafkaMessageT
msg))
mkSuccessReport :: RdKafkaMessageT -> IO DeliveryReport
mkSuccessReport :: RdKafkaMessageT -> IO DeliveryReport
mkSuccessReport msg :: RdKafkaMessageT
msg = do
ProducerRecord
prodRec <- RdKafkaMessageT -> IO ProducerRecord
mkProdRec RdKafkaMessageT
msg
DeliveryReport -> IO DeliveryReport
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DeliveryReport -> IO DeliveryReport)
-> DeliveryReport -> IO DeliveryReport
forall a b. (a -> b) -> a -> b
$ ProducerRecord -> Offset -> DeliveryReport
DeliverySuccess ProducerRecord
prodRec (Int64 -> Offset
Offset (Int64 -> Offset) -> Int64 -> Offset
forall a b. (a -> b) -> a -> b
$ RdKafkaMessageT -> Int64
offset'RdKafkaMessageT RdKafkaMessageT
msg)
mkProdRec :: RdKafkaMessageT -> IO ProducerRecord
mkProdRec :: RdKafkaMessageT -> IO ProducerRecord
mkProdRec msg :: RdKafkaMessageT
msg = do
Text
topic <- RdKafkaMessageT -> IO Text
readTopic RdKafkaMessageT
msg
Maybe ByteString
key <- RdKafkaMessageT -> IO (Maybe ByteString)
readKey RdKafkaMessageT
msg
Maybe ByteString
payload <- RdKafkaMessageT -> IO (Maybe ByteString)
readPayload RdKafkaMessageT
msg
ProducerRecord -> IO ProducerRecord
forall (f :: * -> *) a. Applicative f => a -> f a
pure $WProducerRecord :: TopicName
-> ProducePartition
-> Maybe ByteString
-> Maybe ByteString
-> ProducerRecord
ProducerRecord
{ prTopic :: TopicName
prTopic = Text -> TopicName
TopicName Text
topic
, prPartition :: ProducePartition
prPartition = Int -> ProducePartition
SpecifiedPartition (RdKafkaMessageT -> Int
partition'RdKafkaMessageT RdKafkaMessageT
msg)
, prKey :: Maybe ByteString
prKey = Maybe ByteString
key
, prValue :: Maybe ByteString
prValue = Maybe ByteString
payload
}