Почему этот пул потоков блокируется при уничтожении?

Я пытаюсь понять новые функции параллелизма С++ 20. Я использую std::barrier в реализации пула потоков, но он блокируется. В чем может быть проблема со следующей реализацией пула потоков?

Основной поток блокируется при вызове деструктора пула потоков. У меня есть предположение, почему это могло быть так, но я не уверен и задаюсь вопросом, могу ли я немного переписать программу и при этом полагаться на std::barrier

Воспроизводимый пример выглядит следующим образом:

#include <stop_token>
#include <atomic>
#include <barrier>
#include <functional>
#include <stop_token>
#include <thread>
#include <vector>


class ThreadPool {

   public:
    friend void ThreadMain(std::stop_token, ThreadPool*);
    using function = void(void* context, size_t idx);
    explicit ThreadPool(size_t num_threads);
    ~ThreadPool();
    void QueueTask(function*, void* context, int range);

   private:
    std::barrier<std::function<void()>> sync_point;
    std::atomic_flag start{false};
    std::atomic<int> idx{0};
    function* task{nullptr};
    void* ctx{nullptr};
    std::vector<std::jthread> threads;
};

void ThreadMain(std::stop_token stoken, ThreadPool* pool) {
    while (!stoken.stop_requested()) {
        pool->start.wait(false);
        if (stoken.stop_requested()) {
            return;
        }
        int i{0};
        while ((i = pool->idx.fetch_sub(1) - 1) > -1) {
            (*(pool->task))(pool->ctx, i);
        }
        pool->sync_point.arrive_and_wait();
    }
}

ThreadPool::ThreadPool(size_t num_threads)
    : sync_point(num_threads, [&]() noexcept { this->start.clear(); }),
      threads(num_threads - 1) {
    for (uint32_t ii = 1; ii < num_threads; ++ii) {
        threads[ii - 1] = std::jthread(ThreadMain, this);
    }
}

void ThreadPool::QueueTask(ThreadPool::function* function, void* context,
                           int r) {
    task = function;
    ctx = context;
    idx.store(r);
    start.test_and_set();
    start.notify_all();
    int i{0};
    while ((i = idx.fetch_sub(1) - 1) > -1) {
        (*(function))(context, i);
    }
    sync_point.arrive_and_wait();
}

ThreadPool::~ThreadPool() {
    start.test_and_set();
    start.notify_all();
    for (auto& t : threads) {
        t.request_stop();
        t.join();
    }
}

void function(void* context, size_t idx) {
    return;
}
int main() {
    ThreadPool pool(2);
    for (size_t i = 0; i < 10; ++i) {
        pool.QueueTask(function, nullptr, 100);
    }
}

Пример можно скомпилировать с помощью ggc 13.2.0 с флагом c++20 следующим образом: g++ main.cpp -O0 -g --std=c++20.

Задачи в пул потоков добавляются с помощью функции QueueTask. Основной поток устанавливает информацию о работе и контексте для всех потоков и уведомляет их о новой работе. Все потоки (включая основной поток) разделяют работу, выбирая элементы аргументов задачи. Когда все аргументы разделены, они используют std::barrier для координации.

Когда я отлаживаю пул потоков с помощью GDB, я вижу, что программа зависает на деструкторе пула потоков. При использовании двух потоков основной поток ожидает присоединения к другому потоку, в то время как другой поток ожидает у барьера:

pool->sync_point.arrive_and_wait(); 

Ссылка на c++ говорит о функции arrive_and_wait:

атомарно уменьшает ожидаемый счетчик на 1, затем блокируется в точке синхронизации для текущей фазы до тех пор, пока не будет выполнен шаг завершения фазы текущей фазы. [курсив мой].

Примечательно, что cppreference не говорит, что потоки ждут, пока не будет запущена функция завершения. Я использую функцию завершения, чтобы снова установить флаг запуска в значение false. Без этого некоторые потоки продолжат выполнять работу, а затем снова будут ждать барьера.

Могу ли я как-то спасти приведенный выше пример, продолжая использовать std::barrier и внося лишь минимальные модификации?

Здесь отсутствует минимальный воспроизводимый пример, демонстрирующий тупик. Вы проверяли свою застрявшую программу с помощью отладчика? Где был заблокирован каждый поток? Анализировали ли вы, как потоки попали в эти строки, не соблюдая ваши предположения о синхронизации?

Stephen Newell 12.08.2024 21:19

Возможно, вашему деструктору потребуется вызвать std::barrier::arrive(threads.size())

AndyG 12.08.2024 21:30

«Тупик пула потоков», разве это не фильм?

john 12.08.2024 21:52

спасибо @StephenNewell. Я добавил MWG.

fabian 13.08.2024 12:51

@ЭндиГ. Я думаю, что это могло бы решить проблему деструктора, но тогда все равно останутся другие проблемы с общей реализацией.

fabian 13.08.2024 12:51

Вы инициализируете sync_point с помощью num_threads. Разве это не должно быть num_threads+1, ведь основной поток ждет на барьере так же, как и рабочие?

Nate Eldredge 13.08.2024 18:28

Текст cppreference, возможно, мог бы быть более понятным, но стандарт ясно дает понять, что функция завершения выполняется после того, как все ожидаемое количество потоков войдет arrive_and_wait, и до того, как какой-либо из них вернется. Сюда входит синхронизация в смысле «сильно случается раньше». Поэтому использование функции завершения для очистки флага start должно быть совершенно правильным.

Nate Eldredge 13.08.2024 18:34

Спасибо за уверенность, @NateEldredge. Я посмотрел это, и вы ссылаетесь на «Конец этапа завершения строго происходит до возврата всех вызовов, которые были разблокированы на этапе завершения». (Пункт 3, eel.is/c++draft/thread.barrier#class-3) ? Ваш комментарий по поводу количества потоков: я создаю 1 новый поток, поэтому я думаю, что ожидаемое значение барьера должно быть num_threads.

fabian 13.08.2024 20:50

Думаю, теперь я понимаю суть проблемы. Вероятно, мне следует вызвать request_stop в каждой теме, прежде чем разблокировать их, установив для start значение true. Завтра проверю на машине разработки. Спасибо за все комментарии.

fabian 13.08.2024 20:52
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
9
83
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

В деструкторе произошла ошибка с неправильным порядком событий. Деструктор следует записать так:

ThreadPool::~ThreadPool() {
    for (auto& t : threads) {
        t.request_stop();
    }
    start.test_and_set();
    start.notify_all();
}

В целом, я очень доволен как лаконичностью, так и производительностью std::barrier.

Другие вопросы по теме