Я пытаюсь написать потоковое приложение Flink, в котором есть KafkaSource для чтения из темы, для данных которой определена схема AVRO.
Я хотел бы знать, как в этом случае работает автоматическое кэширование схем локально по аналогии с документацией Confluent здесь.
По сути, вариант использования заключается в том, что потребитель не должен знать схему заранее. После создания экземпляра потребителя URL-адрес реестра схемы следует использовать в качестве параметра, и потребитель должен прочитать схему для этой конкретной темы.
Это возможно? Любые указатели приветствуются!
Да, реестр схемы Glue в основном является копией сливающегося реестра. FlinkKafkaConsumer
требует, чтобы потребители знали схему заранее. Мне было интересно, можно ли этого избежать, заставив схему для потребителя считываться напрямую из реестра схем и кэшироваться локально — как они описали здесь: docs.confluent.io/platform/current/schema-registry/…. Моя проблема заключается в том, что наши потребители не будут знать схему заранее, и они должны иметь возможность создать клиент FlinkKafkaConsumer
, используя URL-адрес реестра схемы Glue или Confluent.
Нет, этого нельзя избежать, потому что для Avro требуется две схемы. Данные, потребляемые в теме, являются «схемой записи». Если у вас есть конечная точка реестра, вы можете просто использовать HTTP-клиент для получения «схемы чтения». Проблема не уникальна для Flink; вы увидите, как любой другой потребитель Avro Kafka предварительно скомпилирует/загрузит отдельную «схему чтения» Avro локально для десериализации записей. Это то, что называется «эволюцией схемы» — вы предоставляете одну схему локально для сопоставления ранее записанных данных.
Итак, после первоначальной загрузки схемы любые обновления схемы, которые обновляются в реестре схем (клей или что-то другое), будут работать с автоматическим обновлением схемы?
Это должно быть возможно.
Вы можете протестировать с помощью инструмента CLI Kafka, такого как kcat
, следующим образом:
kcat -b mybroker -t ledger -s avro -r http://schema-registry-url:8080
Если вы используете kafka-avro-console-consumer
:
kafka-avro-console-consumer --topic topicX --bootstrap-server kafka:9092 \ --property schema.registry.url = "http://schema-registry:8081"
Какое это имеет отношение к Flink или AWS Glue?
Можно через обычный кафка авро консьюмер, это я знаю. Я хочу иметь возможность делать то же самое с FlinkKafkaConsumer
— предоставляя только URL-адрес реестра схемы Glue или URL-адрес реестра Confluent Schema.
После создания экземпляра потребителя URL-адрес реестра схемы следует использовать в качестве параметра, и потребитель должен прочитать схему для этой конкретной темы.
Это будет, и это будет кэшировано. Это известно как «схема писателя».
потребитель не должен знать схему заранее
Это необходимо, поскольку Avro требует «схемы чтения» для десериализации данных, определенных «схемой записи».
Без схемы чтения вам остается обрабатывать типы Avro GenericRecord
.
Я немного читал и нашел этот ресурс, где фрагмент кода Python Kafka Consumer (в описании видео) не принимает схему в качестве входных данных. Не уверен, что я что-то упускаю здесь. Видео: youtube.com/watch?v=q7XFcfE_TJ0
Не уверен о чем ты. Они используют schema_file.read()
Кроме того, Python, в частности, нетипизирован, так что это относится к типу GenericRecord
JVM, где вам не нужен локальный файл схемы.
Думаю, они делают это со стороны продюсера.
О, если у нас все в порядке с GenericRecord
, возможно ли также использование Flink Consumer + Glue?
Должно быть. На данный момент это просто Avro API. Любая конкретная запись может быть преобразована в общую.
Давайте продолжим обсуждение в чате.
Я действительно использовал только Beam. У меня нет примера кода Flink
Я также попытался запустить приложение, используя реестр схемы Confluent. Я вижу неожиданные результаты, которые не имеют для меня полного смысла. У меня есть 2 записи в исходной теме. r1 со схемой v1, а r2 со схемой v2. Я предоставил s1 как часть метода forGeneric
, а также предоставил URL-адрес реестра схемы. Похоже, что реестр схемы даже не читается клиентом-потребителем. Почему это?
Я не понимаю, как автоматическая выборка схемы / кэшированная схема работает с Flink или реестром схем в целом. Если он всегда использует схему чтения, даже если схема записи является более новой версией и совместима, какой смысл иметь реестр схем для использования клиентами? Не могли бы вы помочь мне понять, что мне здесь не хватает?
Как я уже говорил, я не использую Flink, и дело даже не в этом. Flink делегирует классы десериализатора Avro, не относящиеся к Flink Kafka... Вы получаете кэшированную схему только в том случае, если используете GenericRecord
, как уже упоминалось. В противном случае у вас есть скомпилированная схема Avro как SpecificRecord
, поэтому «схема записи» (отправляемая производителем, которая устанавливает идентификатор в байтах) преобразуется в нее с помощью политик эволюции Avro. В документации есть пример с использованием GenericRecord
спасибо за всю вашу помощь - ценю это!
Библиотеки AWS SerDe для Glue используют проводной формат, который содержит uuid схемы (версии), с которой сериализовано сообщение. Приложение-потребитель считывает идентификатор схемы из сообщения и загружает его из реестра схемы Glue, если его еще нет в локальном кеше. Вы можете найти описание формата провода в нижней части файла readme для этой библиотеки javascript serde: https://github.com/meinestadt/glue-schema-registry .
Спасибо, Феликс, это то, чего мне удалось добиться с помощью Confluent. Но для Glue я не вижу CachedScenaRegistryClient для кэширования полученной схемы. Мое приложение использует Java-клиент. Позвольте мне проверить еще раз.
Я попытался следовать формату массива байтов, но получил исключение. Похоже, байты со 2 по 18 не имеют идентификатора схемы. Я делаю следующее: ByteBuffer schemaIdBuffer = ByteBuffer.wrap(value, 2, 16);
Что-то я пропустил?
Почему нельзя использовать официальные библиотеки Glue SerDe для Java от AWS? github.com/awslabs/aws-glue-schema-registry ?
Или вы можете поделиться одним примером двоичного сообщения, которое вы пытаетесь расшифровать? Тогда я могу взглянуть на формат. Например, вы можете поделиться им как шестнадцатеричной строкой.
Я использовал официальный SerDe от aws, и это сработало :)
Десериализаторы клея имеют открытый исходный код... В этом отношении они работают очень похоже на Confluent. Какие конкретно проблемы у вас возникают?