CompletableFuture: несколько задач

Как я могу асинхронно выполнить 20 задач Runnable (или 1 задачу 20 раз), используя 5 CompletableFutures?

Вот что у меня есть:

Runnable task = () -> {
        long startTime = System.currentTimeMillis();
        Random random = new Random();

        while (System.currentTimeMillis() - startTime < 3000) {
            DoubleStream.generate(() -> random.nextDouble())
                    .limit(random.nextInt(100))
                    .map(n -> Math.cos(n))
                    .sum();
        }
        System.out.println("Done");
    };

    for (int i = 0; i < 4; i++) {
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(task);
        CompletableFuture<Void> future2 = CompletableFuture.runAsync(task);
        CompletableFuture<Void> future3 = CompletableFuture.runAsync(task);
        CompletableFuture<Void> future4 = CompletableFuture.runAsync(task);
        CompletableFuture<Void> future5 = CompletableFuture.runAsync(task);
        future1.get();
        future2.get();
        future3.get();
        future4.get();
        future5.get();
    }

Если я выполню этот код, я увижу, что он асинхронно запускает только 3 future.get (): 3, а затем 2, которые остались в течение 1 итерации for ()

Итак, я хотел бы выполнить все 20 задач как можно асинхронно.

Может быть поможет следующий пост: Что определяет количество потоков, создаваемых Java ForkJoinPool?

CannedMoose 15.04.2018 09:53
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
2
1
5 886
3

Ответы 3

Вы можете использовать allOf для одновременного запуска нескольких задач как одной. Сначала я создаю комбинацию из 5 задач (таких же, как в вашем вопросе), но затем я добавил 10 вместо этого (и только дважды) и получил половину времени выполнения.

for (int i = 0; i < 2; i++) {
   CompletableFuture<Void> future1 = CompletableFuture.runAsync(task);
   CompletableFuture<Void> future2 = CompletableFuture.runAsync(task);
  // and so on until ten  
   CompletableFuture<Void> future10 = CompletableFuture.runAsync(task);

   CompletableFuture<Void> combined = CompletableFuture.allOf(future1, future2, future3, future4, future5, future6, future7, future8, future9, future10);

   combined.get();
}

Задайте в следующем системном свойстве количество потоков, которые должен использовать общий пул соединений вилки:

java.util.concurrent.ForkJoinPool.common.parallelism 

См. ForkJoinPool

Причина в том, что вы не указываете свой собственный пул соединения вилки при создании завершаемых фьючерсов, поэтому он неявно использует

ForkJoinPool.commonPool()

См. CompletableFurure

Исполнителем по умолчанию для CompletableFuture является общий пул ForkJoinPool, который имеет целевой параллелизм по умолчанию, соответствующий количеству ядер ЦП минус один. Таким образом, если у вас четыре ядра, максимум три задания будут выполняться асинхронно. Поскольку вы вызываете ожидание завершения каждые 5 заданий, вы получите три параллельных выполнения, за которыми следуют два параллельных выполнения на каждой итерации цикла.

Если вы хотите получить определенную стратегию выполнения, такую ​​как параллелизм, по вашему выбору, лучший способ - указать правильно настроенный исполнитель. Затем вы должны позволить исполнителю управлять параллелизмом вместо ожидания в цикле.

ExecutorService pool = Executors.newFixedThreadPool(5);

for (int i = 0; i < 20; i++) {
    CompletableFuture.runAsync(task, pool);
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.DAYS); // wait for the completion of all tasks

Это позволяет пять параллельных заданий, но позволит каждому из пяти потоков выбрать новое задание сразу после его завершения, вместо того, чтобы ждать следующей итерации цикла.

Но когда вы говорите

So, I would like to do all 20 tasks, as asynchronously as possible

Непонятно, почему вы вообще заставляете ждать после пяти заданий. Максимального параллелизма можно добиться за счет

ExecutorService pool = Executors.newCachedThreadPool();
for (int i = 0; i < 20; i++) {
    CompletableFuture.runAsync(task, pool);
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.DAYS); // wait for the completion of all tasks

Это может порождать столько потоков, сколько заданий, если только одно задание не завершится до того, как все будут запланированы, так как в этом случае рабочий поток может получить новое задание.

Но для этой логики вообще не нужен CompletableFuture. Вы также можете использовать:

ExecutorService pool = Executors.newCachedThreadPool();
// schedule 20 jobs and return when all completed
pool.invokeAll(Collections.nCopies(20, Executors.callable(task)));
pool.shutdown();

Но когда ваша работа не связана с вводом-выводом или каким-либо другим видом ожидания, соответственно. освобождая ЦП, нет смысла создавать больше потоков, чем ядер ЦП. Предпочтительно пул, настроенный на количество процессоров.

ExecutorService pool = Executors.newWorkStealingPool(
    Runtime.getRuntime().availableProcessors());
// schedule 20 jobs at return when all completed
pool.invokeAll(Collections.nCopies(20, Executors.callable(task)));
pool.shutdown();

В вашем особом случае это, вероятно, выполняется медленнее, так как ваши задания используют системное время, чтобы казаться работающими быстрее, когда у них больше потоков, чем ядер, но на самом деле тогда они делают меньше работы. Но для обычных вычислительных задач это повысит производительность.

Почему вы ждете завершения работы после завершения работы? Разве вам не следует сначала дождаться завершения, прежде чем вызывать завершение работы исполнителя?

Faraz 28.05.2019 12:02

@FarazDurrani, исполнитель, не отключится, пока его об этом не попросят. Таким образом, ожидание завершения, когда никто не запрашивал отключение, будет просто ждать указанное время без каких-либо преимуществ.

Holger 28.05.2019 12:39

Другие вопросы по теме