{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE LambdaCase #-}
module Kafka.Producer.Callbacks
( deliveryCallback
, module X
)
where
import Control.Monad (void)
import Control.Exception (bracket)
import Control.Concurrent (forkIO)
import Foreign.C.Error (getErrno)
import Foreign.Ptr (Ptr, nullPtr)
import Foreign.Storable (Storable(peek))
import Foreign.StablePtr (castPtrToStablePtr, deRefStablePtr, freeStablePtr)
import Kafka.Callbacks as X
import Kafka.Consumer.Types (Offset(..))
import Kafka.Internal.RdKafka (RdKafkaMessageT(..), RdKafkaRespErrT(..), rdKafkaConfSetDrMsgCb)
import Kafka.Internal.Setup (getRdKafkaConf, Callback(..))
import Kafka.Internal.Shared (kafkaRespErr, readTopic, readKey, readPayload, readHeaders)
import Kafka.Producer.Types (ProducerRecord(..), DeliveryReport(..), ProducePartition(..))
import Kafka.Types (KafkaError(..), TopicName(..))
import Data.Either (fromRight)
deliveryCallback :: (DeliveryReport -> IO ()) -> Callback
deliveryCallback :: (DeliveryReport -> IO ()) -> Callback
deliveryCallback callback :: DeliveryReport -> IO ()
callback = (KafkaConf -> IO ()) -> Callback
Callback ((KafkaConf -> IO ()) -> Callback)
-> (KafkaConf -> IO ()) -> Callback
forall a b. (a -> b) -> a -> b
$ \kc :: KafkaConf
kc -> RdKafkaConfTPtr -> DeliveryCallback -> IO ()
rdKafkaConfSetDrMsgCb (KafkaConf -> RdKafkaConfTPtr
forall k. HasKafkaConf k => k -> RdKafkaConfTPtr
getRdKafkaConf KafkaConf
kc) DeliveryCallback
forall t. t -> Ptr RdKafkaMessageT -> IO ()
realCb
where
realCb :: t -> Ptr RdKafkaMessageT -> IO ()
realCb :: t -> Ptr RdKafkaMessageT -> IO ()
realCb _ mptr :: Ptr RdKafkaMessageT
mptr =
if Ptr RdKafkaMessageT
mptr Ptr RdKafkaMessageT -> Ptr RdKafkaMessageT -> Bool
forall a. Eq a => a -> a -> Bool
== Ptr RdKafkaMessageT
forall a. Ptr a
nullPtr
then IO Errno
getErrno IO Errno -> (Errno -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (DeliveryReport -> IO ()
callback (DeliveryReport -> IO ())
-> (Errno -> DeliveryReport) -> Errno -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. KafkaError -> DeliveryReport
NoMessageError (KafkaError -> DeliveryReport)
-> (Errno -> KafkaError) -> Errno -> DeliveryReport
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Errno -> KafkaError
kafkaRespErr)
else do
RdKafkaMessageT
s <- Ptr RdKafkaMessageT -> IO RdKafkaMessageT
forall a. Storable a => Ptr a -> IO a
peek Ptr RdKafkaMessageT
mptr
ProducerRecord
prodRec <- Ptr RdKafkaMessageT -> IO ProducerRecord
mkProdRec Ptr RdKafkaMessageT
mptr
let cbPtr :: Ptr ()
cbPtr = RdKafkaMessageT -> Ptr ()
opaque'RdKafkaMessageT RdKafkaMessageT
s
Ptr () -> DeliveryReport -> IO ()
callbacks Ptr ()
cbPtr (DeliveryReport -> IO ()) -> DeliveryReport -> IO ()
forall a b. (a -> b) -> a -> b
$
if RdKafkaMessageT -> RdKafkaRespErrT
err'RdKafkaMessageT RdKafkaMessageT
s RdKafkaRespErrT -> RdKafkaRespErrT -> Bool
forall a. Eq a => a -> a -> Bool
/= RdKafkaRespErrT
RdKafkaRespErrNoError
then RdKafkaMessageT -> ProducerRecord -> DeliveryReport
mkErrorReport RdKafkaMessageT
s ProducerRecord
prodRec
else RdKafkaMessageT -> ProducerRecord -> DeliveryReport
mkSuccessReport RdKafkaMessageT
s ProducerRecord
prodRec
callbacks :: Ptr () -> DeliveryReport -> IO ()
callbacks cbPtr :: Ptr ()
cbPtr rep :: DeliveryReport
rep = do
DeliveryReport -> IO ()
callback DeliveryReport
rep
if Ptr ()
cbPtr Ptr () -> Ptr () -> Bool
forall a. Eq a => a -> a -> Bool
== Ptr ()
forall a. Ptr a
nullPtr then
() -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
else IO (StablePtr (DeliveryReport -> IO ()))
-> (StablePtr (DeliveryReport -> IO ()) -> IO ())
-> (StablePtr (DeliveryReport -> IO ()) -> IO ())
-> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (StablePtr (DeliveryReport -> IO ())
-> IO (StablePtr (DeliveryReport -> IO ()))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (StablePtr (DeliveryReport -> IO ())
-> IO (StablePtr (DeliveryReport -> IO ())))
-> StablePtr (DeliveryReport -> IO ())
-> IO (StablePtr (DeliveryReport -> IO ()))
forall a b. (a -> b) -> a -> b
$ Ptr () -> StablePtr (DeliveryReport -> IO ())
forall a. Ptr () -> StablePtr a
castPtrToStablePtr Ptr ()
cbPtr) StablePtr (DeliveryReport -> IO ()) -> IO ()
forall a. StablePtr a -> IO ()
freeStablePtr ((StablePtr (DeliveryReport -> IO ()) -> IO ()) -> IO ())
-> (StablePtr (DeliveryReport -> IO ()) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \stablePtr :: StablePtr (DeliveryReport -> IO ())
stablePtr -> do
DeliveryReport -> IO ()
msgCb <- StablePtr (DeliveryReport -> IO ()) -> IO (DeliveryReport -> IO ())
forall a. StablePtr a -> IO a
deRefStablePtr @(DeliveryReport -> IO ()) StablePtr (DeliveryReport -> IO ())
stablePtr
IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> (IO () -> IO ThreadId) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ThreadId
forkIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ DeliveryReport -> IO ()
msgCb DeliveryReport
rep
mkErrorReport :: RdKafkaMessageT -> ProducerRecord -> DeliveryReport
mkErrorReport :: RdKafkaMessageT -> ProducerRecord -> DeliveryReport
mkErrorReport msg :: RdKafkaMessageT
msg prodRec :: ProducerRecord
prodRec = ProducerRecord -> KafkaError -> DeliveryReport
DeliveryFailure ProducerRecord
prodRec (RdKafkaRespErrT -> KafkaError
KafkaResponseError (RdKafkaMessageT -> RdKafkaRespErrT
err'RdKafkaMessageT RdKafkaMessageT
msg))
mkSuccessReport :: RdKafkaMessageT -> ProducerRecord -> DeliveryReport
mkSuccessReport :: RdKafkaMessageT -> ProducerRecord -> DeliveryReport
mkSuccessReport msg :: RdKafkaMessageT
msg prodRec :: ProducerRecord
prodRec = ProducerRecord -> Offset -> DeliveryReport
DeliverySuccess ProducerRecord
prodRec (Int64 -> Offset
Offset (Int64 -> Offset) -> Int64 -> Offset
forall a b. (a -> b) -> a -> b
$ RdKafkaMessageT -> Int64
offset'RdKafkaMessageT RdKafkaMessageT
msg)
mkProdRec :: Ptr RdKafkaMessageT -> IO ProducerRecord
mkProdRec :: Ptr RdKafkaMessageT -> IO ProducerRecord
mkProdRec pmsg :: Ptr RdKafkaMessageT
pmsg = do
RdKafkaMessageT
msg <- Ptr RdKafkaMessageT -> IO RdKafkaMessageT
forall a. Storable a => Ptr a -> IO a
peek Ptr RdKafkaMessageT
pmsg
Text
topic <- RdKafkaMessageT -> IO Text
readTopic RdKafkaMessageT
msg
Maybe ByteString
key <- RdKafkaMessageT -> IO (Maybe ByteString)
readKey RdKafkaMessageT
msg
Maybe ByteString
payload <- RdKafkaMessageT -> IO (Maybe ByteString)
readPayload RdKafkaMessageT
msg
((Headers -> ProducerRecord) -> IO Headers -> IO ProducerRecord)
-> IO Headers -> (Headers -> ProducerRecord) -> IO ProducerRecord
forall a b c. (a -> b -> c) -> b -> a -> c
flip (Headers -> ProducerRecord) -> IO Headers -> IO ProducerRecord
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Headers -> Either RdKafkaRespErrT Headers -> Headers
forall b a. b -> Either a b -> b
fromRight Headers
forall a. Monoid a => a
mempty (Either RdKafkaRespErrT Headers -> Headers)
-> IO (Either RdKafkaRespErrT Headers) -> IO Headers
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Ptr RdKafkaMessageT -> IO (Either RdKafkaRespErrT Headers)
readHeaders Ptr RdKafkaMessageT
pmsg) ((Headers -> ProducerRecord) -> IO ProducerRecord)
-> (Headers -> ProducerRecord) -> IO ProducerRecord
forall a b. (a -> b) -> a -> b
$ \headers :: Headers
headers ->
$WProducerRecord :: TopicName
-> ProducePartition
-> Maybe ByteString
-> Maybe ByteString
-> Headers
-> ProducerRecord
ProducerRecord
{ prTopic :: TopicName
prTopic = Text -> TopicName
TopicName Text
topic
, prPartition :: ProducePartition
prPartition = Int -> ProducePartition
SpecifiedPartition (RdKafkaMessageT -> Int
partition'RdKafkaMessageT RdKafkaMessageT
msg)
, prKey :: Maybe ByteString
prKey = Maybe ByteString
key
, prValue :: Maybe ByteString
prValue = Maybe ByteString
payload
, prHeaders :: Headers
prHeaders = Headers
headers
}