Я читаю C++ Concurrency в действии, 2-е изд. И автор показал реализацию пула потоков, который использует кражу работы следующим образом:
// Listing 9.7 Lock-based queue for work stealing
class work_stealing_queue
{
private:
typedef function_wrapper data_type;
std::deque<data_type> the_queue;
mutable std::mutex the_mutex;
public:
work_stealing_queue()
{}
work_stealing_queue(const work_stealing_queue& other)=delete;
work_stealing_queue& operator=(const work_stealing_queue& other)=delete;
void push(data_type data)
{
std::lock_guard<std::mutex> lock(the_mutex);
the_queue.push_front(std::move(data));
}
bool empty() const
{
std::lock_guard<std::mutex> lock(the_mutex);
return the_queue.empty();
}
bool try_pop(data_type& res)
{
std::lock_guard<std::mutex> lock(the_mutex);
if (the_queue.empty())
{
return false;
}
res=std::move(the_queue.front());
the_queue.pop_front();
return true;
}
bool try_steal(data_type& res)
{
std::lock_guard<std::mutex> lock(the_mutex);
if (the_queue.empty())
{
return false;
}
res=std::move(the_queue.back());
the_queue.pop_back();
return true;
}
};
// Listing 9.8 A thread pool that uses work stealing
class thread_pool
{
typedef function_wrapper task_type;
std::atomic_bool done;
threadsafe_queue<task_type> pool_work_queue;
std::vector<std::unique_ptr<work_stealing_queue>> queues;
std::vector<std::thread> threads;
join_threads joiner;
static thread_local work_stealing_queue* local_work_queue;
static thread_local unsigned my_index;
void worker_thread(unsigned my_index_)
{
my_index=my_index_;
local_work_queue=queues[my_index].get();
while(!done)
{
run_pending_task();
}
}
bool pop_task_from_local_queue(task_type& task)
{
return local_work_queue && local_work_queue->try_pop(task);
}
bool pop_task_from_pool_queue(task_type& task)
{
return pool_work_queue.try_pop(task);
}
bool pop_task_from_other_thread_queue(task_type& task)
{
for(unsigned i=0;i<queues.size();++i)
{
unsigned const index=(my_index+i+1)%queues.size();
if (queues[index]->try_steal(task))
{
return true;
}
}
return false;
}
public:
thread_pool():
done(false),joiner(threads)
{
unsigned const thread_count=std::thread::hardware_concurrency();
try
{
for(unsigned i=0;i<thread_count;++i)
{
queues.push_back(std::unique_ptr<work_stealing_queue>(
new work_stealing_queue));
}
for(unsigned i=0;i<thread_count;++i)
{
threads.push_back(std::thread(&thread_pool::worker_thread,this,i));
}
}
catch(...)
{
done=true;
throw;
}
}
~thread_pool()
{
done=true;
}
template<typename FunctionType>
std::future<typename std::result_of<FunctionType()>::type> submit(
FunctionType f)
{
typedef typename std::result_of<FunctionType()>::type result_type;
std::packaged_task<result_type()> task(f);
std::future<result_type> res(task.get_future());
if (local_work_queue)
{
local_work_queue->push(std::move(task));
}
else
{
pool_work_queue.push(std::move(task));
}
return res;
}
void run_pending_task()
{
task_type task;
if (pop_task_from_local_queue(task) ||
pop_task_from_pool_queue(task) ||
pop_task_from_other_thread_queue(task))
{
task();
}
else
{
std::this_thread::yield();
}
}
};
Вот и мой вопрос: в конструкторе thread_pool он сначала построил все work_stealing_queue, а затем все рабочие потоки. И когда выполнение рабочего потока достигает run_pending_task, он попытается получить доступ к переменной-члену thread_pool::queues. Возможно ли, что из-за переупорядочения построение элементов в thread_pool::queues все еще не завершено до доступа из рабочего потока? Если нет, то как обеспечивается порядок? Я не смог найти никакой синхронизации между этими событиями.
Не могли бы вы объяснить проблему безопасности потоков, описанную выше?
Спасибо за ваше чтение!
Я думаю, что технически вы правы, перед созданием потоков должен был быть барьер памяти. На самом деле создание потоков — очень запутанная вещь, поэтому любые сохраненные записи будут сброшены в память к тому времени, когда новый поток начнет свое выполнение.
@ChristianStieber Я думаю, это похоже на проблему в undefined_behaviour_with_double_checked_locking() в главе 3. Компилятор может изменить порядок инструкций. он может заранее вставить новый unique_ptr в queues, пока строительство еще не закончено.
@AndreyTurkin Это гарантируется std::thread?
Любой вызов функций синхронизации на объектах синхронизации создает барьер инструкций. Создание/инициализация мьютексов и потоков, соединение, блокировка и разблокировка предотвращают смешивание инструкций до и инструкций после них.





Завершение вызова конструктора std::thread синхронизируется с началом вызова функции потока. (см. [thread.thread.constr]/6)
Поэтому гарантировано, что worker_thread увидит состояние queues после цикла, в котором были созданы его элементы, и нет гонки данных.
Ой, я не знала этой детали std::thread. Спасибо, что поделились!
Ты живешь и учишься. Оглядываясь назад, это имеет смысл, иначе функция потока вообще не могла бы безопасно работать, потому что все, к чему она обращается, было бы гонкой данных.
Почему может возникнуть проблема? Я только бегло рассмотрел конструктор, но он определенно создает все очереди до создания потоков...