Я хочу написать простую очередь производитель-потребитель без использования встроенного System.Collections.Concurrent.BlockingCollection. Вот быстрая попытка, которая, «кажется», работает. Что-то не так с потоками, условиями гонки, взаимоблокировками и т. д.?
class ProducerConsumerQueue<T>
{
Queue<T> Queue = new Queue<T>();
ManualResetEvent Event = new ManualResetEvent(false);
object Lock = new object();
public void Add(T t)
{
lock (Lock)
{
Queue.Enqueue(t);
}
Event.Set();
}
public bool TryTake(out T t, int timeout)
{
if (Event.WaitOne(timeout))
{
lock (Lock)
{
if (Queue.Count > 0)
{
t = Queue.Dequeue();
if (Queue.Count == 0) Event.Reset();
return true;
}
}
}
t = default(T);
return false;
}
}
Кстати. мне нужны только два метода: Add и TryTake, мне не нужны IEnumerable и т. д.
@ibubi Мне нужен тайм-аут при удалении элементов из очереди, у ConcurrentQueue этого нет.
Это лучше подходит для codereview.stackexchange.com
Вам нужна блокирующая очередь с методом TryTake, в котором можно передать тайм-аут?
@ EmrahSüngü Да
@kaalus, тогда как насчет того, чтобы заменить Queue<T> Queue = new Queue<T>(); на ConcurrentQueue, чтобы избавиться от Lock, а затем вместо ManualResetEvent в Semaphoreslim. В остальном похоже, что у вас все работает.
@kaalus, это всего лишь догадка, а вы как-то пытаетесь воспроизвести фичу канала golang?
@ EmrahSüngü Нет, извините, я не пытаюсь воспроизвести что-либо в голанге.





Я думаю, что использование как lock, так и ManualResetEvent излишне. Я предлагаю вам узнать больше о ManualResetEvent о том, как входить и выходить из синхронизированных областей в вашем коде (вы также можете взглянуть на другие механизмы синхронизации, доступные в System.Threading).
Если это не только для упражнений, вы также можете взглянуть на NetMQ.
Надеюсь, поможет!
Хорошо, хотя, если OP не собирается использовать первоклассный BlockingCollection, он, вероятно, не будет использовать стороннее решение, такое как NetMQ.
Согласно моему комментарию в вопросе,
Вот предлагаемое мной решение.
public class BlockingQueue<T>
{
// In order to get rid of Lock object
// Any thread should be able to add items to the queue
private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
// Only one thread is able to consume from queue
// You can fine tune this to your interest
private readonly SemaphoreSlim _slim = new SemaphoreSlim(1,1);
public void Add(T item) {
_queue.Enqueue(item);
}
public bool TryTake(out T item, TimeSpan timeout) {
if (_slim.Wait(timeout)){
return _queue.TryDequeue(out item);
}
item = default(T);
return false;
}
}
Что касается отрицательного голоса, я буду более чем счастлив исправить любые ошибки, если вы скажете мне о проблеме :) Большое спасибо
Microsoft недавно отказалась от System.Threading.Channels, который предназначен для предоставления оптимизированных API-интерфейсов производителя / потребителя, которые в данном случае могут хорошо подойти. Он охватывает неограниченные и ограниченные сценарии и включает в себя один и несколько сценариев чтения / записи. API довольно прост и интуитивно понятен в использовании; Единственное небольшое предостережение - это то, что он использует API-интерфейс, ориентированный на async (для потребителя и - в случае ограниченных каналов - для производителя).
Дело в том, что код, который вы не пишете, как правило, имеет меньше проблем, особенно если он был написан командой, обладающей опытом и интересом к конкретным задачам, на которые нацелены.
Однако: вы можете делать все в своем текущем коде без необходимости использования ManualResetEvent - lock в C# - это просто оболочка для частей простейшийMonitor, но Monitor также обеспечивает функциональность ожидания / импульса:
class ProducerConsumerQueue<T>
{
private readonly Queue<T> Queue = new Queue<T>();
public void Add(T t)
{
lock (Queue)
{
Queue.Enqueue(t);
if (Queue.Count == 1)
{
// wake up one sleeper
Monitor.Pulse(Queue);
}
}
}
public bool TryTake(out T t, int millisecondsTimeout)
{
lock (Queue)
{
if (Queue.Count == 0)
{
// try and wait for arrival
Monitor.Wait(Queue, millisecondsTimeout);
}
if (Queue.Count != 0)
{
t = Queue.Dequeue();
return true;
}
}
t = default(T);
return false;
}
}
Спасибо, Марк. Я анализировал ваш код и не совсем понял одну вещь. AFAIK Monitor.Pulse не имеет внутреннего состояния, поэтому, если на мониторе никто не ждет, импульс не разбудит читателя, и первый элемент в очереди никогда не будет получен. На самом деле другое дело, если кто-то ожидает на мониторе в TryTake, он держит блокировку очереди, поэтому никто не может пульсировать монитор в Add, который находится в той же блокировке. Пожалуйста, дайте мне знать, если я ошибаюсь.
@kaalus, на Мониторе тайм-аут. если никто не пульсирует, блокировка не сработает, тогда продолжайте
@kaalus, когда вы вызываете Wait, он помещает текущий поток в очередь ожидания для этого монитора, а релизы блокирует на время ожидания; когда Wait завершается (либо по импульсу, либо по таймауту), поток повторно приобретает блокирует. Так; Wait - это временная сдача блокировки с механизмом импульса / тайм-аута.
почему бы не использовать более совершенные реализации (более высокую производительность без взаимоблокировок), например
ConcurrentQueue?