Отправка POJO в Kafka с предопределенной схемой Avro в реестре схем

У нас есть реестр конфлюентных схем, настроенный в облаке, в котором есть схема Avro для следующих тем.

  • Схема dummytopic-key имеет одно поле id, которое является string.
  • Схема dummytopic-value имеет поле name, которое является string, и поле score, которое является double.

Сейчас мы пытаемся реализовать производителя на основе Scala с использованием Confluent kafka-client и avro-сериализатора kafka-avro-serializer. Наши классы определены следующим образом.

class DummyKey (val id: java.lang.String)
class DummyValue (val name: java.lang.String, val score: java.lang.Double)

Сериализация этих классов не работает из коробки, потому что классы не поддерживаются типами Avro. Проблема в том, что решения, которые мы находим в Интернете, либо определяют явную схему для этих классов на стороне производителя, либо пытаются получить схему путем отражения. Нам нужен способ получить схему из реестра схем и использовать эту схему для сериализации наших классов.

Иллюстрация того, как это должно выглядеть

object MyApp {
  val producerProps = new Properties()
  // Omitted: properties for Kafka bootstrap servers, TLS, key/value Avro serializers, etc.
  // Omitted: declare a schemaRegistryClient

  val dummyKeySerializer = new KafkaAvroSerializer(schemaRegistryClient)
  dummyKeySerializer.configure(schemaRegistryConfig, true)
  val dummyValueSerializer = new KafkaAvroSerializer(schemaRegistryClient)
  dummyValueSerializer.configure(schemaRegistryConfig, false)

  val producer = new KafkaProducer(producerProps, dummyKeySerializer, dummyValueSerializer)

  val dummyKey = new DummyKey(checkTransaction.transactionid)
  val dummyValue = new DummyValue("test", 0.5)
  producer.send(new ProducerRecord("dummytopic", dummyKey, dummyValue))
}

Функция producer.send жалуется на Unsupported Avro type, потому что DummyKey и DummyValue не являются экземплярами IndexedRecord. Кажется, есть два варианта.

  • Есть ли способ создать IndexedRecord из POJO, используя схему, определенную в реестре схем?
  • Есть ли какой-либо другой метод сериализации, который загружает схему из реестра схем, не полагаясь на локально созданную схему?

Вы должны использовать Maven или аналогичный инструмент сборки для загрузки схемы из реестра перед компиляцией кода. В противном случае используйте avro4s для преобразования ваших классов в объекты Avro.

OneCricketeer 09.04.2021 06:20
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
0
1
14
0

Другие вопросы по теме