Я хочу перебрать базовый список, который предлагает referenceIds. Затем на следующем этапе я хочу сделать несколько вызовов службы и объединить результаты в кортеж.
Затем, в качестве последнего шага, я хочу перебрать все мои referenceIds и вернуть каждый сгенерированный JSONObject напрямую через поток событий.
@GetMapping(value = "/test", produces = TEXT_EVENT_STREAM_VALUE)
public Flux<JSONObject> test(Integer pages) {
pages = 1;
return Flux.range(1, pages)
.map(pageNumber -> Arrays.asList(1 * pageNumber, 2 * pageNumber, 3 * pageNumber)) //referenceIds for testing
.flatMap(numbers -> Flux.fromIterable(numbers).zipWith(Mono.zip(a(numbers), b(numbers)))) //results based on reference ids
.map(tuple -> {
Integer number = tuple.getT1();
Tuple2<List<String>, List<String>> lookup = tuple.getT2();
JSONObject json = new JSONObject();
json.put("number", number);
json.put("someMore", <fromLookup>);
return json;
});
}
Для этого примера возвращаемые типы методов a() и b() не имеют значения. Важная часть:
Если просто вернуть Flux.fromIterable(numbers); все работает нормально.
НО при агрегировании с использованием .zipWith() я получаю только 1-й элемент списка чисел. Все остальные потеряны. Почему?
Примечание: мне нужно использовать .zipWith() для параллельного выполнения этих вызовов методов (некоторые более длительные транзакции).




All others are lost. Why?
Из документации класса Flux:
Zip this {@link Flux} with another {@link Publisher} source, that is to say wait for both to emit one element and combine these elements once into a {@link Tuple2}. The operator will continue doing so until any of the sources completes.
Вы можете выполнить вызовы a() и b(), заархивировать результаты, а затем раскрутить список numbers в поток и добавить результат следующим образом:
.flatMap(numbers -> Mono.zip(a(numbers), b(numbers))
.flatMapMany(tuple -> Flux.fromIterable(numbers).map(i -> Tuples.of(i,tuple))))
Еще один вопрос: если я добавлю это выше, агрегация кортежей вообще работает. Но возвращаемый Server Sent Events, который должен быть отправлен конечной точкой, начинает испускаться, когда все данные обработаны. Я ожидаю, что он будет испускать каждый JSONObject, как только он будет создан. Но это не так? Как я могу изменить окончательный метод map(), чтобы не ждать, пока все данные будут агрегированы?
Это то, что я ожидал, но это не так. События, отправленные сервером, начинаются, когда все данные были обработаны последним методом карты. Я ожидал увидеть немедленные события.
Я приму ваш ответ и начну новый вопрос, так как тема этого вопроса о том, как совместить поток и zip, верна.
Так что мой подход, вероятно, здесь неверен? Имея
List<String> numbers, я хочу сгенерироватьTuple<List<String> number, Object anything>, причем все, что является результатом дальнейших вызовов Flux/Mono. Может быть, вы могли бы привести пример?