Все еще новичок в концепциях реактивного программирования, поэтому, пожалуйста, потерпите меня. Я провел больше тестов с реактивными цепочками, и ниже приведен следующий код:
Flux
.range(0, 10000)
.doOnNext(i -> {
System.out.println("start " + i+ " Thread: " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
})
.flatMap(i -> {
System.out.println("end"+ i + " Thread: " + Thread.currentThread().getName());
return Mono.just(i);
})
.doOnError(err -> {
throw new RuntimeException(err.getMessage());
})
.subscribe();
System.out.println("Thread: " + Thread.currentThread().getName() + " Hello world");
Моя главная путаница здесь - это вывод:
start 0 Thread: main
end0 Thread: main
start 1 Thread: main
end1 Thread: main
start 2 Thread: main
end2 Thread: main
start 3 Thread: main
end3 Thread: main
start 4 Thread: main
end4 Thread: main
Thread: main Hello world
Почему по умолчанию не создается/используется threadPool для более эффективной обработки каждого целого числа? Кроме того, исходя из моего понимания использования подписки на издателя:
Subscribe is an asynchronous process. It means that when you call subscribe, it launch processing in the background, then return immediately.
Тем не менее, мое наблюдение, кажется, отличается, когда я наблюдал здесь поведение блокировки, поскольку печать «Hello World» должна сначала дождаться завершения обработки цепочки Flux Reactive, поскольку эта цепочка использует и блокирует (?) основной поток.
Раскомментирование subscribeOn()
имеет другое, «правильное» поведение:
Flux
.range(0, 10000)
.doOnNext(i -> {
System.out.println("start " + i+ " Thread: " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
})
.flatMap(i -> {
System.out.println("end"+ i + " Thread: " + Thread.currentThread().getName());
return Mono.just(i);
})
.doOnError(err -> {
throw new RuntimeException(err.getMessage());
})
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
System.out.println("Thread: " + Thread.currentThread().getName() + " Hello world");
Thread: main Hello world
start 0 Thread: boundedElastic-1
end0 Thread: boundedElastic-1
start 1 Thread: boundedElastic-1
end1 Thread: boundedElastic-1
start 2 Thread: boundedElastic-1
Я понимаю это так, потому что теперь мы указываем threadPool, который реактивная цепочка должна использовать для своей обработки, так что основной поток, таким образом, может вести себя разблокированным, асинхронно печатая сначала «Hello World».
Точно так же, если бы мы сейчас заменили .subscribe()
на .blockLast()
:
Flux
.range(0, 5)
.doOnNext(i -> {
System.out.println("start " + i+ " Thread: " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
})
.flatMap(i -> {
// if (i == 100) return Mono.error(new RuntimeException("Died cuz i = 100"));
System.out.println("end"+ i + " Thread: " + Thread.currentThread().getName());
return Mono.just(i);
})
.doOnError(err -> {
throw new RuntimeException(err.getMessage());
})
.subscribeOn(Schedulers.boundedElastic())
.blockLast();
System.out.println("Thread: " + Thread.currentThread().getName() + " Hello world");
Результирующее поведение, как ожидается, изменится на блокировку вместо асинхронного, где, если я правильно понимаю, это связано с тем, что даже если мы указали другой пул потоков (не основной), который будет использоваться для обработки реактивной цепочки, основной поток является потоком вызывающей стороны, по-прежнему заблокирован до тех пор, пока реактивная цепочка не вернет сигнал об успехе или ошибке перед освобождением основного потока.
Правильно ли я понимаю? И где я ошибаюсь в своем понимании поведения потоков Project Reactor по умолчанию?
Извините, я не совсем понимаю. То, что вы описываете, происходит, когда используется только .subscribe()? Потому что это то, что показал мой первый пример кода. Я удалил закомментированный код на случай, если он вызовет путаницу.
Советую прочитать раздел, посвященный многопоточности в официальной документации. Это должно дать вам хорошее понимание того, что происходит. Постараюсь подытожить, как смогу.
Объекты Flux и Mono моделируют приостанавливаемую, возобновляемую цепочку операций. Это означает, что движок может «припарковать» действия, а затем запланировать их выполнение в доступном потоке.
Теперь, когда вы вызываете subscribe
на Publisher, вы запускаете цепочку операций. Его первые действия запускаются в вызывающем потоке, как указано в документе:
Если не указано иное, самый верхний оператор (источник) сам выполняется в потоке, в котором был сделан вызов subscribe().
Если ваш поток достаточно прост, есть большая вероятность, что весь поток/моно будет обрабатываться в одном потоке.
Это может создать иллюзию синхронной обработки, но это не так.
Мы уже видим это в вашем первом примере. Вы создаете диапазон из тысячи значений, но перед сообщением Thread: main Hello world
печатаются только 4 значения. Он показывает, что обработка началась, но была «приостановлена», чтобы ваша основная программа могла продолжить работу.
Мы также можем увидеть это более четко в следующем примере:
// Assemble the flow of operations
Flux flow = Flux.range(0, 5)
.flatMap(i -> Mono
.just("flatMap on [" + Thread.currentThread().getName() + "] -> " + i)
.delayElement(Duration.ofMillis(50)));
// Launch processing
Disposable execStatus = flow.subscribe(System.out::println);
System.out.println("SUBSCRIPTION DONE");
// Prevent program to stop before the flow is over.
do {
System.out.println("Waiting for the flow to end...");
Thread.sleep(50);
} while (!execStatus.isDisposed());
System.out.println("FLUX PROCESSED");
Эта программа печатает:
SUBSCRIPTION DONE
Waiting for the flow to end...
flatMap on [main] -> 0
flatMap on [main] -> 1
flatMap on [main] -> 3
Waiting for the flow to end...
flatMap on [main] -> 4
flatMap on [main] -> 2
FLUX PROCESSED
Если мы посмотрим на него, то увидим, что сообщения от основной программы чередуются с основной программой, поэтому даже если поток выполняется в основном потоке, он все равно выполняется в фоновом режиме.
Это доказывает то, что утверждает subscribe(Consumer) apidoc:
Имейте в виду, что, поскольку последовательность может быть асинхронной, это немедленно вернет управление вызывающему потоку.
Теперь, почему никакая другая нить не использовалась? Ну, это сложный вопрос. В этом случае я бы сказал, что движок решил, что никакой другой поток не нужен для выполнения конвейера с хорошей производительностью. Переключение между потоками имеет свою стоимость, поэтому, если этого можно избежать, я думаю, Reactor избегает этого.
В документации указано:
Некоторые операторы по умолчанию используют определенный планировщик из планировщиков.
Это означает, что в зависимости от конвейера его задачи могут быть отправлены в потоки, предоставленные планировщиком.
На самом деле, flatMap должен это делать. Если мы немного изменим пример, мы увидим, что операции отправляются в параллельный планировщик. Все, что нам нужно сделать, это ограничить параллелизм (да, я знаю, это не очень интуитивно понятно). По умолчанию flatMap использует коэффициент параллелизма 256. Это означает, что он может запускать 256 операций одновременно (грубое объяснение). Давайте ограничим его до 2:
Flux flow = Flux.range(0, 5)
.flatMap(i -> Mono
.just("flatMap on [" + Thread.currentThread().getName() + "] -> " + i)
.delayElement(Duration.ofMillis(50)),
2);
Теперь программа печатает:
SUBSCRIPTION DONE
Waiting for the flow to end...
flatMap on [main] -> 0
Waiting for the flow to end...
flatMap on [main] -> 1
Waiting for the flow to end...
flatMap on [parallel-1] -> 2
flatMap on [parallel-2] -> 3
Waiting for the flow to end...
flatMap on [parallel-3] -> 4
FLUX PROCESSED
Мы видим, что операции 2, 3 и 4 произошли в потоке с именем parallel-x. Это темы, созданные Schedulers.parallel.
ПРИМЕЧАНИЕ. Методы subscribeOn и publishOn можно использовать для более точного управления многопоточностью.
Изменяют ли методы block
, blockFirst
и blockLast
способ планирования/выполнения операций? Ответ - нет.
Когда вы используете блок, внутренне поток подписывается, как и при вызове subscribe()
. Однако, как только поток запускается, вместо возврата Reactor внутренне использует вызывающий поток для зацикливания и ожидания завершения потока, как я сделал в моем первом примере выше (но они делают это очень умным способом).
Мы можем попробовать. Используя ограниченную плоскую карту, что будет напечатано, если мы будем использовать блок вместо ручного цикла?
Программа:
Flux.range(0, 5)
.flatMap(i -> Mono
.just("flatMap on [" + Thread.currentThread().getName() + "] -> " + i)
.delayElement(Duration.ofMillis(50)),
2)
.doOnNext(System.out::println)
.blockLast();
System.out.println("FLUX PROCESSED");
печатает:
flatMap on [main] -> 0
flatMap on [main] -> 1
flatMap on [parallel-1] -> 2
flatMap on [parallel-2] -> 3
flatMap on [parallel-3] -> 4
FLUX PROCESSED
Мы видим, что, как и раньше, поток использовал как основной поток, так и параллельные потоки для обработки своих элементов. Но на этот раз основная программа была «остановлена» до завершения потока. Block предотвратил возобновление работы нашей программы до завершения Flux.
вы сказали в своем первом примере, что onSubscribe должен взять один поток из пула потоков (это основной) и запустить все на нем. Я не понимаю, что вы ожидаете? Вы подписываетесь, и все запускается в этом потоке.