Я изучаю параллелизм на С++ и пытаюсь реализовать многопоточную систему регистрации обратных вызовов. Я придумал следующий код, который должен принимать запросы на регистрацию, пока не произойдет событие. После этого он должен выполнить все зарегистрированные обратные вызовы в том порядке, в котором они были зарегистрированы. Порядок регистрации не обязательно должен быть детерминированным. Код не работает должным образом. Во-первых, он редко печатает сообщение «Отправка обратного вызова с идентификатором». Во-вторых, он иногда зависает (я полагаю, тупик, вызванный состоянием гонки). Буду признателен за помощь в выяснении того, что здесь происходит. Если вы видите, что я слишком усложняю некоторые части кода или неправильно использую некоторые части, также укажите на это.
#include <condition_variable>
#include <functional>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
class CallbackRegistrar{
public:
void registerCallbackAndExecute(std::function<void()> callback) {
if (!eventTriggered) {
std::unique_lock<std::mutex> lock(callbackMutex);
auto saved_id = callback_id;
std::cout << "Pushing callback with id " << saved_id << std::endl;
registeredCallbacks.push(std::make_pair(callback_id, callback));
++callback_id;
callbackCond.wait(lock, [this, saved_id]{return releasedCallback.first == saved_id;});
releasedCallback.second();
callbackExecuted = true;
eventCond.notify_one();
}
else {
callback();
}
}
void registerEvent() {
eventTriggered = true;
while (!registeredCallbacks.empty()) {
releasedCallback = registeredCallbacks.front();
callbackCond.notify_all();
std::unique_lock<std::mutex> lock(eventMutex);
eventCond.wait(lock, [this]{return callbackExecuted;});
callbackExecuted = false;
registeredCallbacks.pop();
}
}
private:
std::queue<std::pair<unsigned, std::function<void()>>> registeredCallbacks;
bool eventTriggered{false};
bool callbackExecuted{false};
std::mutex callbackMutex;
std::mutex eventMutex;
std::condition_variable callbackCond;
std::condition_variable eventCond;
unsigned callback_id{1};
std::pair<unsigned, std::function<void()>> releasedCallback;
};
int main()
{
CallbackRegistrar registrar;
std::thread t1(&CallbackRegistrar::registerCallbackAndExecute, std::ref(registrar), []{std::cout << "First!\n";});
std::thread t2(&CallbackRegistrar::registerCallbackAndExecute, std::ref(registrar), []{std::cout << "Second!\n";});
registrar.registerEvent();
t1.join();
t2.join();
return 0;
}
@TedLyngmo, спасибо, это, вероятно, одна из проблем, но когда я перемещаю блокировку перед оператором if, я все равно получаю тот же результат.
Я особо не копался в этом, это было просто первое наблюдение, которое я сделал. :)
Задокументируйте, какие данные совместно используются потоками и какой мьютекс используется для управления доступом к ним. Кроме того, для условной переменной укажите, с каким мьютексом она будет использоваться. По моему опыту, понимание этих вещей является фундаментальным.
Я изо всех сил пытаюсь понять многоцелевую функцию registerCallbackAndExecute
. Почему он выполняет обратный вызов? Также while (!registeredCallbacks.empty())
в registerEvent()
тоже не синхронизированы, так что там другая гонка.
И очередь, к которой обращаются несколько потоков выполнения, также доступна без блокировки мьютекса. Все это нужно переписывать с нуля.
@TedLyngmo Из того, что я вижу, цель функции состоит в том, чтобы выполнить ее сразу при определенных условиях, а при других условиях поставить в очередь обратный вызов, который будет вызван позже. Части дизайна выглядят так, как будто целью является своего рода асинхронная рабочая очередь, которая сразу же выполняет задания, если ничего не ожидает, но, похоже, она связана с каким-то внешним событием... Я действительно не уверен
Fwiw, вы используете g++ или clang++? Если да, то вы можете скомпилировать с помощью -g -fsanitize=thread
. Он должен найти некоторые проблемы в программе и, надеюсь, указать прямо на них.
Спасибо за все комментарии. Намерение состояло в том, чтобы написать механизм регистрации обратного вызова. Прежде чем произойдет определенное одноразовое событие, обратные вызовы помещаются в очередь. Затем, как только событие произойдет, все обратные вызовы должны быть выполнены в том порядке, в котором они были помещены в очередь, и тем же потоком, который их зарегистрировал. Все дальнейшие вызовы registerCallbackAndExecute (после того, как событие произошло) должны выполняться сразу же без дополнительной регистрации
Я использовал onlinegdb.com, я не уверен, какой компилятор он использует. Это действительно только для целей обучения. Я не подумал о дезинфицирующих средствах, это хороший совет, я посмотрю, принесет ли это больше понимания.
Я изучаю параллелизм на C++ и пытаюсь реализовать многопоточную систему регистрации обратных вызовов. Многопоточное программирование на C++ — непростая задача. Вот почему у программистов МП часто зарплата намного выше, чем у программиста, работающего над однопоточными приложениями. Просто на уровне «Я изучаю» означает, что будет гораздо больше вещей, которые удивят вас, когда дело дойдет до программирования на МП, и вы действительно не узнаете об этих вещах, пока не наберетесь большого опыта.
@user6646922 user6646922 Я не понимал, что вы хотите, чтобы обратные вызовы вызывались в потоке, в котором они были зарегистрированы. Код, который я предоставил ранее, этого не сделает, я обновил свой ответ чем-то более близким к тому, что вы хотите.
Этот ответ был отредактирован в ответ на дополнительную информацию, предоставленную OP в комментарии, редактирование находится внизу ответа.
Наряду с отличными предложениями в комментариях, основная проблема, которую я обнаружил в вашем коде, связана с настроенным вами условием ожидания переменной условия callbackCond
. Что произойдет, если releasedCallback.first
не равно savedId
?
Когда я запустил ваш код (с потокобезопасной очередью и eventTriggered
как атомарный), я обнаружил, что проблема была в этой функции ожидания, если вы поместите оператор печати в эту функцию, вы обнаружите, что вы получите что-то вроде этого:
releasedCallback.first: 0, savedId: 1
Затем это ждет вечно.
Фактически, я обнаружил, что условные переменные, используемые в вашем коде, на самом деле не нужны. Вам нужен только один, и он может жить внутри потокобезопасной очереди, которую вы собираетесь построить после некоторого поиска;)
После того, как у вас есть потокобезопасная очередь, приведенный выше код можно сократить до:
class CallbackRegistrar{
public:
using NumberedCallback = std::pair<unsigned int, std::function<void()>>;
void postCallback(std::function<void()> callback) {
if (!eventTriggered)
{
std::unique_lock<std::mutex> lock(mutex);
auto saved_id = callback_id;
std::cout << "Pushing callback with id " << saved_id << std::endl;
registeredCallbacks.push(std::make_pair(callback_id, callback));
++callback_id;
}
else
{
while (!registeredCallbacks.empty())
{
NumberedCallback releasedCallback;
registeredCallbacks.waitAndPop(releasedCallback);
releasedCallback.second();
}
callback();
}
}
void registerEvent() {
eventTriggered = true;
}
private:
ThreadSafeQueue<NumberedCallback> registeredCallbacks;
std::atomic<bool> eventTriggered{false};
std::mutex mutex;
unsigned int callback_id{1};
};
int main()
{
CallbackRegistrar registrar;
std::vector<std::thread> threads;
for (int i = 0; i < 10; i++)
{
threads.push_back(std::thread(&CallbackRegistrar::postCallback,
std::ref(registrar),
[i]{std::cout << std::to_string(i) <<"\n";}
));
}
registrar.registerEvent();
for (auto& thread : threads)
{
thread.join();
}
return 0;
}
Я не уверен, что это именно то, что вы хотите, но это не тупик. В любом случае это хорошая отправная точка, но вам нужно использовать собственную реализацию ThreadSafeQueue.
Это редактирование является ответом на комментарий OP, в котором говорится, что «после того, как событие произойдет, все обратные вызовы должны выполняться в [том] порядке, в котором они были помещены в очередь, и тем же потоком, который их зарегистрировал».
Это не было упомянуто в исходном сообщении с вопросом. Однако, если это требуемое поведение, нам нужно, чтобы условная переменная ждала в методе postCallback
. Я думаю, что это также причина, по которой у OP была условная переменная в методе postCallback
в первую очередь.
В приведенном ниже коде я внес несколько изменений в обратные вызовы, теперь они принимают входные параметры. Я сделал это, чтобы вывести некоторую полезную информацию во время выполнения кода, чтобы было легче увидеть, как он работает, и, что важно, как работает условная переменная ожидания.
Основная идея аналогична тому, что вы сделали, я просто вырезал то, что вам не нужно.
class CallbackRegistrar{
public:
using NumberedCallback = std::pair<unsigned int, std::function<void(int, int)>>;
void postCallback(std::function<void(int, int)> callback, int threadId) {
if (!m_eventTriggered)
{
// Lock the m_mutex
std::unique_lock<std::mutex> lock(m_mutex);
// Save the current callback ID and push the callback to the queue
auto savedId = m_currentCallbackId++;
std::cout << "Pushing callback with ID " << savedId << "\n";
m_registeredCallbacks.push(std::make_pair(savedId, callback));
// Wait until our thread's callback is next in the queue,
// this will occur when the ID of the last called callback is one less than our saved callback.
m_conditionVariable.wait(lock, [this, savedId, threadId] () -> bool
{
std::cout << "Waiting on thread " << threadId << " last: " << m_lastCalledCallbackId << ", saved - 1: " << (savedId - 1) << "\n";
return (m_lastCalledCallbackId == (savedId - 1));
});
// Once we are finished waiting, get the callback out of the queue
NumberedCallback retrievedCallback;
m_registeredCallbacks.waitAndPop(retrievedCallback);
// Update last callback ID and call the callback
m_lastCalledCallbackId = retrievedCallback.first;
retrievedCallback.second(m_lastCalledCallbackId, threadId);
// Notify one waiting thread
m_conditionVariable.notify_one();
}
else
{
// If the event is already triggered, call the callback straight away
callback(-1, threadId);
}
}
void registerEvent() {
// This is all we have to do here.
m_eventTriggered = true;
}
private:
ThreadSafeQueue<NumberedCallback> m_registeredCallbacks;
std::atomic<bool> m_eventTriggered{ false};
std::mutex m_mutex;
std::condition_variable m_conditionVariable;
unsigned int m_currentCallbackId{ 1};
std::atomic<unsigned int> m_lastCalledCallbackId{ 0};
};
Основная функция такая же, как указано выше, за исключением того, что я создаю 100 потоков вместо 10, и я заставил обратный вызов распечатать информацию о том, как он был вызван.
for (int createdThreadId = 0; createdThreadId < 100; createdThreadId++)
{
threads.push_back(std::thread(&CallbackRegistrar::postCallback,
std::ref(registrar),
[createdThreadId](int registeredCallbackId, int callingThreadId)
{
if (registeredCallbackId < 0)
{
std::cout << "Callback " << createdThreadId;
std::cout << " called immediately, from thread: " << callingThreadId << "\n";
}
else
{
std::cout << "Callback " << createdThreadId;
std::cout << " called from thread " << callingThreadId;
std::cout << " after being registered as " << registeredCallbackId << "\n";
}
},
createdThreadId));
}
Я не совсем уверен, почему вы хотите это сделать, так как это, кажется, лишает смысла иметь несколько потоков, хотя я могу что-то упустить. Но, тем не менее, я надеюсь, что это поможет вам лучше понять проблему, которую вы пытаетесь решить.
Большое спасибо за ответ :) Перед написанием этого кода я искал потокобезопасность контейнеров stl и подумал, что std::queue будет достаточно для моего случая использования. Я тогда еще поищу. Я все еще довольно смущен, почему код не надежно печатает часть «Отправка обратного вызова с идентификатором», когда в конкретном прогоне нет взаимоблокировки.
Не уверен, есть некоторые накладные расходы при создании потоков. Итак, если вы делаете только два, возможно, registerEvent
вызывается до того, как потоки начнут выполняться. Я должен был упомянуть, что я увеличил количество потоков до 10, чтобы проверить это.
@ user6646922 -- std::queue
не является потокобезопасным, как и большинство, если не все контейнеры STL. Посмотрите на это с другой стороны: почему программист на C++ должен платить за безопасность потоков, если он пишет однопоточные приложения? Это противоречит философии C++.
Поэкспериментировав еще немного с этим кодом, я понял, почему часть "Pushing callback with id" печаталась редко. Это потому, что вызов registrar.registerEvent
из основного потока обычно был быстрее, чем вызов registerCallbackAndExecute
из отдельных потоков. Из-за этого условие if (!eventTriggered)
почти никогда не выполнялось (eventTriggered
было установлено в true в методе registerEvent
) и, следовательно, все вызовы registerCallbackAndExecute
попадали в ветку else и выполнялись сразу.
Затем программа иногда также не завершалась из-за состояния гонки между registerEvent
и registerCallbackAndExecute
. Иногда registerEvent
вызывался после проверки if (!eventTriggered)
, но перед помещением обратного вызова в очередь. Затем registerEvent
завершился мгновенно (поскольку очередь была пуста), пока поток, вызывающий registerCallbackAndExecute
, отправлял обратный вызов в очередь. Последний поток продолжал вечно ждать, пока не произойдет событие (которое уже произошло).
if (!eventTriggered)
не синхронизируется. Если вы не хотите блокировать мьютекс перед проверкойeventTriggered
, сделайтеeventTriggered
атомарным.