У меня есть вопрос по дизайну пула потоков. Рассмотрим следующий код:
int main() {
auto po = std::execution::par_unseq;
// If Parallel STL uses TBB as the backend, I think the thread pool
// is created during the first time for_each() being called.
std::for_each(po, begin, end, unaryFun);
// Worker threads are put to sleep.
// ... Other things are done by the main thread.
std::for_each(po, begin, end, unaryFun); // Threads wake up and run again.
// ... Other things are done by the main thread.
} // Right before going out of scope, how does the thread pool know to destruct
// itself ?
У TBB однажды возникла проблема с утечкой памяти C++: утечка памяти в tbb. Всякий раз, когда я компилировал свою программу с дезинфицирующими средствами, мне приходилось ставить export ASAN_OPTIONS=alloc_dealloc_mismatch=0, чтобы избежать сбоя. Я всегда думал, что проблема с утечкой связана именно с тем, что пул потоков не удаляется и выходит за пределы области действия.
Однако в новой версии oneTBB этой проблемы больше нет. Как они это решили? Я не думаю, что ответ настолько глуп, как то, что пул потоков создается и разрушается внутри каждого вызова for_each(). Как пул потоков узнает, что нужно уничтожить себя, выйдя за пределы области действия? Я хочу применить такой дизайн к некоторым другим структурам.
«Как пул потоков узнает, что нужно уничтожить себя, выйдя за пределы области действия?» - Надеюсь, что нет. Он должен создаваться при запуске программы (или при первом использовании) и уничтожаться при завершении работы программы.
Re: «Параллельный STL использует TBB в качестве бэкэнда» — возможно, но это не обязательно, а в некоторых реализациях это не так.
«Я всегда думал, что проблема с утечкой связана именно с тем, что пул потоков не удаляется и выходит за пределы области действия». Это не следует из вопроса, на который вы ссылаетесь. Согласно этому вопросу (без ответа), «все потоки tbb завершились» и «утечка памяти на самом деле является строкой». То есть это просто имена потоков, которые, как сообщается, были утекли, а не весь пул потоков.
@TedLyngmo Спасибо, я тоже надеюсь, но как TBB этого достигает? Это благодаря какой-то магии компилятора? Я хочу узнать о дизайне
Никакого волшебства, это можно сделать вручную, а иногда это называется система задач .
@JakkapongRattananen, систему задач, о которой вы упомянули, все еще необходимо создать, а затем автоматически уничтожить при выходе за пределы области действия.
«Как пул потоков узнает, что нужно уничтожить себя, выйдя за пределы области действия?» Это звучит очень знакомо: оно называется RAII и должно быть реализовано посредством применения правила 350. Вам также необходимо узнать о классах хранения в C и C++. Затем изучите различные реализации одноэлементного шаблона Скотта Мейерса.





Вам не нужно уничтожать пул потоков в конце области видимости, в любой операционной системе вы можете выполнять код при выгрузке библиотеки, а в C++ компилятор должен вызывать деструкторы всех статических объектов при выгрузке библиотеки. , вам просто нужно сделать пул потоков одноэлементным.
Вот наивная реализация такого дизайна, работающая демо godbolt
#include <thread>
#include <vector>
#include <queue>
#include <future>
#include <optional>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <atomic>
class TaskQueue
{
public:
void push(std::optional<std::packaged_task<void()>> task)
{
{
std::unique_lock lk{m_mutex};
m_queue.push(std::move(task));
}
m_cv.notify_one();
}
std::optional<std::packaged_task<void()>> pop()
{
std::optional<std::packaged_task<void()>> task;
{
std::unique_lock lk(m_mutex);
m_cv.wait(lk, [this](){ return !this->m_queue.empty();});
task = std::move(m_queue.front());
m_queue.pop();
}
return task;
}
private:
std::queue<std::optional<std::packaged_task<void()>>> m_queue;
std::mutex m_mutex;
std::condition_variable m_cv;
};
class ThreadPool
{
public:
static ThreadPool& Instance()
{
static ThreadPool pool(std::thread::hardware_concurrency());
return pool;
}
template<typename Func>
std::future<void> push_task(Func&& f)
{
std::packaged_task<void()> task{
[func = std::move(f)] { func(); }
};
auto fut = task.get_future();
m_queue.push(std::move(task));
return fut;
}
private:
ThreadPool(int thread_count)
: m_thread_count{thread_count}
{
Initialize();
}
void worker_task()
{
while (m_running)
{
auto task = m_queue.pop();
if (task)
{
(*task)();
}
else
{
break;
}
}
}
void Initialize()
{
m_running = true;
for (int i = 0; i < m_thread_count; i++)
{
m_workers.push_back(std::thread{[this]{this->worker_task();}});
}
}
~ThreadPool()
{
m_running = false;
for (int i = 0; i < m_thread_count; i++)
{
m_queue.push(std::nullopt);
}
for (auto&& worker: m_workers)
{
if (worker.joinable())
{
worker.join();
}
}
// maybe set an exception on every item left in queue
}
TaskQueue m_queue;
std::vector<std::thread> m_workers;
std::atomic<bool> m_running = false;
int m_thread_count;
};
template<typename RndIter, typename Func>
void foreach_par(RndIter begin, RndIter end, Func&& func)
{
std::vector<std::future<void>> futures;
futures.reserve(std::distance(begin,end));
auto&& threadpool = ThreadPool::Instance();
while (begin != end)
{
futures.push_back(threadpool.push_task([begin = begin, &func]{ func(*begin);}));
begin++;
}
for (auto&& future: futures)
{
future.get();
}
}
int main()
{
std::vector<int> vals{1,2,3,4,5};
foreach_par(vals.begin(), vals.end(), [](int& value) { value *= 2; });
for (auto&& value: vals)
{
std::cout << value << ' ';
}
}
Обычно библиотеки выполняют дополнительную оптимизацию, например
Эта реализация не оптимизирована, но она потокобезопасна и не имеет утечек, но вы все равно можете столкнуться с такими проблемами, как один поток, использующий пул потоков после его уничтожения, а также потоки, которые завершаются без известной причины, необходимо возобновить или добавить таймауты.
Если вы разрабатываете свой собственный пул потоков, я бы рекомендовал вам сделать его не одноэлементным, а вместо этого передавать ссылки на него или использовать для него указатель ресурсов и просто создать его в своем main, чтобы обойти неконтролируемый порядок построения/уничтожения статических объектов. , приведенный выше код может сломаться, если кто-то попытается использовать пул потоков во время создания или уничтожения другого статического объекта, у синглтонов есть масса недостатков.
Спасибо за такой подробный ответ. Не могу поверить, что после всех этих лет я забыл, что локальные статические переменные уничтожаются при завершении программы.
Распределитель памяти и пул потоков — это совершенно разные вещи. Что вам действительно нужно создать? Тот или другой, или оба? Если вы хотите сделать и то, и другое, то спрашивайте только о проблемах с одним из них. Возможно, вам также понадобится время, чтобы обновить справочный центр , пройти экскурсию по SO и прочитать Как задать вопрос . Также, пожалуйста, прочитайте о как написать «идеальный» вопрос , особенно о его контрольном списке.