Приемник Apache Flink не вызывается при использовании ProcessFunction или RichFlatMapFunction

Мое приложение Flink считывает данные из одного источника кафки, сопоставляет их с объектом и записывает в другую тему кафки. Все работает нормально, если я использую MapFunction для преобразования, но как только я использую реализацию, которая использует классы extends ProcessFunction or RichFlatMapFunction, приемник вообще не вызывается (запись в код темы kafka не выполняется) вообще. Причина, по которой я использую ProcessFunction или RichFlatMapFunction, заключается в том, что мне нужно RuntimeConext() для чтения и записи ValueState. Как мне добиться вызова раковины в этом случае?

env.addSource(новый FlinkKafkaConsumer<>("READ_FROM_TOPIC", новый Десериализатор(), плюсы)).keyBy(Order::getId) .process(new StatefulOrderMapper()).addSink(new FlinkKafkaProducer<>("WRITE_TO_TOPIC", new Serializer(), реквизиты)); // StatefulOrderMapper расширяет ProcessFunction

env.addSource(новый FlinkKafkaConsumer<>("READ_FROM_TOPIC", новый Десериализатор(), плюсы)).keyBy(Order::getId) .map(новый DoSomeMapping()).addSink(новый FlinkKafkaProducer<>("WRITE_TO_TOPIC", новый OrderSerializer(), реквизит)); //DoSomeMapping расширяет MapFunction

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
0
103
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Одно большое различие между картой и плоской картой или процессом заключается в том, что карта выдает возвращаемое значение MapFunction, тогда как FlatMap или ProcessFunction использует Collector для выдачи событий.

В качестве рабочего примера, который вы можете использовать в качестве отправной точки, я предлагаю взглянуть на https://docs.immerok.cloud/docs/how-to-guides/development/batch-and-streaming-with-the- apache-flink-table-and-datastream-apis/#the-datastream-workflow . Код вы найдете в https://github.com/immerok/recipes/tree/main/latest-transaction.

Примечание: я работаю на Иммерок.

Андесон, спасибо, сработало! могу ли я задать другой вопрос: когда я сопоставляю исходное сообщение с объектом Order, я хотел бы передать его другим функциям сопоставления в зависимости от его статуса, поэтому все семь средств сопоставления действуют как конечный автомат (FSM). Например, у меня есть еще 6 функций, таких как класс StatefulOrderMapper, и я бы Flink рассматривал их как 7 состояний/узлов FSM. Единственный способ, который я мог придумать, заключался в том, чтобы поместить все 7 в последовательную цепочку и иметь логику внутри каждого узла, чтобы решить, должен ли он его обрабатывать или нет. Хотя выглядит не элегантно.

Abidi 26.11.2022 21:55

Другие вопросы по теме