Правильна ли эта реализация ThreadPool?

Я новичок в параллельном программировании и пытаюсь реализовать простое ThreadPool самостоятельно. Я нашел эту реализацию на обучающем веб-сайте (jenkov.com), и, похоже, она работает нормально.

Тем не менее, я думаю, что thread и isStopped должны быть как минимум volatile в классе PoolThreadRunnable или переменных Atomic, поскольку они используются двумя потоками: тот, который запускает метод для объекта (где вызываются isStopped=true и this.thread.interrupt()), и тот, в котором фактический код запуска запускается (где мы делаем this.thread=Thread.currentThread() и while(!isStopped()).

Правильно ли я понимаю или я что-то упускаю?

public class ThreadPool {

    private BlockingQueue taskQueue = null;
    private List<PoolThreadRunnable> runnables = new ArrayList<>();
    private boolean isStopped = false;

    public ThreadPool(int noOfThreads, int maxNoOfTasks){
        taskQueue = new ArrayBlockingQueue(maxNoOfTasks);

        for(int i=0; i<noOfThreads; i++){
            PoolThreadRunnable poolThreadRunnable =
                    new PoolThreadRunnable(taskQueue);

            runnables.add(poolThreadRunnable);
        }
        for(PoolThreadRunnable runnable : runnables){
            new Thread(runnable).start();
        }
    }

    public synchronized void  execute(Runnable task) throws Exception{
        if (this.isStopped) throw
                new IllegalStateException("ThreadPool is stopped");

        this.taskQueue.offer(task);
    }

    public synchronized void stop(){
        this.isStopped = true;
        for(PoolThreadRunnable runnable : runnables){
            runnable.doStop();
        }
    }

    public synchronized void waitUntilAllTasksFinished() {
        while(this.taskQueue.size() > 0) {
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
public class PoolThreadRunnable implements Runnable {

    private Thread        thread    = null;
    private BlockingQueue taskQueue = null;
    private boolean       isStopped = false;

    public PoolThreadRunnable(BlockingQueue queue){
        taskQueue = queue;
    }

    public void run(){
        this.thread = Thread.currentThread();
        while(!isStopped()){
            try{
                Runnable runnable = (Runnable) taskQueue.take();
                runnable.run();
            } catch(Exception e){
                //log or otherwise report exception,
                //but keep pool thread alive.
            }
        }
    }

    public synchronized void doStop(){
        isStopped = true;
        //break pool thread out of dequeue() call.
        this.thread.interrupt();
    }

    public synchronized boolean isStopped(){
        return isStopped;
    }
}

Вместо просмотра и/или поддержки случайного кода, найденного в Интернете, я бы предложил использовать ThreadPoolExecutor, когда необходим пул потоков Java.

Elliott Frisch 06.07.2024 20:18

@ElliottFrisch – Что касается производственного кода, я согласен. Но для целей обучения нет ничего лучше, чем реализовать подобные вещи самостоятельно.

tquadrat 06.07.2024 20:30

Поля isStopped, похоже, должным образом охраняются методами synchronized (обратите внимание: while (!isStopped()) вызывает метод, а не читает поле напрямую). Однако поле thread имеет сомнительное назначение (поле не является изменчивым, и запись в него run не охраняется блоком synchronized (this), чтобы соответствовать синхронизированному методу doStop, в котором поле считывается).

Slaw 06.07.2024 20:32

Спасибо @ElliottFrisch за предложение! Я знаю API-интерфейс Executors в Java и определенно буду использовать его в производственном коде. Однако этот вопрос предназначен для обучения, и я фактически не использую его ни в каком производстве.

Siddharth 06.07.2024 20:33

@Slaw Спасибо за ответ! Я пропустил часть синхронизированного доступа для флага isStopped. Но, как вы также сказали, назначение переменных потока по-прежнему не защищено, и не будет никакой гарантии видимости без использования летучих или атомарных переменных. Я объявлю переменную потока как изменчивую.

Siddharth 06.07.2024 21:50

Учитывая то, как реализована остальная часть класса, я думаю, было бы лучше сделать synchronized (this) { if (isStopped) { return; } thread = Thread.currentThread(); }. И метод doStop, вероятно, должен проверить, имеет ли thread значение null перед вызовом interrupt.

Slaw 06.07.2024 23:00

@Slaw Согласен с проверкой нуля для потока, но не может использовать проверку if на isStopped и возврат. Таким образом, рабочий поток будет вызывать только один запуск, а затем он завершится. Итак, цикл необходим.

Siddharth 09.07.2024 13:29

Код в моем комментарии будет перед циклом (в методе run).

Slaw 09.07.2024 23:52

Спасибо за ответ @Slaw! Это должно сработать. У меня возник еще один вопрос (хотя и косвенный) о том, какие изменения будут внесены для выполнения Callable вместо этого. Был бы признателен, если бы вы указали мне на любую ссылку, если она вам известна.

Siddharth 10.07.2024 11:59
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
9
124
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Оба поля isStopped выглядят должным образом защищенными синхронизированными методами. Обратите внимание, что цикл while в методе run() вызывает метод isStopped() вместо прямого чтения поля. Но у поля thread сомнительное назначение. Это поле не является энергозависимым, и запись не охраняется синхронизированным блоком, соответствующим синхронизированному методу doStop().

Учитывая то, как реализована остальная часть класса, хорошим решением было бы следующее:

@Override
public void run(){
    synchronized (this) {
        if (isStopped) {
            return;
        }
        this.thread = Thread.currentThread();
    }

    while(!isStopped()){
        try{
            Runnable runnable = (Runnable) taskQueue.take();
            runnable.run();
        } catch(Exception e){
            //log or otherwise report exception,
            //but keep pool thread alive.
        }
    }
}

Note: It's recommended to always use @Override when you think you are overriding a method. Adding that annotation forces the compiler to verify the method actually overrides something.

Зачем синхронизироваться на this? Потому что синхронизированный метод экземпляра неявно синхронизируется по this, и важно синхронизировать один и тот же объект, если вы хотите создать правильную связь «происходит до». Кроме того, чтобы избежать потенциального NullPointerException, вы, вероятно, захотите изменить doStop() на:

public synchronized void doStop(){
    isStopped = true;
    if (thread != null) {
        //break pool thread out of dequeue() call.
        thread.interrupt();
    }
}

У меня возник еще один вопрос (хотя и косвенный) о том, какие изменения будут внесены для выполнения Callable вместо этого. Был бы признателен, если бы вы указали мне на любую ссылку, если она вам известна.

Я не знаю вводного урока, который пришел мне в голову. Но цель Callable — вернуть результат, чего Runnable сделать не может. Вам понадобится пул потоков, чтобы возвращать некоторый объект, представляющий асинхронный результат. В Java Executor Framework это Будущее . Вы можете посмотреть реализации ThreadPoolExecutor и FutureTask, чтобы увидеть, как они это делают.

Итак, используя для вдохновения Java Executor Framework, сначала вам следует создать интерфейс, аналогичный Future. Этот интерфейс может быть проще, поскольку вы пытаетесь только изучить, а не реализовать полнофункциональный API. Вы можете добавлять функции по своему желанию.

import java.util.concurrent.ExecutionException;

public interface Result<V> {

  /**
   * Gets the value of this {@code Result}, blocking until the background task
   * is done if necessary.
   *
   * @return the value of the background task
   * @throws ExecutionException if the background task throws an exception; the
   * thrown exception will be the cause of the {@code ExecutionException}
   * @throws InterruptedException if the calling thread is interrupted
   */
  V get() throws ExecutionException, InterruptedException;

  /**
   * Tests if the background task is complete.
   *
   * @return {@code true} if the background task is complete, {@code false}
   * otherwise
   */
  boolean isDone();
}

Затем вам следует создать реализацию, которая также реализует Runnable. Эта реализация примет Callable и будет отвечать за его вызов.

import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ResultTask<V> implements Result<V>, Runnable {

  private final AtomicBoolean executed = new AtomicBoolean();

  private final Lock lock = new ReentrantLock();
  private final Condition isDone = lock.newCondition();

  private final Callable<V> callable;
  // these three fields are guarded by 'lock'
  private boolean done;
  private V result;
  private Throwable error;

  public ResultTask(Callable<V> callable) {
    this.callable = Objects.requireNonNull(callable);
  }

  @Override
  public V get() throws ExecutionException, InterruptedException {
    lock.lockInterruptibly();
    try {
      while (!done) {
        // Called in a loop to handle a so-called "spurious wakeup" of the thread
        isDone.await();
      }

      if (error != null) {
        throw new ExecutionException(error);
      }
      return result;
    } finally {
      lock.unlock();
    }
  }

  @Override
  public boolean isDone() {
    lock.lock();
    try {
      return done;
    } finally {
      lock.unlock();
    }
  }

  @Override
  public void run() {
    if (!executed.compareAndSet(false, true)) {
      throw new IllegalStateException("task already executed");
    }

    try {
      // don't hold the lock while invoking the Callable
      V result = callable.call();
      complete(result, null);
    } catch (Throwable error) {
      complete(null, error);
    }
  }

  private void complete(V result, Throwable error) {
    lock.lock();
    try {
      this.result = result;
      this.error = error;
      done = true;
      // Signal all threads blocked on 'isDone.await()' to wake them up
      isDone.signalAll();
    } finally {
      lock.unlock();
    }
  }
}

Тогда ваш ThreadPool возьмет Callable и завернет его в ResultTask. Поскольку ResultTask реализует Runnable, вы можете поставить его в очередь задач, чтобы они выполнялись как обычно. Затем вы возвращаете ResultTask как Result вызывающей стороне, чтобы результат задачи можно было запросить в какой-то момент в будущем.

public <V> Result<V> execute(Callable<V> callable) throws Exception {
  Objects.requireNonNull(callable);
  synchronized (this) {
    if (isStopped)
      throw new IllegalStateException("ThreadPool is stopped");

    ResultTask<V> task = new ResultTask<>(callable);
    taskQueue.offer(task);
    return task;
  }
}

Warning: The result of offer should probably be checked. See end of answer.

Вы даже можете изменить свой метод execute(Runnable) следующим образом:

public Result<Void> execute(Runnable runnable) throws Exception {
  Objects.requireNonNull(runnable);
  return execute(() -> {
    runnable.run();
    return null;
  });
}

Это позволит вызывающему абоненту узнать, когда Runnable завершится, даже если это не будет иметь никакого значения.


Кроме того, если BlockingQueue — это java.util.concurrent.BlockingQueue, обратите внимание на следующие вещи:

  1. Этот интерфейс является общим. Вы не параметризовали его, а это значит, что вы используете необработанный тип. Не используйте необработанные типы; используйте BlockingQueue<Runnable> и new ArrayBlockingQueue<>(...) (обратите внимание на <>). Это также устранит необходимость приведения к Runnable в вашем методе PoolThreadRunnable#run().

  2. Метод offer(E) вернет false, если элемент не удалось добавить, например, из-за того, что очередь заполнена до отказа. Вы не проверяете это, а это означает, что вызывающий execute может подумать, что задача успешно поставлена ​​в очередь, хотя это не так. Если offer возвращает false, вам, вероятно, следует создать исключение. В Java Executor Framework это будет RejectedExecutionException.

Спасибо за подробный ответ @Slaw! Это в значительной степени суммирует то, что я хотел знать. Хорошего дня!

Siddharth 12.07.2024 19:58

Другие вопросы по теме