Std :: atomic vs static переменная для синхронизации потоков

У меня есть несколько потоков, которые работают с несколькими элементами данных. Потоки должны выдавать результаты в том же порядке, в котором я передаю данные потокам. Это:

Thread #1: give data - start processing
Thread #2: give data - start processing
Thread #3: give data - start processing
...
Thread #n: give data - start processing

Результаты должны быть получены в том же порядке, в котором данные были переданы потокам, независимо от того, какой поток завершил обработку первым. А именно:

Thread #1: put data 
Thread #2: put data
...

Чтобы различать потоки и управлять ими, я присвоил каждому из них идентификатор (0,1,2,...,n). Я использую идентификаторы для присвоения данных каждому потоку, чтобы он мог их обработать.

for(int i=0; i<thread_count; i++)
    give_data(i); // i is id and the function knows where to get data from

Я хочу, чтобы потоки разделяли токен, который определяет, какой поток должен дать результат. Все тела ниток идентичны, тело выглядит так:

while(true){
    auto data = get_data();
    result = process_data(data);
    while(token != this_id) spin;
    put_data(result); // this is a synchronized call 
    update_token(token);
}

Моя проблема связана с token. Сначала я попробовал обычный эталон (int & token), и он, очевидно, не может работать (и я этого не ожидал). В любом случае, я использовал статическую переменную, и потоки не всегда получают самую последнюю. Я был удивлен, увидев, что одна тема доминирует над всем. Каждый раз, когда поток обновляет токен, он теряет свою очередь, позволяя другому потоку поместить свой результат и так далее. Однако у меня был один поток, который доминирует, как если бы токен всегда был установлен на свой собственный идентификатор и не обновлялся.

Если бы мне пришлось угадывать, я бы сказал, что это проблема кеширования. Однако я не уверен.

В любом случае, я подумываю использовать std::atomic<int> в качестве своего токена. Это сработает? Если нет, что еще мне следует сделать? Как лучше синхронизировать эти потоки?

Дополнительно: это похоже на плохой дизайн, и я не знаю, как это сделать лучше. Любые предложения будут очень признательны.

Примечание: если вы разделяете переменную между потоками и хотя бы один из них изменяет ее, вам нужна синхронизация. В противном случае у вас будет гонка за данными и неопределенное поведение.

NathanOliver 24.04.2018 18:10

@NathanOliver Это мой вопрос ...

Everyone 24.04.2018 18:11
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
2
2
506
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Anyway, I used a static variable and the threads do not always get the latest one. I was surprised to see one thread dominating everything

Да, несколько потоков, обращающихся к одному и тому же несинхронизированному значению, и хотя бы один из них записывает в него, является гонка данных, что является неопределенным поведением в соответствии со стандартом C++. Может произойти все, что угодно.

I am thinking of using std::atomic as my token. Would it work?

Да. Это предотвратит любую гонку данных по токену. Я не вижу другой прямой проблемы в вашем псевдокоде, так что с этой точки зрения это выглядит хорошо.

this feels like a bad design and I am not sure how to do it better. Any suggestions would be very much appreciated.

Весь дизайн выглядит несколько странно, но это зависит от вашей библиотеки потоков, если есть более простой способ выразить это. Например, с OpenMP вы можете сделать это за один проход (логика give_data и get_data слишком неясна, чтобы сделать это полным):

#pragma omp parallel
{
    int threadCount = omp_get_num_threads();
#pragma omp single
    for (int i = 0; i < threadCount; ++i)
        give_data(i);

#pragma omp ordered for ordered schedule(static)
    for (int i = 0; i < threadCount; ++i)
    {
        auto data = get_data();
        result = process_data(data);
#pragma omp ordered
        put_data(result); // this is a synchronized call 
    }
}

Директива ordered заставляет вызовы put_data выполняться точно в том же порядке (один за другим), как если бы цикл был последовательным, в то время как потоки все еще могут выполнять предшествующую обработку данных параллельно.

На самом деле с OpenMP все могло бы быть еще проще, если бы все, что вы хотели сделать В самом деле, - это сделать один большой цикл обработки данных параллельно с упорядоченной записью:

#pragma omp parallel for ordered schedule(static)
for (int i = 0; i < dataItemCount; ++i)
{
    auto data = get_data(i); // whatever this would entail
    auto result = process_data(data);
#pragma omp ordered
    put_data(result); // this is a synchronized call    
}

Не похоже, что вам нужно, чтобы распределение элементов данных было в порядке, но если вы действительно это сделаете, этот подход не будет работать так легко, потому что у вас может быть только один упорядоченный раздел на упорядоченный цикл.

Спасибо. OpenMP отлично звучит. Я впервые слышу об этом. Так или иначе, я решил сделать что-то подобное, используя std::promise и std::future.

Everyone 24.04.2018 18:38
Ответ принят как подходящий

Ответ Макса отличный. Если бы у меня была возможность использовать OpenMP, учитывая время, я бы это сделал. Однако это не так, поэтому я отправляю этот ответ на свой вопрос.

В моем предыдущем дизайне это зависело от синхронизации потоков друг с другом, и это не кажется лучшей идеей, так как многое может пойти не так. Вместо этого я решил позволить менеджеру синхронизировать их результаты (я почерпнул эту идею из последнего фрагмента кода Макса).

void give_threads_data(){
    vector<pair<data, promise<result>*> promises(threads.size());
    vector<future<result>> futures(threads.size());
    for(int i=0; i<threads.size(); i++){
        data d = get_data();
        threads[i].put_data(d, promises[i]);
        futures[i] = promises[i].get_future();
    }

    for(int i=0; i<futures.size(); i++){
        result = futures[i].get();
        // handle result
    }
}

Таким образом, я смог получить результаты так же, как отправляю их в потоки. Тело резьбы стало намного чище:

void thread_body(){
    while(true){
        pair<data, promise<result>*> item = queue.get(); // blocking call
        data d = item.first;
        promise<result>* promise = item.second;

        result r = process_data(d);
        promise->set_value(r);
    }
}

Нет люфта, и результаты отличные. В следующий раз, когда я буду использовать потоки, я рассмотрю OpenMP.

Другие вопросы по теме