Проектирование пула потоков

У меня есть вопрос по дизайну пула потоков. Рассмотрим следующий код:

int main() {
  auto po = std::execution::par_unseq;
  
  // If Parallel STL uses TBB as the backend, I think the thread pool
  //   is created during the first time for_each() being called.
  std::for_each(po, begin, end, unaryFun);  
  // Worker threads are put to sleep.
  
  // ... Other things are done by the main thread.
  
  std::for_each(po, begin, end, unaryFun); // Threads wake up and run again.
  
  // ... Other things are done by the main thread.
  
} // Right before going out of scope, how does the thread pool know to destruct
//     itself ? 

У TBB однажды возникла проблема с утечкой памяти C++: утечка памяти в tbb. Всякий раз, когда я компилировал свою программу с дезинфицирующими средствами, мне приходилось ставить export ASAN_OPTIONS=alloc_dealloc_mismatch=0, чтобы избежать сбоя. Я всегда думал, что проблема с утечкой связана именно с тем, что пул потоков не удаляется и выходит за пределы области действия.

Однако в новой версии oneTBB этой проблемы больше нет. Как они это решили? Я не думаю, что ответ настолько глуп, как то, что пул потоков создается и разрушается внутри каждого вызова for_each(). Как пул потоков узнает, что нужно уничтожить себя, выйдя за пределы области действия? Я хочу применить такой дизайн к некоторым другим структурам.

Распределитель памяти и пул потоков — это совершенно разные вещи. Что вам действительно нужно создать? Тот или другой, или оба? Если вы хотите сделать и то, и другое, то спрашивайте только о проблемах с одним из них. Возможно, вам также понадобится время, чтобы обновить справочный центр , пройти экскурсию по SO и прочитать Как задать вопрос . Также, пожалуйста, прочитайте о как написать «идеальный» вопрос , особенно о его контрольном списке.

Some programmer dude 08.07.2024 23:19

«Как пул потоков узнает, что нужно уничтожить себя, выйдя за пределы области действия?» - Надеюсь, что нет. Он должен создаваться при запуске программы (или при первом использовании) и уничтожаться при завершении работы программы.

Ted Lyngmo 08.07.2024 23:46

Re: «Параллельный STL использует TBB в качестве бэкэнда» — возможно, но это не обязательно, а в некоторых реализациях это не так.

Pete Becker 08.07.2024 23:48

«Я всегда думал, что проблема с утечкой связана именно с тем, что пул потоков не удаляется и выходит за пределы области действия». Это не следует из вопроса, на который вы ссылаетесь. Согласно этому вопросу (без ответа), «все потоки tbb завершились» и «утечка памяти на самом деле является строкой». То есть это просто имена потоков, которые, как сообщается, были утекли, а не весь пул потоков.

JaMiT 08.07.2024 23:53

@TedLyngmo Спасибо, я тоже надеюсь, но как TBB этого достигает? Это благодаря какой-то магии компилятора? Я хочу узнать о дизайне

user2961927 09.07.2024 06:07

Никакого волшебства, это можно сделать вручную, а иногда это называется система задач .

Jakkapong Rattananen 09.07.2024 06:20

@JakkapongRattananen, систему задач, о которой вы упомянули, все еще необходимо создать, а затем автоматически уничтожить при выходе за пределы области действия.

user2961927 09.07.2024 06:49

«Как пул потоков узнает, что нужно уничтожить себя, выйдя за пределы области действия?» Это звучит очень знакомо: оно называется RAII и должно быть реализовано посредством применения правила 350. Вам также необходимо узнать о классах хранения в C и C++. Затем изучите различные реализации одноэлементного шаблона Скотта Мейерса.

Red.Wave 09.07.2024 14:20
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
8
156
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вам не нужно уничтожать пул потоков в конце области видимости, в любой операционной системе вы можете выполнять код при выгрузке библиотеки, а в C++ компилятор должен вызывать деструкторы всех статических объектов при выгрузке библиотеки. , вам просто нужно сделать пул потоков одноэлементным.

Вот наивная реализация такого дизайна, работающая демо godbolt

#include <thread>
#include <vector>
#include <queue>
#include <future>
#include <optional>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <atomic>

class TaskQueue
{
    public:
    void push(std::optional<std::packaged_task<void()>> task)
    {
        {
            std::unique_lock lk{m_mutex};
            m_queue.push(std::move(task));
        }
        m_cv.notify_one();
    }
    std::optional<std::packaged_task<void()>> pop()
    {
        std::optional<std::packaged_task<void()>> task;
        {
            std::unique_lock lk(m_mutex);
            m_cv.wait(lk, [this](){ return !this->m_queue.empty();});
            task = std::move(m_queue.front());
            m_queue.pop();
        }
        return task;
    }
    private:
    std::queue<std::optional<std::packaged_task<void()>>> m_queue;
    std::mutex m_mutex;
    std::condition_variable m_cv;

};

class ThreadPool
{
    public:
    static ThreadPool& Instance()
    {
        static ThreadPool pool(std::thread::hardware_concurrency());
        return pool;
    }
    
    template<typename Func>
    std::future<void> push_task(Func&& f)
    {
        std::packaged_task<void()> task{
            [func = std::move(f)] { func(); }
        };
        auto fut = task.get_future();
        m_queue.push(std::move(task));
        return fut;
    }

    private:
    ThreadPool(int thread_count)
    : m_thread_count{thread_count}
    {
        Initialize();
    }
    void worker_task()
    {
        while (m_running)
        {
            auto task = m_queue.pop();
            if (task)
            {
                (*task)();
            }
            else
            {
                break;
            }
        }
    }
    void Initialize()
    {
        m_running = true;
        for (int i = 0; i < m_thread_count; i++)
        {
            m_workers.push_back(std::thread{[this]{this->worker_task();}});
        }
    }

    ~ThreadPool()
    {
        m_running = false;
        for (int i = 0; i < m_thread_count; i++)
        {
            m_queue.push(std::nullopt);
        }
        for (auto&& worker: m_workers)
        {
            if (worker.joinable())
            {
                worker.join();
            }
        }
    // maybe set an exception on every item left in queue
    }
    TaskQueue m_queue;
    std::vector<std::thread> m_workers;
    std::atomic<bool> m_running = false;
    int m_thread_count;
};

template<typename RndIter, typename Func>
void foreach_par(RndIter begin, RndIter end, Func&& func)
{
    std::vector<std::future<void>> futures;
    futures.reserve(std::distance(begin,end));
    auto&& threadpool = ThreadPool::Instance();
    while (begin != end)
    {
        futures.push_back(threadpool.push_task([begin = begin, &func]{ func(*begin);}));
        begin++;
    }
    for (auto&& future: futures)
    {
        future.get();
    }
}

int main()
{
    std::vector<int> vals{1,2,3,4,5};
    foreach_par(vals.begin(), vals.end(), [](int& value) { value *= 2; });

    for (auto&& value: vals)
    {
        std::cout << value << ' ';
    }
}

Обычно библиотеки выполняют дополнительную оптимизацию, например

  • уменьшение выделения кучи
  • уменьшение блокировок за счет использования безблокировочных структур
  • разбивка задач
  • очередь на каждого работника и кража работы

Эта реализация не оптимизирована, но она потокобезопасна и не имеет утечек, но вы все равно можете столкнуться с такими проблемами, как один поток, использующий пул потоков после его уничтожения, а также потоки, которые завершаются без известной причины, необходимо возобновить или добавить таймауты.

Если вы разрабатываете свой собственный пул потоков, я бы рекомендовал вам сделать его не одноэлементным, а вместо этого передавать ссылки на него или использовать для него указатель ресурсов и просто создать его в своем main, чтобы обойти неконтролируемый порядок построения/уничтожения статических объектов. , приведенный выше код может сломаться, если кто-то попытается использовать пул потоков во время создания или уничтожения другого статического объекта, у синглтонов есть масса недостатков.

Спасибо за такой подробный ответ. Не могу поверить, что после всех этих лет я забыл, что локальные статические переменные уничтожаются при завершении программы.

user2961927 09.07.2024 16:51

Другие вопросы по теме