У меня есть несколько потоков, которые работают с несколькими элементами данных. Потоки должны выдавать результаты в том же порядке, в котором я передаю данные потокам. Это:
Thread #1: give data - start processing
Thread #2: give data - start processing
Thread #3: give data - start processing
...
Thread #n: give data - start processing
Результаты должны быть получены в том же порядке, в котором данные были переданы потокам, независимо от того, какой поток завершил обработку первым. А именно:
Thread #1: put data
Thread #2: put data
...
Чтобы различать потоки и управлять ими, я присвоил каждому из них идентификатор (0,1,2,...,n). Я использую идентификаторы для присвоения данных каждому потоку, чтобы он мог их обработать.
for(int i=0; i<thread_count; i++)
give_data(i); // i is id and the function knows where to get data from
Я хочу, чтобы потоки разделяли токен, который определяет, какой поток должен дать результат. Все тела ниток идентичны, тело выглядит так:
while(true){
auto data = get_data();
result = process_data(data);
while(token != this_id) spin;
put_data(result); // this is a synchronized call
update_token(token);
}
Моя проблема связана с token. Сначала я попробовал обычный эталон (int & token), и он, очевидно, не может работать (и я этого не ожидал). В любом случае, я использовал статическую переменную, и потоки не всегда получают самую последнюю. Я был удивлен, увидев, что одна тема доминирует над всем. Каждый раз, когда поток обновляет токен, он теряет свою очередь, позволяя другому потоку поместить свой результат и так далее. Однако у меня был один поток, который доминирует, как если бы токен всегда был установлен на свой собственный идентификатор и не обновлялся.
Если бы мне пришлось угадывать, я бы сказал, что это проблема кеширования. Однако я не уверен.
В любом случае, я подумываю использовать std::atomic<int> в качестве своего токена. Это сработает? Если нет, что еще мне следует сделать? Как лучше синхронизировать эти потоки?
Дополнительно: это похоже на плохой дизайн, и я не знаю, как это сделать лучше. Любые предложения будут очень признательны.
@NathanOliver Это мой вопрос ...





Anyway, I used a static variable and the threads do not always get the latest one. I was surprised to see one thread dominating everything
Да, несколько потоков, обращающихся к одному и тому же несинхронизированному значению, и хотя бы один из них записывает в него, является гонка данных, что является неопределенным поведением в соответствии со стандартом C++. Может произойти все, что угодно.
I am thinking of using std::atomic as my token. Would it work?
Да. Это предотвратит любую гонку данных по токену. Я не вижу другой прямой проблемы в вашем псевдокоде, так что с этой точки зрения это выглядит хорошо.
this feels like a bad design and I am not sure how to do it better. Any suggestions would be very much appreciated.
Весь дизайн выглядит несколько странно, но это зависит от вашей библиотеки потоков, если есть более простой способ выразить это. Например, с OpenMP вы можете сделать это за один проход (логика give_data и get_data слишком неясна, чтобы сделать это полным):
#pragma omp parallel
{
int threadCount = omp_get_num_threads();
#pragma omp single
for (int i = 0; i < threadCount; ++i)
give_data(i);
#pragma omp ordered for ordered schedule(static)
for (int i = 0; i < threadCount; ++i)
{
auto data = get_data();
result = process_data(data);
#pragma omp ordered
put_data(result); // this is a synchronized call
}
}
Директива ordered заставляет вызовы put_data выполняться точно в том же порядке (один за другим), как если бы цикл был последовательным, в то время как потоки все еще могут выполнять предшествующую обработку данных параллельно.
На самом деле с OpenMP все могло бы быть еще проще, если бы все, что вы хотели сделать В самом деле, - это сделать один большой цикл обработки данных параллельно с упорядоченной записью:
#pragma omp parallel for ordered schedule(static)
for (int i = 0; i < dataItemCount; ++i)
{
auto data = get_data(i); // whatever this would entail
auto result = process_data(data);
#pragma omp ordered
put_data(result); // this is a synchronized call
}
Не похоже, что вам нужно, чтобы распределение элементов данных было в порядке, но если вы действительно это сделаете, этот подход не будет работать так легко, потому что у вас может быть только один упорядоченный раздел на упорядоченный цикл.
Спасибо. OpenMP отлично звучит. Я впервые слышу об этом. Так или иначе, я решил сделать что-то подобное, используя std::promise и std::future.
Ответ Макса отличный. Если бы у меня была возможность использовать OpenMP, учитывая время, я бы это сделал. Однако это не так, поэтому я отправляю этот ответ на свой вопрос.
В моем предыдущем дизайне это зависело от синхронизации потоков друг с другом, и это не кажется лучшей идеей, так как многое может пойти не так. Вместо этого я решил позволить менеджеру синхронизировать их результаты (я почерпнул эту идею из последнего фрагмента кода Макса).
void give_threads_data(){
vector<pair<data, promise<result>*> promises(threads.size());
vector<future<result>> futures(threads.size());
for(int i=0; i<threads.size(); i++){
data d = get_data();
threads[i].put_data(d, promises[i]);
futures[i] = promises[i].get_future();
}
for(int i=0; i<futures.size(); i++){
result = futures[i].get();
// handle result
}
}
Таким образом, я смог получить результаты так же, как отправляю их в потоки. Тело резьбы стало намного чище:
void thread_body(){
while(true){
pair<data, promise<result>*> item = queue.get(); // blocking call
data d = item.first;
promise<result>* promise = item.second;
result r = process_data(d);
promise->set_value(r);
}
}
Нет люфта, и результаты отличные. В следующий раз, когда я буду использовать потоки, я рассмотрю OpenMP.
Примечание: если вы разделяете переменную между потоками и хотя бы один из них изменяет ее, вам нужна синхронизация. В противном случае у вас будет гонка за данными и неопределенное поведение.