я использую значение + запись производителя kafka, используя:
bin/kafka-console-producer.sh --topic test3 --property "parse.key=true" --property "key.separator=:" --bootstrap-server localhost:9092
но мне трудно понять, как читать эти записи кафки с помощью потребителя Flink kafka KafkaSource. Я хочу иметь возможность делать такие вещи, как:
record.getValue(), record.getKey(), record.getTimestamp()...
это мой текущий код, который читает только записи без ключей из kafka
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(ip)
.setTopics("test3")
.setGroupId("1")
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
stream.map((MapFunction<String, String>) value -> "Receiving from Kafka : " + value).print();
Могу ли я получить пример того, что я ищу?
Я использовал только Beam, а не Flink, но глядя на Javadoc, вам нужно использовать его для преобразования необработанного ConsumerRecord в ваш собственный тип. Затем KafkaSource<T> будет использовать это, поскольку он не поддерживает наличие двух дженериков.




Вам нужно реализовать KafkaRecordDeserializationSchema (но не valueOnly), а затем в его методе десериализации у вас будет доступ к ConsumerRecord, и вы можете работать с его ключом, значением, заголовками и т. д. для создания любого типа, который вы хотите.
Пример есть в Чтение заголовков Apache Kafka®, который является частью кулинарной книги Immerok Apache Flink. Обратите внимание, что хотя в этом примере доступ к теме, разделу, смещению и метке времени осуществляется из заголовков записи, он не использует ключ, который доступен как record.key().
Примечание: я работаю на Иммерок.
Отличается ли FlinkSQL? Например, Spark Structured Streaming имеет доступ к ключу и значению?
Flink SQL имеет концепцию столбцов метаданных, которые раскрывают заголовки Kafka. См. nightlies.apache.org/flink/flink-docs-release-1.16/docs/….
@DavidAnderson спасибо за урок!
Заголовки не совпадают с ключом... Например, SparkSQL возвращает ключ, значение и, необязательно, заголовки в виде трех отдельных столбцов фрейма данных.
Правда, надо было быть точнее. Столбцы метаданных Flink SQL для соединителя Kafka предоставляют ряд атрибутов метаданных каждой записи, таких как тема, смещение, заголовки, отметка времени и т. д. IIUC, ключ не включен.
Для начала не используйте
valueOnly()