Threadpoolexecutor с corepoolsize 0 не должен выполнять задачи, пока очередь задач не заполнится

Перебирал Параллелизм Java на практике и застрял в теме 8.3.1 Создание и удаление потока. Следующая сноска предупреждает о сохранении значения corePoolSize равным нулю.

Developers are sometimes tempted to set the core size to zero so that the worker threads will eventually be torn down and therefore won’t prevent the JVM from exiting, but this can cause some strange-seeming behavior in thread pools that don’t use a SynchronousQueue for their work queue (as newCachedThreadPool does). If the pool is already at the core size, ThreadPoolExecutor creates a new thread only if the work queue is full. So tasks submitted to a thread pool with a work queue that has any capacity and a core size of zero will not execute until the queue fills up, which is usually not what is desired.

Чтобы проверить это, я написал эту программу, которая не работает, как указано выше.

    final int corePoolSize = 0;
    ThreadPoolExecutor tp = new ThreadPoolExecutor(corePoolSize, 1, 5, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>());

    // If the pool is already at the core size
    if (tp.getPoolSize() == corePoolSize) {
        ExecutorService ex = tp;

        // So tasks submitted to a thread pool with a work queue that has any capacity
        // and a core size of zero will not execute until the queue fills up.
        // So, this should not execute until queue fills up.
        ex.execute(() -> System.out.println("Hello"));
    }

Выход: Привет

Итак, предполагает ли поведение программы, что ThreadPoolExecutor создает хотя бы один поток, если задача отправлена ​​независимо от corePoolSize=0. Если да, то о чем предупреждение в учебнике.

Обновлено: Протестировал код в jdk1.5.0_22 по предложению @ S.K. со следующим изменением:

ThreadPoolExecutor tp = new ThreadPoolExecutor(corePoolSize, 1, 5, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(1));//Queue size is set to 1.

Но с этим изменением программа завершает работу, не выводя никаких результатов.

Так я неправильно истолковываю эти утверждения из книги?

ИЗМЕНИТЬ (@sjlee): Трудно добавить код в комментарий, поэтому я добавлю его здесь как правку ... Можете ли вы опробовать эту модификацию и запустить ее как с последней версией JDK, так и с JDK 1.5?

final int corePoolSize = 0;
ThreadPoolExecutor tp = new ThreadPoolExecutor(corePoolSize, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

// If the pool is already at the core size
if (tp.getPoolSize() == corePoolSize) {
    ExecutorService ex = tp;

    // So tasks submitted to a thread pool with a work queue that has any capacity
    // and a core size of zero will not execute until the queue fills up.
    // So, this should not execute until queue fills up.
    ex.execute(() -> System.out.println("Hello"));
}
tp.shutdown();
if (tp.awaitTermination(1, TimeUnit.SECONDS)) {
    System.out.println("thread pool shut down. exiting.");
} else {
    System.out.println("shutdown timed out. exiting.");
}

@sjlee Опубликовал результат в комментариях.

@sjlee Вывод для jdk 1.5 - thread pool shut down. exiting., а для jdk 1.8 - Hello thread pool shut down. exiting.

Steve 15.09.2018 13:08

Хорошо спасибо. Как вы узнали ниже, похоже, что JDK немного изменил поведение в этом отношении с 1,5 до 1,6.

sjlee 19.09.2018 00:10

@sjlee Но код в любой версии java не выводит текст, упомянутый в учебнике. Я неправильно интерпретирую текст? Согласно моему примеру в вопросе, даже если очередь задач заполнена с corePoolSize = 0, задача не выполняется. Я не могу понять этого.

Steve 19.09.2018 08:41

«Учебник» в данном случае - Java Concurrency in Practice? Технические книги постоянно устаревают, если они не обновляются постоянно. Я просто думаю, что это было правдой на момент публикации, но это уже не так. Что гораздо важнее, так это javadoc (документ API). И я считаю, что документ API об этом, когда размер основного пула = 0, довольно расплывчатый, поэтому это не настоящий разрыв в контракте. Что касается полной очереди задач с размером основного пула = 0, не могли бы вы опубликовать пример кода, который это иллюстрирует?

sjlee 20.09.2018 19:04

@sjlee Я уже ответил на свой вопрос. Посмотрите результат теста jdk1.5. Здесь размер основного пула = 0, размер очереди задач = 1 и количество отправленных задач = 1.

Steve 21.09.2018 06:49

Заполнение очереди задач означает, что очередь должна быть заполнена, когда вы отправляете задачу. Другими словами, queue.offer (o) вернет false, если очередь заполнена. В вашем примере у вас есть LinkedBlockingQueue размером 1. Очередь не заполнена, когда отправляется первая задача. Только если вы отправляете задачи быстро, чтобы заполнить очередь, вы воссоздаете это условие. Более простой способ смоделировать это - использовать LinkedBlockingQueue размера 0 или SynchronousQueue.

sjlee 22.09.2018 00:55

@sjlee К сожалению, LinkedBlockingQueue не позволяет создавать очередь размером 0, а с другой стороны, SynchronousQueue действительно служит цели, но я пытался проверить текст в книге.

Steve 22.09.2018 14:38

Я думаю, что другая сноска из JCP также неверна: allowCoreThreadTimeOut allows you to request that all pool threads be able to time out; enable this feature with a core size of zero if you want a bounded thread pool with a bounded work queue but still have all the threads torn down when there is no work to do. "включить эту функцию с нулевым размером ядра" не имеет смысла.

Jason 22.02.2019 12:55
9
8
3 507
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Похоже, это была ошибка в более старых версиях Java, но сейчас ее нет в Java 1.8.

Согласно документации Java 1.8 от ThreadPoolExecutor.execute():

     /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     * ....
     */

Во втором пункте после добавления рабочего в очередь повторяется проверка: если вместо постановки задачи в очередь можно запустить новый поток, то откатить постановку в очередь и запустить новый поток.

Вот что происходит. Во время первой проверки задача ставится в очередь, но во время повторной проверки запускается новый поток, который выполняет вашу задачу.

Я запустил эту программу в jdk1.6.0_45, и она дает тот же результат.

Steve 10.09.2018 11:36

Вероятно, проблема была в jdk 1.5. Я мог получить только эту ссылку в Google, которая намекает на исправление этой проблемы в 1.6: cs.oswego.edu/pipermail/concurrency-interest/2006-Detersburg/…

S.K. 10.09.2018 12:13

Я добавил результат теста jdk1.5.0_22. Смотрите обновление в вопросе. ИМО, если бы это была ошибка, она бы упоминалась в самом учебнике.

Steve 11.09.2018 09:02

Я добавил правдоподобное объяснение. Посмотри на это.

Steve 15.09.2018 20:09

При запуске этой программы в jdk 1.5, 1.6, 1.7 и 1.8 я обнаружил различные реализации ThreadPoolExecutor#execute(Runnable) в 1.5, 1.6 и 1.7+. Вот что я нашел:

Реализация JDK 1.5

 //Here poolSize is the number of core threads running.

 public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    for (;;) {
        if (runState != RUNNING) {
            reject(command);
            return;
        }
        if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
            return;
        if (workQueue.offer(command))
            return;
        Runnable r = addIfUnderMaximumPoolSize(command);
        if (r == command)
            return;
        if (r == null) {
            reject(command);
            return;
        }
        // else retry
    }
}

Эта реализация не создает поток, когда corePoolSize равен 0, поэтому поставленная задача не выполняется.

Реализация JDK 1.6

//Here poolSize is the number of core threads running.

  public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}

JDK 1.6 создает новый поток, даже если corePoolSize равен 0.

Реализация JDK 1.7+ (аналогично JDK 1.6, но с лучшими блокировками и проверками состояния)

    public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

JDK 1.7 также создает новый поток, даже если corePoolSize равен 0.

Итак, похоже, что corePoolSize=0 - это особый случай в каждой версии JDK 1.5 и JDK 1.6+.

Но странно, что объяснение книги не совпадает ни с одним из результатов программы.

Я не понимаю, как вы пришли к выводу, что «объяснение в книге не соответствует ни одному из результатов программы», когда вы уже признали, что результат делает Java 5 соответствует описанию.

Holger 14.04.2020 14:16

@Holger Я запустил следующий код в Java 5, и ожидалось, что он напечатает Привет, но не final int corePoolSize = 0; ThreadPoolExecutor tp = new ThreadPoolExecutor(corePoolSize, 1, 5, TimeUnit.SECONDS,new LinkedBlockingQueue<>(1));//Queue size is set to 1. if (tp.getPoolSize() == corePoolSize) { ExecutorService ex = tp; ex.execute(() -> System.out.println("Hello")); }

Steve 29.05.2020 11:41

Точно так, как написано в книге. Так почему же вы утверждаете, что «объяснение книги не соответствует ни одному из результатов программы»?

Holger 29.05.2020 13:08
Ответ принят как подходящий

Такое странное поведение ThreadPoolExecutor в Java 5, когда размер основного пула равен нулю, по-видимому, было признано ошибкой и незаметно исправлено в Java 6.

Действительно, проблема снова появилась в Java 7 в результате некоторой переделки кода между 6 и 7. Затем о ней было сообщено как об ошибке, которая была признана ошибкой и исправлена.

В любом случае вы не должны использовать версию Java, подверженную этой ошибке. Срок службы Java 5 закончился в 2015 году, и последние доступные версии Java 6 и более поздних версий не затронуты. Этот раздел «Java Concurrency In Practice» больше не подходит.

Использованная литература:

Другие вопросы по теме