FlinkKafkaConsumer / KafkaSource с реестром AWS Glue Schema Registry или Confluent Schema Registry

Я пытаюсь написать потоковое приложение Flink, в котором есть KafkaSource для чтения из темы, для данных которой определена схема AVRO.

Я хотел бы знать, как в этом случае работает автоматическое кэширование схем локально по аналогии с документацией Confluent здесь.

По сути, вариант использования заключается в том, что потребитель не должен знать схему заранее. После создания экземпляра потребителя URL-адрес реестра схемы следует использовать в качестве параметра, и потребитель должен прочитать схему для этой конкретной темы.

Это возможно? Любые указатели приветствуются!

Десериализаторы клея имеют открытый исходный код... В этом отношении они работают очень похоже на Confluent. Какие конкретно проблемы у вас возникают?

OneCricketeer 11.01.2023 08:46

Да, реестр схемы Glue в основном является копией сливающегося реестра. FlinkKafkaConsumer требует, чтобы потребители знали схему заранее. Мне было интересно, можно ли этого избежать, заставив схему для потребителя считываться напрямую из реестра схем и кэшироваться локально — как они описали здесь: docs.confluent.io/platform/current/schema-registry/…. Моя проблема заключается в том, что наши потребители не будут знать схему заранее, и они должны иметь возможность создать клиент FlinkKafkaConsumer, используя URL-адрес реестра схемы Glue или Confluent.

guru 11.01.2023 18:47

Нет, этого нельзя избежать, потому что для Avro требуется две схемы. Данные, потребляемые в теме, являются «схемой записи». Если у вас есть конечная точка реестра, вы можете просто использовать HTTP-клиент для получения «схемы чтения». Проблема не уникальна для Flink; вы увидите, как любой другой потребитель Avro Kafka предварительно скомпилирует/загрузит отдельную «схему чтения» Avro локально для десериализации записей. Это то, что называется «эволюцией схемы» — вы предоставляете одну схему локально для сопоставления ранее записанных данных.

OneCricketeer 11.01.2023 19:01

Итак, после первоначальной загрузки схемы любые обновления схемы, которые обновляются в реестре схем (клей или что-то другое), будут работать с автоматическим обновлением схемы?

guru 11.01.2023 20:25
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
0
4
78
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Это должно быть возможно.

Вы можете протестировать с помощью инструмента CLI Kafka, такого как kcat, следующим образом:

kcat -b mybroker -t ledger -s avro -r http://schema-registry-url:8080

Если вы используете kafka-avro-console-consumer:

kafka-avro-console-consumer --topic topicX --bootstrap-server kafka:9092 \ --property schema.registry.url = "http://schema-registry:8081"

Какое это имеет отношение к Flink или AWS Glue?

OneCricketeer 11.01.2023 08:45

Можно через обычный кафка авро консьюмер, это я знаю. Я хочу иметь возможность делать то же самое с FlinkKafkaConsumer — предоставляя только URL-адрес реестра схемы Glue или URL-адрес реестра Confluent Schema.

guru 11.01.2023 18:48

После создания экземпляра потребителя URL-адрес реестра схемы следует использовать в качестве параметра, и потребитель должен прочитать схему для этой конкретной темы.

Это будет, и это будет кэшировано. Это известно как «схема писателя».

потребитель не должен знать схему заранее

Это необходимо, поскольку Avro требует «схемы чтения» для десериализации данных, определенных «схемой записи».

Без схемы чтения вам остается обрабатывать типы Avro GenericRecord.

Я немного читал и нашел этот ресурс, где фрагмент кода Python Kafka Consumer (в описании видео) не принимает схему в качестве входных данных. Не уверен, что я что-то упускаю здесь. Видео: youtube.com/watch?v=q7XFcfE_TJ0

guru 11.01.2023 20:32

Не уверен о чем ты. Они используют schema_file.read()

OneCricketeer 11.01.2023 20:39

Кроме того, Python, в частности, нетипизирован, так что это относится к типу GenericRecord JVM, где вам не нужен локальный файл схемы.

OneCricketeer 11.01.2023 20:46

Думаю, они делают это со стороны продюсера.

guru 11.01.2023 20:51

О, если у нас все в порядке с GenericRecord, возможно ли также использование Flink Consumer + Glue?

guru 11.01.2023 20:52

Должно быть. На данный момент это просто Avro API. Любая конкретная запись может быть преобразована в общую.

OneCricketeer 12.01.2023 15:41

Давайте продолжим обсуждение в чате.

guru 12.01.2023 18:05

Я действительно использовал только Beam. У меня нет примера кода Flink

OneCricketeer 13.01.2023 00:34

Я также попытался запустить приложение, используя реестр схемы Confluent. Я вижу неожиданные результаты, которые не имеют для меня полного смысла. У меня есть 2 записи в исходной теме. r1 со схемой v1, а r2 со схемой v2. Я предоставил s1 как часть метода forGeneric, а также предоставил URL-адрес реестра схемы. Похоже, что реестр схемы даже не читается клиентом-потребителем. Почему это?

guru 17.01.2023 23:46

Я не понимаю, как автоматическая выборка схемы / кэшированная схема работает с Flink или реестром схем в целом. Если он всегда использует схему чтения, даже если схема записи является более новой версией и совместима, какой смысл иметь реестр схем для использования клиентами? Не могли бы вы помочь мне понять, что мне здесь не хватает?

guru 18.01.2023 18:13

Как я уже говорил, я не использую Flink, и дело даже не в этом. Flink делегирует классы десериализатора Avro, не относящиеся к Flink Kafka... Вы получаете кэшированную схему только в том случае, если используете GenericRecord, как уже упоминалось. В противном случае у вас есть скомпилированная схема Avro как SpecificRecord, поэтому «схема записи» (отправляемая производителем, которая устанавливает идентификатор в байтах) преобразуется в нее с помощью политик эволюции Avro. В документации есть пример с использованием GenericRecord

OneCricketeer 18.01.2023 22:22

спасибо за всю вашу помощь - ценю это!

guru 18.01.2023 22:32
Ответ принят как подходящий

Библиотеки AWS SerDe для Glue используют проводной формат, который содержит uuid схемы (версии), с которой сериализовано сообщение. Приложение-потребитель считывает идентификатор схемы из сообщения и загружает его из реестра схемы Glue, если его еще нет в локальном кеше. Вы можете найти описание формата провода в нижней части файла readme для этой библиотеки javascript serde: https://github.com/meinestadt/glue-schema-registry .

Спасибо, Феликс, это то, чего мне удалось добиться с помощью Confluent. Но для Glue я не вижу CachedScenaRegistryClient для кэширования полученной схемы. Мое приложение использует Java-клиент. Позвольте мне проверить еще раз.

guru 21.01.2023 16:31

Я попытался следовать формату массива байтов, но получил исключение. Похоже, байты со 2 по 18 не имеют идентификатора схемы. Я делаю следующее: ByteBuffer schemaIdBuffer = ByteBuffer.wrap(value, 2, 16); Что-то я пропустил?

guru 23.01.2023 22:14

Почему нельзя использовать официальные библиотеки Glue SerDe для Java от AWS? github.com/awslabs/aws-glue-schema-registry ?

Felix R 28.01.2023 08:58

Или вы можете поделиться одним примером двоичного сообщения, которое вы пытаетесь расшифровать? Тогда я могу взглянуть на формат. Например, вы можете поделиться им как шестнадцатеричной строкой.

Felix R 28.01.2023 09:01

Я использовал официальный SerDe от aws, и это сработало :)

guru 07.02.2023 20:30

Другие вопросы по теме