У меня есть реализация пула с одним элементом:
public class OneItemPool<T>
{
private T? _item;
private object _lockForChangeValue = new();
private object _lockForPostItem = new();
private object _lockForGetItem = new();
public void PostItem(T item)
{
lock (_lockForPostItem)
{
// waiting for _item to be taken
while (WeHaveItem())
{
}
lock (_lockForChangeValue)
{
_item = item;
}
}
}
public T GetItem()
{
lock (_lockForGetItem)
{
// waiting for _item to appear
while (!WeHaveItem())
{
}
lock (_lockForChangeValue)
{
var item = _item;
_item = default(T);
return item;
}
}
}
bool WeHaveItem()
{
return !object.ReferenceEquals(_item, default(T));
}
}
Как видите, у меня нет блокировки внутри bool WeHaveItem()
, я знаю, что object.ReferenceEquals
не является атомарной операцией.
Я знаю, что это может случиться:
WeHaveItem()
и видит, что _item
имеет значение null_item
._item
имеет значение null, хотя это больше не соответствует действительности.Предположим, что _item
не равно нулю и поток A вызывает PostItem
, затем проверьте WeHaveItem
и подождите, пока он станет нулевым
в то время как поток B вызывает GetItem
и меняет _value
на ноль
Поток А получит старый результат _item != null
но это вообще не должно влиять на поток A, потому что следующей операцией потока A будет перепроверка WeHaveItem
Я думаю, проблем не будет. Но реальные испытания показывают, что это не так.
Я настроил свой тест следующим образом
var pool = new OneItemPool<string>();
while (true)
{
var threads = new List<Thread>();
for (int i = 0; i < 3; i++)
{
var k = i;
var thr1 = new Thread(() =>
{
pool.PostItem(k.ToString());
pool.GetItem();
});
threads.Add(thr1);
}
foreach (var thread in threads)
{
thread.Start();
}
foreach (var thread in threads)
{
thread.Join();
}
}
Один PostItem
за один GetItem
, но тест заходит в тупик внутри PostItem
, а значение _item
не равно нулю.
Это показывает, что два GetItem
успешно выполняются для одного PostItem
.
Если я добавлю блокировку внутри WeHaveItem
, взаимоблокировка исчезнет.
Я думаю об этом уже два дня и не представляю, как такое возможно.
Вы уверены, что видите тупик, а не просто бесконечную задержку?
он бесконечно вращается «пока (WeHaveItem())» внутри PostItem
Я не понимаю, как можно получить тупик в приведенном выше коде, но получить бесконечную задержку, безусловно, возможно.
Причина в том, что блокировки обеспечивают две вещи: взаимное исключение и видимость памяти. Взаимное исключение означает, что только один поток может заблокировать каждый объект одновременно (например, _lockForGetItem
). Видимость памяти означает, что когда поток A записывает значение, удерживая блокировку объекта, изменение будет видно потоку B, когда поток B получит блокировку.
Проблема в том, что вам нужно заблокировать один и тот же объект в обоих потоках, чтобы получить гарантию синхронизации. Когда поток A обновляет _item
, удерживая _lockForPostItem
и _lockForChangeValue
, нет никакой гарантии, что поток B когда-либо увидит обновление, поскольку WeHaveItem()
не блокирует ни один из этих объектов.
Если вы столкнулись с этой проблемой, это также объясняет, почему это помогает заблокировать _lockForPostItem
или _lockForChangeValue
внутри WeHaveItem()
.
Альтернативное решение — объявить _item
изменчивым, что гарантирует, что изменения _item
будут видны во всех потоках.
Чтобы иметь возможность объявить _item
volatile
, T
должно иметь общее ограничение class
.
Это не прямой ответ на ваш вопрос. Это всего лишь демонстрация того, что вы можете довольно легко и без особых усилий реализовать компонент OneItemPool<T>
, используя инструменты .NET более высокого уровня, такие как класс BlockingCollection<T>:
public class OneItemPool<T>
{
private readonly BlockingCollection<T> _queue = new(1);
public void PostItem(T item) => _queue.Add(item);
public T GetItem() => _queue.Take();
}
Он не только предельно прост, но и имеет лучшее поведение ожидания. Он не сжигает ядро ЦП во время ожидания добавления или удаления элемента путем ожидания вращения. Вместо этого он блокирует текущий поток, не потребляя ресурсы ЦП во время ожидания.