Какова модель потоков Spring Reactor, и .subscribe(), похоже, не использует более одного потока?

Все еще новичок в концепциях реактивного программирования, поэтому, пожалуйста, потерпите меня. Я провел больше тестов с реактивными цепочками, и ниже приведен следующий код:

                Flux
                        .range(0, 10000)
                        .doOnNext(i -> {
                            System.out.println("start " + i+ " Thread: " + Thread.currentThread().getName());
                            try {
                                Thread.sleep(2000);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        })
                        .flatMap(i -> {
                            System.out.println("end"+ i + " Thread: " + Thread.currentThread().getName());
                            return Mono.just(i);
                        })
                        .doOnError(err -> {
                            throw new RuntimeException(err.getMessage());
                        })
                        .subscribe();
System.out.println("Thread: " + Thread.currentThread().getName() + " Hello world");

Моя главная путаница здесь - это вывод:

start 0 Thread: main
end0 Thread: main
start 1 Thread: main
end1 Thread: main
start 2 Thread: main
end2 Thread: main
start 3 Thread: main
end3 Thread: main
start 4 Thread: main
end4 Thread: main
Thread: main Hello world

Почему по умолчанию не создается/используется threadPool для более эффективной обработки каждого целого числа? Кроме того, исходя из моего понимания использования подписки на издателя:

Subscribe is an asynchronous process. It means that when you call subscribe, it launch processing in the background, then return immediately. 

Тем не менее, мое наблюдение, кажется, отличается, когда я наблюдал здесь поведение блокировки, поскольку печать «Hello World» должна сначала дождаться завершения обработки цепочки Flux Reactive, поскольку эта цепочка использует и блокирует (?) основной поток.

Раскомментирование subscribeOn() имеет другое, «правильное» поведение:

Flux
                            .range(0, 10000)
                            .doOnNext(i -> {
                                System.out.println("start " + i+ " Thread: " + Thread.currentThread().getName());
                                try {
                                    Thread.sleep(2000);
                                } catch (InterruptedException e) {
                                    throw new RuntimeException(e);
                                }
                            })
                            .flatMap(i -> {
                                System.out.println("end"+ i + " Thread: " + Thread.currentThread().getName());
                                return Mono.just(i);
                            })
                            .doOnError(err -> {
                                throw new RuntimeException(err.getMessage());
                            })
                            .subscribeOn(Schedulers.boundedElastic())
                            .subscribe();
    System.out.println("Thread: " + Thread.currentThread().getName() + " Hello world");

Thread: main Hello world
start 0 Thread: boundedElastic-1
end0 Thread: boundedElastic-1
start 1 Thread: boundedElastic-1
end1 Thread: boundedElastic-1
start 2 Thread: boundedElastic-1

Я понимаю это так, потому что теперь мы указываем threadPool, который реактивная цепочка должна использовать для своей обработки, так что основной поток, таким образом, может вести себя разблокированным, асинхронно печатая сначала «Hello World».

Точно так же, если бы мы сейчас заменили .subscribe() на .blockLast():

            Flux
                    .range(0, 5)
                    .doOnNext(i -> {
                        System.out.println("start " + i+ " Thread: " + Thread.currentThread().getName());
                        try {
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    })
                    .flatMap(i -> {
//                        if (i == 100) return Mono.error(new RuntimeException("Died cuz i = 100"));
                        System.out.println("end"+ i + " Thread: " + Thread.currentThread().getName());
                        return Mono.just(i);
                    })
                    .doOnError(err -> {
                        throw new RuntimeException(err.getMessage());
                    })
                    .subscribeOn(Schedulers.boundedElastic())
                    .blockLast();
            System.out.println("Thread: " + Thread.currentThread().getName() + " Hello world");

Результирующее поведение, как ожидается, изменится на блокировку вместо асинхронного, где, если я правильно понимаю, это связано с тем, что даже если мы указали другой пул потоков (не основной), который будет использоваться для обработки реактивной цепочки, основной поток является потоком вызывающей стороны, по-прежнему заблокирован до тех пор, пока реактивная цепочка не вернет сигнал об успехе или ошибке перед освобождением основного потока.

Правильно ли я понимаю? И где я ошибаюсь в своем понимании поведения потоков Project Reactor по умолчанию?

вы сказали в своем первом примере, что onSubscribe должен взять один поток из пула потоков (это основной) и запустить все на нем. Я не понимаю, что вы ожидаете? Вы подписываетесь, и все запускается в этом потоке.

Toerktumlare 15.01.2023 17:03

Извините, я не совсем понимаю. То, что вы описываете, происходит, когда используется только .subscribe()? Потому что это то, что показал мой первый пример кода. Я удалил закомментированный код на случай, если он вызовет путаницу.

BoredPanda 15.01.2023 18:02
Выполнение HTTP-запроса с помощью Spring WebClient: GET
Выполнение HTTP-запроса с помощью Spring WebClient: GET
WebClient - это реактивный веб-клиент, представленный в Spring 5. Это реактивное, неблокирующее решение, работающее по протоколу HTTP/1.1.
0
2
67
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Советую прочитать раздел, посвященный многопоточности в официальной документации. Это должно дать вам хорошее понимание того, что происходит. Постараюсь подытожить, как смогу.

Почему Flux обрабатывается в вызывающем потоке?

Объекты Flux и Mono моделируют приостанавливаемую, возобновляемую цепочку операций. Это означает, что движок может «припарковать» действия, а затем запланировать их выполнение в доступном потоке.

Теперь, когда вы вызываете subscribe на Publisher, вы запускаете цепочку операций. Его первые действия запускаются в вызывающем потоке, как указано в документе:

Если не указано иное, самый верхний оператор (источник) сам выполняется в потоке, в котором был сделан вызов subscribe().

Если ваш поток достаточно прост, есть большая вероятность, что весь поток/моно будет обрабатываться в одном потоке.

Означает ли это, что он вызывается синхронно/на переднем плане программы?

Это может создать иллюзию синхронной обработки, но это не так.

Мы уже видим это в вашем первом примере. Вы создаете диапазон из тысячи значений, но перед сообщением Thread: main Hello world печатаются только 4 значения. Он показывает, что обработка началась, но была «приостановлена», чтобы ваша основная программа могла продолжить работу.

Мы также можем увидеть это более четко в следующем примере:

// Assemble the flow of operations
Flux flow = Flux.range(0, 5)
        .flatMap(i -> Mono
                .just("flatMap on [" + Thread.currentThread().getName() + "] -> " + i)
                .delayElement(Duration.ofMillis(50)));

// Launch processing
Disposable execStatus = flow.subscribe(System.out::println);

System.out.println("SUBSCRIPTION DONE");

// Prevent program to stop before the flow is over.
do {
    System.out.println("Waiting for the flow to end...");
    Thread.sleep(50);
} while (!execStatus.isDisposed());

System.out.println("FLUX PROCESSED");

Эта программа печатает:

SUBSCRIPTION DONE
Waiting for the flow to end...
flatMap on [main] -> 0
flatMap on [main] -> 1
flatMap on [main] -> 3
Waiting for the flow to end...
flatMap on [main] -> 4
flatMap on [main] -> 2
FLUX PROCESSED

Если мы посмотрим на него, то увидим, что сообщения от основной программы чередуются с основной программой, поэтому даже если поток выполняется в основном потоке, он все равно выполняется в фоновом режиме.

Это доказывает то, что утверждает subscribe(Consumer) apidoc:

Имейте в виду, что, поскольку последовательность может быть асинхронной, это немедленно вернет управление вызывающему потоку.

Почему никакие другие потоки не задействованы?

Теперь, почему никакая другая нить не использовалась? Ну, это сложный вопрос. В этом случае я бы сказал, что движок решил, что никакой другой поток не нужен для выполнения конвейера с хорошей производительностью. Переключение между потоками имеет свою стоимость, поэтому, если этого можно избежать, я думаю, Reactor избегает этого.

Могут ли быть задействованы другие потоки?

В документации указано:

Некоторые операторы по умолчанию используют определенный планировщик из планировщиков.

Это означает, что в зависимости от конвейера его задачи могут быть отправлены в потоки, предоставленные планировщиком.

На самом деле, flatMap должен это делать. Если мы немного изменим пример, мы увидим, что операции отправляются в параллельный планировщик. Все, что нам нужно сделать, это ограничить параллелизм (да, я знаю, это не очень интуитивно понятно). По умолчанию flatMap использует коэффициент параллелизма 256. Это означает, что он может запускать 256 операций одновременно (грубое объяснение). Давайте ограничим его до 2:

Flux flow = Flux.range(0, 5)
        .flatMap(i -> Mono
                .just("flatMap on [" + Thread.currentThread().getName() + "] -> " + i)
                .delayElement(Duration.ofMillis(50)),
                2);

Теперь программа печатает:

SUBSCRIPTION DONE
Waiting for the flow to end...
flatMap on [main] -> 0
Waiting for the flow to end...
flatMap on [main] -> 1
Waiting for the flow to end...
flatMap on [parallel-1] -> 2
flatMap on [parallel-2] -> 3
Waiting for the flow to end...
flatMap on [parallel-3] -> 4
FLUX PROCESSED

Мы видим, что операции 2, 3 и 4 произошли в потоке с именем parallel-x. Это темы, созданные Schedulers.parallel.

ПРИМЕЧАНИЕ. Методы subscribeOn и publishOn можно использовать для более точного управления многопоточностью.

О блокировке

Изменяют ли методы block, blockFirst и blockLast способ планирования/выполнения операций? Ответ - нет.

Когда вы используете блок, внутренне поток подписывается, как и при вызове subscribe(). Однако, как только поток запускается, вместо возврата Reactor внутренне использует вызывающий поток для зацикливания и ожидания завершения потока, как я сделал в моем первом примере выше (но они делают это очень умным способом).

Мы можем попробовать. Используя ограниченную плоскую карту, что будет напечатано, если мы будем использовать блок вместо ручного цикла?

Программа:

Flux.range(0, 5)
        .flatMap(i -> Mono
                .just("flatMap on [" + Thread.currentThread().getName() + "] -> " + i)
                .delayElement(Duration.ofMillis(50)),
                2)
        .doOnNext(System.out::println)
        .blockLast();

System.out.println("FLUX PROCESSED");

печатает:

flatMap on [main] -> 0
flatMap on [main] -> 1
flatMap on [parallel-1] -> 2
flatMap on [parallel-2] -> 3
flatMap on [parallel-3] -> 4
FLUX PROCESSED

Мы видим, что, как и раньше, поток использовал как основной поток, так и параллельные потоки для обработки своих элементов. Но на этот раз основная программа была «остановлена» до завершения потока. Block предотвратил возобновление работы нашей программы до завершения Flux.

Другие вопросы по теме