Я новичок в параллельном программировании и пытаюсь реализовать простое ThreadPool
самостоятельно. Я нашел эту реализацию на обучающем веб-сайте (jenkov.com), и, похоже, она работает нормально.
Тем не менее, я думаю, что thread
и isStopped
должны быть как минимум volatile
в классе PoolThreadRunnable
или переменных Atomic
, поскольку они используются двумя потоками: тот, который запускает метод для объекта (где вызываются isStopped=true
и this.thread.interrupt()
), и тот, в котором фактический код запуска запускается (где мы делаем this.thread=Thread.currentThread()
и while(!isStopped())
.
Правильно ли я понимаю или я что-то упускаю?
public class ThreadPool {
private BlockingQueue taskQueue = null;
private List<PoolThreadRunnable> runnables = new ArrayList<>();
private boolean isStopped = false;
public ThreadPool(int noOfThreads, int maxNoOfTasks){
taskQueue = new ArrayBlockingQueue(maxNoOfTasks);
for(int i=0; i<noOfThreads; i++){
PoolThreadRunnable poolThreadRunnable =
new PoolThreadRunnable(taskQueue);
runnables.add(poolThreadRunnable);
}
for(PoolThreadRunnable runnable : runnables){
new Thread(runnable).start();
}
}
public synchronized void execute(Runnable task) throws Exception{
if (this.isStopped) throw
new IllegalStateException("ThreadPool is stopped");
this.taskQueue.offer(task);
}
public synchronized void stop(){
this.isStopped = true;
for(PoolThreadRunnable runnable : runnables){
runnable.doStop();
}
}
public synchronized void waitUntilAllTasksFinished() {
while(this.taskQueue.size() > 0) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class PoolThreadRunnable implements Runnable {
private Thread thread = null;
private BlockingQueue taskQueue = null;
private boolean isStopped = false;
public PoolThreadRunnable(BlockingQueue queue){
taskQueue = queue;
}
public void run(){
this.thread = Thread.currentThread();
while(!isStopped()){
try{
Runnable runnable = (Runnable) taskQueue.take();
runnable.run();
} catch(Exception e){
//log or otherwise report exception,
//but keep pool thread alive.
}
}
}
public synchronized void doStop(){
isStopped = true;
//break pool thread out of dequeue() call.
this.thread.interrupt();
}
public synchronized boolean isStopped(){
return isStopped;
}
}
@ElliottFrisch – Что касается производственного кода, я согласен. Но для целей обучения нет ничего лучше, чем реализовать подобные вещи самостоятельно.
Поля isStopped
, похоже, должным образом охраняются методами synchronized
(обратите внимание: while (!isStopped())
вызывает метод, а не читает поле напрямую). Однако поле thread
имеет сомнительное назначение (поле не является изменчивым, и запись в него run
не охраняется блоком synchronized (this)
, чтобы соответствовать синхронизированному методу doStop
, в котором поле считывается).
Спасибо @ElliottFrisch за предложение! Я знаю API-интерфейс Executors в Java и определенно буду использовать его в производственном коде. Однако этот вопрос предназначен для обучения, и я фактически не использую его ни в каком производстве.
@Slaw Спасибо за ответ! Я пропустил часть синхронизированного доступа для флага isStopped. Но, как вы также сказали, назначение переменных потока по-прежнему не защищено, и не будет никакой гарантии видимости без использования летучих или атомарных переменных. Я объявлю переменную потока как изменчивую.
Учитывая то, как реализована остальная часть класса, я думаю, было бы лучше сделать synchronized (this) { if (isStopped) { return; } thread = Thread.currentThread(); }
. И метод doStop
, вероятно, должен проверить, имеет ли thread
значение null перед вызовом interrupt
.
@Slaw Согласен с проверкой нуля для потока, но не может использовать проверку if на isStopped и возврат. Таким образом, рабочий поток будет вызывать только один запуск, а затем он завершится. Итак, цикл необходим.
Код в моем комментарии будет перед циклом (в методе run
).
Спасибо за ответ @Slaw! Это должно сработать. У меня возник еще один вопрос (хотя и косвенный) о том, какие изменения будут внесены для выполнения Callable вместо этого. Был бы признателен, если бы вы указали мне на любую ссылку, если она вам известна.
Оба поля isStopped
выглядят должным образом защищенными синхронизированными методами. Обратите внимание, что цикл while
в методе run()
вызывает метод isStopped()
вместо прямого чтения поля. Но у поля thread
сомнительное назначение. Это поле не является энергозависимым, и запись не охраняется синхронизированным блоком, соответствующим синхронизированному методу doStop()
.
Учитывая то, как реализована остальная часть класса, хорошим решением было бы следующее:
@Override
public void run(){
synchronized (this) {
if (isStopped) {
return;
}
this.thread = Thread.currentThread();
}
while(!isStopped()){
try{
Runnable runnable = (Runnable) taskQueue.take();
runnable.run();
} catch(Exception e){
//log or otherwise report exception,
//but keep pool thread alive.
}
}
}
Note: It's recommended to always use @Override
when you think you are overriding a method. Adding that annotation forces the compiler to verify the method actually overrides something.
Зачем синхронизироваться на this
? Потому что синхронизированный метод экземпляра неявно синхронизируется по this
, и важно синхронизировать один и тот же объект, если вы хотите создать правильную связь «происходит до». Кроме того, чтобы избежать потенциального NullPointerException
, вы, вероятно, захотите изменить doStop()
на:
public synchronized void doStop(){
isStopped = true;
if (thread != null) {
//break pool thread out of dequeue() call.
thread.interrupt();
}
}
У меня возник еще один вопрос (хотя и косвенный) о том, какие изменения будут внесены для выполнения Callable вместо этого. Был бы признателен, если бы вы указали мне на любую ссылку, если она вам известна.
Я не знаю вводного урока, который пришел мне в голову. Но цель Callable
— вернуть результат, чего Runnable
сделать не может. Вам понадобится пул потоков, чтобы возвращать некоторый объект, представляющий асинхронный результат. В Java Executor Framework это Будущее . Вы можете посмотреть реализации ThreadPoolExecutor и FutureTask, чтобы увидеть, как они это делают.
Итак, используя для вдохновения Java Executor Framework, сначала вам следует создать интерфейс, аналогичный Future
. Этот интерфейс может быть проще, поскольку вы пытаетесь только изучить, а не реализовать полнофункциональный API. Вы можете добавлять функции по своему желанию.
import java.util.concurrent.ExecutionException;
public interface Result<V> {
/**
* Gets the value of this {@code Result}, blocking until the background task
* is done if necessary.
*
* @return the value of the background task
* @throws ExecutionException if the background task throws an exception; the
* thrown exception will be the cause of the {@code ExecutionException}
* @throws InterruptedException if the calling thread is interrupted
*/
V get() throws ExecutionException, InterruptedException;
/**
* Tests if the background task is complete.
*
* @return {@code true} if the background task is complete, {@code false}
* otherwise
*/
boolean isDone();
}
Затем вам следует создать реализацию, которая также реализует Runnable
. Эта реализация примет Callable
и будет отвечать за его вызов.
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ResultTask<V> implements Result<V>, Runnable {
private final AtomicBoolean executed = new AtomicBoolean();
private final Lock lock = new ReentrantLock();
private final Condition isDone = lock.newCondition();
private final Callable<V> callable;
// these three fields are guarded by 'lock'
private boolean done;
private V result;
private Throwable error;
public ResultTask(Callable<V> callable) {
this.callable = Objects.requireNonNull(callable);
}
@Override
public V get() throws ExecutionException, InterruptedException {
lock.lockInterruptibly();
try {
while (!done) {
// Called in a loop to handle a so-called "spurious wakeup" of the thread
isDone.await();
}
if (error != null) {
throw new ExecutionException(error);
}
return result;
} finally {
lock.unlock();
}
}
@Override
public boolean isDone() {
lock.lock();
try {
return done;
} finally {
lock.unlock();
}
}
@Override
public void run() {
if (!executed.compareAndSet(false, true)) {
throw new IllegalStateException("task already executed");
}
try {
// don't hold the lock while invoking the Callable
V result = callable.call();
complete(result, null);
} catch (Throwable error) {
complete(null, error);
}
}
private void complete(V result, Throwable error) {
lock.lock();
try {
this.result = result;
this.error = error;
done = true;
// Signal all threads blocked on 'isDone.await()' to wake them up
isDone.signalAll();
} finally {
lock.unlock();
}
}
}
Тогда ваш ThreadPool
возьмет Callable
и завернет его в ResultTask
. Поскольку ResultTask
реализует Runnable
, вы можете поставить его в очередь задач, чтобы они выполнялись как обычно. Затем вы возвращаете ResultTask
как Result
вызывающей стороне, чтобы результат задачи можно было запросить в какой-то момент в будущем.
public <V> Result<V> execute(Callable<V> callable) throws Exception {
Objects.requireNonNull(callable);
synchronized (this) {
if (isStopped)
throw new IllegalStateException("ThreadPool is stopped");
ResultTask<V> task = new ResultTask<>(callable);
taskQueue.offer(task);
return task;
}
}
Warning: The result of offer
should probably be checked. See end of answer.
Вы даже можете изменить свой метод execute(Runnable)
следующим образом:
public Result<Void> execute(Runnable runnable) throws Exception {
Objects.requireNonNull(runnable);
return execute(() -> {
runnable.run();
return null;
});
}
Это позволит вызывающему абоненту узнать, когда Runnable
завершится, даже если это не будет иметь никакого значения.
Кроме того, если BlockingQueue
— это java.util.concurrent.BlockingQueue
, обратите внимание на следующие вещи:
Этот интерфейс является общим. Вы не параметризовали его, а это значит, что вы используете необработанный тип. Не используйте необработанные типы; используйте BlockingQueue<Runnable>
и new ArrayBlockingQueue<>(...)
(обратите внимание на <>
). Это также устранит необходимость приведения к Runnable
в вашем методе PoolThreadRunnable#run()
.
Метод offer(E)
вернет false
, если элемент не удалось добавить, например, из-за того, что очередь заполнена до отказа. Вы не проверяете это, а это означает, что вызывающий execute
может подумать, что задача успешно поставлена в очередь, хотя это не так. Если offer
возвращает false
, вам, вероятно, следует создать исключение. В Java Executor Framework это будет RejectedExecutionException
.
Спасибо за подробный ответ @Slaw! Это в значительной степени суммирует то, что я хотел знать. Хорошего дня!
Вместо просмотра и/или поддержки случайного кода, найденного в Интернете, я бы предложил использовать ThreadPoolExecutor, когда необходим пул потоков Java.