Как выполнить операции для пакета элементов в ArrayList с помощью CompletableFuture?

Java-версия: 11

У меня есть список объектов, я хочу выполнить над ними определенные операции, где одна операция зависит от результата другой. Чтобы реализовать этот рабочий процесс в асинхронном режиме, я использую CompletableFuture.

В настоящее время я делаю это, разбивая список на подсписки и предоставляя каждый из подсписков CompletableFuture, поэтому каждый поток в моем пуле потоков может начать работать с этим подсписком.

Код для вышеуказанного подхода, который я использовал и работал:

List<SomeObject> someObjectList // original list

List<List<SomeObject>> partitionList = ListUtils.partition(someObjectList, partionSize);

partitionList.forEach(subList -> {
    CompletableFuture.supplyAsync(() ->  firstOperation(subList), executorService)
            .thenAcceptAsync(firstOpresult -> secondOperationWithFirstOpResult(firstOpresult),executorService);
});

public static List<String> firstOperation(List<SomeObject> subList){
        //perform operation
        return List<String>;
}

public static void secondOperationWithFirstOpResult(List<String> firstOpProducedList) {
        //perform operation
        //print results.
}

Здесь проблема заключается в разбиении исходного списка,

потому что, если мой исходный список имеет 100 тысяч записей и размер раздела скажем, 100 (что означает, что я хочу 100 элементов в каждом подсписке), у меня будет 1000 объектов подсписка, каждый из которых содержит 100 элементов, что может быть не очень хорошо, учитывая такое количество объектов в памяти, более того, если размер раздела контролируется пользователем/конфигурационным контроллером, меньший размер раздела приведет к огромному количеству объектов подсписка.

Вместо разделения исходного списка,

  1. Я хочу взять исходный список (скажем, 100 элементов).
  2. Имейте startIndex и endIndex в исходном списке (скажем, от 0 до 9, от 10 до 19...)
  3. И передайте каждую из этих партий потоку в пуле потоков с помощью CompletableFuture.
  4. Таким образом, этот поток может выполнять две операции.

Я знаю, что SO не место для точного решения, но если бы вы могли подтолкнуть меня в правильном направлении, псевдокод или даже если это возможно с помощью CompletableFuture, было бы очень полезно :)

Различные этапы CompletableFuture могут выполняться в разных потоках. Почему вы хотите, чтобы один поток выполнял обе операции?

Thiyagu 11.11.2022 08:39

Также это выглядит как отличный вариант использования ForkJoinPool. Вы уже посмотрели на это? Вы можете сделать так, чтобы каждый поток брал свой пакет исходного списка, а затем выполнял две операции над элементами.

Lino 11.11.2022 08:48

@ user7, да, все в порядке, даже если это не один поток, но моя главная задача - создавать пакеты и передавать их потоку / потокам.

Nishikant Tayade 11.11.2022 08:59

@Lino, я обновил свой вопрос, чтобы узнать больше о том, почему я хочу избежать разбиения на разделы и перейти к пакетной обработке в ArrayList. Я проверю на ForkJoinPool.

Nishikant Tayade 11.11.2022 09:00
[..]which may not be good considering this many objects in memory - Не могу сказать без измерения производительности. В зависимости от того, какую реализацию subList вы используете, вы можете просто создать представление исходного списка, а не копировать элементы списка. Одним из примеров является раздел Google Guava. И List#subList также возвращает представление части резервного списка
Thiyagu 11.11.2022 09:11

@NishikantTayade Я собрал небольшой пример, используя ForkJoinPool: ideone.com/1UiFJM, возможно, это поможет вам. Дайте мне знать, если это не так

Lino 11.11.2022 09:15
LeetCode запись решения 2536. Увеличение подматриц на единицу
LeetCode запись решения 2536. Увеличение подматриц на единицу
Увеличение подматриц на единицу - LeetCode
Версия Java на основе версии загрузки
Версия Java на основе версии загрузки
Если вы зайдете на официальный сайт Spring Boot , там представлен start.spring.io , который упрощает создание проектов Spring Boot, как показано ниже.
Документирование API с помощью Swagger на Springboot
Документирование API с помощью Swagger на Springboot
В предыдущей статье мы уже узнали, как создать Rest API с помощью Springboot и MySql .
Как включить TLS в gRPC-клиенте и сервере : 2
Как включить TLS в gRPC-клиенте и сервере : 2
Здравствуйте! 🙏🏻 Надеюсь, у вас все хорошо и добро пожаловать в мой блог.
Сортировка hashmap по значениям
Сортировка hashmap по значениям
На Leetcode я решал задачу с хэшмапой и подумал, что мне нужно отсортировать хэшмапу по значениям.
4
6
124
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Так как ListUtils.partition не стандартный метод, невозможно сказать, как он работает. Но если он работает «по-умному», он будет использовать subList в исходном списке вместо копирования данных.

Если вы хотите быть в безопасности, вы можете сделать тривиальное разбиение самостоятельно:

IntStream.range(0, (someObjectList.size() + partionSize - 1) / partionSize)
    .forEach(p -> {
        int start = p * partionSize,
            end = Math.min(someObjectList.size(), start + partionSize);
        List<SomeObject> subList = someObjectList.subList(start, end);
        CompletableFuture.supplyAsync(() ->  firstOperation(subList), executorService)
            .thenAcceptAsync(r -> secondOperationWithFirstOpResult(r), executorService);
    });

Поскольку эти подсписки не хранят элементы, они потребляют меньше памяти, чем, например, экземпляры CompletableFuture. Так что не о чем беспокоиться.

Но если вы можете жить с использованием пула потоков по умолчанию¹ вместо executorService, вы можете использовать

IntStream.range(0, (someObjectList.size() + partionSize - 1) / partionSize)
    .parallel()
    .forEach(p -> {
        int start = p * partionSize,
            end = Math.min(someObjectList.size(), start + partionSize);
        List<SomeObject> subList = someObjectList.subList(start, end);
        secondOperationWithFirstOpResult(firstOperation(subList));
    });

Где каждый подсписок существует только во время обработки.

Это уже будет использовать задачи Fork/Join под капотом. Нет необходимости реализовывать эти операции Fork/Join самостоятельно.

¹ пул по умолчанию не указан, но на практике будет ForkJoinPool.commonPool().

@Holger, ListUtils.partition() в моем коде, взят из Apache Commons IO, в документации которого говорится The inner lists are sublist views of the original list, produced on demand using List.subList(int, int), and are subject to all the usual caveats about modification as explained in that API., я думаю, это то, что вы имели в виду, когда сказали достаточно умно, верно? То же самое было указано @user7

Nishikant Tayade 11.11.2022 09:31

@NishikantTayade точно. Это означает, что вы также можете остаться с исходным кодом, если хотите.

Holger 11.11.2022 09:33

@Holger [], they consume less memory than the CompletableFuture instances, не могли бы вы объяснить это немного подробнее? Я получил первую часть, что подсписок — это просто представление, это более эффективно, но less memory than the CompletableFuture instances это утверждение я не получил

Nishikant Tayade 11.11.2022 10:11

Это было сделано только для того, чтобы напомнить вам, что есть затраты, о которых вы можете даже не думать, потому что в большинстве случаев они незначительны, а представление подсписка даже ниже этой планки. Завершаемое будущее должно отслеживать статус завершения и зависимые действия, после чего фактическое задание инкапсулируется в другой объект, который отправляется службе-исполнителю. В сумме это не так много, просто подсписок еще меньше (в большинстве случаев это зависит от конкретной реализации списка, из которой он был получен).

Holger 11.11.2022 10:24

Другие вопросы по теме