Java-версия: 11
У меня есть список объектов, я хочу выполнить над ними определенные операции, где одна операция зависит от результата другой. Чтобы реализовать этот рабочий процесс в асинхронном режиме, я использую CompletableFuture.
В настоящее время я делаю это, разбивая список на подсписки и предоставляя каждый из подсписков CompletableFuture, поэтому каждый поток в моем пуле потоков может начать работать с этим подсписком.
Код для вышеуказанного подхода, который я использовал и работал:
List<SomeObject> someObjectList // original list
List<List<SomeObject>> partitionList = ListUtils.partition(someObjectList, partionSize);
partitionList.forEach(subList -> {
CompletableFuture.supplyAsync(() -> firstOperation(subList), executorService)
.thenAcceptAsync(firstOpresult -> secondOperationWithFirstOpResult(firstOpresult),executorService);
});
public static List<String> firstOperation(List<SomeObject> subList){
//perform operation
return List<String>;
}
public static void secondOperationWithFirstOpResult(List<String> firstOpProducedList) {
//perform operation
//print results.
}
Здесь проблема заключается в разбиении исходного списка,
потому что, если мой исходный список имеет 100 тысяч записей и размер раздела скажем, 100 (что означает, что я хочу 100 элементов в каждом подсписке), у меня будет 1000 объектов подсписка, каждый из которых содержит 100 элементов, что может быть не очень хорошо, учитывая такое количество объектов в памяти, более того, если размер раздела контролируется пользователем/конфигурационным контроллером, меньший размер раздела приведет к огромному количеству объектов подсписка.
Вместо разделения исходного списка,
Я знаю, что SO не место для точного решения, но если бы вы могли подтолкнуть меня в правильном направлении, псевдокод или даже если это возможно с помощью CompletableFuture, было бы очень полезно :)
Также это выглядит как отличный вариант использования ForkJoinPool. Вы уже посмотрели на это? Вы можете сделать так, чтобы каждый поток брал свой пакет исходного списка, а затем выполнял две операции над элементами.
@ user7, да, все в порядке, даже если это не один поток, но моя главная задача - создавать пакеты и передавать их потоку / потокам.
@Lino, я обновил свой вопрос, чтобы узнать больше о том, почему я хочу избежать разбиения на разделы и перейти к пакетной обработке в ArrayList. Я проверю на ForkJoinPool.
[..]which may not be good considering this many objects in memory
- Не могу сказать без измерения производительности. В зависимости от того, какую реализацию subList вы используете, вы можете просто создать представление исходного списка, а не копировать элементы списка. Одним из примеров является раздел Google Guava. И List#subList
также возвращает представление части резервного списка
@NishikantTayade Я собрал небольшой пример, используя ForkJoinPool: ideone.com/1UiFJM, возможно, это поможет вам. Дайте мне знать, если это не так
Так как ListUtils.partition не стандартный метод, невозможно сказать, как он работает. Но если он работает «по-умному», он будет использовать subList в исходном списке вместо копирования данных.
Если вы хотите быть в безопасности, вы можете сделать тривиальное разбиение самостоятельно:
IntStream.range(0, (someObjectList.size() + partionSize - 1) / partionSize)
.forEach(p -> {
int start = p * partionSize,
end = Math.min(someObjectList.size(), start + partionSize);
List<SomeObject> subList = someObjectList.subList(start, end);
CompletableFuture.supplyAsync(() -> firstOperation(subList), executorService)
.thenAcceptAsync(r -> secondOperationWithFirstOpResult(r), executorService);
});
Поскольку эти подсписки не хранят элементы, они потребляют меньше памяти, чем, например, экземпляры CompletableFuture. Так что не о чем беспокоиться.
Но если вы можете жить с использованием пула потоков по умолчанию¹ вместо executorService, вы можете использовать
IntStream.range(0, (someObjectList.size() + partionSize - 1) / partionSize)
.parallel()
.forEach(p -> {
int start = p * partionSize,
end = Math.min(someObjectList.size(), start + partionSize);
List<SomeObject> subList = someObjectList.subList(start, end);
secondOperationWithFirstOpResult(firstOperation(subList));
});
Где каждый подсписок существует только во время обработки.
Это уже будет использовать задачи Fork/Join под капотом. Нет необходимости реализовывать эти операции Fork/Join самостоятельно.
¹ пул по умолчанию не указан, но на практике будет ForkJoinPool.commonPool().
@Holger, ListUtils.partition() в моем коде, взят из Apache Commons IO, в документации которого говорится The inner lists are sublist views of the original list, produced on demand using List.subList(int, int), and are subject to all the usual caveats about modification as explained in that API., я думаю, это то, что вы имели в виду, когда сказали достаточно умно, верно? То же самое было указано @user7
@NishikantTayade точно. Это означает, что вы также можете остаться с исходным кодом, если хотите.
@Holger [], they consume less memory than the CompletableFuture instances, не могли бы вы объяснить это немного подробнее? Я получил первую часть, что подсписок — это просто представление, это более эффективно, но less memory than the CompletableFuture instances это утверждение я не получил
Это было сделано только для того, чтобы напомнить вам, что есть затраты, о которых вы можете даже не думать, потому что в большинстве случаев они незначительны, а представление подсписка даже ниже этой планки. Завершаемое будущее должно отслеживать статус завершения и зависимые действия, после чего фактическое задание инкапсулируется в другой объект, который отправляется службе-исполнителю. В сумме это не так много, просто подсписок еще меньше (в большинстве случаев это зависит от конкретной реализации списка, из которой он был получен).
Различные этапы CompletableFuture могут выполняться в разных потоках. Почему вы хотите, чтобы один поток выполнял обе операции?