Этот вопрос связан с Немедленно вернитесь в Spring Web Flux, но я не думаю, что это одно и то же (по крайней мере, ответ меня не устраивает).
У меня есть функция, возвращающая Mono
, которая при вызове запускает длительную работу. Эта функция вызывается при вызове HTTP API Spring Webflux. Вот пример:
@PutMapping("/{jobId}")
fun startNewJob(@PathVariable("jobId") jobId: String,
request: ServerHttpRequest): Mono<ResponseEntity<Unit>> {
val longRunningJob : Mono<Job> = startNewJob(jobId)
longRunningJob.map { job ->
val jobUri = generateJobUri(request, job.id)
ResponseEntity.created(jobURI).build<Unit>()
}
}
Проблема с приведенным выше кодом заключается в том, что создается «201 Created» после, длительное задание завершено. Я хочу запустить longRunningJob
в фоновом режиме и немедленно вернуть «201 Created».
Возможно, я мог бы сделать что-то вроде этого:
@PutMapping("/{jobId}")
fun startNewJob(@PathVariable("jobId") jobId: String,
request: ServerHttpRequest): Mono<ResponseEntity<Unit>> {
startNewJob(jobId)
.subscribeOn(Schedulers.newSingle("thread"))
.subscribe()
val jobUri = generateJobUri(request, job.id)
val response = ResponseEntity.created(jobURI).build<Unit>()
Mono.just(response)
}
Но мне не кажется очень идиоматичным вызывать subscribe()
вручную (например, intellij жалуется, что я вызываю subscribe()
в неблокирующей области). Разве нет лучшего способа составить два «потока» без использования явного subscribe
? Если да, то как мне изменить функцию startNewJob
выше, чтобы добиться этого?
Насколько я знаю, использование одного из методов subscribe
— это единственный способ действительно запустить задание в фоновом режиме с собственным жизненным циклом (не привязанным к возвращаемому издателю).
Если бы вы использовали один из операторов для объединения издателя заданий и издателя ответов (например, zip
или merge
), то жизненный цикл издателя заданий был бы привязан к издателю ответов, а это не то, что вам нужно для фонового задания. .
Одна вещь, которую вы, возможно, захотите рассмотреть, это начало фоновое задание в потоке публикации ответа, а не непосредственно в теле метода. например через doOnSubscibe
или от оператора перед ответом.
Это свяжет запуск фонового задания с событиями onSubscribe издателя ответа, но позволит ему завершиться в фоновом режиме.
Также обратите внимание, что если вы хотите иметь возможность отменить фоновое задание (например, во время закрытия приложения), вам нужно сохранить Disposable
, возвращенное из subscribe
, чтобы вы могли позже вызвать dispose
для него. Это может быть лучше сделано с помощью некоторого типа BackgroundJobManager, который мог бы отслеживать все запущенные задания.
Если ваше задание вызывает код блокировки, то этот код блокировки заблокирует поток. Вы можете убедиться, что блокирующий код работает на Scheduler
, предназначенном для блокирующего кода (например, Schedulers.boundedElastic()
), используя .publishOn
или .subscribeOn
соответствующим образом.
предполагая, что «длительное задание» является реактивным от начала до конца, клиент, вызывающий API, будет блокироваться в ожидании ответа, но потоки тоже всегда работают в это время?
В общем, да, если вы не вызываете код блокировки. Но более конкретно... операторы, которые вы используете для сборки своего потока, будут определять, какие потоки используются для выполнения работы. У вас есть полный контроль над тем, какие потоки будет использовать поток. См. projectreactor.io/docs/core/release/reference/#schedulers. Некоторые операторы (например, Flux.delayElements
) по умолчанию используют определенные планировщики, чтобы предотвратить блокировку потока.
private static final Scheduler backgroundTaskScheduler = Schedulers.newParallel("backgroundTaskScheduler", 2);
backgroundTaskScheduler.schedule(() -> doBackgroundJob());
Хотя этот фрагмент кода может решить проблему, включая объяснение действительно помогает улучшить качество вашего сообщения. Помните, что вы отвечаете на вопрос для будущих читателей, и эти люди могут не знать причин вашего предложения кода.
блокируются ли какие-либо потоки в первом варианте, когда он возвращает 201 после завершения длительной работы?