Ниже у меня есть структура WriterLock. На самом деле это Reader-Writer, но в этом сценарии я вызываю только блокировку и разблокировку записи.
Мой модульный тест создает несколько потоков, они ждут, а затем запрашивают доступ для увеличения глобальной переменной.
ThreadSanitizer не сообщает об отсутствии условий гонки, если я использую compare_exchange_weak()
с последовательно согласованными барьерами памяти по умолчанию.
Однако, учитывая, что я использую x86, если я попытаюсь использовать приобретение-релиз для compare_exchange_weak()
, будет обнаружено состояние гонки.
Можно ли здесь использовать compare_exchange_weak()
с менее ограничительными барьерами?
#include <cstdint>
#include <atomic>
#include <limits>
#include <thread>
#include <vector>
struct WriterLock
{
void writer_lock()
{
while (true)
{
uint32_t prev_readers = _reader_count.load(std::memory_order_acquire);
if (prev_readers == 0)
{
// race condition
if (_reader_count.compare_exchange_weak(prev_readers, HAS_WRITER, std::memory_order_acquire, std::memory_order_release))
// works
//if (_reader_count.compare_exchange_weak(prev_readers, HAS_WRITER))
{
return;
}
}
}
}
void writer_unlock()
{
while (true)
{
uint32_t prev_readers = _reader_count.load(std::memory_order_acquire);
if (prev_readers == HAS_WRITER)
{
// race condition
if (_reader_count.compare_exchange_weak(prev_readers, 0, std::memory_order_acquire, std::memory_order_release))
// Works
//if (_reader_count.compare_exchange_weak(prev_readers, 0))
{
return;
}
}
}
}
private:
const uint32_t HAS_WRITER = std::numeric_limits<uint32_t>::max();
std::atomic<uint32_t> _reader_count{0};
};
uint64_t counter{0};
WriterLock lock;
static const uint64_t num_times = uint64_t(10'000'000);
std::atomic<bool> go{false};
void increment()
{
while(go == false)
{
}
for(uint64_t j = 0; j < num_times; ++j)
{
lock.writer_lock();
++counter;
lock.writer_unlock();
}
}
int main()
{
std::vector<std::thread> threads;
const size_t num_threads = 10;
for(uint64_t t = 0; t < num_threads; ++t)
{
threads.push_back(std::thread(increment));
}
for(uint64_t t = 0; t < num_threads; ++t)
{
threads[t].detach();
}
go = true;
}
@BrianBi Я имею в виду, что ThreadSanitizer предположительно сообщает, что глобальная переменная модульного теста записывается двумя потоками одновременно.
@DaveS, используя std::memory_order_acq_rel, std::memory_order_acquire
, похоже, работает. ThreadSanitizer не сообщает о каких-либо проблемах.
@NathanOliver Я не до конца понимаю «более сильную» часть. Я использую x86, поэтому знаю, что в целом могу использовать семантику приобретения-выпуска.
@intrigued_66: «Сильнее» связано с тем, что, например, seq_cst
включает в себя все гарантии acquire
или release
(для грузов и магазинов соответственно), так что в этом смысле он «сильнее». Таким образом, по силе мы имеем relaxed < (acquire, release) < acq_rel < seq_cst
. Любую операцию можно заменить на более сильную, ничего не нарушив, хотя, возможно, и за счет снижения эффективности.
Также имейте в виду: приобретение применимо только к загрузкам, а выпуск применим только к магазинам. Таким образом, acq_rel
применим только к операции, выполняющей загрузку и сохранение, то есть операции чтения-изменения-записи.
_reader_count.compare_exchange_weak(prev_readers, HAS_WRITER, std::memory_order_acquire, std::memory_order_release)
Это неопределенное поведение, поскольку, как указано для atomic_compare_exchange_weak(ожидаемый, желаемый, успех, неудача) :
Предварительные условия:
failure
— этоmemory_order::relaxed
,memory_order::consume
,memory_order::acquire
илиmemory_order::seq_cst
.
Указание release
обычно бессмысленно, поскольку в случае неудачи сравнение-обмен загружает текущее значение в _reader_count
, а release
не имеет смысла для загрузки.
Как вы отметили в комментариях, указание memory_order_acquire
в качестве последнего аргумента не приводит к гонке данных.
Это правда, но в других отношениях порядок ошибочен. Достаточно использовать relaxed
для упорядочивания ошибок в этом коде, но более серьезная ошибка и причина фактической гонки данных заключается в том, что успешный порядок в функции разблокировки равен acquire
, тогда как должен быть release
. Я говорю об этом в своем ответе.
Более слабые порядки работают нормально, если они выбраны правильно, но ваш выбор кажется довольно запутанным, и неясно, как вы к ним пришли.
Во-первых, обратите внимание на то, что на самом деле означают два аргумента порядка compare_exchange_weak
. Первый аргумент — это порядок «успеха», который применяется, если значение атомарной переменной равно ожидаемому значению, и в этом случае новое значение сохраняется. В данном случае мы выполнили настоящую операцию чтения-изменения-записи, поэтому в аргументе необходимо указать барьеры, применимые как к загрузке, так и к сохранению. Второй аргумент — это «ошибочный» порядок в случае, если атомарная переменная не равна ожидаемому значению и новое значение не сохраняется. В этом случае выполняется только загрузка, поэтому параметру необходимо указать только порядок загрузки.
Это показывает, почему release
не имеет смысла для неудачного заказа, поскольку «выпуск» применим только к магазину. Стандарт C++ позволяет библиотеке предполагать, что это бессмысленное значение никогда не будет передано, поэтому он объявляет, что оно вызывает неопределенное поведение.
Но давайте подумаем о том, чего мы на самом деле хотим.
Начальная загрузка, предшествующая compare_exchange_weak
, может быть relaxed
. Это просто оптимизация, позволяющая проверить, стоит ли вообще пытаться взломать блокировку; он не играет никакой роли в фактическом получении блокировки, и поэтому его не нужно упорядочивать каким-либо особым образом относительно критической секции.
Успешный заказ на compare_exchange_weak
должен быть не ниже acquire
. Загрузка значения 0 является доказательством того, что блокировка была доступна, и поэтому порядок получения гарантирует, что критическая секция не будет продолжена, пока у нас не будет этого доказательства. Тот факт, что compare_exchange
удалось, доказывает, что замок не только был доступен, но, более того, мы его забрали.
Использование memory_order_acquire
в качестве параметра успеха фактически означает, что сторона магазина при сравнении-обмене равна relaxed
; он не несет никаких особых ограничений по порядку. Это нормально; опять же, именно грузовая сторона предоставила доказательство того, что замок наш. Независимо от порядка памяти, compare_exchange_weak
по-прежнему является атомарным чтением-изменением-записью, поэтому даже если сохранение нового значения HAS_WRITER
несколько задерживается, мы все равно гарантируем, что никакой другой поток в это время не сможет сохранить его. Таким образом, memory_order_acq_rel
или memory_order_seq_cst
здесь также будут действительны, поскольку они строго сильнее, но в них нет необходимости.
Значение заказа неудачи может быть relaxed
. Если сравнение-обмен завершается неудачей, это просто говорит нам о том, что нам не удалось взять блокировку. В критический участок мы вообще не собираемся заходить, пока его не возьмем, поэтому никаких барьеров в данном случае не нужно. Все, чего мы на самом деле достигли, — это загрузить обновленное значение, и это может произойти relaxed
по тем же причинам, которые были указаны выше для начальной загрузки. (В коде в его нынешнем виде мы даже не используем обновленное значение, а вместо этого запускаем цикл заново и повторяем первоначальную загрузку; возможно, это можно было бы оптимизировать лучше, но я не буду вдаваться в подробности.)
Поэтому я бы написал:
void writer_lock()
{
while (true)
{
uint32_t prev_readers = _reader_count.load(std::memory_order_relaxed);
if (prev_readers == 0)
{
if (_reader_count.compare_exchange_weak(prev_readers, HAS_WRITER, std::memory_order_acquire, std::memory_order_relaxed))
{
return;
}
}
}
}
Обратите внимание, что в большинстве случаев, если вам не удалось взять замок, вам следует сделать что-то более вежливое, чем просто крутить. Поспите немного или отключите процессор каким-либо другим способом. (std::shared_mutex
, который вы здесь заново изобретаете, способен использовать специфичные для системы примитивы, чтобы засыпать и просыпаться, как только блокировка становится доступной.)
В функции разблокировки та же логика, что и раньше, говорит о том, что первоначальная загрузка может быть безопасно relaxed
. Однако здесь непонятно, зачем мы это вообще делаем. Мы можем вызвать writer_unlock()
только из потока, который уже имеет блокировку записи, поэтому значение _reader_count
уже должно быть HAS_WRITER
. Я полагаю, что вы могли бы проверить значение в основном как проверку утверждения или ошибки, но если это так, то если вы обнаружите, что оно не равно HAS_WRITER
, вам следует сделать что-нибудь полезное, чтобы сообщить об ошибке. В нынешнем виде, если это произойдет, код автоматически войдет в бесконечный цикл, что бесполезно.
Что касается сравнения-обмена, то здесь необходим порядок успеха release
. Мы должны убедиться, что критический раздел завершен и доступен глобально, прежде чем сохранять значение, указывающее на доступность блокировки. Нам не нужен какой-либо особый порядок на стороне загрузки операций чтения-изменения-записи, поскольку это будет служить лишь для того, чтобы некритичный код, следующий за разблокировкой, не запускался слишком рано. Но поскольку это не критично и не конфликтует с другими потоками, нас это не волнует.
В вашем коде здесь был acquire
, и это основная причина гонки данных, обнаруженной TSAN. По сути, нет ничего плохого в упорядочении получения на стороне загрузки (в этом просто нет необходимости), но проблема в том, что memory_order_acquire
при чтении-изменении-записи неявно задает (путем пропуска) расслабленное упорядочение на стороне хранилища. Это плохо, поскольку позволяет переупорядочить критическую секцию после сохранения до того момента, когда какой-то другой поток может получить блокировку и работать с общими данными. Изменение его на acq_rel
, как вы обсуждали в комментариях, исправляет эту ошибку и устраняет гонку данных, но все равно немного неэффективно, поскольку накладывает порядок получения при загрузке, который нам не нужен.
Кроме того, по той же логике, что и раньше, relaxed
достаточно для упорядочения ошибок.
На самом деле, поскольку, как уже упоминалось, мы уже знаем, что значение _reader_count
должно быть HAS_WRITERS
в этот момент, сравнение-обмен также не требуется; мы могли бы просто сделать безусловное сохранение. Таким образом, вся функция разблокировки может быть просто
void writer_unlock()
{
_reader_count.store(0, std::memory_order_release);
}
Однако, когда вы приступите к реализации функций блокировки и разблокировки считывателя, вам понадобится compare_exchange
в reader_unlock
, поскольку нам нужно знать старое значение _reader_count
, чтобы определить правильное новое значение (т. е. на одно меньше). Здесь снова правильный порядок будет release
в случае успеха и relaxed
в случае неудачи. (В качестве альтернативы, поскольку мы знаем, что если мы удерживаем блокировку чтения, значение _reader_count
должно быть положительным целым числом, а не HAS_WRITERS
, мы могли бы просто сделать _reader_count.fetch_sub(1, std::memory_order_release)
.)
Большое спасибо за этот ответ. На самом деле я понял, что часть разблокировки можно упростить, но внутренние тонкости функции блокировки были очень интересны.
Вы бы добавили флаг, чтобы остановить новое чтение, если писатель уже ждал/вращался?
@intrigued_66: Я думаю, это будет зависеть от вашего варианта использования. Есть ситуации, когда это было бы полезно, и есть ситуации, когда это не так.
C++20 .wait()
и .notify_one()
существуют для портативного предоставления тех же функций, которые std::mutex
использует для сна, если блокировка недоступна. Но нетривиально избежать вызова notify_one
(который обычно стоит системного вызова), если официантов нет; вам может потребоваться использовать больше битов переменной блокировки, чтобы отслеживать больше вещей и, возможно, разблокировать их с помощью атомарного обмена. Мне пришлось бы посмотреть, что делает glibc pthread_mutex, чтобы напомнить себе, как они это делают. (@intrigued_66)
Не уверен во всей вашей проблеме, но до C++17 использование
std::memory_order_release
в качестве аргумента неудачи было UB. Я думаю, вам нуженstd::memory_order_acqrel
для пути успеха иstd::memory_order_acquire
для пути неудач. Возможно, вам сойдет с рукrelaxed
за неудачу, но меня это не совсем устраивает, поэтому я не публикую полный ответ.