Мне нужно выполнить 100 задач. Я хочу запускать их параллельно, но только 10 одновременно. Я могу использовать для этого простой
ExecutorService service = Executors.newFixedThreadPool(10);
Затем создайте 10 Runnables и отправьте их в ExecutorService.
Но что, если есть 100_000 задач или 100_000_000 задач? В какой-то момент это может привести к нехватке памяти из-за большого количества Runnables в памяти.
Итак, я хотел что-то вроде этого: я сохраняю параметры всей задачи в базе данных, затем беру из базы данных еще один из них, когда освобождается один поток, и запускаю новую задачу с этими параметрами. Я придумал что-то вроде этого
package com.company;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
private static ExecutorService executorService = Executors.newFixedThreadPool(10);
private static class MyRunnable implements Runnable {
private int i; /* there would by actually parameters for task*/
public MyRunnable(int i) {
this.i = i;
}
public void run() {
try {
System.out.println(i); // there would by actually some code
Thread.sleep(1_000); // there would by actually some code
} catch (Exception ex) {
ex.printStackTrace();
} finally {
executorService.execute(new MyRunnable(i + 10 /* there would by actually taking parameters from database for new task*/));
}
}
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
executorService.submit(new MyRunnable(i /* there would by actually taking parameters from database for new task*/));
}
}
}
Это хорошее или плохое решение? Какое лучшее решение?
@JoopEggen Учитывая название, проблема заключается в нехватке памяти для создания экземпляров миллиона Runnable объектов. Как Project Loom решает проблему управления памятью ожидающих выполнения?
@BasilBourque прав, и спасибо за ссылку.
Вы, кажется, обеспокоены тем, что слишком много ожидающих объектов задач Runnable могут занять больше памяти, чем у вас будет доступно на вашем компьютере развертывания. Вы хотите избежать ошибок нехватки памяти или перегрузки виртуальной памяти.
Чтобы избежать слишком большого количества задач, подождите, чтобы создать следующую задачу, пока текущая не будет выполнена.
Добавьте вторую обязанность к каждому экземпляру ваших объектов Runnable: извлеките следующий ожидающий параметр из своей базы данных и отправьте задачу службе-исполнителю.
Задача может быть либо новым экземпляром вашей Runnable реализации, либо задачей может быть тот же экземпляр, который в данный момент выполняется Runnable. Я предпочитаю первый подход, если второй подход означает написание кода для очистки «грязного» состояния.
Когда больше не остается входных параметров, последний запущенный экземпляр вашего Runnable просто завершается после закрытия службы-исполнителя. Служба исполнителя остается без дальнейшей работы.
Обязательно в конечном итоге отключите службу-исполнитель. В противном случае его резервный пул потоков может продолжать работать бесконечно, даже после завершения работы вашего приложения, как зомби 🧟♂️.
Давайте определим задачу, которая умножает число, а затем выводит отчет на консоль.
Конструктор этой задачи принимает потокобезопасные Queue целых чисел, которые он использует в качестве входных данных. Каждый объект задачи рисует один элемент, объект Integer, из этой очереди.
Конструктор этой задачи также использует ссылку на существующий ExecutorService. Когда эта задача завершает свою работу, она создает другую задачу. Старая задача отправляет новую задачу этой переданной службе-исполнителю. Служба-исполнитель выполнит новую задачу в будущем, вероятно, в самом ближайшем будущем.
package work.basil.example.bogus;
import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
public class Multiplier implements Runnable
{
private final Queue < Integer > queue;
private final ExecutorService executorService;
public Multiplier ( final Queue < Integer > queue , final ExecutorService executorService )
{
this.queue = queue;
this.executorService = executorService;
}
@Override
public void run ( )
{
try
{
this.doRun();
}
catch ( Exception e )
{
e.printStackTrace();
}
}
private void doRun ( )
{
Integer input = queue.poll();
if ( Objects.nonNull( input ) ) // If we have another input to process.
{
// Business logic.
// Simulate doing much work by sleeping this thread.
try { Thread.sleep( Duration.ofMillis( ThreadLocalRandom.current().nextInt( 500 , 2_000 ) ) ); } catch ( InterruptedException e ) { throw new RuntimeException( e ); }
int result = Math.multiplyExact( input , 2 );
System.out.println( "INFO - input = " + input + " | result = " + result + " at " + Instant.now() );
// Schedule another task.
// Try to avoid needlessly submitting unnecessary tasks. But still may happen because of the interleaving of concurrent threads.
if ( ! this.queue.isEmpty() )
{
Multiplier task = new Multiplier( this.queue , this.executorService );
try { executorService.submit( task ); } catch ( RejectedExecutionException e ) { System.out.println( "e = " + e ); }
System.out.println( "DEBUG - Submitted a subsequent task to executor service. " + Instant.now() );
}
}
}
}
Давайте напишем приложение для выполнения некоторых из этих задач.
Это приложение создает экземпляр службы-исполнителя, поддерживаемой пулом потоков определенного размера. Приложение создает и выполняет столько задач, сколько потоков, чтобы сдвинуться с мертвой точки.
Оттуда каждая выполняемая задача создает экземпляр другой задачи, а затем отправляет ее для последующего выполнения.
Этот скользящий цикл работа-создание экземпляра-отправка/работа-создание экземпляра-отправка продолжается до тех пор, пока мы не исчерпаем очередь входных данных.
package work.basil.example.bogus;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
public class App
{
public static void main ( String[] args )
{
App app = new App();
app.demo();
}
private void demo ( )
{
System.out.println( "INFO - Demo starting. " + Instant.now() );
final List < Integer > data = List.of( 10 , 20 , 30 , 40 , 50 , 60 , 70 );
final Queue < Integer > queue = new ArrayBlockingQueue <>( data.size() , false , data );
final int countThreads = 3;
ExecutorService executorService = Executors.newFixedThreadPool( countThreads );
// Te get the ball rolling, start as many tasks as we have threads.
// From there, those initial tasks will instantiate and submit further tasks.
for ( int index = 0 ; index < countThreads ; index++ )
{
Multiplier task = new Multiplier( queue , executorService );
try { executorService.submit( task ); } catch ( RejectedExecutionException e ) { System.out.println( "e = " + e ); }
}
Duration wait = Duration.ofSeconds( 20 );
System.out.println( "DEBUG - Main thread will sleep for " + wait + " at " + Instant.now() );
try { Thread.sleep( wait ); } catch ( InterruptedException e ) { throw new RuntimeException( e ); }
System.out.println( "DEBUG - Main thread waking after waiting " + wait + " at " + Instant.now() );
executorService.close();
if ( ! executorService.isTerminated() )
{
System.out.println( "ERROR - Executor service STILL not yet terminated after waiting for " + wait + " at " + Instant.now() );
}
System.out.println( "INFO - Demo ending. " + Instant.now() );
}
}
При запуске мы ожидаем увидеть вывод, подобный этому:
INFO - Demo starting. 2023-02-23T05:51:19.584253Z
DEBUG - Main thread will sleep for PT20S at 2023-02-23T05:51:19.592739Z
INFO - input = 30 | result = 60 at 2023-02-23T05:51:20.542284Z
DEBUG - Submitted a subsequent task to executor service. 2023-02-23T05:51:20.549764Z
INFO - input = 20 | result = 40 at 2023-02-23T05:51:20.571571Z
DEBUG - Submitted a subsequent task to executor service. 2023-02-23T05:51:20.571752Z
INFO - input = 40 | result = 80 at 2023-02-23T05:51:21.180109Z
DEBUG - Submitted a subsequent task to executor service. 2023-02-23T05:51:21.180650Z
INFO - input = 10 | result = 20 at 2023-02-23T05:51:21.220425Z
DEBUG - Submitted a subsequent task to executor service. 2023-02-23T05:51:21.221124Z
INFO - input = 50 | result = 100 at 2023-02-23T05:51:21.753899Z
INFO - input = 70 | result = 140 at 2023-02-23T05:51:22.215306Z
INFO - input = 60 | result = 120 at 2023-02-23T05:51:23.023050Z
DEBUG - Main thread waking after waiting PT20S at 2023-02-23T05:51:39.588413Z
INFO - Demo ending. 2023-02-23T05:51:39.592896Z
ИМХО вы гоняетесь за призраками.
Эти задачи обычно не существуют изолированно:
Некоторые проблемы:
Из перечисленных пунктов видно, что использование памяти всеми этими экземплярами задач — лишь одна из нескольких связанных проблем.
Здесь есть несколько вариантов:
Подождите соотв. Посмотрите в проект ткацкий станок.