Я новичок в параллельном программировании и пытаюсь реализовать простое ThreadPool самостоятельно. Я нашел эту реализацию на обучающем веб-сайте (jenkov.com), и, похоже, она работает нормально.
Тем не менее, я думаю, что thread и isStopped должны быть как минимум volatile в классе PoolThreadRunnable или переменных Atomic, поскольку они используются двумя потоками: тот, который запускает метод для объекта (где вызываются isStopped=true и this.thread.interrupt()), и тот, в котором фактический код запуска запускается (где мы делаем this.thread=Thread.currentThread() и while(!isStopped()).
Правильно ли я понимаю или я что-то упускаю?
public class ThreadPool {
private BlockingQueue taskQueue = null;
private List<PoolThreadRunnable> runnables = new ArrayList<>();
private boolean isStopped = false;
public ThreadPool(int noOfThreads, int maxNoOfTasks){
taskQueue = new ArrayBlockingQueue(maxNoOfTasks);
for(int i=0; i<noOfThreads; i++){
PoolThreadRunnable poolThreadRunnable =
new PoolThreadRunnable(taskQueue);
runnables.add(poolThreadRunnable);
}
for(PoolThreadRunnable runnable : runnables){
new Thread(runnable).start();
}
}
public synchronized void execute(Runnable task) throws Exception{
if (this.isStopped) throw
new IllegalStateException("ThreadPool is stopped");
this.taskQueue.offer(task);
}
public synchronized void stop(){
this.isStopped = true;
for(PoolThreadRunnable runnable : runnables){
runnable.doStop();
}
}
public synchronized void waitUntilAllTasksFinished() {
while(this.taskQueue.size() > 0) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class PoolThreadRunnable implements Runnable {
private Thread thread = null;
private BlockingQueue taskQueue = null;
private boolean isStopped = false;
public PoolThreadRunnable(BlockingQueue queue){
taskQueue = queue;
}
public void run(){
this.thread = Thread.currentThread();
while(!isStopped()){
try{
Runnable runnable = (Runnable) taskQueue.take();
runnable.run();
} catch(Exception e){
//log or otherwise report exception,
//but keep pool thread alive.
}
}
}
public synchronized void doStop(){
isStopped = true;
//break pool thread out of dequeue() call.
this.thread.interrupt();
}
public synchronized boolean isStopped(){
return isStopped;
}
}
@ElliottFrisch – Что касается производственного кода, я согласен. Но для целей обучения нет ничего лучше, чем реализовать подобные вещи самостоятельно.
Поля isStopped, похоже, должным образом охраняются методами synchronized (обратите внимание: while (!isStopped()) вызывает метод, а не читает поле напрямую). Однако поле thread имеет сомнительное назначение (поле не является изменчивым, и запись в него run не охраняется блоком synchronized (this), чтобы соответствовать синхронизированному методу doStop, в котором поле считывается).
Спасибо @ElliottFrisch за предложение! Я знаю API-интерфейс Executors в Java и определенно буду использовать его в производственном коде. Однако этот вопрос предназначен для обучения, и я фактически не использую его ни в каком производстве.
@Slaw Спасибо за ответ! Я пропустил часть синхронизированного доступа для флага isStopped. Но, как вы также сказали, назначение переменных потока по-прежнему не защищено, и не будет никакой гарантии видимости без использования летучих или атомарных переменных. Я объявлю переменную потока как изменчивую.
Учитывая то, как реализована остальная часть класса, я думаю, было бы лучше сделать synchronized (this) { if (isStopped) { return; } thread = Thread.currentThread(); }. И метод doStop, вероятно, должен проверить, имеет ли thread значение null перед вызовом interrupt.
@Slaw Согласен с проверкой нуля для потока, но не может использовать проверку if на isStopped и возврат. Таким образом, рабочий поток будет вызывать только один запуск, а затем он завершится. Итак, цикл необходим.
Код в моем комментарии будет перед циклом (в методе run).
Спасибо за ответ @Slaw! Это должно сработать. У меня возник еще один вопрос (хотя и косвенный) о том, какие изменения будут внесены для выполнения Callable вместо этого. Был бы признателен, если бы вы указали мне на любую ссылку, если она вам известна.




Оба поля isStopped выглядят должным образом защищенными синхронизированными методами. Обратите внимание, что цикл while в методе run() вызывает метод isStopped() вместо прямого чтения поля. Но у поля thread сомнительное назначение. Это поле не является энергозависимым, и запись не охраняется синхронизированным блоком, соответствующим синхронизированному методу doStop().
Учитывая то, как реализована остальная часть класса, хорошим решением было бы следующее:
@Override
public void run(){
synchronized (this) {
if (isStopped) {
return;
}
this.thread = Thread.currentThread();
}
while(!isStopped()){
try{
Runnable runnable = (Runnable) taskQueue.take();
runnable.run();
} catch(Exception e){
//log or otherwise report exception,
//but keep pool thread alive.
}
}
}
Note: It's recommended to always use @Override when you think you are overriding a method. Adding that annotation forces the compiler to verify the method actually overrides something.
Зачем синхронизироваться на this? Потому что синхронизированный метод экземпляра неявно синхронизируется по this, и важно синхронизировать один и тот же объект, если вы хотите создать правильную связь «происходит до». Кроме того, чтобы избежать потенциального NullPointerException, вы, вероятно, захотите изменить doStop() на:
public synchronized void doStop(){
isStopped = true;
if (thread != null) {
//break pool thread out of dequeue() call.
thread.interrupt();
}
}
У меня возник еще один вопрос (хотя и косвенный) о том, какие изменения будут внесены для выполнения Callable вместо этого. Был бы признателен, если бы вы указали мне на любую ссылку, если она вам известна.
Я не знаю вводного урока, который пришел мне в голову. Но цель Callable — вернуть результат, чего Runnable сделать не может. Вам понадобится пул потоков, чтобы возвращать некоторый объект, представляющий асинхронный результат. В Java Executor Framework это Будущее . Вы можете посмотреть реализации ThreadPoolExecutor и FutureTask, чтобы увидеть, как они это делают.
Итак, используя для вдохновения Java Executor Framework, сначала вам следует создать интерфейс, аналогичный Future. Этот интерфейс может быть проще, поскольку вы пытаетесь только изучить, а не реализовать полнофункциональный API. Вы можете добавлять функции по своему желанию.
import java.util.concurrent.ExecutionException;
public interface Result<V> {
/**
* Gets the value of this {@code Result}, blocking until the background task
* is done if necessary.
*
* @return the value of the background task
* @throws ExecutionException if the background task throws an exception; the
* thrown exception will be the cause of the {@code ExecutionException}
* @throws InterruptedException if the calling thread is interrupted
*/
V get() throws ExecutionException, InterruptedException;
/**
* Tests if the background task is complete.
*
* @return {@code true} if the background task is complete, {@code false}
* otherwise
*/
boolean isDone();
}
Затем вам следует создать реализацию, которая также реализует Runnable. Эта реализация примет Callable и будет отвечать за его вызов.
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ResultTask<V> implements Result<V>, Runnable {
private final AtomicBoolean executed = new AtomicBoolean();
private final Lock lock = new ReentrantLock();
private final Condition isDone = lock.newCondition();
private final Callable<V> callable;
// these three fields are guarded by 'lock'
private boolean done;
private V result;
private Throwable error;
public ResultTask(Callable<V> callable) {
this.callable = Objects.requireNonNull(callable);
}
@Override
public V get() throws ExecutionException, InterruptedException {
lock.lockInterruptibly();
try {
while (!done) {
// Called in a loop to handle a so-called "spurious wakeup" of the thread
isDone.await();
}
if (error != null) {
throw new ExecutionException(error);
}
return result;
} finally {
lock.unlock();
}
}
@Override
public boolean isDone() {
lock.lock();
try {
return done;
} finally {
lock.unlock();
}
}
@Override
public void run() {
if (!executed.compareAndSet(false, true)) {
throw new IllegalStateException("task already executed");
}
try {
// don't hold the lock while invoking the Callable
V result = callable.call();
complete(result, null);
} catch (Throwable error) {
complete(null, error);
}
}
private void complete(V result, Throwable error) {
lock.lock();
try {
this.result = result;
this.error = error;
done = true;
// Signal all threads blocked on 'isDone.await()' to wake them up
isDone.signalAll();
} finally {
lock.unlock();
}
}
}
Тогда ваш ThreadPool возьмет Callable и завернет его в ResultTask. Поскольку ResultTask реализует Runnable, вы можете поставить его в очередь задач, чтобы они выполнялись как обычно. Затем вы возвращаете ResultTask как Result вызывающей стороне, чтобы результат задачи можно было запросить в какой-то момент в будущем.
public <V> Result<V> execute(Callable<V> callable) throws Exception {
Objects.requireNonNull(callable);
synchronized (this) {
if (isStopped)
throw new IllegalStateException("ThreadPool is stopped");
ResultTask<V> task = new ResultTask<>(callable);
taskQueue.offer(task);
return task;
}
}
Warning: The result of offer should probably be checked. See end of answer.
Вы даже можете изменить свой метод execute(Runnable) следующим образом:
public Result<Void> execute(Runnable runnable) throws Exception {
Objects.requireNonNull(runnable);
return execute(() -> {
runnable.run();
return null;
});
}
Это позволит вызывающему абоненту узнать, когда Runnable завершится, даже если это не будет иметь никакого значения.
Кроме того, если BlockingQueue — это java.util.concurrent.BlockingQueue, обратите внимание на следующие вещи:
Этот интерфейс является общим. Вы не параметризовали его, а это значит, что вы используете необработанный тип. Не используйте необработанные типы; используйте BlockingQueue<Runnable> и new ArrayBlockingQueue<>(...) (обратите внимание на <>). Это также устранит необходимость приведения к Runnable в вашем методе PoolThreadRunnable#run().
Метод offer(E) вернет false, если элемент не удалось добавить, например, из-за того, что очередь заполнена до отказа. Вы не проверяете это, а это означает, что вызывающий execute может подумать, что задача успешно поставлена в очередь, хотя это не так. Если offer возвращает false, вам, вероятно, следует создать исключение. В Java Executor Framework это будет RejectedExecutionException.
Спасибо за подробный ответ @Slaw! Это в значительной степени суммирует то, что я хотел знать. Хорошего дня!
Вместо просмотра и/или поддержки случайного кода, найденного в Интернете, я бы предложил использовать ThreadPoolExecutor, когда необходим пул потоков Java.