Как я могу асинхронно выполнить 20 задач Runnable (или 1 задачу 20 раз), используя 5 CompletableFutures?
Вот что у меня есть:
Runnable task = () -> {
long startTime = System.currentTimeMillis();
Random random = new Random();
while (System.currentTimeMillis() - startTime < 3000) {
DoubleStream.generate(() -> random.nextDouble())
.limit(random.nextInt(100))
.map(n -> Math.cos(n))
.sum();
}
System.out.println("Done");
};
for (int i = 0; i < 4; i++) {
CompletableFuture<Void> future1 = CompletableFuture.runAsync(task);
CompletableFuture<Void> future2 = CompletableFuture.runAsync(task);
CompletableFuture<Void> future3 = CompletableFuture.runAsync(task);
CompletableFuture<Void> future4 = CompletableFuture.runAsync(task);
CompletableFuture<Void> future5 = CompletableFuture.runAsync(task);
future1.get();
future2.get();
future3.get();
future4.get();
future5.get();
}
Если я выполню этот код, я увижу, что он асинхронно запускает только 3 future.get (): 3, а затем 2, которые остались в течение 1 итерации for ()
Итак, я хотел бы выполнить все 20 задач как можно асинхронно.




Вы можете использовать allOf для одновременного запуска нескольких задач как одной. Сначала я создаю комбинацию из 5 задач (таких же, как в вашем вопросе), но затем я добавил 10 вместо этого (и только дважды) и получил половину времени выполнения.
for (int i = 0; i < 2; i++) {
CompletableFuture<Void> future1 = CompletableFuture.runAsync(task);
CompletableFuture<Void> future2 = CompletableFuture.runAsync(task);
// and so on until ten
CompletableFuture<Void> future10 = CompletableFuture.runAsync(task);
CompletableFuture<Void> combined = CompletableFuture.allOf(future1, future2, future3, future4, future5, future6, future7, future8, future9, future10);
combined.get();
}
Задайте в следующем системном свойстве количество потоков, которые должен использовать общий пул соединений вилки:
java.util.concurrent.ForkJoinPool.common.parallelism
См. ForkJoinPool
Причина в том, что вы не указываете свой собственный пул соединения вилки при создании завершаемых фьючерсов, поэтому он неявно использует
ForkJoinPool.commonPool()
Исполнителем по умолчанию для CompletableFuture является общий пул ForkJoinPool, который имеет целевой параллелизм по умолчанию, соответствующий количеству ядер ЦП минус один. Таким образом, если у вас четыре ядра, максимум три задания будут выполняться асинхронно. Поскольку вы вызываете ожидание завершения каждые 5 заданий, вы получите три параллельных выполнения, за которыми следуют два параллельных выполнения на каждой итерации цикла.
Если вы хотите получить определенную стратегию выполнения, такую как параллелизм, по вашему выбору, лучший способ - указать правильно настроенный исполнитель. Затем вы должны позволить исполнителю управлять параллелизмом вместо ожидания в цикле.
ExecutorService pool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 20; i++) {
CompletableFuture.runAsync(task, pool);
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.DAYS); // wait for the completion of all tasks
Это позволяет пять параллельных заданий, но позволит каждому из пяти потоков выбрать новое задание сразу после его завершения, вместо того, чтобы ждать следующей итерации цикла.
Но когда вы говорите
So, I would like to do all 20 tasks, as asynchronously as possible
Непонятно, почему вы вообще заставляете ждать после пяти заданий. Максимального параллелизма можно добиться за счет
ExecutorService pool = Executors.newCachedThreadPool();
for (int i = 0; i < 20; i++) {
CompletableFuture.runAsync(task, pool);
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.DAYS); // wait for the completion of all tasks
Это может порождать столько потоков, сколько заданий, если только одно задание не завершится до того, как все будут запланированы, так как в этом случае рабочий поток может получить новое задание.
Но для этой логики вообще не нужен CompletableFuture. Вы также можете использовать:
ExecutorService pool = Executors.newCachedThreadPool();
// schedule 20 jobs and return when all completed
pool.invokeAll(Collections.nCopies(20, Executors.callable(task)));
pool.shutdown();
Но когда ваша работа не связана с вводом-выводом или каким-либо другим видом ожидания, соответственно. освобождая ЦП, нет смысла создавать больше потоков, чем ядер ЦП. Предпочтительно пул, настроенный на количество процессоров.
ExecutorService pool = Executors.newWorkStealingPool(
Runtime.getRuntime().availableProcessors());
// schedule 20 jobs at return when all completed
pool.invokeAll(Collections.nCopies(20, Executors.callable(task)));
pool.shutdown();
В вашем особом случае это, вероятно, выполняется медленнее, так как ваши задания используют системное время, чтобы казаться работающими быстрее, когда у них больше потоков, чем ядер, но на самом деле тогда они делают меньше работы. Но для обычных вычислительных задач это повысит производительность.
Почему вы ждете завершения работы после завершения работы? Разве вам не следует сначала дождаться завершения, прежде чем вызывать завершение работы исполнителя?
@FarazDurrani, исполнитель, не отключится, пока его об этом не попросят. Таким образом, ожидание завершения, когда никто не запрашивал отключение, будет просто ждать указанное время без каких-либо преимуществ.
Может быть поможет следующий пост: Что определяет количество потоков, создаваемых Java ForkJoinPool?