{-# LANGUAGE TypeApplications #-}
module Kafka.Producer.Callbacks
( deliveryCallback
, module X
)
where

import           Control.Monad          (void)
import           Control.Exception      (bracket)
import           Control.Concurrent     (forkIO)
import           Foreign.C.Error        (getErrno)
import           Foreign.Ptr            (Ptr, nullPtr)
import           Foreign.Storable       (Storable(peek))
import           Foreign.StablePtr      (castPtrToStablePtr, deRefStablePtr, freeStablePtr)
import           Kafka.Callbacks        as X
import           Kafka.Consumer.Types   (Offset(..))
import           Kafka.Internal.RdKafka (RdKafkaMessageT(..), RdKafkaRespErrT(..), rdKafkaConfSetDrMsgCb)
import           Kafka.Internal.Setup   (KafkaConf(..), getRdKafkaConf, Callback(..))
import           Kafka.Internal.Shared  (kafkaRespErr, readTopic, readKey, readPayload)
import           Kafka.Producer.Types   (ProducerRecord(..), DeliveryReport(..), ProducePartition(..))
import           Kafka.Types            (KafkaError(..), TopicName(..))

-- | Sets the callback for delivery reports.
--
--   /Note: A callback should not be a long-running process as it blocks
--   librdkafka from continuing on the thread that handles the delivery
--   callbacks. For callbacks to individual messsages see
--   'Kafka.Producer.produceMessage\''./
--
deliveryCallback :: (DeliveryReport -> IO ()) -> Callback
deliveryCallback :: (DeliveryReport -> IO ()) -> Callback
deliveryCallback callback :: DeliveryReport -> IO ()
callback = (KafkaConf -> IO ()) -> Callback
Callback ((KafkaConf -> IO ()) -> Callback)
-> (KafkaConf -> IO ()) -> Callback
forall a b. (a -> b) -> a -> b
$ \kc :: KafkaConf
kc -> RdKafkaConfTPtr -> DeliveryCallback -> IO ()
rdKafkaConfSetDrMsgCb (KafkaConf -> RdKafkaConfTPtr
forall k. HasKafkaConf k => k -> RdKafkaConfTPtr
getRdKafkaConf KafkaConf
kc) DeliveryCallback
forall t. t -> Ptr RdKafkaMessageT -> IO ()
realCb
  where
    realCb :: t -> Ptr RdKafkaMessageT -> IO ()
    realCb :: t -> Ptr RdKafkaMessageT -> IO ()
realCb _ mptr :: Ptr RdKafkaMessageT
mptr =
      if Ptr RdKafkaMessageT
mptr Ptr RdKafkaMessageT -> Ptr RdKafkaMessageT -> Bool
forall a. Eq a => a -> a -> Bool
== Ptr RdKafkaMessageT
forall a. Ptr a
nullPtr
        then IO Errno
getErrno IO Errno -> (Errno -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (DeliveryReport -> IO ()
callback (DeliveryReport -> IO ())
-> (Errno -> DeliveryReport) -> Errno -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. KafkaError -> DeliveryReport
NoMessageError (KafkaError -> DeliveryReport)
-> (Errno -> KafkaError) -> Errno -> DeliveryReport
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Errno -> KafkaError
kafkaRespErr)
        else do
          RdKafkaMessageT
s <- Ptr RdKafkaMessageT -> IO RdKafkaMessageT
forall a. Storable a => Ptr a -> IO a
peek Ptr RdKafkaMessageT
mptr
          let cbPtr :: Ptr ()
cbPtr = RdKafkaMessageT -> Ptr ()
opaque'RdKafkaMessageT RdKafkaMessageT
s
          if RdKafkaMessageT -> RdKafkaRespErrT
err'RdKafkaMessageT RdKafkaMessageT
s RdKafkaRespErrT -> RdKafkaRespErrT -> Bool
forall a. Eq a => a -> a -> Bool
/= RdKafkaRespErrT
RdKafkaRespErrNoError
            then RdKafkaMessageT -> IO DeliveryReport
mkErrorReport RdKafkaMessageT
s   IO DeliveryReport -> (DeliveryReport -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Ptr () -> DeliveryReport -> IO ()
callbacks Ptr ()
cbPtr
            else RdKafkaMessageT -> IO DeliveryReport
mkSuccessReport RdKafkaMessageT
s IO DeliveryReport -> (DeliveryReport -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Ptr () -> DeliveryReport -> IO ()
callbacks Ptr ()
cbPtr

    callbacks :: Ptr () -> DeliveryReport -> IO ()
callbacks cbPtr :: Ptr ()
cbPtr rep :: DeliveryReport
rep = do
      DeliveryReport -> IO ()
callback DeliveryReport
rep
      if Ptr ()
cbPtr Ptr () -> Ptr () -> Bool
forall a. Eq a => a -> a -> Bool
== Ptr ()
forall a. Ptr a
nullPtr then
        () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      else IO (StablePtr (DeliveryReport -> IO ()))
-> (StablePtr (DeliveryReport -> IO ()) -> IO ())
-> (StablePtr (DeliveryReport -> IO ()) -> IO ())
-> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (StablePtr (DeliveryReport -> IO ())
-> IO (StablePtr (DeliveryReport -> IO ()))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (StablePtr (DeliveryReport -> IO ())
 -> IO (StablePtr (DeliveryReport -> IO ())))
-> StablePtr (DeliveryReport -> IO ())
-> IO (StablePtr (DeliveryReport -> IO ()))
forall a b. (a -> b) -> a -> b
$ Ptr () -> StablePtr (DeliveryReport -> IO ())
forall a. Ptr () -> StablePtr a
castPtrToStablePtr Ptr ()
cbPtr) StablePtr (DeliveryReport -> IO ()) -> IO ()
forall a. StablePtr a -> IO ()
freeStablePtr ((StablePtr (DeliveryReport -> IO ()) -> IO ()) -> IO ())
-> (StablePtr (DeliveryReport -> IO ()) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \stablePtr :: StablePtr (DeliveryReport -> IO ())
stablePtr -> do
        DeliveryReport -> IO ()
msgCb <- StablePtr (DeliveryReport -> IO ()) -> IO (DeliveryReport -> IO ())
forall a. StablePtr a -> IO a
deRefStablePtr @(DeliveryReport -> IO ()) StablePtr (DeliveryReport -> IO ())
stablePtr
        -- Here we fork the callback since it might be a longer action and
        -- blocking here would block librdkafka from continuing its execution
        IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> (IO () -> IO ThreadId) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ThreadId
forkIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ DeliveryReport -> IO ()
msgCb DeliveryReport
rep

mkErrorReport :: RdKafkaMessageT -> IO DeliveryReport
mkErrorReport :: RdKafkaMessageT -> IO DeliveryReport
mkErrorReport msg :: RdKafkaMessageT
msg = do
  ProducerRecord
prodRec <- RdKafkaMessageT -> IO ProducerRecord
mkProdRec RdKafkaMessageT
msg
  DeliveryReport -> IO DeliveryReport
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DeliveryReport -> IO DeliveryReport)
-> DeliveryReport -> IO DeliveryReport
forall a b. (a -> b) -> a -> b
$ ProducerRecord -> KafkaError -> DeliveryReport
DeliveryFailure ProducerRecord
prodRec (RdKafkaRespErrT -> KafkaError
KafkaResponseError (RdKafkaMessageT -> RdKafkaRespErrT
err'RdKafkaMessageT RdKafkaMessageT
msg))

mkSuccessReport :: RdKafkaMessageT -> IO DeliveryReport
mkSuccessReport :: RdKafkaMessageT -> IO DeliveryReport
mkSuccessReport msg :: RdKafkaMessageT
msg = do
  ProducerRecord
prodRec <- RdKafkaMessageT -> IO ProducerRecord
mkProdRec RdKafkaMessageT
msg
  DeliveryReport -> IO DeliveryReport
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DeliveryReport -> IO DeliveryReport)
-> DeliveryReport -> IO DeliveryReport
forall a b. (a -> b) -> a -> b
$ ProducerRecord -> Offset -> DeliveryReport
DeliverySuccess ProducerRecord
prodRec (Int64 -> Offset
Offset (Int64 -> Offset) -> Int64 -> Offset
forall a b. (a -> b) -> a -> b
$ RdKafkaMessageT -> Int64
offset'RdKafkaMessageT RdKafkaMessageT
msg)

mkProdRec :: RdKafkaMessageT -> IO ProducerRecord
mkProdRec :: RdKafkaMessageT -> IO ProducerRecord
mkProdRec msg :: RdKafkaMessageT
msg = do
  Text
topic     <- RdKafkaMessageT -> IO Text
readTopic RdKafkaMessageT
msg
  Maybe ByteString
key       <- RdKafkaMessageT -> IO (Maybe ByteString)
readKey RdKafkaMessageT
msg
  Maybe ByteString
payload   <- RdKafkaMessageT -> IO (Maybe ByteString)
readPayload RdKafkaMessageT
msg
  ProducerRecord -> IO ProducerRecord
forall (f :: * -> *) a. Applicative f => a -> f a
pure $WProducerRecord :: TopicName
-> ProducePartition
-> Maybe ByteString
-> Maybe ByteString
-> ProducerRecord
ProducerRecord
    { prTopic :: TopicName
prTopic = Text -> TopicName
TopicName Text
topic
    , prPartition :: ProducePartition
prPartition = Int -> ProducePartition
SpecifiedPartition (RdKafkaMessageT -> Int
partition'RdKafkaMessageT RdKafkaMessageT
msg)
    , prKey :: Maybe ByteString
prKey = Maybe ByteString
key
    , prValue :: Maybe ByteString
prValue = Maybe ByteString
payload
    }