Могу ли я использовать в Loom виртуальные потоки для Recursive[Action/Task]?

Можно ли использовать RecursiveAction, например, в сочетании с - вместо пула fork/join - пулом виртуальных потоков (до того, как я попытаюсь выполнить плохо спроектированное, специальное усилие)?

Мне нужно узнать больше о RecursiveAction, но красный флаг в вашем вопросе связан с объединением виртуальных потоков. Основной темой видео/статей на Loom является то, что виртуальные потоки не должны объединяться. Они дешевы; товар.

Michael Easter 20.11.2022 04:30

это правда, хотя меня интересует функциональность Recursive*, с которой можно работать с большим количеством потоков.

HellishHeat 20.11.2022 12:07
Документирование API с помощью Swagger на Springboot
Документирование API с помощью Swagger на Springboot
В предыдущей статье мы уже узнали, как создать Rest API с помощью Springboot и MySql .
Как включить TLS в gRPC-клиенте и сервере : 2
Как включить TLS в gRPC-клиенте и сервере : 2
Здравствуйте! 🙏🏻 Надеюсь, у вас все хорошо и добро пожаловать в мой блог.
Сортировка hashmap по значениям
Сортировка hashmap по значениям
На Leetcode я решал задачу с хэшмапой и подумал, что мне нужно отсортировать хэшмапу по значениям.
Принципы SOLID - лучшие практики
Принципы SOLID - лучшие практики
SOLID - это аббревиатура, обозначающая пять ключевых принципов проектирования: принцип единой ответственности, принцип "открыто-закрыто", принцип...
gRPC на Android с использованием TLS
gRPC на Android с использованием TLS
gRPC - это относительно новая концепция взаимодействия между клиентом и сервером, но не более.
2
2
121
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

RecursiveAction является подклассом ForkJoinTask, который, как следует из названия и даже в документации, буквально

Абстрактный базовый класс для задач, которые выполняются в ForkJoinPool.

Хотя ForkJoinPool можно настроить с помощью фабрики потоков , это не стандартная фабрика потоков , а специальная фабрика для создания экземпляров ForkJoinWorkerThread. Поскольку эти потоки являются подклассами Thread, их нельзя создать с помощью виртуальной фабрики потоков.

Итак, вы не можете использовать RecursiveAction с виртуальными потоками. То же самое относится и к RecursiveTask. Но стоит переосмыслить, что даст вам использование этих классов с виртуальными потоками.

В любом случае, основная проблема, реализовать декомпозицию вашей задачи на подзадачи, лежит на вас. Эти классы предоставляют вам функции, специально предназначенные для работы с пулом Fork/Join и балансировки рабочей нагрузки с доступными потоками платформы. Если вы хотите выполнять каждую подзадачу в отдельном виртуальном потоке, вам это не нужно. Таким образом, вы можете легко реализовать рекурсивную задачу с виртуальными потоками без встроенных классов, например.

record PseudoTask(int from, int to) {
    public static CompletableFuture<Void> run(int from, int to) {
        return CompletableFuture.runAsync(
            new PseudoTask(from, to)::compute, Thread::startVirtualThread);
    }

    protected void compute() {
        int mid = (from + to) >>> 1;
        if(mid == from) {
            // simulate actual processing with potentially blocking operations
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
        }
        else {
            CompletableFuture<Void> sub1 = run(from, mid), sub2 = run(mid, to);
            sub1.join();
            sub2.join();
        }
    }
}

Этот пример просто не заботится об ограничении подразделения или предотвращении блокировки вызовов join(), и он по-прежнему хорошо работает при выполнении, например. PseudoTask.run(0, 1_000).join(); Вы можете заметить, что с большими диапазонами, методы, известные из других реализаций рекурсивных задач, могут быть полезны и здесь, где подзадача довольно дешевая.

Например, вы можете отправить только одну половину диапазона в другой поток и обработать другую половину локально, например

record PseudoTask(int from, int to) {
    public static CompletableFuture<Void> run(int from, int to) {
        return CompletableFuture.runAsync(
            new PseudoTask(from, to)::compute, Thread::startVirtualThread);
    }

    protected void compute() {
        CompletableFuture<Void> f = null;
        for(int from = this.from, mid; ; from = mid) {
            mid = (from + to) >>> 1;
            if (mid == from) {
                // simulate actual processing with potentially blocking operations
                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
                break;
            } else {
                CompletableFuture<Void> sub1 = run(from, mid);
                if(f == null) f = sub1; else f = CompletableFuture.allOf(f, sub1);
            }
        }
        if(f != null) f.join();
    }
}

что имеет заметное значение при работе, например. PseudoTask.run(0, 1_000_000).join(); который будет использовать только 1 миллион потоков во втором примере, а не 2 миллиона. Но, конечно, это обсуждение на другом уровне, чем с тредами платформы, где ни один из подходов не будет работать разумно.


Еще одна предстоящая опция — StructuredTaskScope, которая позволяет создавать подзадачи и ждать их завершения.

record PseudoTask(int from, int to) {
    public static void run(int from, int to) {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            new PseudoTask(from, to).compute(scope);
            scope.join();
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
    }

    protected Void compute(StructuredTaskScope<Object> scope) {
        for(int from = this.from, mid; ; from = mid) {
            mid = (from + to) >>> 1;
            if (mid == from) {
                // simulate actual processing with potentially blocking operations
                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
                break;
            } else {
                var sub = new PseudoTask(from, mid);
                scope.fork(() -> sub.compute(scope));
            }
        }
        return null;
    }
}

Здесь задачи не ждут завершения своей подзадачи, а только корневая задача ожидает завершения всех задач. Но эта функция находится в состоянии инкубатора, поэтому может потребоваться даже больше времени, чем функция виртуальных потоков, чтобы стать готовой к работе.

Очень подробно. Большое спасибо.

HellishHeat 02.12.2022 10:15

Другие вопросы по теме