Как предотвратить нехватку памяти из-за многих Runnables

Мне нужно выполнить 100 задач. Я хочу запускать их параллельно, но только 10 одновременно. Я могу использовать для этого простой

ExecutorService service = Executors.newFixedThreadPool(10);

Затем создайте 10 Runnables и отправьте их в ExecutorService.

Но что, если есть 100_000 задач или 100_000_000 задач? В какой-то момент это может привести к нехватке памяти из-за большого количества Runnables в памяти.

Итак, я хотел что-то вроде этого: я сохраняю параметры всей задачи в базе данных, затем беру из базы данных еще один из них, когда освобождается один поток, и запускаю новую задачу с этими параметрами. Я придумал что-то вроде этого

package com.company;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {

    private static ExecutorService executorService = Executors.newFixedThreadPool(10);

    private static class MyRunnable implements Runnable {

        private int i;  /* there would by actually parameters for task*/

        public MyRunnable(int i) {
            this.i = i;
        }

        public void run() {
            try {
                System.out.println(i); // there would by actually some code
                Thread.sleep(1_000);   // there would by actually some code
            } catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                executorService.execute(new MyRunnable(i + 10 /* there would by actually taking parameters from database for new task*/));
            }
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            executorService.submit(new MyRunnable(i /* there would by actually taking parameters from database for new task*/));
        }
    }
}

Это хорошее или плохое решение? Какое лучшее решение?

Подождите соотв. Посмотрите в проект ткацкий станок.

Joop Eggen 21.02.2023 18:11

@JoopEggen Учитывая название, проблема заключается в нехватке памяти для создания экземпляров миллиона Runnable объектов. Как Project Loom решает проблему управления памятью ожидающих выполнения?

Basil Bourque 21.02.2023 20:13

@BasilBourque прав, и спасибо за ссылку.

Joop Eggen 22.02.2023 01:31
Лучшая компания по разработке спортивных приложений
Лучшая компания по разработке спортивных приложений
Ищете лучшую компанию по разработке спортивных приложений? Этот список, несомненно, облегчит вашу работу!
Blibli Automation Journey - Как захватить сетевой трафик с помощью утилиты HAR в Selenium 4
Blibli Automation Journey - Как захватить сетевой трафик с помощью утилиты HAR в Selenium 4
Если вы являетесь веб-разработчиком или тестировщиком, вы можете быть знакомы с Selenium, популярным инструментом для автоматизации работы...
Фото ️🔁 Radek Jedynak 🔃 on ️🔁 Unsplash 🔃
Фото ️🔁 Radek Jedynak 🔃 on ️🔁 Unsplash 🔃
Что такое Java 8 Streams API? Java 8 Stream API
Деревья поиска (Алгоритм4 Заметки к учебнику)
Деревья поиска (Алгоритм4 Заметки к учебнику)
(1) Двоичные деревья поиска: среднее lgN, наихудшее N для вставки и поиска.
0
3
62
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Ответ принят как подходящий

Вы, кажется, обеспокоены тем, что слишком много ожидающих объектов задач Runnable могут занять больше памяти, чем у вас будет доступно на вашем компьютере развертывания. Вы хотите избежать ошибок нехватки памяти или перегрузки виртуальной памяти.

Чтобы избежать слишком большого количества задач, подождите, чтобы создать следующую задачу, пока текущая не будет выполнена.

Завершите каждую задачу отправкой другой задачи

Добавьте вторую обязанность к каждому экземпляру ваших объектов Runnable: извлеките следующий ожидающий параметр из своей базы данных и отправьте задачу службе-исполнителю.

Задача может быть либо новым экземпляром вашей Runnable реализации, либо задачей может быть тот же экземпляр, который в данный момент выполняется Runnable. Я предпочитаю первый подход, если второй подход означает написание кода для очистки «грязного» состояния.

Когда больше не остается входных параметров, последний запущенный экземпляр вашего Runnable просто завершается после закрытия службы-исполнителя. Служба исполнителя остается без дальнейшей работы.

Обязательно в конечном итоге отключите службу-исполнитель. В противном случае его резервный пул потоков может продолжать работать бесконечно, даже после завершения работы вашего приложения, как зомби 🧟‍♂️.

Пример кода

Задача

Давайте определим задачу, которая умножает число, а затем выводит отчет на консоль.

Конструктор этой задачи принимает потокобезопасные Queue целых чисел, которые он использует в качестве входных данных. Каждый объект задачи рисует один элемент, объект Integer, из этой очереди.

Конструктор этой задачи также использует ссылку на существующий ExecutorService. Когда эта задача завершает свою работу, она создает другую задачу. Старая задача отправляет новую задачу этой переданной службе-исполнителю. Служба-исполнитель выполнит новую задачу в будущем, вероятно, в самом ближайшем будущем.

package work.basil.example.bogus;

import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;

public class Multiplier implements Runnable
{
    private final Queue < Integer > queue;
    private final ExecutorService executorService;

    public Multiplier ( final Queue < Integer > queue , final ExecutorService executorService )
    {
        this.queue = queue;
        this.executorService = executorService;
    }

    @Override
    public void run ( )
    {
        try
        {
            this.doRun();
        }
        catch ( Exception e )
        {
            e.printStackTrace();
        }
    }

    private void doRun ( )
    {
        Integer input = queue.poll();
        if ( Objects.nonNull( input ) )  // If we have another input to process.
        {
            // Business logic.
            // Simulate doing much work by sleeping this thread.
            try { Thread.sleep( Duration.ofMillis( ThreadLocalRandom.current().nextInt( 500 , 2_000 ) ) ); } catch ( InterruptedException e ) { throw new RuntimeException( e ); }
            int result = Math.multiplyExact( input , 2 );
            System.out.println( "INFO - input = " + input + " | result = " + result + " at " + Instant.now() );

            // Schedule another task.
            // Try to avoid needlessly submitting unnecessary tasks. But still may happen because of the interleaving of concurrent threads.
            if ( ! this.queue.isEmpty() )
            {
                Multiplier task = new Multiplier( this.queue , this.executorService );
                try { executorService.submit( task ); } catch ( RejectedExecutionException e ) { System.out.println( "e = " + e ); }
                System.out.println( "DEBUG - Submitted a subsequent task to executor service. " + Instant.now() );
            }
        }
    }
}

Приложение

Давайте напишем приложение для выполнения некоторых из этих задач.

Это приложение создает экземпляр службы-исполнителя, поддерживаемой пулом потоков определенного размера. Приложение создает и выполняет столько задач, сколько потоков, чтобы сдвинуться с мертвой точки.

Оттуда каждая выполняемая задача создает экземпляр другой задачи, а затем отправляет ее для последующего выполнения.

Этот скользящий цикл работа-создание экземпляра-отправка/работа-создание экземпляра-отправка продолжается до тех пор, пока мы не исчерпаем очередь входных данных.

package work.basil.example.bogus;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;

public class App
{
    public static void main ( String[] args )
    {
        App app = new App();
        app.demo();
    }

    private void demo ( )
    {
        System.out.println( "INFO - Demo starting. " + Instant.now() );

        final List < Integer > data = List.of( 10 , 20 , 30 , 40 , 50 , 60 , 70 );
        final Queue < Integer > queue = new ArrayBlockingQueue <>( data.size() , false , data );
        final int countThreads = 3;
        ExecutorService executorService = Executors.newFixedThreadPool( countThreads );

        // Te get the ball rolling, start as many tasks as we have threads.
        // From there, those initial tasks will instantiate and submit further tasks.
        for ( int index = 0 ; index < countThreads ; index++ )
        {
            Multiplier task = new Multiplier( queue , executorService );
            try { executorService.submit( task ); } catch ( RejectedExecutionException e ) { System.out.println( "e = " + e ); }
        }

        Duration wait = Duration.ofSeconds( 20 );
        System.out.println( "DEBUG - Main thread will sleep for " + wait + " at " + Instant.now() );
        try { Thread.sleep( wait ); } catch ( InterruptedException e ) { throw new RuntimeException( e ); }
        System.out.println( "DEBUG - Main thread waking after waiting " + wait + " at " + Instant.now() );

        executorService.close();
        if ( ! executorService.isTerminated() )
        {
            System.out.println( "ERROR - Executor service STILL not yet terminated after waiting for " + wait + " at " + Instant.now() );
        }

        System.out.println( "INFO - Demo ending. " + Instant.now() );
    }
}

Исполнение

При запуске мы ожидаем увидеть вывод, подобный этому:

INFO - Demo starting. 2023-02-23T05:51:19.584253Z
DEBUG - Main thread will sleep for PT20S at 2023-02-23T05:51:19.592739Z
INFO - input = 30 | result = 60 at 2023-02-23T05:51:20.542284Z
DEBUG - Submitted a subsequent task to executor service. 2023-02-23T05:51:20.549764Z
INFO - input = 20 | result = 40 at 2023-02-23T05:51:20.571571Z
DEBUG - Submitted a subsequent task to executor service. 2023-02-23T05:51:20.571752Z
INFO - input = 40 | result = 80 at 2023-02-23T05:51:21.180109Z
DEBUG - Submitted a subsequent task to executor service. 2023-02-23T05:51:21.180650Z
INFO - input = 10 | result = 20 at 2023-02-23T05:51:21.220425Z
DEBUG - Submitted a subsequent task to executor service. 2023-02-23T05:51:21.221124Z
INFO - input = 50 | result = 100 at 2023-02-23T05:51:21.753899Z
INFO - input = 70 | result = 140 at 2023-02-23T05:51:22.215306Z
INFO - input = 60 | result = 120 at 2023-02-23T05:51:23.023050Z
DEBUG - Main thread waking after waiting PT20S at 2023-02-23T05:51:39.588413Z
INFO - Demo ending. 2023-02-23T05:51:39.592896Z

ИМХО вы гоняетесь за призраками.

Эти задачи обычно не существуют изолированно:

  • что-то (другой процесс, пользователь, подключение по сети) вызывает создание новых задач
  • задачи требуют некоторого конечного времени для выполнения
  • обычно есть и то, что заинтересовано в исходе задачи (если никого не волнует: зачем вы выполняете задачу?)

Некоторые проблемы:

  • если то, что вызывает создание задачи, может сделать это быстрее, чем вы можете обработать задачи, то ваша система рано или поздно будет перегружена задачами - даже база данных не имеет неограниченной емкости
  • если каждой задаче требуется только одна секунда, и вы можете обрабатывать 10 задач параллельно, это означает, что вам нужно более 100 дней для обработки 100_000_000 задач - если в это время не поступают другие задачи. Даже с 100_000 задач вам понадобится около 3 часов
  • если вам нужно 100 дней для обработки задач: как вы можете убедиться, что ваше приложение может работать так долго без перерывов?
  • если каждой задаче требуется всего несколько миллисекунд, тогда обращение к базе данных может стать вашим узким местом
    • если создается новая задача, вам нужно сохранить ее в базе данных
    • вам нужно затем прочитать эту задачу из базы данных
    • если задача завершена, вам нужно удалить ее из базы данных
  • что вы делаете с результатами заданий?
    • Вывести их на консоль? Кто это будет читать?
    • хранить их в каком-то файле? Этот файл скоро станет большим
    • хранить их в базе данных?
    • отправить их обратно по сети запрашивающей стороне?

Из перечисленных пунктов видно, что использование памяти всеми этими экземплярами задач — лишь одна из нескольких связанных проблем.

Здесь есть несколько вариантов:

  • Вы можете использовать очередь задач фиксированной длины. В этом случае, когда очередь заполнена, метод отправки блокируется до тех пор, пока не освободится слот.
  • Вы можете хранить свои задачи в базе данных, как вы упомянули
  • Вы можете использовать систему обмена сообщениями, такую ​​как JMS или Kafka, с постоянно хранимой очередью.

Другие вопросы по теме