Я пытаюсь понять новые функции параллелизма С++ 20. Я использую std::barrier
в реализации пула потоков, но он блокируется. В чем может быть проблема со следующей реализацией пула потоков?
Основной поток блокируется при вызове деструктора пула потоков. У меня есть предположение, почему это могло быть так, но я не уверен и задаюсь вопросом, могу ли я немного переписать программу и при этом полагаться на std::barrier
Воспроизводимый пример выглядит следующим образом:
#include <stop_token>
#include <atomic>
#include <barrier>
#include <functional>
#include <stop_token>
#include <thread>
#include <vector>
class ThreadPool {
public:
friend void ThreadMain(std::stop_token, ThreadPool*);
using function = void(void* context, size_t idx);
explicit ThreadPool(size_t num_threads);
~ThreadPool();
void QueueTask(function*, void* context, int range);
private:
std::barrier<std::function<void()>> sync_point;
std::atomic_flag start{false};
std::atomic<int> idx{0};
function* task{nullptr};
void* ctx{nullptr};
std::vector<std::jthread> threads;
};
void ThreadMain(std::stop_token stoken, ThreadPool* pool) {
while (!stoken.stop_requested()) {
pool->start.wait(false);
if (stoken.stop_requested()) {
return;
}
int i{0};
while ((i = pool->idx.fetch_sub(1) - 1) > -1) {
(*(pool->task))(pool->ctx, i);
}
pool->sync_point.arrive_and_wait();
}
}
ThreadPool::ThreadPool(size_t num_threads)
: sync_point(num_threads, [&]() noexcept { this->start.clear(); }),
threads(num_threads - 1) {
for (uint32_t ii = 1; ii < num_threads; ++ii) {
threads[ii - 1] = std::jthread(ThreadMain, this);
}
}
void ThreadPool::QueueTask(ThreadPool::function* function, void* context,
int r) {
task = function;
ctx = context;
idx.store(r);
start.test_and_set();
start.notify_all();
int i{0};
while ((i = idx.fetch_sub(1) - 1) > -1) {
(*(function))(context, i);
}
sync_point.arrive_and_wait();
}
ThreadPool::~ThreadPool() {
start.test_and_set();
start.notify_all();
for (auto& t : threads) {
t.request_stop();
t.join();
}
}
void function(void* context, size_t idx) {
return;
}
int main() {
ThreadPool pool(2);
for (size_t i = 0; i < 10; ++i) {
pool.QueueTask(function, nullptr, 100);
}
}
Пример можно скомпилировать с помощью ggc 13.2.0 с флагом c++20 следующим образом: g++ main.cpp -O0 -g --std=c++20
.
Задачи в пул потоков добавляются с помощью функции QueueTask
. Основной поток устанавливает информацию о работе и контексте для всех потоков и уведомляет их о новой работе. Все потоки (включая основной поток) разделяют работу, выбирая элементы аргументов задачи. Когда все аргументы разделены, они используют std::barrier
для координации.
Когда я отлаживаю пул потоков с помощью GDB, я вижу, что программа зависает на деструкторе пула потоков. При использовании двух потоков основной поток ожидает присоединения к другому потоку, в то время как другой поток ожидает у барьера:
pool->sync_point.arrive_and_wait();
Ссылка на c++ говорит о функции arrive_and_wait
:
атомарно уменьшает ожидаемый счетчик на 1, затем блокируется в точке синхронизации для текущей фазы до тех пор, пока не будет выполнен шаг завершения фазы текущей фазы. [курсив мой].
Примечательно, что cppreference не говорит, что потоки ждут, пока не будет запущена функция завершения. Я использую функцию завершения, чтобы снова установить флаг запуска в значение false. Без этого некоторые потоки продолжат выполнять работу, а затем снова будут ждать барьера.
Могу ли я как-то спасти приведенный выше пример, продолжая использовать std::barrier
и внося лишь минимальные модификации?
Возможно, вашему деструктору потребуется вызвать std::barrier::arrive(threads.size())
«Тупик пула потоков», разве это не фильм?
спасибо @StephenNewell. Я добавил MWG.
@ЭндиГ. Я думаю, что это могло бы решить проблему деструктора, но тогда все равно останутся другие проблемы с общей реализацией.
Вы инициализируете sync_point
с помощью num_threads
. Разве это не должно быть num_threads+1
, ведь основной поток ждет на барьере так же, как и рабочие?
Текст cppreference, возможно, мог бы быть более понятным, но стандарт ясно дает понять, что функция завершения выполняется после того, как все ожидаемое количество потоков войдет arrive_and_wait
, и до того, как какой-либо из них вернется. Сюда входит синхронизация в смысле «сильно случается раньше». Поэтому использование функции завершения для очистки флага start
должно быть совершенно правильным.
Спасибо за уверенность, @NateEldredge. Я посмотрел это, и вы ссылаетесь на «Конец этапа завершения строго происходит до возврата всех вызовов, которые были разблокированы на этапе завершения». (Пункт 3, eel.is/c++draft/thread.barrier#class-3) ? Ваш комментарий по поводу количества потоков: я создаю 1 новый поток, поэтому я думаю, что ожидаемое значение барьера должно быть num_threads.
Думаю, теперь я понимаю суть проблемы. Вероятно, мне следует вызвать request_stop
в каждой теме, прежде чем разблокировать их, установив для start
значение true. Завтра проверю на машине разработки. Спасибо за все комментарии.
В деструкторе произошла ошибка с неправильным порядком событий. Деструктор следует записать так:
ThreadPool::~ThreadPool() {
for (auto& t : threads) {
t.request_stop();
}
start.test_and_set();
start.notify_all();
}
В целом, я очень доволен как лаконичностью, так и производительностью std::barrier
.
Здесь отсутствует минимальный воспроизводимый пример, демонстрирующий тупик. Вы проверяли свою застрявшую программу с помощью отладчика? Где был заблокирован каждый поток? Анализировали ли вы, как потоки попали в эти строки, не соблюдая ваши предположения о синхронизации?