kafka-client-sync-0.1.1.1: Synchronous Kafka Client

Safe HaskellNone
LanguageHaskell2010

Kafka.Producer.Sync

Contents

Description

This module provides a synchronous interface on top of the hw-kafka-client

It works by using MVars managed in two different queues. Each request is sent as soon as there are no other effectively equal Kafka records in-flight. This is done in order to make sure that there is no ambiguity as to which MVar to resolve.

Currently, this implements fair sending. For all requests, the oldest pending request should be sent first.

Synopsis

Sync producer

data SyncKafkaProducer Source #

A producer for sending messages to Kafka and waiting for the DeliveryReport

A single producer may be used for the entire application. The underlying library, librdkafka, deals very well with concurrent use - this implementation supports that as well.

newSyncProducer :: MonadIO m => ProducerProperties -> m (Either KafkaError SyncKafkaProducer) Source #

Create a new SyncKafkaProducer

Note: since this library wraps the regular hw-kafka-client, please be aware that you should not set the delivery report callback. As it is set internally.

closeSyncProducer :: MonadIO m => SyncKafkaProducer -> m () Source #

Close the SyncKafkaProducer

After invoking this function, the producer should not be used anymore

Ideally, you would use bracket in order to make sure that a producer is not re-used once closed.

produceRecord :: MonadIO m => SyncKafkaProducer -> ProducerRecord -> m (Either KafkaError ()) Source #

Synchronously produce a record using the specified producer

Re-exports

Record datatypes

newtype TopicName #

Constructors

TopicName 

Fields

Instances
Eq TopicName 
Instance details

Defined in Kafka.Types

Ord TopicName 
Instance details

Defined in Kafka.Types

Read TopicName 
Instance details

Defined in Kafka.Types

Show TopicName 
Instance details

Defined in Kafka.Types

Generic TopicName 
Instance details

Defined in Kafka.Types

Associated Types

type Rep TopicName :: Type -> Type #

type Rep TopicName 
Instance details

Defined in Kafka.Types

type Rep TopicName = D1 (MetaData "TopicName" "Kafka.Types" "hw-kfk-clnt-3.0.0-9c515fc4" True) (C1 (MetaCons "TopicName" PrefixI True) (S1 (MetaSel (Just "unTopicName") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Text)))

data ProducePartition #

Instances
Eq ProducePartition 
Instance details

Defined in Kafka.Producer.Types

Ord ProducePartition 
Instance details

Defined in Kafka.Producer.Types

Show ProducePartition 
Instance details

Defined in Kafka.Producer.Types

Generic ProducePartition 
Instance details

Defined in Kafka.Producer.Types

Associated Types

type Rep ProducePartition :: Type -> Type #

type Rep ProducePartition 
Instance details

Defined in Kafka.Producer.Types

type Rep ProducePartition = D1 (MetaData "ProducePartition" "Kafka.Producer.Types" "hw-kfk-clnt-3.0.0-9c515fc4" False) (C1 (MetaCons "SpecifiedPartition" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) SourceUnpack SourceStrict DecidedUnpack) (Rec0 Int)) :+: C1 (MetaCons "UnassignedPartition" PrefixI False) (U1 :: Type -> Type))

Errors

data KafkaError #

Instances
Eq KafkaError 
Instance details

Defined in Kafka.Types

Show KafkaError 
Instance details

Defined in Kafka.Types

Generic KafkaError 
Instance details

Defined in Kafka.Types

Associated Types

type Rep KafkaError :: Type -> Type #

Exception KafkaError 
Instance details

Defined in Kafka.Types

type Rep KafkaError 
Instance details

Defined in Kafka.Types

type Rep KafkaError = D1 (MetaData "KafkaError" "Kafka.Types" "hw-kfk-clnt-3.0.0-9c515fc4" False) ((C1 (MetaCons "KafkaError" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Text)) :+: (C1 (MetaCons "KafkaInvalidReturnValue" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "KafkaBadSpecification" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Text)))) :+: ((C1 (MetaCons "KafkaResponseError" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 RdKafkaRespErrT)) :+: C1 (MetaCons "KafkaInvalidConfigurationValue" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Text))) :+: (C1 (MetaCons "KafkaUnknownConfigurationKey" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Text)) :+: C1 (MetaCons "KafkaBadConfiguration" PrefixI False) (U1 :: Type -> Type))))

Producer configuration

Configuration helpers

Set brokers for producer

Set log-level for producer

Set compression level for producer

Set topic compression for producer

Set send timeout for producer

Set extra properties for producer

Suppress disconnect log lines

Configure extra topic properties

Add KafkaDebug options

Other datatypes

newtype BrokerAddress #

Constructors

BrokerAddress 
Instances
Eq BrokerAddress 
Instance details

Defined in Kafka.Types

Show BrokerAddress 
Instance details

Defined in Kafka.Types

Generic BrokerAddress 
Instance details

Defined in Kafka.Types

Associated Types

type Rep BrokerAddress :: Type -> Type #

type Rep BrokerAddress 
Instance details

Defined in Kafka.Types

type Rep BrokerAddress = D1 (MetaData "BrokerAddress" "Kafka.Types" "hw-kfk-clnt-3.0.0-9c515fc4" True) (C1 (MetaCons "BrokerAddress" PrefixI True) (S1 (MetaSel (Just "unBrokerAddress") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Text)))

data KafkaCompressionCodec #

Constructors

NoCompression 
Gzip 
Snappy 
Lz4 
Instances
Eq KafkaCompressionCodec 
Instance details

Defined in Kafka.Types

Show KafkaCompressionCodec 
Instance details

Defined in Kafka.Types

Generic KafkaCompressionCodec 
Instance details

Defined in Kafka.Types

Associated Types

type Rep KafkaCompressionCodec :: Type -> Type #

type Rep KafkaCompressionCodec 
Instance details

Defined in Kafka.Types

type Rep KafkaCompressionCodec = D1 (MetaData "KafkaCompressionCodec" "Kafka.Types" "hw-kfk-clnt-3.0.0-9c515fc4" False) ((C1 (MetaCons "NoCompression" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "Gzip" PrefixI False) (U1 :: Type -> Type)) :+: (C1 (MetaCons "Snappy" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "Lz4" PrefixI False) (U1 :: Type -> Type)))

data KafkaDebug #

Instances
Eq KafkaDebug 
Instance details

Defined in Kafka.Types

Show KafkaDebug 
Instance details

Defined in Kafka.Types

Generic KafkaDebug 
Instance details

Defined in Kafka.Types

Associated Types

type Rep KafkaDebug :: Type -> Type #

type Rep KafkaDebug 
Instance details

Defined in Kafka.Types

type Rep KafkaDebug = D1 (MetaData "KafkaDebug" "Kafka.Types" "hw-kfk-clnt-3.0.0-9c515fc4" False) (((C1 (MetaCons "DebugGeneric" PrefixI False) (U1 :: Type -> Type) :+: (C1 (MetaCons "DebugBroker" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "DebugTopic" PrefixI False) (U1 :: Type -> Type))) :+: (C1 (MetaCons "DebugMetadata" PrefixI False) (U1 :: Type -> Type) :+: (C1 (MetaCons "DebugQueue" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "DebugMsg" PrefixI False) (U1 :: Type -> Type)))) :+: ((C1 (MetaCons "DebugProtocol" PrefixI False) (U1 :: Type -> Type) :+: (C1 (MetaCons "DebugCgrp" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "DebugSecurity" PrefixI False) (U1 :: Type -> Type))) :+: (C1 (MetaCons "DebugFetch" PrefixI False) (U1 :: Type -> Type) :+: (C1 (MetaCons "DebugFeature" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "DebugAll" PrefixI False) (U1 :: Type -> Type)))))

newtype Timeout #

Constructors

Timeout 

Fields

Instances
Eq Timeout 
Instance details

Defined in Kafka.Types

Methods

(==) :: Timeout -> Timeout -> Bool #

(/=) :: Timeout -> Timeout -> Bool #

Read Timeout 
Instance details

Defined in Kafka.Types

Show Timeout 
Instance details

Defined in Kafka.Types

Generic Timeout 
Instance details

Defined in Kafka.Types

Associated Types

type Rep Timeout :: Type -> Type #

Methods

from :: Timeout -> Rep Timeout x #

to :: Rep Timeout x -> Timeout #

type Rep Timeout 
Instance details

Defined in Kafka.Types

type Rep Timeout = D1 (MetaData "Timeout" "Kafka.Types" "hw-kfk-clnt-3.0.0-9c515fc4" True) (C1 (MetaCons "Timeout" PrefixI True) (S1 (MetaSel (Just "unTimeout") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Int)))