Можно ли использовать RecursiveAction, например, в сочетании с - вместо пула fork/join - пулом виртуальных потоков (до того, как я попытаюсь выполнить плохо спроектированное, специальное усилие)?
это правда, хотя меня интересует функциональность Recursive*, с которой можно работать с большим количеством потоков.
RecursiveAction является подклассом ForkJoinTask, который, как следует из названия и даже в документации, буквально
Абстрактный базовый класс для задач, которые выполняются в ForkJoinPool.
Хотя ForkJoinPool можно настроить с помощью фабрики потоков , это не стандартная фабрика потоков , а специальная фабрика для создания экземпляров ForkJoinWorkerThread. Поскольку эти потоки являются подклассами Thread
, их нельзя создать с помощью виртуальной фабрики потоков.
Итак, вы не можете использовать RecursiveAction с виртуальными потоками. То же самое относится и к RecursiveTask
. Но стоит переосмыслить, что даст вам использование этих классов с виртуальными потоками.
В любом случае, основная проблема, реализовать декомпозицию вашей задачи на подзадачи, лежит на вас. Эти классы предоставляют вам функции, специально предназначенные для работы с пулом Fork/Join и балансировки рабочей нагрузки с доступными потоками платформы. Если вы хотите выполнять каждую подзадачу в отдельном виртуальном потоке, вам это не нужно. Таким образом, вы можете легко реализовать рекурсивную задачу с виртуальными потоками без встроенных классов, например.
record PseudoTask(int from, int to) {
public static CompletableFuture<Void> run(int from, int to) {
return CompletableFuture.runAsync(
new PseudoTask(from, to)::compute, Thread::startVirtualThread);
}
protected void compute() {
int mid = (from + to) >>> 1;
if (mid == from) {
// simulate actual processing with potentially blocking operations
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
}
else {
CompletableFuture<Void> sub1 = run(from, mid), sub2 = run(mid, to);
sub1.join();
sub2.join();
}
}
}
Этот пример просто не заботится об ограничении подразделения или предотвращении блокировки вызовов join()
, и он по-прежнему хорошо работает при выполнении, например. PseudoTask.run(0, 1_000).join();
Вы можете заметить, что с большими диапазонами, методы, известные из других реализаций рекурсивных задач, могут быть полезны и здесь, где подзадача довольно дешевая.
Например, вы можете отправить только одну половину диапазона в другой поток и обработать другую половину локально, например
record PseudoTask(int from, int to) {
public static CompletableFuture<Void> run(int from, int to) {
return CompletableFuture.runAsync(
new PseudoTask(from, to)::compute, Thread::startVirtualThread);
}
protected void compute() {
CompletableFuture<Void> f = null;
for(int from = this.from, mid; ; from = mid) {
mid = (from + to) >>> 1;
if (mid == from) {
// simulate actual processing with potentially blocking operations
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
break;
} else {
CompletableFuture<Void> sub1 = run(from, mid);
if (f == null) f = sub1; else f = CompletableFuture.allOf(f, sub1);
}
}
if (f != null) f.join();
}
}
что имеет заметное значение при работе, например. PseudoTask.run(0, 1_000_000).join();
который будет использовать только 1 миллион потоков во втором примере, а не 2 миллиона. Но, конечно, это обсуждение на другом уровне, чем с тредами платформы, где ни один из подходов не будет работать разумно.
Еще одна предстоящая опция — StructuredTaskScope, которая позволяет создавать подзадачи и ждать их завершения.
record PseudoTask(int from, int to) {
public static void run(int from, int to) {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
new PseudoTask(from, to).compute(scope);
scope.join();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
protected Void compute(StructuredTaskScope<Object> scope) {
for(int from = this.from, mid; ; from = mid) {
mid = (from + to) >>> 1;
if (mid == from) {
// simulate actual processing with potentially blocking operations
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
break;
} else {
var sub = new PseudoTask(from, mid);
scope.fork(() -> sub.compute(scope));
}
}
return null;
}
}
Здесь задачи не ждут завершения своей подзадачи, а только корневая задача ожидает завершения всех задач. Но эта функция находится в состоянии инкубатора, поэтому может потребоваться даже больше времени, чем функция виртуальных потоков, чтобы стать готовой к работе.
Очень подробно. Большое спасибо.
Мне нужно узнать больше о RecursiveAction, но красный флаг в вашем вопросе связан с объединением виртуальных потоков. Основной темой видео/статей на Loom является то, что виртуальные потоки не должны объединяться. Они дешевы; товар.