Прочитать запись Kafka с ключом, используя apache Flink?

я использую значение + запись производителя kafka, используя:

bin/kafka-console-producer.sh --topic test3 --property "parse.key=true" --property "key.separator=:" --bootstrap-server localhost:9092

но мне трудно понять, как читать эти записи кафки с помощью потребителя Flink kafka KafkaSource. Я хочу иметь возможность делать такие вещи, как:

record.getValue(), record.getKey(), record.getTimestamp()...

это мой текущий код, который читает только записи без ключей из kafka

        KafkaSource<String> source = KafkaSource.<String>builder()
            .setBootstrapServers(ip)
            .setTopics("test3")
            .setGroupId("1")
            .setStartingOffsets(OffsetsInitializer.earliest())
            .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
            .build();

    DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
    stream.map((MapFunction<String, String>) value -> "Receiving from Kafka : " + value).print();

Могу ли я получить пример того, что я ищу?

Для начала не используйте valueOnly()

OneCricketeer 29.11.2022 04:42

Я использовал только Beam, а не Flink, но глядя на Javadoc, вам нужно использовать его для преобразования необработанного ConsumerRecord в ваш собственный тип. Затем KafkaSource<T> будет использовать это, поскольку он не поддерживает наличие двух дженериков.

OneCricketeer 29.11.2022 04:51
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
2
98
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вам нужно реализовать KafkaRecordDeserializationSchema (но не valueOnly), а затем в его методе десериализации у вас будет доступ к ConsumerRecord, и вы можете работать с его ключом, значением, заголовками и т. д. для создания любого типа, который вы хотите.

Пример есть в Чтение заголовков Apache Kafka®, который является частью кулинарной книги Immerok Apache Flink. Обратите внимание, что хотя в этом примере доступ к теме, разделу, смещению и метке времени осуществляется из заголовков записи, он не использует ключ, который доступен как record.key().

Примечание: я работаю на Иммерок.

Отличается ли FlinkSQL? Например, Spark Structured Streaming имеет доступ к ключу и значению?

OneCricketeer 29.11.2022 13:38

Flink SQL имеет концепцию столбцов метаданных, которые раскрывают заголовки Kafka. См. nightlies.apache.org/flink/flink-docs-release-1.16/docs/….

David Anderson 29.11.2022 18:24

@DavidAnderson спасибо за урок!

4 3 2 29.11.2022 20:31

Заголовки не совпадают с ключом... Например, SparkSQL возвращает ключ, значение и, необязательно, заголовки в виде трех отдельных столбцов фрейма данных.

OneCricketeer 30.11.2022 15:04

Правда, надо было быть точнее. Столбцы метаданных Flink SQL для соединителя Kafka предоставляют ряд атрибутов метаданных каждой записи, таких как тема, смещение, заголовки, отметка времени и т. д. IIUC, ключ не включен.

David Anderson 30.11.2022 20:11

Другие вопросы по теме