Могу ли я использовать в Loom виртуальные потоки для Recursive[Action/Task]?

Можно ли использовать RecursiveAction, например, в сочетании с - вместо пула fork/join - пулом виртуальных потоков (до того, как я попытаюсь выполнить плохо спроектированное, специальное усилие)?

Мне нужно узнать больше о RecursiveAction, но красный флаг в вашем вопросе связан с объединением виртуальных потоков. Основной темой видео/статей на Loom является то, что виртуальные потоки не должны объединяться. Они дешевы; товар.

Michael Easter 20.11.2022 04:30

это правда, хотя меня интересует функциональность Recursive*, с которой можно работать с большим количеством потоков.

HellishHeat 20.11.2022 12:07
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
2
2
121
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

RecursiveAction является подклассом ForkJoinTask, который, как следует из названия и даже в документации, буквально

Абстрактный базовый класс для задач, которые выполняются в ForkJoinPool.

Хотя ForkJoinPool можно настроить с помощью фабрики потоков , это не стандартная фабрика потоков , а специальная фабрика для создания экземпляров ForkJoinWorkerThread. Поскольку эти потоки являются подклассами Thread, их нельзя создать с помощью виртуальной фабрики потоков.

Итак, вы не можете использовать RecursiveAction с виртуальными потоками. То же самое относится и к RecursiveTask. Но стоит переосмыслить, что даст вам использование этих классов с виртуальными потоками.

В любом случае, основная проблема, реализовать декомпозицию вашей задачи на подзадачи, лежит на вас. Эти классы предоставляют вам функции, специально предназначенные для работы с пулом Fork/Join и балансировки рабочей нагрузки с доступными потоками платформы. Если вы хотите выполнять каждую подзадачу в отдельном виртуальном потоке, вам это не нужно. Таким образом, вы можете легко реализовать рекурсивную задачу с виртуальными потоками без встроенных классов, например.

record PseudoTask(int from, int to) {
    public static CompletableFuture<Void> run(int from, int to) {
        return CompletableFuture.runAsync(
            new PseudoTask(from, to)::compute, Thread::startVirtualThread);
    }

    protected void compute() {
        int mid = (from + to) >>> 1;
        if (mid == from) {
            // simulate actual processing with potentially blocking operations
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
        }
        else {
            CompletableFuture<Void> sub1 = run(from, mid), sub2 = run(mid, to);
            sub1.join();
            sub2.join();
        }
    }
}

Этот пример просто не заботится об ограничении подразделения или предотвращении блокировки вызовов join(), и он по-прежнему хорошо работает при выполнении, например. PseudoTask.run(0, 1_000).join(); Вы можете заметить, что с большими диапазонами, методы, известные из других реализаций рекурсивных задач, могут быть полезны и здесь, где подзадача довольно дешевая.

Например, вы можете отправить только одну половину диапазона в другой поток и обработать другую половину локально, например

record PseudoTask(int from, int to) {
    public static CompletableFuture<Void> run(int from, int to) {
        return CompletableFuture.runAsync(
            new PseudoTask(from, to)::compute, Thread::startVirtualThread);
    }

    protected void compute() {
        CompletableFuture<Void> f = null;
        for(int from = this.from, mid; ; from = mid) {
            mid = (from + to) >>> 1;
            if (mid == from) {
                // simulate actual processing with potentially blocking operations
                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
                break;
            } else {
                CompletableFuture<Void> sub1 = run(from, mid);
                if (f == null) f = sub1; else f = CompletableFuture.allOf(f, sub1);
            }
        }
        if (f != null) f.join();
    }
}

что имеет заметное значение при работе, например. PseudoTask.run(0, 1_000_000).join(); который будет использовать только 1 миллион потоков во втором примере, а не 2 миллиона. Но, конечно, это обсуждение на другом уровне, чем с тредами платформы, где ни один из подходов не будет работать разумно.


Еще одна предстоящая опция — StructuredTaskScope, которая позволяет создавать подзадачи и ждать их завершения.

record PseudoTask(int from, int to) {
    public static void run(int from, int to) {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            new PseudoTask(from, to).compute(scope);
            scope.join();
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
    }

    protected Void compute(StructuredTaskScope<Object> scope) {
        for(int from = this.from, mid; ; from = mid) {
            mid = (from + to) >>> 1;
            if (mid == from) {
                // simulate actual processing with potentially blocking operations
                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
                break;
            } else {
                var sub = new PseudoTask(from, mid);
                scope.fork(() -> sub.compute(scope));
            }
        }
        return null;
    }
}

Здесь задачи не ждут завершения своей подзадачи, а только корневая задача ожидает завершения всех задач. Но эта функция находится в состоянии инкубатора, поэтому может потребоваться даже больше времени, чем функция виртуальных потоков, чтобы стать готовой к работе.

Очень подробно. Большое спасибо.

HellishHeat 02.12.2022 10:15

Другие вопросы по теме