Мне нужно одновременно выполнять несколько вызовов API, которые не зависят друг от друга:
Mono<Response1> response1= this.webClient
.post()
.uri(requestURI1)
.body(Flux.just(request.getRequestBody()), ParameterizedTypeReference<T>)
.exchangeToMono(response -> {
return response.statusCode().equals(HttpStatus.OK)
? response.bodyToMono(ParameterizedTypeReference<T>)
: response.createException().flatMap(Mono::error);
});
Mono<Response2> response2= this.webClient
.post()
.uri(requestURI2)
.body(Flux.just(request.getRequestBody()), ParameterizedTypeReference<T>)
.exchangeToMono(response -> {
return response.statusCode().equals(HttpStatus.OK)
? response.bodyToMono(ParameterizedTypeReference<T>)
: response.createException().flatMap(Mono::error);
});
Mono<Response3> response3= this.webClient
.post()
.uri(requestURI3)
.body(Flux.just(request.getRequestBody()), ParameterizedTypeReference<T>)
.exchangeToMono(response -> {
return response.statusCode().equals(HttpStatus.OK)
? response.bodyToMono(ParameterizedTypeReference<T>)
: response.createException().flatMap(Mono::error);
});
Как я могу получить ответ на вышеуказанные вызовы API в отдельных объектах и в то же время они должны выполняться параллельно? Кроме того, после выполнения этих вышеуказанных вызовов и помещения данных в отдельные объекты, скажем, Response1, Response2, Response3, я хочу выполнить еще один вызов API, который использует эти ответы Response1, Response2, Response3.
Я пытался использовать Flux.merge, но это объединит ответы в один объект, что неверно. Также прочитайте о Mono.zip, но все они используются для объединения ответов, которые мне не нужны.
Обновлено: Mono.zip работает отлично. У меня есть дополнительный вопрос, который я упомянул в комментариях, но также публикую его здесь. Итак, я реализовал так:
Mono.zip(rs1, rs2, rs3).flatMap(tuples -> {
//do something with responses
return Mono.just(transformedData)
}).flatMap(transformedData -> {
//here another webclient.post call which consumes transformedData and return actualData in form of Mono<actualData>
Mono<actualData> data= callAPI;
return data;
});
Now this response is propagated to rest layer in form of Mono<actualData> and I am getting this in response: {
"scanAvailable": true
}
Чтобы объединить издателей параллельно, вы можете использовать Mono.zip
, который вернет TupleX<...>
и будет разрешен, когда все издатели будут разрешены.
Mono<Tuple3<Response1, Response2, Response3>> res = Mono.zip(response1, response2, response3)
.map(tuples -> {
//do something with responses
return transformedData;
})
.flatMap(transformedData -> {
//here another webclient.post call which consumes transformedData and return actualData in form of Mono<actualData>
return callAPI(transformedData);
});
В зависимости от логики обработки ошибок вы можете рассмотреть zipDelayError
Как я упоминал в комментарии, одна из ключевых особенностей реактивного API — различать синхронные и асинхронные операции. В случае преобразования синхронизации свяжите его с .map
и используйте .flatMap
или аналогичный оператор, если вы хотите связать другой асинхронный метод.
Если это просто преобразование, свяжите его с .map
получением Tuple3<Response1, Response2, Response3>
в качестве входных данных и возвратом TestData
.
Итак, я реализовал так: Mono.zip(rs1, rs2, rs3).flatMap(tuples -> { // делаем что-то с ответами return Mono.just(transformedData) }).flatMap(transformedData -> { // вот еще вызов webclient.post, который использует преобразованные данные и возвращает фактические данные в форме Mono<actualData> Mono<actualData> data= callAPI; return data; }); Теперь этот ответ распространяется на оставшийся слой в виде Mono<actualData>, и я получаю это в ответ: { "scanAvailable": true }
Здесь трудно читать код. Я бы предложил обновить исходный вопрос.
Одна из ключевых особенностей реактивного API — различать синхронные и асинхронные операции. Как я упоминал в предыдущем комментарии, в случае преобразования синхронизации свяжите его с .map
и используйте .flatMap
, если вы хотите связать другую асинхронную операцию, аналогичную вашему источнику, как l запросы
@Abhishek Wadhwa проверьте обновленный ответ.
Прочтите ИсполнительСервис. Это пул потоков, который вы можете использовать для отправки задач, которые будут выполняться в отдельных потоках параллельно. Вам нужно будет реализовать интерфейс Вызываемый и получить Будущее в качестве результатов. Это дает вам желаемую функциональность
Спасибо Михаил за ваш ответ. Но мне было интересно, есть ли у веб-клиента встроенная возможность запускать вызовы параллельно и публиковать данные параллельно для сбора. Я также подумал об использовании ExecuterService и отправке внешних вызовов API в каждой задаче с помощью RestTemplate.
В реактивном вы не работаете с ExecutorService
и потоками напрямую. Преимущество заключается в том, что все операции ввода-вывода являются асинхронными, и вы можете создавать очень эффективные приложения, используя небольшое количество потоков. Как я уже упоминал в своем ответе, вам нужно использовать реактивные операторы для объединения и разрешения издателей Mono
/Flux
. Вы можете найти больше примеров здесь stackoverflow.com/questions/71476693/…
Да, это работает. Большое спасибо. Мне просто нужно задать 1 дополнительный вопрос: вышеуказанный метод вызывается внутри службы через вызов API. Теперь, после того, как я получил ответ от всех этих издателей в этом методе, я хочу преобразовать данные в какой-то другой объект, скажем, TestData, и мне нужно вернуть этот объект в качестве ответа на мой API. Как я могу добиться этого неблокирующим способом? Должен ли я возвращать объект Mono<TestData>? Или я могу напрямую вернуть TestData в качестве ответа?