Как использовать Spring WebClient для одновременного выполнения нескольких вызовов и получения ответа по отдельности?

Мне нужно одновременно выполнять несколько вызовов API, которые не зависят друг от друга:

Mono<Response1> response1= this.webClient
               .post()
               .uri(requestURI1)
               .body(Flux.just(request.getRequestBody()), ParameterizedTypeReference<T>)
               .exchangeToMono(response -> {
                   return response.statusCode().equals(HttpStatus.OK)
                            ? response.bodyToMono(ParameterizedTypeReference<T>)
                            : response.createException().flatMap(Mono::error);
                });

Mono<Response2> response2= this.webClient
               .post()
               .uri(requestURI2)
               .body(Flux.just(request.getRequestBody()), ParameterizedTypeReference<T>)
               .exchangeToMono(response -> {
                   return response.statusCode().equals(HttpStatus.OK)
                            ? response.bodyToMono(ParameterizedTypeReference<T>)
                            : response.createException().flatMap(Mono::error);
                });

Mono<Response3> response3= this.webClient
               .post()
               .uri(requestURI3)
               .body(Flux.just(request.getRequestBody()), ParameterizedTypeReference<T>)
               .exchangeToMono(response -> {
                   return response.statusCode().equals(HttpStatus.OK)
                            ? response.bodyToMono(ParameterizedTypeReference<T>)
                            : response.createException().flatMap(Mono::error);
                });

Как я могу получить ответ на вышеуказанные вызовы API в отдельных объектах и ​​в то же время они должны выполняться параллельно? Кроме того, после выполнения этих вышеуказанных вызовов и помещения данных в отдельные объекты, скажем, Response1, Response2, Response3, я хочу выполнить еще один вызов API, который использует эти ответы Response1, Response2, Response3.

Я пытался использовать Flux.merge, но это объединит ответы в один объект, что неверно. Также прочитайте о Mono.zip, но все они используются для объединения ответов, которые мне не нужны.

Обновлено: Mono.zip работает отлично. У меня есть дополнительный вопрос, который я упомянул в комментариях, но также публикую его здесь. Итак, я реализовал так:

Mono.zip(rs1, rs2, rs3).flatMap(tuples -> {
//do something with responses
return Mono.just(transformedData)
}).flatMap(transformedData -> {
//here another webclient.post call which consumes transformedData and return actualData in form of Mono<actualData>

Mono<actualData> data= callAPI;
return data;
});

Now this response is propagated to rest layer in form of Mono<actualData> and I am getting this in response: {
    "scanAvailable": true
}
0
0
79
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Чтобы объединить издателей параллельно, вы можете использовать Mono.zip, который вернет TupleX<...> и будет разрешен, когда все издатели будут разрешены.

Mono<Tuple3<Response1, Response2, Response3>> res = Mono.zip(response1, response2, response3)
        .map(tuples -> {
            //do something with responses
            return transformedData;
        })
        .flatMap(transformedData -> {
            //here another webclient.post call which consumes transformedData and return actualData in form of Mono<actualData>
            return callAPI(transformedData);
        });

В зависимости от логики обработки ошибок вы можете рассмотреть zipDelayError

Как я упоминал в комментарии, одна из ключевых особенностей реактивного API — различать синхронные и асинхронные операции. В случае преобразования синхронизации свяжите его с .map и используйте .flatMap или аналогичный оператор, если вы хотите связать другой асинхронный метод.

Да, это работает. Большое спасибо. Мне просто нужно задать 1 дополнительный вопрос: вышеуказанный метод вызывается внутри службы через вызов API. Теперь, после того, как я получил ответ от всех этих издателей в этом методе, я хочу преобразовать данные в какой-то другой объект, скажем, TestData, и мне нужно вернуть этот объект в качестве ответа на мой API. Как я могу добиться этого неблокирующим способом? Должен ли я возвращать объект Mono<TestData>? Или я могу напрямую вернуть TestData в качестве ответа?

Abhishek Wadhwa 22.03.2022 16:03

Если это просто преобразование, свяжите его с .map получением Tuple3<Response1, Response2, Response3> в качестве входных данных и возвратом TestData.

Alex 22.03.2022 16:31

Итак, я реализовал так: Mono.zip(rs1, rs2, rs3).flatMap(tuples -> { // делаем что-то с ответами return Mono.just(transformedData) }).flatMap(transformedData -> { // вот еще вызов webclient.post, который использует преобразованные данные и возвращает фактические данные в форме Mono<actualData> Mono<actualData> data= callAPI; return data; }); Теперь этот ответ распространяется на оставшийся слой в виде Mono<actualData>, и я получаю это в ответ: { "scanAvailable": true }

Abhishek Wadhwa 22.03.2022 16:34

Здесь трудно читать код. Я бы предложил обновить исходный вопрос.

Alex 22.03.2022 16:43

Одна из ключевых особенностей реактивного API — различать синхронные и асинхронные операции. Как я упоминал в предыдущем комментарии, в случае преобразования синхронизации свяжите его с .map и используйте .flatMap, если вы хотите связать другую асинхронную операцию, аналогичную вашему источнику, как l запросы

Alex 22.03.2022 16:48

@Abhishek Wadhwa проверьте обновленный ответ.

Alex 22.03.2022 17:30

Прочтите ИсполнительСервис. Это пул потоков, который вы можете использовать для отправки задач, которые будут выполняться в отдельных потоках параллельно. Вам нужно будет реализовать интерфейс Вызываемый и получить Будущее в качестве результатов. Это дает вам желаемую функциональность

Спасибо Михаил за ваш ответ. Но мне было интересно, есть ли у веб-клиента встроенная возможность запускать вызовы параллельно и публиковать данные параллельно для сбора. Я также подумал об использовании ExecuterService и отправке внешних вызовов API в каждой задаче с помощью RestTemplate.

Abhishek Wadhwa 16.03.2022 15:49

В реактивном вы не работаете с ExecutorService и потоками напрямую. Преимущество заключается в том, что все операции ввода-вывода являются асинхронными, и вы можете создавать очень эффективные приложения, используя небольшое количество потоков. Как я уже упоминал в своем ответе, вам нужно использовать реактивные операторы для объединения и разрешения издателей Mono/Flux. Вы можете найти больше примеров здесь stackoverflow.com/questions/71476693/…

Alex 16.03.2022 18:11

Другие вопросы по теме