Мое приложение Flink считывает данные из одного источника кафки, сопоставляет их с объектом и записывает в другую тему кафки. Все работает нормально, если я использую MapFunction
для преобразования, но как только я использую реализацию, которая использует классы extends ProcessFunction or RichFlatMapFunction
, приемник вообще не вызывается (запись в код темы kafka не выполняется) вообще. Причина, по которой я использую ProcessFunction
или RichFlatMapFunction
, заключается в том, что мне нужно RuntimeConext() для чтения и записи ValueState. Как мне добиться вызова раковины в этом случае?
env.addSource(новый FlinkKafkaConsumer<>("READ_FROM_TOPIC", новый Десериализатор(), плюсы)).keyBy(Order::getId) .process(new StatefulOrderMapper()).addSink(new FlinkKafkaProducer<>("WRITE_TO_TOPIC", new Serializer(), реквизиты)); // StatefulOrderMapper расширяет ProcessFunction
env.addSource(новый FlinkKafkaConsumer<>("READ_FROM_TOPIC", новый Десериализатор(), плюсы)).keyBy(Order::getId) .map(новый DoSomeMapping()).addSink(новый FlinkKafkaProducer<>("WRITE_TO_TOPIC", новый OrderSerializer(), реквизит)); //DoSomeMapping расширяет MapFunction
Одно большое различие между картой и плоской картой или процессом заключается в том, что карта выдает возвращаемое значение MapFunction, тогда как FlatMap или ProcessFunction использует Collector для выдачи событий.
В качестве рабочего примера, который вы можете использовать в качестве отправной точки, я предлагаю взглянуть на https://docs.immerok.cloud/docs/how-to-guides/development/batch-and-streaming-with-the- apache-flink-table-and-datastream-apis/#the-datastream-workflow . Код вы найдете в https://github.com/immerok/recipes/tree/main/latest-transaction.
Примечание: я работаю на Иммерок.
Flink имеет библиотеку CEP для сопоставления шаблонов с FSM. См. docs.immerok.cloud/docs/how-to-guides/development/… для ознакомления. Его также можно использовать из Flink SQL: nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/…
Андесон, спасибо, сработало! могу ли я задать другой вопрос: когда я сопоставляю исходное сообщение с объектом Order, я хотел бы передать его другим функциям сопоставления в зависимости от его статуса, поэтому все семь средств сопоставления действуют как конечный автомат (FSM). Например, у меня есть еще 6 функций, таких как класс StatefulOrderMapper, и я бы Flink рассматривал их как 7 состояний/узлов FSM. Единственный способ, который я мог придумать, заключался в том, чтобы поместить все 7 в последовательную цепочку и иметь логику внутри каждого узла, чтобы решить, должен ли он его обрабатывать или нет. Хотя выглядит не элегантно.