Здесь я столкнулся с проблемой при создании собственного исполнителя пула потоков. Я пытаюсь реализовать то же самое, не используя какую-либо библиотеку служб исполнителя, чтобы подготовиться к собеседованию по Java. Я могу кодировать ниже службы.
import java.util.LinkedList;
import java.util.Queue;
class Worker extends Thread {
private final Queue<Runnable> taskQueue;
private volatile boolean isStopped = false;
public Worker(Queue<Runnable> taskQueue) {
this.taskQueue = taskQueue;
}
public void run() {
while (!isStopped) {
Runnable task = null;
synchronized (taskQueue) {
while (taskQueue.isEmpty() && !isStopped) {
try {
taskQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (!taskQueue.isEmpty()) {
task = taskQueue.poll();
}
}
if (task != null) {
task.run();
}
if (isStopped)
return;
}
}
public void stopThread() {
isStopped = true;
interrupt(); // Interrupt the thread if it's waiting
}
}
public class CustomThreadPool {
private final int poolSize;
private final Queue<Runnable> taskQueue;
private final Worker[] workers;
public CustomThreadPool(int poolSize) {
this.poolSize = poolSize;
taskQueue = new LinkedList<>();
workers = new Worker[poolSize];
for (int i = 0; i < poolSize; i++) {
workers[i] = new Worker(taskQueue);
workers[i].start();
}
}
public void execute(Runnable task) {
synchronized (taskQueue) {
taskQueue.offer(task);
taskQueue.notifyAll(); // Notify all waiting threads to start executing the task
}
}
public void shutdown() {
for (Worker worker : workers) {
worker.stopThread();
}
}
public static void main(String[] args) {
CustomThreadPool threadPool = new CustomThreadPool(3);
// Submit tasks to the thread pool
for (int i = 1; i <= 5; i++) {
int num = i;
threadPool.execute(() -> {
int result = calculateFactorial(num);
System.out.println("Factorial of " + num + " is " + result + " - Thread: " + Thread.currentThread().getName());
});
}
// Shutdown the thread pool
threadPool.shutdown();
}
private static int calculateFactorial(int n) {
int factorial = 1;
for (int i = 1; i <= n; i++) {
factorial *= i;
}
return factorial;
}
}
Ниже приведен вывод приведенного выше кода.
Factorial of 1 is 1 - Thread: Thread-0
Factorial of 2 is 2 - Thread: Thread-1
Factorial of 3 is 6 - Thread: Thread-2
Ну, проблема здесь в том, что я отправляю 5 задач исполнителю, я вижу то же самое и в TaskQueue, но выполняется только 3/5 задач, поскольку размер пула потоков сохраняется 3. Я хочу, чтобы эти 3 потока выполнялись эти 5 задач в каком-то смысле. Мне нужна помощь, чтобы понять, что я делаю неправильно и как заставить одни и те же 3 потока выполнять все поставленные задачи в очереди.
Примечание: ваш метод if (isStopped) return; Within, состоящий исключительно из цикла while(!isStopped) …, устарел.
Прерывание означает, что кто-то хочет, чтобы ваш поток остановил свою работу и завершил работу как можно скорее. Игнорирование прерывания — худший возможный ответ. Самое простое решение — поместить весь цикл while (!isStopped) внутри try/catch.




Вы останавливаете рабочие процессы, когда isStopped == true, даже если еще есть задания для обработки. Минимальное изменение в вашем коде будет заключаться в изменении метода запуска, подобного этому.
public void run() {
while (!isStopped || !taskQueue.isEmpty()) {
Runnable task = null;
synchronized (taskQueue) {
while (taskQueue.isEmpty() && !isStopped) {
try {
taskQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (!taskQueue.isEmpty()) {
task = taskQueue.poll();
}
}
if (task != null) {
task.run();
}
}
}
Не лучшая идея вызывать taskQueue.isEmpty() за пределами синхронизированного блока, но вероятность возникновения проблем должна быть минимальной.
Спасибо друг! Это сработало нормально. Я должен был увидеть это раньше. Флаг isStopped рабочего потока уже установлен в значение true, поэтому он не может выполнить другую задачу. Этот TaskQueue.isEmpty() пригодился для завершения задач в очереди.
Основываясь на этом ответе, я бы изменил метод run() следующим образом:
public void run()
{
while( true )
{
Runnable task = null;
synchronized( taskQueue )
{
if ( isStopped && taskQueue.isEmpty() ) break;
if ( taskQueue.isEmpty() )
{
try
{
taskQueue.wait();
}
catch( final InterruptedException e )
{
e.printStackTrace();
}
}
if ( !taskQueue.isEmpty() ) task = taskQueue.poll();
}
if ( task != null ) task.run();
}
}
спасибо и за это, приятель, твой кажется более потокобезопасным, но это while(1) заставило меня задуматься.
@AdityaTiwari – В Java while(1) не компилируется 😉 . Но, конечно, вы можете сохранить результат проверки в строке с break в переменной и использовать ее в качестве условия while. Но переместить проверку с taskQueue на while не получится, если эту проверку необходимо синхронизировать.
Я думаю, проблема в том, что вы отключаете пул потоков до того, как рабочие смогут выполнить все задачи. Таким образом, вы, скорее всего, выходите из всех рабочих с
isStoppedбудучиtrue, а не когда очередь пуста.