Запуск Mono в фоновом режиме при возврате ответа при использовании Spring Webflux

Этот вопрос связан с Немедленно вернитесь в Spring Web Flux, но я не думаю, что это одно и то же (по крайней мере, ответ меня не устраивает).

У меня есть функция, возвращающая Mono, которая при вызове запускает длительную работу. Эта функция вызывается при вызове HTTP API Spring Webflux. Вот пример:

@PutMapping("/{jobId}")
fun startNewJob(@PathVariable("jobId") jobId: String,
                request: ServerHttpRequest): Mono<ResponseEntity<Unit>> {
    val longRunningJob : Mono<Job> = startNewJob(jobId)
    longRunningJob.map { job ->
        val jobUri = generateJobUri(request, job.id)
        ResponseEntity.created(jobURI).build<Unit>()
    }
}

Проблема с приведенным выше кодом заключается в том, что создается «201 Created» после, длительное задание завершено. Я хочу запустить longRunningJob в фоновом режиме и немедленно вернуть «201 Created».

Возможно, я мог бы сделать что-то вроде этого:

@PutMapping("/{jobId}")
fun startNewJob(@PathVariable("jobId") jobId: String,
                request: ServerHttpRequest): Mono<ResponseEntity<Unit>> {

    startNewJob(jobId)
        .subscribeOn(Schedulers.newSingle("thread"))
        .subscribe()

    val jobUri = generateJobUri(request, job.id)
    val response = ResponseEntity.created(jobURI).build<Unit>()
    Mono.just(response)
}

Но мне не кажется очень идиоматичным вызывать subscribe() вручную (например, intellij жалуется, что я вызываю subscribe() в неблокирующей области). Разве нет лучшего способа составить два «потока» без использования явного subscribe? Если да, то как мне изменить функцию startNewJob выше, чтобы добиться этого?

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Версия Java на основе версии загрузки
Версия Java на основе версии загрузки
Если вы зайдете на официальный сайт Spring Boot , там представлен start.spring.io , который упрощает создание проектов Spring Boot, как показано ниже.
Документирование API с помощью Swagger на Springboot
Документирование API с помощью Swagger на Springboot
В предыдущей статье мы уже узнали, как создать Rest API с помощью Springboot и MySql .
6
0
4 140
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Насколько я знаю, использование одного из методов subscribe — это единственный способ действительно запустить задание в фоновом режиме с собственным жизненным циклом (не привязанным к возвращаемому издателю).

Если бы вы использовали один из операторов для объединения издателя заданий и издателя ответов (например, zip или merge), то жизненный цикл издателя заданий был бы привязан к издателю ответов, а это не то, что вам нужно для фонового задания. .

Одна вещь, которую вы, возможно, захотите рассмотреть, это начало фоновое задание в потоке публикации ответа, а не непосредственно в теле метода. например через doOnSubscibe или от оператора перед ответом.

Это свяжет запуск фонового задания с событиями onSubscribe издателя ответа, но позволит ему завершиться в фоновом режиме.

Также обратите внимание, что если вы хотите иметь возможность отменить фоновое задание (например, во время закрытия приложения), вам нужно сохранить Disposable, возвращенное из subscribe, чтобы вы могли позже вызвать dispose для него. Это может быть лучше сделано с помощью некоторого типа BackgroundJobManager, который мог бы отслеживать все запущенные задания.

блокируются ли какие-либо потоки в первом варианте, когда он возвращает 201 после завершения длительной работы?

user1955934 28.01.2020 03:28

Если ваше задание вызывает код блокировки, то этот код блокировки заблокирует поток. Вы можете убедиться, что блокирующий код работает на Scheduler, предназначенном для блокирующего кода (например, Schedulers.boundedElastic()), используя .publishOn или .subscribeOn соответствующим образом.

Phil Clay 28.01.2020 03:33

предполагая, что «длительное задание» является реактивным от начала до конца, клиент, вызывающий API, будет блокироваться в ожидании ответа, но потоки тоже всегда работают в это время?

user1955934 28.01.2020 03:53

В общем, да, если вы не вызываете код блокировки. Но более конкретно... операторы, которые вы используете для сборки своего потока, будут определять, какие потоки используются для выполнения работы. У вас есть полный контроль над тем, какие потоки будет использовать поток. См. projectreactor.io/docs/core/release/reference/#schedulers. Некоторые операторы (например, Flux.delayElements) по умолчанию используют определенные планировщики, чтобы предотвратить блокировку потока.

Phil Clay 28.01.2020 05:56
private static final Scheduler backgroundTaskScheduler = Schedulers.newParallel("backgroundTaskScheduler", 2);
backgroundTaskScheduler.schedule(() -> doBackgroundJob());

Хотя этот фрагмент кода может решить проблему, включая объяснение действительно помогает улучшить качество вашего сообщения. Помните, что вы отвечаете на вопрос для будущих читателей, и эти люди могут не знать причин вашего предложения кода.

Clijsters 23.07.2020 17:16

Другие вопросы по теме