Простая коллекция производителя-потребителя без BlockingCollection

Я хочу написать простую очередь производитель-потребитель без использования встроенного System.Collections.Concurrent.BlockingCollection. Вот быстрая попытка, которая, «кажется», работает. Что-то не так с потоками, условиями гонки, взаимоблокировками и т. д.?

class ProducerConsumerQueue<T>
{
    Queue<T> Queue = new Queue<T>();
    ManualResetEvent Event = new ManualResetEvent(false);
    object Lock = new object();

    public void Add(T t)
    {
        lock (Lock)
        {
            Queue.Enqueue(t);
        }
        Event.Set();
    }

    public bool TryTake(out T t, int timeout)
    {
        if (Event.WaitOne(timeout))
        {
            lock (Lock)
            {
                if (Queue.Count > 0)
                {
                    t = Queue.Dequeue();
                    if (Queue.Count == 0) Event.Reset();
                    return true;
                }
            }
        }
        t = default(T);
        return false;
    }
}

Кстати. мне нужны только два метода: Add и TryTake, мне не нужны IEnumerable и т. д.

почему бы не использовать более совершенные реализации (более высокую производительность без взаимоблокировок), например ConcurrentQueue?

ibubi 17.12.2018 09:22

@ibubi Мне нужен тайм-аут при удалении элементов из очереди, у ConcurrentQueue этого нет.

kaalus 17.12.2018 09:30

Это лучше подходит для codereview.stackexchange.com

FCin 17.12.2018 09:32

Вам нужна блокирующая очередь с методом TryTake, в котором можно передать тайм-аут?

Hasan Emrah Süngü 17.12.2018 09:34

@ EmrahSüngü Да

kaalus 17.12.2018 09:37

@kaalus, тогда как насчет того, чтобы заменить Queue<T> Queue = new Queue<T>(); на ConcurrentQueue, чтобы избавиться от Lock, а затем вместо ManualResetEvent в Semaphoreslim. В остальном похоже, что у вас все работает.

Hasan Emrah Süngü 17.12.2018 09:41

@kaalus, это всего лишь догадка, а вы как-то пытаетесь воспроизвести фичу канала golang?

Hasan Emrah Süngü 17.12.2018 09:43

@ EmrahSüngü Нет, извините, я не пытаюсь воспроизвести что-либо в голанге.

kaalus 17.12.2018 09:48
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
8
223
3

Ответы 3

Я думаю, что использование как lock, так и ManualResetEvent излишне. Я предлагаю вам узнать больше о ManualResetEvent о том, как входить и выходить из синхронизированных областей в вашем коде (вы также можете взглянуть на другие механизмы синхронизации, доступные в System.Threading).

Если это не только для упражнений, вы также можете взглянуть на NetMQ.

Надеюсь, поможет!

Хорошо, хотя, если OP не собирается использовать первоклассный BlockingCollection, он, вероятно, не будет использовать стороннее решение, такое как NetMQ.

MickyD 17.12.2018 09:36

Согласно моему комментарию в вопросе,

Вот предлагаемое мной решение.

public class BlockingQueue<T>
{
    // In order to get rid of Lock object
    // Any thread should be able to add items to the queue
    private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();

    // Only one thread is able to consume from queue
    // You can fine tune this to your interest
    private readonly SemaphoreSlim _slim = new SemaphoreSlim(1,1);

    public void Add(T item) {
        _queue.Enqueue(item);
    }

    public bool TryTake(out T item, TimeSpan timeout) {
        if (_slim.Wait(timeout)){
            return _queue.TryDequeue(out item);
        }
        item = default(T);
        return false;
    }
}

Что касается отрицательного голоса, я буду более чем счастлив исправить любые ошибки, если вы скажете мне о проблеме :) Большое спасибо

Hasan Emrah Süngü 18.12.2018 01:47

Microsoft недавно отказалась от System.Threading.Channels, который предназначен для предоставления оптимизированных API-интерфейсов производителя / потребителя, которые в данном случае могут хорошо подойти. Он охватывает неограниченные и ограниченные сценарии и включает в себя один и несколько сценариев чтения / записи. API довольно прост и интуитивно понятен в использовании; Единственное небольшое предостережение - это то, что он использует API-интерфейс, ориентированный на async (для потребителя и - в случае ограниченных каналов - для производителя).

Дело в том, что код, который вы не пишете, как правило, имеет меньше проблем, особенно если он был написан командой, обладающей опытом и интересом к конкретным задачам, на которые нацелены.


Однако: вы можете делать все в своем текущем коде без необходимости использования ManualResetEvent - lock в C# - это просто оболочка для частей простейшийMonitor, но Monitor также обеспечивает функциональность ожидания / импульса:

class ProducerConsumerQueue<T>
{
    private readonly Queue<T> Queue = new Queue<T>();

    public void Add(T t)
    {
        lock (Queue)
        {
            Queue.Enqueue(t);
            if (Queue.Count == 1)
            {
                // wake up one sleeper
                Monitor.Pulse(Queue);
            }
        }
    }

    public bool TryTake(out T t, int millisecondsTimeout)
    {
        lock (Queue)
        {
            if (Queue.Count == 0)
            {
                // try and wait for arrival
                Monitor.Wait(Queue, millisecondsTimeout);
            }
            if (Queue.Count != 0)
            {
                t = Queue.Dequeue();
                return true;
            }
        }
        t = default(T);
        return false;
    }
}

Спасибо, Марк. Я анализировал ваш код и не совсем понял одну вещь. AFAIK Monitor.Pulse не имеет внутреннего состояния, поэтому, если на мониторе никто не ждет, импульс не разбудит читателя, и первый элемент в очереди никогда не будет получен. На самом деле другое дело, если кто-то ожидает на мониторе в TryTake, он держит блокировку очереди, поэтому никто не может пульсировать монитор в Add, который находится в той же блокировке. Пожалуйста, дайте мне знать, если я ошибаюсь.

kaalus 18.12.2018 01:20

@kaalus, на Мониторе тайм-аут. если никто не пульсирует, блокировка не сработает, тогда продолжайте

Hasan Emrah Süngü 18.12.2018 02:22

@kaalus, когда вы вызываете Wait, он помещает текущий поток в очередь ожидания для этого монитора, а релизы блокирует на время ожидания; когда Wait завершается (либо по импульсу, либо по таймауту), поток повторно приобретает блокирует. Так; Wait - это временная сдача блокировки с механизмом импульса / тайм-аута.

Marc Gravell 18.12.2018 11:26

Другие вопросы по теме