У меня есть приложение Spring Flux, где в какой-то момент мне нужно выполнить какую-то тяжелую задачу в фоновом режиме, вызывающей стороне (запрос HTTP) не нужно ждать, пока эта задача завершится.
Без реактора я бы, вероятно, использовал аннотацию Асинхронный, выполняя этот метод в другом потоке. С реактором я не уверен, следует ли мне продолжать этот подход или уже есть встроенный механизм, который позволяет мне это сделать.
Например, для Контроллер, который принимает объект Ресурс:
@PostMapping("/create")
public Mono<Resource> create(@Valid @RequestBody Resource r) {
processor.run(r); // the caller should not wait for the resource to be processed
return repository.save(r);
}
И класс Процессор:
@Async
void run(Resource r) {
WebClient webClient = WebClient.create("http://localhost:8080");
Mono<String> result = webClient.get()
.retrieve()
.bodyToMono(String.class);
String response = result.block(); //block for now
}
HTTP-вызывающему для /create не нужно ждать завершения метода run.
У меня работает аннотация @Async, я спрашиваю, следует ли мне использовать ее с Reactor или нет.
Я верю этот должен ответить в ваш вопрос.




Я провел некоторое тестирование, и я думаю, что даже при использовании subscribe() в качестве «выстрелил и забыл» будет ждать завершения запроса, прежде чем возвращать ответ веб-браузеру или REST-клиенту (по крайней мере, в моих простых тестах это выглядит так). Итак, вам нужно сделать то же самое, что и @Async, создать еще один поток:
@PostMapping("/create")
public Mono<Resource> create(@Valid @RequestBody Resource r) {
return processor.run(r)
.subscribeOn(Schedulers.elastic()) // put eveything above this line on another thread
.doOnNext(string -> repository.save(r)); // persist "r", not changing it, though
}
И класс процессора:
Mono<String> run(Resource r) {
WebClient webClient = WebClient.create("http://localhost:8080");
return webClient.get()
.retrieve()
.bodyToMono(String.class);
}
Не заставит ли это вызывающую сторону ждать, пока метод processor#run не будет выполнен и не завершится?
Если вы ищете реализацию шаблона выстрелил-забыл, вы можете просто подписаться на своего издателя.
@PostMapping("/create")
public Mono<Resource> create(@Valid @RequestBody Resource r) {
run(r).subscribe();
return repository.save(r);
}
Mono<Void> run(Resource r) {
WebClient webClient = WebClient.create("http://localhost:8080");
return webClient.get()
.retrieve()
.bodyToMono(String.class)
.then();
}
Если ваш издатель выполняет блокирующие операции, он должен быть подписан на другой поток с эластичным или параллельным планировщиком.
Будет ли «запуск» всегда завершен? Даже если вызывающий поток/поток завершается много раньше?
Мне не ясно, что "поток завершается". Если вы используете потоки netty NIO, они завершаются только при завершении работы/сбое приложения. В таком случае вы можете потерять завершение (с @Async ситуация такая же). У меня есть небольшая демонстрация, чтобы проиллюстрировать, что выполнение завершается после того, как контроллер вернет результат. Запустите StackoverflowDemo1ApplicationTests и посмотрите логи.
вы добавили @EnableAsync в класс конфигурации?