Ниже приведен реактивный код, использующий поток в активной зоне реактора:
Flux.create(sink -> ... /* listens to and receives from external source */ , FluxSink.OverflowStrategy.LATEST)
.flatMap(map -> redisHashReactiveCommands.hmset(key, map))
//.flatMap(... //want to store same data async into kafka with its own back pressure handling)
.subscribeOn(Schedulers.parallel())
.doOnNext(s -> log.debug("Redis consumed. Result -> {}", s))
.doOnComplete(() -> log.debug("On completed."))
.doOnError(exception -> log.error("Error occurred while consuming message", exception))
.subscribe();
Как видите, у меня есть обработка обратного давления для внешнего источника моего процесса (FluxSink.OverflowStrategy.LATEST). Однако я также хочу настроить обратное давление для моего процесса на redis (redisHashReactiveCommands.hmset (key, map)), поскольку это может быть более узким местом, чем внешний источник для моего процесса. Я ожидаю, что мне нужно будет создать еще один поток для части redis и связать его с этим потоком, но как мне этого добиться, поскольку .flatMap работает с отдельным элементом, а не с потоком элементов?
Кроме того, я хочу сохранить тот же излучаемый элемент в Kafka, но цепочка flapMap, похоже, не работает ... есть ли простой способ связать все это вместе в одном наборе функциональных вызовов (внешний источник -> мой процесс, мой процесс -> redis, мой процесс -> кафка)?




Если вас не интересуют объекты результатов в главной последовательности, вы можете объединить оба сохранения из flatMap. Вам нужно будет переместить subscribeOn и войти в flatMap, чтобы разместить их во внутренних издателях сохранения:
Flux.create(sink -> ... /* listens to and receives from external source */ , FluxSink.OverflowStrategy.LATEST)
.flatMap(map -> Mono.when(
redisHashReactiveCommands.hmset(key, map)
.subscribeOn(Schedulers.parallel())
.doOnNext(s -> log.debug("Redis consumed. Result -> {}", s)),
kafkaReactiveCommand.something(map)
.subscribeOn(Schedulers.parallel())
.doOnNext(s -> log.debug("Kafka consumed. Result -> {}", s)),
))
//... this results in a Mono<Void>
.doOnComplete(() -> log.debug("Both redis and kafka completed."))
.doOnError(exception -> log.error("Error occurred while consuming message", exception))
.subscribe();
В качестве альтернативы, если вы Конечно, оба процесса выдают либо элемент результата, либо ошибку, вы можете объединить оба результата в Tuple2, заменив when на zip.
1. Он будет эффективно распараллеливать с коэффициентом параллелизма по умолчанию для flatMap (256). 2. Может тогда нужно 2 отдельных конвейера? Посмотрите на оператор доли, который будет применяться при создании
Я не могу найти ни одного примера на share () .. я могу просто применить его к основному Flux? Flux.create (приемник -> ..., FluxSink.OverflowStrategy.LATEST) .share () .flatMap (карта -> Mono.when (redisHashReactiveCommands.hmset (ключ, карта) ....
Вы храните Flux.create(...).share() в переменной foo. Затем вы определяете 1 конвейер Flux для Redis из foo и один для Kafka (этот с window(100) и т.п.). Вы также можете использовать операторы onBackPressure* для настройки поведения противодавления, тем самым устраняя необходимость иметь дело с ним на уровне create.
Спасибо ... еще один вопрос, как мне сделать так, чтобы излучаемые элементы во внутреннем потоке (т.е. внутри redisHashReactiveCommands.hmset(..)) обрабатывались последовательно, а не параллельно (но все же сделать это асинхронным, а не блокирующим вызовом), поскольку порядок обработки важен? Я предполагаю, что должен быть способ указать один фоновый поток, который будет использоваться для обработки всех излучаемых элементов?
Используйте concatMap вместо flatMap (или flatMapSequential, если вы хотите подписаться раньше, но поддерживать порядок)
Спасибо. У меня есть несколько вопросов по поводу вашего ответа: 1. Что касается использования
MonoиSchedulers.parallel()на внутренних издателях, действительно ли это обрабатывает каждый отправленный элемент последовательно или все элементы параллельно, поскольку они поступают по отдельности? 2. Как справиться с различным обратным давлением для потока Кафки? Должен ли я использоватьFlux.zipвместоMono.when(Reactor Kafka использует Flux)?FluxSink.OverflowStrategy.LATESTподходит как для (внешний источник -> мой процесс), так и (мой процесс -> redis), но я хочу использовать.window(100)для пакетной обработки потока (мой процесс -> kafka).