Проблема потокобезопасности реализации пула потоков в листинге 9.8 книги C++ Concurrency in Action

Я читаю C++ Concurrency в действии, 2-е изд. И автор показал реализацию пула потоков, который использует кражу работы следующим образом:

// Listing 9.7 Lock-based queue for work stealing

class work_stealing_queue
{
private:
    typedef function_wrapper data_type;
    std::deque<data_type> the_queue; 
    mutable std::mutex the_mutex;
public:
    work_stealing_queue()
    {}
    work_stealing_queue(const work_stealing_queue& other)=delete;
    work_stealing_queue& operator=(const work_stealing_queue& other)=delete;
    void push(data_type data) 
    {
        std::lock_guard<std::mutex> lock(the_mutex);
        the_queue.push_front(std::move(data));
    }
    bool empty() const
    {
        std::lock_guard<std::mutex> lock(the_mutex);
        return the_queue.empty();
    }
    bool try_pop(data_type& res) 
    {
        std::lock_guard<std::mutex> lock(the_mutex);
        if (the_queue.empty())
        {
            return false;
        }
        res=std::move(the_queue.front());
        the_queue.pop_front();
        return true;
    }
    bool try_steal(data_type& res) 
    {
        std::lock_guard<std::mutex> lock(the_mutex);
        if (the_queue.empty())
        {
            return false;
        }
        res=std::move(the_queue.back());
        the_queue.pop_back();
        return true;
    }
};
// Listing 9.8 A thread pool that uses work stealing

class thread_pool
{
    typedef function_wrapper task_type;
    std::atomic_bool done;
    threadsafe_queue<task_type> pool_work_queue;
    std::vector<std::unique_ptr<work_stealing_queue>> queues; 
    std::vector<std::thread> threads;
    join_threads joiner;
    static thread_local work_stealing_queue* local_work_queue; 
    static thread_local unsigned my_index;
    void worker_thread(unsigned my_index_)
    {
        my_index=my_index_;
        local_work_queue=queues[my_index].get(); 
        while(!done)
        {
            run_pending_task();
        }
    }
    bool pop_task_from_local_queue(task_type& task)
    {
        return local_work_queue && local_work_queue->try_pop(task);
    }
    bool pop_task_from_pool_queue(task_type& task)
    {
        return pool_work_queue.try_pop(task);
    }
    bool pop_task_from_other_thread_queue(task_type& task) 
    {
        for(unsigned i=0;i<queues.size();++i)
        {
            unsigned const index=(my_index+i+1)%queues.size(); 
            if (queues[index]->try_steal(task))
            {
                return true;
            }
        }
        return false;
    }
public:
    thread_pool():
        done(false),joiner(threads)
    {
        unsigned const thread_count=std::thread::hardware_concurrency();
        try
        {
            for(unsigned i=0;i<thread_count;++i)
            {
                queues.push_back(std::unique_ptr<work_stealing_queue>( 
                new work_stealing_queue));
            }
            for(unsigned i=0;i<thread_count;++i)
            {
                threads.push_back(std::thread(&thread_pool::worker_thread,this,i));
            }
        }
        catch(...)
        {
            done=true;
            throw;
        }
    }
    ~thread_pool()
    {
        done=true;
    }
    template<typename FunctionType>
    std::future<typename std::result_of<FunctionType()>::type> submit(
    FunctionType f)
    {
        typedef typename std::result_of<FunctionType()>::type result_type;
        std::packaged_task<result_type()> task(f);
        std::future<result_type> res(task.get_future());
        if (local_work_queue)
        {
            local_work_queue->push(std::move(task));
        }
        else
        {
            pool_work_queue.push(std::move(task));
        }
        return res;
    }
    void run_pending_task()
    {
        task_type task;
        if (pop_task_from_local_queue(task) || 
            pop_task_from_pool_queue(task) || 
            pop_task_from_other_thread_queue(task)) 
        {
            task();
        }
        else
        {
            std::this_thread::yield();
        }
    }
};

Вот и мой вопрос: в конструкторе thread_pool он сначала построил все work_stealing_queue, а затем все рабочие потоки. И когда выполнение рабочего потока достигает run_pending_task, он попытается получить доступ к переменной-члену thread_pool::queues. Возможно ли, что из-за переупорядочения построение элементов в thread_pool::queues все еще не завершено до доступа из рабочего потока? Если нет, то как обеспечивается порядок? Я не смог найти никакой синхронизации между этими событиями.

Не могли бы вы объяснить проблему безопасности потоков, описанную выше?

Спасибо за ваше чтение!

Почему может возникнуть проблема? Я только бегло рассмотрел конструктор, но он определенно создает все очереди до создания потоков...

Christian Stieber 01.06.2024 10:46

Я думаю, что технически вы правы, перед созданием потоков должен был быть барьер памяти. На самом деле создание потоков — очень запутанная вещь, поэтому любые сохраненные записи будут сброшены в память к тому времени, когда новый поток начнет свое выполнение.

Andrey Turkin 01.06.2024 11:27

@ChristianStieber Я думаю, это похоже на проблему в undefined_behaviour_with_double_checked_locking() в главе 3. Компилятор может изменить порядок инструкций. он может заранее вставить новый unique_ptr в queues, пока строительство еще не закончено.

pan64271 01.06.2024 11:54

@AndreyTurkin Это гарантируется std::thread?

pan64271 01.06.2024 11:55

Любой вызов функций синхронизации на объектах синхронизации создает барьер инструкций. Создание/инициализация мьютексов и потоков, соединение, блокировка и разблокировка предотвращают смешивание инструкций до и инструкций после них.

Red.Wave 01.06.2024 14:25
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
5
62
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Завершение вызова конструктора std::thread синхронизируется с началом вызова функции потока. (см. [thread.thread.constr]/6)

Поэтому гарантировано, что worker_thread увидит состояние queues после цикла, в котором были созданы его элементы, и нет гонки данных.

Ой, я не знала этой детали std::thread. Спасибо, что поделились!

pan64271 01.06.2024 12:21

Ты живешь и учишься. Оглядываясь назад, это имеет смысл, иначе функция потока вообще не могла бы безопасно работать, потому что все, к чему она обращается, было бы гонкой данных.

Andrey Turkin 01.06.2024 12:42

Другие вопросы по теме