Я написал класс для совместного использования ограниченного количества ресурсов (например, сетевых интерфейсов) между большим количеством потоков. Ресурсы объединяются в пул, и, если они не используются, они одалживаются запрашивающему потоку, который в противном случае ожидает condition_variable
.
Ничего особо экзотичного: если не считать причудливого scoped_lock
, который требует С++ 17, это должен быть старый добрый С++ 11.
И gcc10.2, и clang11 отлично компилируют основной тест, но в то время как последний создает исполняемый файл, который делает почти то, что ожидалось, первый зависает, не потребляя ресурсы ЦП (тупиковая ситуация?).
С помощью https://godbolt.org/ Пробовал старые версии gcc, а также icc (варианты прохождения -O3 -std=c++17 -pthread
), все воспроизводят плохой результат, хотя даже там clang подтверждает правильное поведение.
Интересно, сделал ли я ошибку или код вызывает какое-то неправильное поведение компилятора и, в случае, как это обойти.
#include <iostream>
#include <vector>
#include <stdexcept>
#include <mutex>
#include <condition_variable>
template <typename T>
class Pool {
///////////////////////////
class Borrowed {
friend class Pool<T>;
Pool<T>& pool;
const size_t id;
T * val;
public:
Borrowed(Pool & p, size_t i, T& v): pool(p), id(i), val(&v) {}
~Borrowed() { release(); }
T& get() const {
if (!val) throw std::runtime_error("Borrowed::get() this resource was collected back by the pool");
return *val;
}
void release() { pool.collect(*this); }
};
///////////////////////////
struct Resource {
T val;
bool available = true;
Resource(T v): val(std::move(v)) {}
};
///////////////////////////
std::vector<Resource> vres;
size_t hint = 0;
std::condition_variable cv;
std::mutex mtx;
size_t available_cnt;
public:
Pool(std::initializer_list<T> l): available_cnt(l.size()) {
vres.reserve(l.size());
for (T t: l) {
vres.emplace_back(std::move(t));
}
std::cout << "Pool has size " << vres.size() << std::endl;
}
~Pool() {
for ( auto & res: vres ) {
if ( ! res.available ) {
std::cerr << "WARNING Pool::~Pool resources are still in use\n";
}
}
}
Borrowed borrow() {
std::unique_lock<std::mutex> lk(mtx);
cv.wait(lk, [&](){return available_cnt > 0;});
if ( vres[hint].available ) {
// quick path, if hint points to an available resource
std::cout << "hint good" << std::endl;
vres[hint].available = false;
--available_cnt;
Borrowed b(*this, hint, vres[hint].val);
if ( hint + 1 < vres.size() ) ++hint;
return b; // <--- gcc seems to hang here
} else {
// full scan to find the available resource
std::cout << "hint bad" << std::endl;
for ( hint = 0; hint < vres.size(); ++hint ) {
if ( vres[hint].available ) {
vres[hint].available = false;
--available_cnt;
return Borrowed(*this, hint, vres[hint].val);
}
}
}
throw std::runtime_error("Pool::borrow() no resource is available - internal logic error");
}
void collect(Borrowed & b) {
if ( &(b.pool) != this )
throw std::runtime_error("Pool::collect() trying to collect resource owned by another pool!");
if ( b.val ) {
b.val = nullptr;
{
std::scoped_lock<std::mutex> lk(mtx);
hint = b.id;
vres[hint].available = true;
++available_cnt;
}
cv.notify_one();
}
}
};
///////////////////////////////////////////////////////////////////
#include <thread>
#include <chrono>
int main() {
Pool<std::string> pool{"hello","world"};
std::vector<std::thread> vt;
for (int i = 10; i > 0; --i) {
vt.emplace_back( [&pool, i]()
{
auto res = pool.borrow();
std::this_thread::sleep_for(std::chrono::milliseconds(i*300));
std::cout << res.get() << std::endl;
}
);
}
for (auto & t: vt) t.join();
return 0;
}
@Secundi Да, это то, чем я занимался последние несколько часов. Я даже запускал рабочий исполняемый файл (скомпилированный с помощью clang) через valgrind Helgrind, который не обнаружил проблем с многопоточностью. Кажется, что gcc не возвращается из функции Pool::borrow
и не снимает блокировку мьютекса, но меня реально смущает, почему так.
Хорошо, я все еще не нашел основной проблемы, но заметил несколько вещей: вы блокируете деструктор. По разным причинам я бы рекомендовал изменить дизайн, если это возможно. поскольку это может привести к различным, возможно, трудно найти дальнейшие проблемы (обработка исключений,...). Также постарайтесь сделать свою условную блокировку устойчивой к ложным пробуждениям! С MSVC я наблюдаю прямое системное исключение в методе collect () (блокировка) — сильный намек на состояние гонки здесь.
@Secundi избежать блокировки в деструкторе так же просто, как вызвать res.release();
в конце лямбда-функции потока, но это не имеет значения. Почему вы говорите, что замок не устойчив к ложным следам? Это должно быть, как я проверяю количество доступных ресурсов на вызов ожидания.
Смотрите мой ответ, я думаю, что нашел вашу проблему.
Вы сталкиваетесь с неопределенным поведением, поскольку фактически повторно блокируете уже полученную блокировку. С MSVC я получил полезный стек вызовов, чтобы отличить это. Вот рабочий фиксированный пример (я полагаю, теперь работает для меня, см. изменения в методе заимствования(), которые могут быть дополнительно переработаны, поскольку блокировка внутри деструктора может быть поставлена под сомнение):
#include <iostream>
#include <vector>
#include <stdexcept>
#include <mutex>
#include <condition_variable>
template <typename T>
class Pool {
///////////////////////////
class Borrowed {
friend class Pool<T>;
Pool<T>& pool;
const size_t id;
T * val;
public:
Borrowed(Pool & p, size_t i, T& v) : pool(p), id(i), val(&v) {}
~Borrowed() { release(); }
T& get() const {
if (!val) throw std::runtime_error("Borrowed::get() this resource was collected back by the pool");
return *val;
}
void release() { pool.collect(*this); }
};
///////////////////////////
struct Resource {
T val;
bool available = true;
Resource(T v) : val(std::move(v)) {}
};
///////////////////////////
std::vector<Resource> vres;
size_t hint = 0;
std::condition_variable cv;
std::mutex mtx;
size_t available_cnt;
public:
Pool(std::initializer_list<T> l) : available_cnt(l.size()) {
vres.reserve(l.size());
for (T t : l) {
vres.emplace_back(std::move(t));
}
std::cout << "Pool has size " << vres.size() << std::endl;
}
~Pool() {
for (auto & res : vres) {
if (!res.available) {
std::cerr << "WARNING Pool::~Pool resources are still in use\n";
}
}
}
Borrowed borrow() {
std::unique_lock<std::mutex> lk(mtx);
while (available_cnt == 0) cv.wait(lk);
if (vres[hint].available) {
// quick path, if hint points to an available resource
std::cout << "hint good" << std::endl;
vres[hint].available = false;
--available_cnt;
Borrowed b(*this, hint, vres[hint].val);
if (hint + 1 < vres.size()) ++hint;
lk.unlock();
return b; // <--- gcc seems to hang here
}
else {
// full scan to find the available resource
std::cout << "hint bad" << std::endl;
for (hint = 0; hint < vres.size(); ++hint) {
if (vres[hint].available) {
vres[hint].available = false;
--available_cnt;
lk.unlock();
return Borrowed(*this, hint, vres[hint].val);
}
}
}
throw std::runtime_error("Pool::borrow() no resource is available - internal logic error");
}
void collect(Borrowed & b) {
if (&(b.pool) != this)
throw std::runtime_error("Pool::collect() trying to collect resource owned by another pool!");
if (b.val) {
b.val = nullptr;
{
std::scoped_lock<std::mutex> lk(mtx);
hint = b.id;
vres[hint].available = true;
++available_cnt;
cv.notify_one();
}
}
}
};
///////////////////////////////////////////////////////////////////
#include <thread>
#include <chrono>
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
int main()
{
try
{
Pool<std::string> pool{ "hello","world" };
std::vector<std::thread> vt;
for (int i = 10; i > 0; --i) {
vt.emplace_back([&pool, i]()
{
auto res = pool.borrow();
std::this_thread::sleep_for(std::chrono::milliseconds(i * 300));
std::cout << res.get() << std::endl;
}
);
}
for (auto & t : vt) t.join();
return 0;
}
catch(const std::exception& e)
{
std::cout << "exception occurred: " << e.what();
}
return 0;
}
Где я могу эффективно повторно заблокировать уже приобретенный замок?
Зачем ты добавил lk.unlock();
? Деструктор unique_lock в любом случае разблокирует мьютекс, не так ли?
По крайней мере, у вас есть возможная проблема с NRVO, см. ответ b. Зависит от компилятора, что происходит сейчас
@DarioP из-за пропущенного NRVO деструктор Borrowed может произойти до того, как unique_lock будет уничтожен
PS: По возможности следует избегать разблокировки, я использовал ее здесь исключительно для того, чтобы подчеркнуть основную проблему.
PS2: теперь работает и с gcc
Деструктор блокировки в сочетании с пропущенным NRVO вызвал проблему (спасибо Secundi за указание на это в комментариях).
Если компилятор пропускает NRVO, несколько строк ниже if вызовут деструктор b
. Деструктор пытается захватить мьютекс до того, как он будет освобожден unique_lock
, что приводит к тупиковой ситуации.
Borrowed b(*this, hint, vres[hint].val);
if ( hint + 1 < vres.size() ) ++hint;
return b; // <--- gcc seems to hang here
Здесь крайне важно не разрушить b
. На самом деле, даже если ручное освобождение unique_lock
перед возвратом позволит избежать взаимоблокировки, деструктор b
пометит объединенный ресурс как доступный, в то время как он просто заимствован, что делает код неправильным.
Возможное исправление заключается в замене приведенных выше строк на:
const auto tmp = hint;
if ( hint + 1 < vres.size() ) ++hint;
return Borrowed(*this, tmp, vres[tmp].val);
Другая возможность (которая не исключает первую) состоит в том, чтобы удалить (злой) копирующий элемент Borrowed
и предоставить только движущийся элемент:
Borrowed(const Borrowed &) = delete;
Borrowed(Borrowed && b): pool(b.pool), id(b.id), val(b.val) { b.val = nullptr; }
Да, я полагал, что эта разблокировка семантически неверна. Семантика перемещения должна быть надежным способом сделать это, поскольку она подчеркивает то, чего вы действительно хотите достичь.
Я думаю, здесь дело не в компиляторе, а в эффективности. Вы, вероятно, наблюдаете состояние гонки. Попробуйте улучшить отладку здесь с помощью выделенных журналов перед каждой блокировкой (функция идентификатора потока/блокировки f.i.), а также вывести состояние условной переменной. Ложные пробуждения также могут быть проблемой.