Как перебрать кортеж с помощью zipWith во Flux?

Я хочу перебрать базовый список, который предлагает referenceIds. Затем на следующем этапе я хочу сделать несколько вызовов службы и объединить результаты в кортеж.

Затем, в качестве последнего шага, я хочу перебрать все мои referenceIds и вернуть каждый сгенерированный JSONObject напрямую через поток событий.

@GetMapping(value = "/test", produces = TEXT_EVENT_STREAM_VALUE)
public Flux<JSONObject> test(Integer pages) {
    pages = 1;
    return Flux.range(1, pages)
            .map(pageNumber -> Arrays.asList(1 * pageNumber, 2 * pageNumber, 3 * pageNumber)) //referenceIds for testing
            .flatMap(numbers -> Flux.fromIterable(numbers).zipWith(Mono.zip(a(numbers), b(numbers)))) //results based on reference ids
            .map(tuple -> {
                Integer number = tuple.getT1();
                Tuple2<List<String>, List<String>> lookup = tuple.getT2();

                JSONObject json = new JSONObject();
                json.put("number", number);
                json.put("someMore", <fromLookup>);
                return json;
            });
}

Для этого примера возвращаемые типы методов a() и b() не имеют значения. Важная часть:

Если просто вернуть Flux.fromIterable(numbers); все работает нормально. НО при агрегировании с использованием .zipWith() я получаю только 1-й элемент списка чисел. Все остальные потеряны. Почему?

Примечание: мне нужно использовать .zipWith() для параллельного выполнения этих вызовов методов (некоторые более длительные транзакции).

Так что мой подход, вероятно, здесь неверен? Имея List<String> numbers, я хочу сгенерировать Tuple<List<String> number, Object anything>, причем все, что является результатом дальнейших вызовов Flux/Mono. Может быть, вы могли бы привести пример?

membersound 29.07.2019 14:04
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
2
1
4 700
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

All others are lost. Why?

Из документации класса Flux:

Zip this {@link Flux} with another {@link Publisher} source, that is to say wait for both to emit one element and combine these elements once into a {@link Tuple2}. The operator will continue doing so until any of the sources completes.

Вы можете выполнить вызовы a() и b(), заархивировать результаты, а затем раскрутить список numbers в поток и добавить результат следующим образом:

   .flatMap(numbers -> Mono.zip(a(numbers), b(numbers))
                .flatMapMany(tuple -> Flux.fromIterable(numbers).map(i -> Tuples.of(i,tuple))))

Еще один вопрос: если я добавлю это выше, агрегация кортежей вообще работает. Но возвращаемый Server Sent Events, который должен быть отправлен конечной точкой, начинает испускаться, когда все данные обработаны. Я ожидаю, что он будет испускать каждый JSONObject, как только он будет создан. Но это не так? Как я могу изменить окончательный метод map(), чтобы не ждать, пока все данные будут агрегированы?

membersound 29.07.2019 15:32

Это то, что я ожидал, но это не так. События, отправленные сервером, начинаются, когда все данные были обработаны последним методом карты. Я ожидал увидеть немедленные события.

membersound 29.07.2019 16:05

Я приму ваш ответ и начну новый вопрос, так как тема этого вопроса о том, как совместить поток и zip, верна.

membersound 29.07.2019 16:29

Другие вопросы по теме