Создание собственного исполнителя пула потоков

Здесь я столкнулся с проблемой при создании собственного исполнителя пула потоков. Я пытаюсь реализовать то же самое, не используя какую-либо библиотеку служб исполнителя, чтобы подготовиться к собеседованию по Java. Я могу кодировать ниже службы.

import java.util.LinkedList;
import java.util.Queue;

class Worker extends Thread {
    private final Queue<Runnable> taskQueue;
    private volatile boolean isStopped = false;

    public Worker(Queue<Runnable> taskQueue) {
        this.taskQueue = taskQueue;
    }

    public void run() {
        while (!isStopped) {
            Runnable task = null;
            synchronized (taskQueue) {
                while (taskQueue.isEmpty() && !isStopped) {
                    try {
                        taskQueue.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                if (!taskQueue.isEmpty()) {
                    task = taskQueue.poll();
                }
            }
            if (task != null) {
                task.run();
            }
            if (isStopped)
            return;
        }
    }

    public void stopThread() {
        isStopped = true;
        interrupt(); // Interrupt the thread if it's waiting
    }
}

public class CustomThreadPool {
    private final int poolSize;
    private final Queue<Runnable> taskQueue;
    private final Worker[] workers;

    public CustomThreadPool(int poolSize) {
        this.poolSize = poolSize;
        taskQueue = new LinkedList<>();
        workers = new Worker[poolSize];
        for (int i = 0; i < poolSize; i++) {
            workers[i] = new Worker(taskQueue);
            workers[i].start();
        }
    }

    public void execute(Runnable task) {
        synchronized (taskQueue) {
            taskQueue.offer(task);
            taskQueue.notifyAll(); // Notify all waiting threads to start executing the task
        }
    }

    public void shutdown() {
        for (Worker worker : workers) {
            worker.stopThread();
        }
    }

    public static void main(String[] args) {
        CustomThreadPool threadPool = new CustomThreadPool(3);

        // Submit tasks to the thread pool
        for (int i = 1; i <= 5; i++) {
            int num = i;
            threadPool.execute(() -> {
                int result = calculateFactorial(num);
                System.out.println("Factorial of " + num + " is " + result + " - Thread: " + Thread.currentThread().getName());
            });
        }

        // Shutdown the thread pool
        threadPool.shutdown();
    }

    private static int calculateFactorial(int n) {
        int factorial = 1;
        for (int i = 1; i <= n; i++) {
            factorial *= i;
        }
        return factorial;
    }
}

Ниже приведен вывод приведенного выше кода.

Factorial of 1 is 1 - Thread: Thread-0
Factorial of 2 is 2 - Thread: Thread-1
Factorial of 3 is 6 - Thread: Thread-2

Ну, проблема здесь в том, что я отправляю 5 задач исполнителю, я вижу то же самое и в TaskQueue, но выполняется только 3/5 задач, поскольку размер пула потоков сохраняется 3. Я хочу, чтобы эти 3 потока выполнялись эти 5 задач в каком-то смысле. Мне нужна помощь, чтобы понять, что я делаю неправильно и как заставить одни и те же 3 потока выполнять все поставленные задачи в очереди.

Я думаю, проблема в том, что вы отключаете пул потоков до того, как рабочие смогут выполнить все задачи. Таким образом, вы, скорее всего, выходите из всех рабочих с isStopped будучи true, а не когда очередь пуста.

gthanop 02.04.2024 14:42

Примечание: ваш метод if (isStopped) return; Within, состоящий исключительно из цикла while(!isStopped) …, устарел.

Holger 02.04.2024 16:12

Прерывание означает, что кто-то хочет, чтобы ваш поток остановил свою работу и завершил работу как можно скорее. Игнорирование прерывания — худший возможный ответ. Самое простое решение — поместить весь цикл while (!isStopped) внутри try/catch.

VGR 02.04.2024 19:15
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
3
367
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Вы останавливаете рабочие процессы, когда isStopped == true, даже если еще есть задания для обработки. Минимальное изменение в вашем коде будет заключаться в изменении метода запуска, подобного этому.

    public void run() {
        while (!isStopped || !taskQueue.isEmpty()) {
            Runnable task = null;
            synchronized (taskQueue) {
                while (taskQueue.isEmpty() && !isStopped) {
                    try {
                        taskQueue.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                if (!taskQueue.isEmpty()) {
                    task = taskQueue.poll();
                }
            }
            if (task != null) {
                task.run();
            }
        }
    }

Не лучшая идея вызывать taskQueue.isEmpty() за пределами синхронизированного блока, но вероятность возникновения проблем должна быть минимальной.

tquadrat 02.04.2024 16:35

Спасибо друг! Это сработало нормально. Я должен был увидеть это раньше. Флаг isStopped рабочего потока уже установлен в значение true, поэтому он не может выполнить другую задачу. Этот TaskQueue.isEmpty() пригодился для завершения задач в очереди.

Aditya Tiwari 02.04.2024 17:52
Ответ принят как подходящий

Основываясь на этом ответе, я бы изменил метод run() следующим образом:

public void run() 
{
  while( true )
  {
    Runnable task = null;
    synchronized( taskQueue )
    {
      if ( isStopped && taskQueue.isEmpty() ) break;
      if ( taskQueue.isEmpty() )
      {
        try 
        {
          taskQueue.wait();
        } 
        catch( final InterruptedException e ) 
        {
          e.printStackTrace();
        }
      }
      if ( !taskQueue.isEmpty() ) task = taskQueue.poll();
    }
    if ( task != null ) task.run();
  }
}

спасибо и за это, приятель, твой кажется более потокобезопасным, но это while(1) заставило меня задуматься.

Aditya Tiwari 02.04.2024 18:05

@AdityaTiwari – В Java while(1) не компилируется 😉 . Но, конечно, вы можете сохранить результат проверки в строке с break в переменной и использовать ее в качестве условия while. Но переместить проверку с taskQueue на while не получится, если эту проверку необходимо синхронизировать.

tquadrat 03.04.2024 11:00

Другие вопросы по теме