Неожиданный вывод многопоточной программы C++

Я изучаю параллелизм на С++ и пытаюсь реализовать многопоточную систему регистрации обратных вызовов. Я придумал следующий код, который должен принимать запросы на регистрацию, пока не произойдет событие. После этого он должен выполнить все зарегистрированные обратные вызовы в том порядке, в котором они были зарегистрированы. Порядок регистрации не обязательно должен быть детерминированным. Код не работает должным образом. Во-первых, он редко печатает сообщение «Отправка обратного вызова с идентификатором». Во-вторых, он иногда зависает (я полагаю, тупик, вызванный состоянием гонки). Буду признателен за помощь в выяснении того, что здесь происходит. Если вы видите, что я слишком усложняю некоторые части кода или неправильно использую некоторые части, также укажите на это.

#include <condition_variable>
#include <functional>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>

class CallbackRegistrar{
public:
    void registerCallbackAndExecute(std::function<void()> callback) {
        if (!eventTriggered) {
            std::unique_lock<std::mutex> lock(callbackMutex);
            auto saved_id = callback_id;
            std::cout << "Pushing callback with id " << saved_id << std::endl;
            registeredCallbacks.push(std::make_pair(callback_id, callback));
            ++callback_id;
            callbackCond.wait(lock, [this, saved_id]{return releasedCallback.first == saved_id;});
            releasedCallback.second();
            callbackExecuted = true;
            eventCond.notify_one();
        }
        else {
            callback();
        }
    }
    void registerEvent() {
        eventTriggered = true;
        while (!registeredCallbacks.empty()) {
            releasedCallback = registeredCallbacks.front();
            callbackCond.notify_all();
            std::unique_lock<std::mutex> lock(eventMutex);
            eventCond.wait(lock, [this]{return callbackExecuted;});
            callbackExecuted = false;
            registeredCallbacks.pop();
        }
    }
private:
    std::queue<std::pair<unsigned, std::function<void()>>> registeredCallbacks;
    bool eventTriggered{false};
    bool callbackExecuted{false};
    std::mutex callbackMutex;
    std::mutex eventMutex;
    std::condition_variable callbackCond;
    std::condition_variable eventCond;
    unsigned callback_id{1};
    std::pair<unsigned, std::function<void()>> releasedCallback;
};

int main()
{
    CallbackRegistrar registrar;
    std::thread t1(&CallbackRegistrar::registerCallbackAndExecute, std::ref(registrar), []{std::cout << "First!\n";});
    std::thread t2(&CallbackRegistrar::registerCallbackAndExecute, std::ref(registrar), []{std::cout << "Second!\n";});
    
    registrar.registerEvent();
    
    t1.join();
    t2.join();

    return 0;
}
if (!eventTriggered) не синхронизируется. Если вы не хотите блокировать мьютекс перед проверкой eventTriggered, сделайте eventTriggered атомарным.
Ted Lyngmo 13.02.2023 18:48

@TedLyngmo, спасибо, это, вероятно, одна из проблем, но когда я перемещаю блокировку перед оператором if, я все равно получаю тот же результат.

user6646922 13.02.2023 18:49

Я особо не копался в этом, это было просто первое наблюдение, которое я сделал. :)

Ted Lyngmo 13.02.2023 18:51

Задокументируйте, какие данные совместно используются потоками и какой мьютекс используется для управления доступом к ним. Кроме того, для условной переменной укажите, с каким мьютексом она будет использоваться. По моему опыту, понимание этих вещей является фундаментальным.

Ulrich Eckhardt 13.02.2023 19:06

Я изо всех сил пытаюсь понять многоцелевую функцию registerCallbackAndExecute. Почему он выполняет обратный вызов? Также while (!registeredCallbacks.empty()) в registerEvent() тоже не синхронизированы, так что там другая гонка.

Ted Lyngmo 13.02.2023 19:07

И очередь, к которой обращаются несколько потоков выполнения, также доступна без блокировки мьютекса. Все это нужно переписывать с нуля.

Sam Varshavchik 13.02.2023 19:11

@TedLyngmo Из того, что я вижу, цель функции состоит в том, чтобы выполнить ее сразу при определенных условиях, а при других условиях поставить в очередь обратный вызов, который будет вызван позже. Части дизайна выглядят так, как будто целью является своего рода асинхронная рабочая очередь, которая сразу же выполняет задания, если ничего не ожидает, но, похоже, она связана с каким-то внешним событием... Я действительно не уверен

44stonelions 13.02.2023 19:21

Fwiw, вы используете g++ или clang++? Если да, то вы можете скомпилировать с помощью -g -fsanitize=thread. Он должен найти некоторые проблемы в программе и, надеюсь, указать прямо на них.

Ted Lyngmo 13.02.2023 19:23

Спасибо за все комментарии. Намерение состояло в том, чтобы написать механизм регистрации обратного вызова. Прежде чем произойдет определенное одноразовое событие, обратные вызовы помещаются в очередь. Затем, как только событие произойдет, все обратные вызовы должны быть выполнены в том порядке, в котором они были помещены в очередь, и тем же потоком, который их зарегистрировал. Все дальнейшие вызовы registerCallbackAndExecute (после того, как событие произошло) должны выполняться сразу же без дополнительной регистрации

user6646922 13.02.2023 21:23

Я использовал onlinegdb.com, я не уверен, какой компилятор он использует. Это действительно только для целей обучения. Я не подумал о дезинфицирующих средствах, это хороший совет, я посмотрю, принесет ли это больше понимания.

user6646922 13.02.2023 21:25

Я изучаю параллелизм на C++ и пытаюсь реализовать многопоточную систему регистрации обратных вызовов. Многопоточное программирование на C++ — непростая задача. Вот почему у программистов МП часто зарплата намного выше, чем у программиста, работающего над однопоточными приложениями. Просто на уровне «Я изучаю» означает, что будет гораздо больше вещей, которые удивят вас, когда дело дойдет до программирования на МП, и вы действительно не узнаете об этих вещах, пока не наберетесь большого опыта.

PaulMcKenzie 13.02.2023 21:39

@user6646922 user6646922 Я не понимал, что вы хотите, чтобы обратные вызовы вызывались в потоке, в котором они были зарегистрированы. Код, который я предоставил ранее, этого не сделает, я обновил свой ответ чем-то более близким к тому, что вы хотите.

44stonelions 14.02.2023 20:34
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
12
118
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Этот ответ был отредактирован в ответ на дополнительную информацию, предоставленную OP в комментарии, редактирование находится внизу ответа.

Наряду с отличными предложениями в комментариях, основная проблема, которую я обнаружил в вашем коде, связана с настроенным вами условием ожидания переменной условия callbackCond. Что произойдет, если releasedCallback.first не равно savedId?

Когда я запустил ваш код (с потокобезопасной очередью и eventTriggered как атомарный), я обнаружил, что проблема была в этой функции ожидания, если вы поместите оператор печати в эту функцию, вы обнаружите, что вы получите что-то вроде этого:

releasedCallback.first: 0, savedId: 1

Затем это ждет вечно.

Фактически, я обнаружил, что условные переменные, используемые в вашем коде, на самом деле не нужны. Вам нужен только один, и он может жить внутри потокобезопасной очереди, которую вы собираетесь построить после некоторого поиска;)

После того, как у вас есть потокобезопасная очередь, приведенный выше код можно сократить до:

class CallbackRegistrar{
public:
  using NumberedCallback = std::pair<unsigned int, std::function<void()>>;

  void postCallback(std::function<void()> callback) {

    if (!eventTriggered)
    {
      std::unique_lock<std::mutex> lock(mutex);
      auto saved_id = callback_id;
      std::cout << "Pushing callback with id " << saved_id << std::endl;
      registeredCallbacks.push(std::make_pair(callback_id, callback));
      ++callback_id;
    }
    else
    {
      while (!registeredCallbacks.empty())
      {
        NumberedCallback releasedCallback;
        registeredCallbacks.waitAndPop(releasedCallback);
        releasedCallback.second();
      }
      callback();
    }
  }
  void registerEvent() {
    eventTriggered = true;
  }
private:
  ThreadSafeQueue<NumberedCallback> registeredCallbacks;
  std::atomic<bool> eventTriggered{false};
  std::mutex mutex;
  unsigned int callback_id{1};
};

int main()
{
  CallbackRegistrar registrar;
  std::vector<std::thread> threads;

  for (int i = 0; i < 10; i++)
  {
    threads.push_back(std::thread(&CallbackRegistrar::postCallback, 
                                  std::ref(registrar), 
                                  [i]{std::cout << std::to_string(i) <<"\n";}
                                  ));
  }

  registrar.registerEvent();

  for (auto& thread : threads)
  {
    thread.join();
  }

  return 0;
}

Я не уверен, что это именно то, что вы хотите, но это не тупик. В любом случае это хорошая отправная точка, но вам нужно использовать собственную реализацию ThreadSafeQueue.

Редактировать

Это редактирование является ответом на комментарий OP, в котором говорится, что «после того, как событие произойдет, все обратные вызовы должны выполняться в [том] порядке, в котором они были помещены в очередь, и тем же потоком, который их зарегистрировал».

Это не было упомянуто в исходном сообщении с вопросом. Однако, если это требуемое поведение, нам нужно, чтобы условная переменная ждала в методе postCallback. Я думаю, что это также причина, по которой у OP была условная переменная в методе postCallback в первую очередь.

В приведенном ниже коде я внес несколько изменений в обратные вызовы, теперь они принимают входные параметры. Я сделал это, чтобы вывести некоторую полезную информацию во время выполнения кода, чтобы было легче увидеть, как он работает, и, что важно, как работает условная переменная ожидания.

Основная идея аналогична тому, что вы сделали, я просто вырезал то, что вам не нужно.

class CallbackRegistrar{
public:
  using NumberedCallback = std::pair<unsigned int, std::function<void(int, int)>>;

  void postCallback(std::function<void(int, int)> callback, int threadId) {

    if (!m_eventTriggered)
    {
      // Lock the m_mutex
      std::unique_lock<std::mutex> lock(m_mutex);

      // Save the current callback ID and push the callback to the queue
      auto savedId = m_currentCallbackId++;
      std::cout << "Pushing callback with ID " << savedId << "\n";
      m_registeredCallbacks.push(std::make_pair(savedId, callback));

      // Wait until our thread's callback is next in the queue,
      // this will occur when the ID of the last called callback is one less than our saved callback.
      m_conditionVariable.wait(lock, [this, savedId, threadId] () -> bool
      {
        std::cout << "Waiting on thread " << threadId << " last: " << m_lastCalledCallbackId << ", saved - 1: " << (savedId - 1) << "\n";
        return (m_lastCalledCallbackId == (savedId - 1));
      });

      // Once we are finished waiting, get the callback out of the queue
      NumberedCallback retrievedCallback;
      m_registeredCallbacks.waitAndPop(retrievedCallback);

      // Update last callback ID and call the callback
      m_lastCalledCallbackId = retrievedCallback.first;
      retrievedCallback.second(m_lastCalledCallbackId, threadId);

      // Notify one waiting thread
      m_conditionVariable.notify_one();
    }
    else
    {
      // If the event is already triggered, call the callback straight away
      callback(-1, threadId);
    }
  }

  void registerEvent() {
    // This is all we have to do here.
    m_eventTriggered = true;
  }

private:
  ThreadSafeQueue<NumberedCallback> m_registeredCallbacks;
  std::atomic<bool> m_eventTriggered{ false};
  std::mutex m_mutex;
  std::condition_variable m_conditionVariable;
  unsigned int m_currentCallbackId{ 1};
  std::atomic<unsigned int> m_lastCalledCallbackId{ 0};
};

Основная функция такая же, как указано выше, за исключением того, что я создаю 100 потоков вместо 10, и я заставил обратный вызов распечатать информацию о том, как он был вызван.

for (int createdThreadId = 0; createdThreadId < 100; createdThreadId++)
{
  threads.push_back(std::thread(&CallbackRegistrar::postCallback,
                                std::ref(registrar),
                                [createdThreadId](int registeredCallbackId, int callingThreadId)
                                {
                                  if (registeredCallbackId < 0)
                                  {
                                    std::cout << "Callback " << createdThreadId;
                                    std::cout << " called immediately, from thread: " << callingThreadId << "\n";
                                  }
                                  else
                                  {
                                    std::cout << "Callback " << createdThreadId;
                                    std::cout << " called from thread " << callingThreadId;
                                    std::cout << " after being registered as " << registeredCallbackId << "\n";
                                  }
                                },
                                createdThreadId));
}

Я не совсем уверен, почему вы хотите это сделать, так как это, кажется, лишает смысла иметь несколько потоков, хотя я могу что-то упустить. Но, тем не менее, я надеюсь, что это поможет вам лучше понять проблему, которую вы пытаетесь решить.

Большое спасибо за ответ :) Перед написанием этого кода я искал потокобезопасность контейнеров stl и подумал, что std::queue будет достаточно для моего случая использования. Я тогда еще поищу. Я все еще довольно смущен, почему код не надежно печатает часть «Отправка обратного вызова с идентификатором», когда в конкретном прогоне нет взаимоблокировки.

user6646922 13.02.2023 21:20

Не уверен, есть некоторые накладные расходы при создании потоков. Итак, если вы делаете только два, возможно, registerEvent вызывается до того, как потоки начнут выполняться. Я должен был упомянуть, что я увеличил количество потоков до 10, чтобы проверить это.

44stonelions 13.02.2023 21:24

@ user6646922 -- std::queue не является потокобезопасным, как и большинство, если не все контейнеры STL. Посмотрите на это с другой стороны: почему программист на C++ должен платить за безопасность потоков, если он пишет однопоточные приложения? Это противоречит философии C++.

PaulMcKenzie 13.02.2023 21:37

Поэкспериментировав еще немного с этим кодом, я понял, почему часть "Pushing callback with id" печаталась редко. Это потому, что вызов registrar.registerEvent из основного потока обычно был быстрее, чем вызов registerCallbackAndExecute из отдельных потоков. Из-за этого условие if (!eventTriggered) почти никогда не выполнялось (eventTriggered было установлено в true в методе registerEvent) и, следовательно, все вызовы registerCallbackAndExecute попадали в ветку else и выполнялись сразу. Затем программа иногда также не завершалась из-за состояния гонки между registerEvent и registerCallbackAndExecute. Иногда registerEvent вызывался после проверки if (!eventTriggered), но перед помещением обратного вызова в очередь. Затем registerEvent завершился мгновенно (поскольку очередь была пуста), пока поток, вызывающий registerCallbackAndExecute, отправлял обратный вызов в очередь. Последний поток продолжал вечно ждать, пока не произойдет событие (которое уже произошло).

Другие вопросы по теме