-- | Kafka specific functions
module Network.Flink.Kafka (kafkaRecord) where

import Data.ProtoLens (defMessage)
import Data.Text (Text)
import Lens.Family2 ((&), (.~))
import qualified Proto.Kafka as Kafka
import qualified Proto.Kafka_Fields as Kafka
import Data.ByteString (ByteString)

-- | Takes a `topic`, `key`, and `value` to construct 'KafkaProducerRecord's for egress
kafkaRecord ::
  -- | Kafka topic
  Text ->
  -- | Kafka key
  Text ->
  -- | Kafka value
  ByteString ->
  Kafka.KafkaProducerRecord
kafkaRecord :: Text -> Text -> ByteString -> KafkaProducerRecord
kafkaRecord topic :: Text
topic k :: Text
k v :: ByteString
v =
  KafkaProducerRecord
forall msg. Message msg => msg
defMessage
    KafkaProducerRecord
-> (KafkaProducerRecord -> KafkaProducerRecord)
-> KafkaProducerRecord
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f KafkaProducerRecord Text
forall (f :: * -> *) s a.
(Functor f, HasField s "topic" a) =>
LensLike' f s a
Kafka.topic (forall (f :: * -> *).
 Identical f =>
 LensLike' f KafkaProducerRecord Text)
-> Text -> KafkaProducerRecord -> KafkaProducerRecord
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
topic
    KafkaProducerRecord
-> (KafkaProducerRecord -> KafkaProducerRecord)
-> KafkaProducerRecord
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f KafkaProducerRecord Text
forall (f :: * -> *) s a.
(Functor f, HasField s "key" a) =>
LensLike' f s a
Kafka.key (forall (f :: * -> *).
 Identical f =>
 LensLike' f KafkaProducerRecord Text)
-> Text -> KafkaProducerRecord -> KafkaProducerRecord
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
k
    KafkaProducerRecord
-> (KafkaProducerRecord -> KafkaProducerRecord)
-> KafkaProducerRecord
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f KafkaProducerRecord ByteString
forall (f :: * -> *) s a.
(Functor f, HasField s "valueBytes" a) =>
LensLike' f s a
Kafka.valueBytes (forall (f :: * -> *).
 Identical f =>
 LensLike' f KafkaProducerRecord ByteString)
-> ByteString -> KafkaProducerRecord -> KafkaProducerRecord
forall s t a b. Setter s t a b -> b -> s -> t
.~ ByteString
v