Как связать поток с другим потоком / моно и применить другое противодавление?

Ниже приведен реактивный код, использующий поток в активной зоне реактора:

Flux.create(sink -> ... /* listens to and receives from external source */ , FluxSink.OverflowStrategy.LATEST)
    .flatMap(map -> redisHashReactiveCommands.hmset(key, map))
    //.flatMap(... //want to store same data async into kafka with its own back pressure handling)
    .subscribeOn(Schedulers.parallel())
    .doOnNext(s -> log.debug("Redis consumed. Result -> {}", s))
    .doOnComplete(() -> log.debug("On completed."))
    .doOnError(exception -> log.error("Error occurred while consuming message", exception))
    .subscribe();

Как видите, у меня есть обработка обратного давления для внешнего источника моего процесса (FluxSink.OverflowStrategy.LATEST). Однако я также хочу настроить обратное давление для моего процесса на redis (redisHashReactiveCommands.hmset (key, map)), поскольку это может быть более узким местом, чем внешний источник для моего процесса. Я ожидаю, что мне нужно будет создать еще один поток для части redis и связать его с этим потоком, но как мне этого добиться, поскольку .flatMap работает с отдельным элементом, а не с потоком элементов?

Кроме того, я хочу сохранить тот же излучаемый элемент в Kafka, но цепочка flapMap, похоже, не работает ... есть ли простой способ связать все это вместе в одном наборе функциональных вызовов (внешний источник -> мой процесс, мой процесс -> redis, мой процесс -> кафка)?

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
5
0
5 918
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Если вас не интересуют объекты результатов в главной последовательности, вы можете объединить оба сохранения из flatMap. Вам нужно будет переместить subscribeOn и войти в flatMap, чтобы разместить их во внутренних издателях сохранения:

Flux.create(sink -> ... /* listens to and receives from external source */ , FluxSink.OverflowStrategy.LATEST)
    .flatMap(map -> Mono.when(
        redisHashReactiveCommands.hmset(key, map)
            .subscribeOn(Schedulers.parallel())
            .doOnNext(s -> log.debug("Redis consumed. Result -> {}", s)),

        kafkaReactiveCommand.something(map)
            .subscribeOn(Schedulers.parallel())
            .doOnNext(s -> log.debug("Kafka consumed. Result -> {}", s)),
    ))
    //... this results in a Mono<Void>
    .doOnComplete(() -> log.debug("Both redis and kafka completed."))
    .doOnError(exception -> log.error("Error occurred while consuming message", exception))
    .subscribe();

В качестве альтернативы, если вы Конечно, оба процесса выдают либо элемент результата, либо ошибку, вы можете объединить оба результата в Tuple2, заменив when на zip.

Спасибо. У меня есть несколько вопросов по поводу вашего ответа: 1. Что касается использования Mono и Schedulers.parallel() на внутренних издателях, действительно ли это обрабатывает каждый отправленный элемент последовательно или все элементы параллельно, поскольку они поступают по отдельности? 2. Как справиться с различным обратным давлением для потока Кафки? Должен ли я использовать Flux.zip вместо Mono.when (Reactor Kafka использует Flux)? FluxSink.OverflowStrategy.LATEST подходит как для (внешний источник -> мой процесс), так и (мой процесс -> redis), но я хочу использовать .window(100) для пакетной обработки потока (мой процесс -> kafka).

Thomas Lee 06.11.2018 06:57

1. Он будет эффективно распараллеливать с коэффициентом параллелизма по умолчанию для flatMap (256). 2. Может тогда нужно 2 отдельных конвейера? Посмотрите на оператор доли, который будет применяться при создании

Simon Baslé 06.11.2018 09:15

Я не могу найти ни одного примера на share () .. я могу просто применить его к основному Flux? Flux.create (приемник -> ..., FluxSink.OverflowStrategy.LATEST) .share () .flatMap (карта -> Mono.when (redisHashReactiveCommands.hmset (ключ, карта) ....

Thomas Lee 06.11.2018 09:59

Вы храните Flux.create(...).share() в переменной foo. Затем вы определяете 1 конвейер Flux для Redis из foo и один для Kafka (этот с window(100) и т.п.). Вы также можете использовать операторы onBackPressure* для настройки поведения противодавления, тем самым устраняя необходимость иметь дело с ним на уровне create.

Simon Baslé 06.11.2018 10:06

Спасибо ... еще один вопрос, как мне сделать так, чтобы излучаемые элементы во внутреннем потоке (т.е. внутри redisHashReactiveCommands.hmset(..)) обрабатывались последовательно, а не параллельно (но все же сделать это асинхронным, а не блокирующим вызовом), поскольку порядок обработки важен? Я предполагаю, что должен быть способ указать один фоновый поток, который будет использоваться для обработки всех излучаемых элементов?

Thomas Lee 07.11.2018 04:20

Используйте concatMap вместо flatMap (или flatMapSequential, если вы хотите подписаться раньше, но поддерживать порядок)

Simon Baslé 07.11.2018 21:36

Другие вопросы по теме