-- | This module provides a synchronous interface on top of the hw-kafka-client -- -- It works by using MVars managed in two different queues. Each request is -- sent as soon as there are no other effectively equal Kafka records -- in-flight. This is done in order to make sure that there is no ambiguity -- as to which MVar to resolve. -- -- Currently, this implements fair sending. For all requests, the oldest -- pending request should be sent first. -- module Kafka.Producer.Sync ( -- * Sync producer SyncKafkaProducer , newSyncProducer , closeSyncProducer , produceRecord -- * Re-exports -- ** Record datatypes , ProducerRecord(..) , TopicName(..) , ProducePartition(..) -- ** Errors , KafkaError(..) -- ** Producer configuration , ProducerProperties(..) -- ** Configuration helpers , KP.brokersList -- | Set brokers for producer , KP.logLevel -- | Set log-level for producer , KP.compression -- | Set compression level for producer , KP.topicCompression -- | Set topic compression for producer , KP.sendTimeout -- | Set send timeout for producer , KP.extraProps -- | Set extra properties for producer , KP.suppressDisconnectLogs -- | Suppress disconnect log lines , KP.extraTopicProps -- | Configure extra topic properties , KP.debugOptions -- | Add 'KafkaDebug' options -- ** Other datatypes , BrokerAddress(..) , KafkaCompressionCodec(..) , KafkaDebug(..) , KafkaLogLevel(..) , Timeout(..) ) where import Prelude import Control.Concurrent (forkIO) import Control.Concurrent.MVar (MVar, newMVar, takeMVar, newEmptyMVar, putMVar) import Control.Monad (void) import Control.Monad.IO.Class (MonadIO(..)) import Data.Foldable (find) import Data.Functor ((<&>)) import Data.Maybe (isJust) import Data.Sequence (Seq(..), (<|), (|>)) import qualified Kafka.Producer as KP (deliveryCallback, flushProducer, newProducer) import qualified Kafka.Producer as KP (closeProducer, produceMessage, setCallback) import Kafka.Producer.ProducerProperties (ProducerProperties(..)) import qualified Kafka.Producer.ProducerProperties as KP (brokersList, logLevel, compression, topicCompression) import qualified Kafka.Producer.ProducerProperties as KP (sendTimeout, extraProps, suppressDisconnectLogs) import qualified Kafka.Producer.ProducerProperties as KP (extraTopicProps, debugOptions) import Kafka.Producer.Types (KafkaProducer, ProducerRecord(..)) import Kafka.Producer.Types (DeliveryReport(..), ProducePartition(..)) import Kafka.Types (KafkaLogLevel(..), KafkaError(..), TopicName(..), Timeout(..)) import Kafka.Types (KafkaDebug(..), BrokerAddress(..), KafkaCompressionCodec(..)) -- | Synchronously produce a record using the specified producer -- produceRecord :: MonadIO m => SyncKafkaProducer -> ProducerRecord -> m (Either KafkaError ()) produceRecord syncProducer record = -- Produce our message synchronously, then send pending: liftIO $ sendProducerRecord record syncProducer <* sendPending syncProducer -- | A producer for sending messages to Kafka and waiting for the 'DeliveryReport' -- -- A single producer may be used for the entire application. The underlying -- library, @librdkafka@, deals very well with concurrent use - this -- implementation supports that as well. -- data SyncKafkaProducer = SyncKafkaProducer { requests :: MVar Requests , producer :: KafkaProducer } -- | A variable containing the MVar that needs to be resolved in order for the -- caller to proceed -- type ResultVar = MVar (Either KafkaError ()) data Requests = Requests { pending :: Seq (ResultVar, ProducerRecord) -- ^ This sequence contains records that are effectively equal to something -- that is already being sent. In order for us not to have ambiguities on -- what 'MVar' to resolve on 'DeliveryReport's - this separation is needed. -- -- When a sent record has it's 'ResultVar' resolved, an effectively equal -- record is removed from this @pending@ queue and produced via the -- 'KafkaProducer'. Once it's produced, it is moved to @sent@. , sent :: Seq (ResultVar, ProducerRecord) -- ^ This structure keeps track of in-flight messages sent to the Kafka -- broker, but not currently acknowledged. Once they are acknowledged -- via the callback action - they are removed from this structure and -- their 'ResultVar' is resolved. } instance Show Requests where show Requests{..} = "Requests { pending = " <> show (snd <$> pending) <> ", sent = " <> show (snd <$> sent) <> " }" -- | Create a new 'SyncKafkaProducer' -- -- /Note/: since this library wraps the regular hw-kafka-client, please be -- aware that you should not set the delivery report callback. As it is set -- internally. -- newSyncProducer :: MonadIO m => ProducerProperties -> m (Either KafkaError SyncKafkaProducer) newSyncProducer props = liftIO $ do reqs <- newMVar Requests { pending = mempty, sent = mempty } let -- A handler that removes requests from sent and resolves callbacks by -- putting to mvars: callbackAction = handleDeliveryReport reqs -- The regular hw-kafka-client KafkaProducer, with the sync callback handler: producer = KP.newProducer $ props <> KP.setCallback (KP.deliveryCallback callbackAction) producer <&> fmap (SyncKafkaProducer reqs) -- | Close the 'SyncKafkaProducer' -- -- After invoking this function, the producer should not be used anymore -- -- Ideally, you would use 'Control.Exception.bracket' in order to make sure -- that a producer is not re-used once closed. -- closeSyncProducer :: MonadIO m => SyncKafkaProducer -> m () closeSyncProducer SyncKafkaProducer{..} = KP.closeProducer producer -- | Sends one pending producer record -- -- This will cause the completed send request to send an additional pending -- record, and so on, eventually emptying the pending queue. -- sendPending :: SyncKafkaProducer -> IO () sendPending SyncKafkaProducer{..} = do reqs <- takeMVar requests case pending reqs of (mvar, rec) :<| rest -> do KP.produceMessage producer rec >>= \case Just err -> putMVar mvar . Left $ err Nothing -> pure () putMVar requests reqs { pending = rest, sent = sent reqs |> (mvar, rec) } KP.flushProducer producer Empty -> putMVar requests reqs -- | Sends a producer record and waits for its delivery report sendProducerRecord :: ProducerRecord -> SyncKafkaProducer -> IO (Either KafkaError ()) sendProducerRecord record SyncKafkaProducer{..} = takeMVar requests >>= \reqs -> if hasEffectivelyEqual record (sent reqs) then do var <- newEmptyMVar putMVar requests reqs { pending = pending reqs |> (var, record) } takeMVar var else KP.produceMessage producer record >>= \case Just err -> do putMVar requests reqs pure (Left err) Nothing -> do var <- newEmptyMVar putMVar requests reqs { sent = sent reqs |> (var, record) } KP.flushProducer producer takeMVar var hasEffectivelyEqual :: ProducerRecord -> Seq (a, ProducerRecord) -> Bool hasEffectivelyEqual record = isJust . find (effectivelyEqual record) . fmap snd handleDeliveryReport :: MVar Requests -> (DeliveryReport -> IO ()) handleDeliveryReport mvarRequests = \case DeliverySuccess record _offset -> void . forkIO $ do reqs <- takeMVar mvarRequests case getAndRemove record (sent reqs) of Just (mvar, rest) -> do putMVar mvarRequests reqs { sent = rest } putMVar mvar $ Right () Nothing -> error $ "Illegal state ocurred, record was not in sent: " <> show reqs DeliveryFailure record err -> void . forkIO $ do reqs <- takeMVar mvarRequests case getAndRemove record (sent reqs) of Just (mvar, rest) -> do putMVar mvarRequests reqs { sent = rest } putMVar mvar . Left $ err Nothing -> error $ "Illegal state ocurred, record was not in sent: " <> show reqs NoMessageError err -> error $ "Illegal state ocurred, NoMessageError received: " <> show err getAndRemove :: ProducerRecord -> Seq (ResultVar, ProducerRecord) -> Maybe (ResultVar, Seq (ResultVar, ProducerRecord)) getAndRemove record xs = let splitRight acc = \case rest :|> current -> if snd current `effectivelyEqual` record then Just (fst current, rest <> acc) else splitRight (current <| acc) rest Empty -> Nothing in splitRight Empty xs effectivelyEqual :: ProducerRecord -> ProducerRecord -> Bool effectivelyEqual this other = prTopic this == prTopic other && prKey this == prKey other && prValue this == prValue other