Реализация многопоточности в C# (обзор кода)

Привет.

Я пытаюсь реализовать в приложении какой-то многопоточный код. Цель этого кода - проверить элементы, которые ему предоставляет база данных. Проверка может занять довольно много времени (от нескольких сотен мс до нескольких секунд), поэтому этот процесс необходимо разделить на отдельный поток для каждого элемента.

Вначале база данных может выдавать 20 или 30 элементов в секунду, но эта цифра начинает быстро сокращаться, в конечном итоге достигая примерно 65 000 элементов в течение 24 часов, после чего приложение закрывается.

Я бы хотел, чтобы кто-нибудь более знающий мог взглянуть на мой код и посмотреть, есть ли какие-нибудь очевидные проблемы. Никто из тех, с кем я работаю, не знает многопоточности, так что я действительно сам по себе, в этом.

Вот код. Это довольно долго, но должно быть довольно ясно. Дайте мне знать, если у вас есть какие-либо отзывы или советы. Спасибо!

public class ItemValidationService
{
    /// <summary>
    /// The object to lock on in this class, for multithreading purposes.
    /// </summary>
    private static object locker = new object();

    /// <summary>Items that have been validated.</summary>
    private HashSet<int> validatedItems;

    /// <summary>Items that are currently being validated.</summary>
    private HashSet<int> validatingItems;

    /// <summary>Remove an item from the index if its links are bad.</summary>
    /// <param name = "id">The ID of the item.</param>
    public void ValidateItem(int id)
    {
        lock (locker)
        {
            if
            (
                !this.validatedItems.Contains(id) &&
                !this.validatingItems.Contains(id)
            ){
                ThreadPool.QueueUserWorkItem(sender =>
                {
                    this.Validate(id);
                });
            }
        }

    } // method

    private void Validate(int itemId)
    {
        lock (locker)
        {
            this.validatingItems.Add(itemId);
        }

        // *********************************************
        // Time-consuming routine to validate an item...
        // *********************************************

        lock (locker)
        {
            this.validatingItems.Remove(itemId);
            this.validatedItems.Add(itemId);
        }

    } // method

} // class

@Chris: Почему «шкафчик» статичен, если переменные-члены, защищаемые «шкафчиком», нет? «шкафчик» должен быть переменной-членом, а не переменной класса.

user7116 29.09.2008 00:48
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
6
1
4 114
8

Ответы 8

Пул потоков - удобный выбор, если у вас есть спорадическая обработка легкий вес, не зависящая от времени. Однако я помню, как читал в MSDN, что это не подходит для крупномасштабной обработки такого рода.

Я использовал его для чего-то очень похожего и сожалею об этом. В последующих приложениях я использовал подход, основанный на рабочих потоках, и мне гораздо больше понравился уровень контроля, который у меня есть.

Мой любимый шаблон в модели рабочего потока - это создание главного потока, который содержит очередь элементов задач. Затем создайте группу воркеров, которые извлекают элементы из очереди для обработки. Я использую блокирующую очередь, чтобы, когда в процессе нет элементов, рабочие просто блокируются, пока что-то не будет помещено в очередь. В этой модели главный поток создает рабочие элементы из некоторого источника (базы данных и т. д.), А рабочие потоки потребляют их.

Я был бы обеспокоен производительностью здесь. Вы указали, что база данных может выдавать 20-30 элементов в секунду, а проверка элемента может занять несколько секунд. Это может быть довольно большое количество потоков - используя ваши метрики, в худшем случае 60-90 потоков! Думаю, здесь нужно пересмотреть дизайн. Майкл упомянул хороший образец. Использование очереди действительно помогает держать вещи под контролем и организовывать. Семафор также можно использовать для управления количеством создаваемых потоков - то есть у вас может быть максимальное количество разрешенных потоков, но при меньших нагрузках вам не обязательно создавать максимальное количество, если в конечном итоге работа выполняется меньшим - - т.е. размер вашего собственного пула может быть динамическим с ограничением.

При использовании пула потоков мне также труднее отслеживать выполнение потоков из пула при выполнении ими своей работы. Так что, если только это не выстрелил и забыл, я за более контролируемое исполнение. Я знаю, что вы упомянули, что ваше приложение закрывается после завершения всех 65K элементов. Как вы отслеживаете свои потоки, чтобы определить, завершили ли они свою работу, то есть все рабочие в очереди выполнены. Вы отслеживаете статус всех элементов в HashSets? Я думаю, если вы поставите свои элементы в очередь и ваши собственные рабочие потоки будут использовать эту очередь, вы сможете получить больший контроль. Хотя это может происходить за счет дополнительных накладных расходов с точки зрения сигнализации между потоками, чтобы указать, когда все элементы были поставлены в очередь, что позволяет им выйти.

Я добавляю каждый элемент в ThreadPool, чтобы позволить <i> ему </i> сколько потоков фактически запускаться одновременно. Я не просто создаю новую ветку для каждого элемента и запускаю его.

core 28.09.2008 06:39

Хорошая точка зрения. Я бы просто побеспокоился о максимальном использовании пула потоков, поскольку он используется CLR где-то еще; однако, если это все приложение делает ...

Peter Meyer 28.09.2008 06:53

Единственное, что я хотел бы затронуть, это то, знаете ли вы, может ли QueueUserWorkItem обрабатывать очереди с таким большим количеством элементов? Есть ли ограничения? Что произойдет, если QueueUserWorkItem вернет false?

Peter Meyer 28.09.2008 07:04

Питер: У меня такие же вопросы.

core 28.09.2008 12:29

Будьте осторожны, QueueUserWorkItem может потерпеть неудачу

Я поддерживаю идею использования блокирующей очереди и рабочих потоков. Вот реализация блокирующей очереди, которую я использовал в прошлом с хорошими результатами: https://www.codeproject.com/Articles/8018/Bounded-Blocking-Queue-One-Lock

Что входит в вашу логику проверки? Если это в основном связано с процессором, я бы создал не более 1 рабочего потока на процессор / ядро ​​в коробке. Это скажет вам количество процессоров: Environment.ProcessorCount

Если ваша проверка включает ввод-вывод, такой как доступ к файлам или доступ к базе данных, вы можете использовать на несколько потоков больше, чем количество процессоров.

Хорошая ссылка - я использовал ту же реализацию в классическом сценарии с одним производителем и несколькими потребителями. Я также изменил его один раз, превратив его в байтовую очередь (меньше накладных расходов с типами значений упаковки / распаковки) для обработки больших файлов - отлично сработало. Очень солидная реализация.

Peter Meyer 28.09.2008 07:17

Возможная логическая ошибка в коде, опубликованном с вопросом, зависит от того, откуда берется идентификатор элемента в ValidateItem(int id). Почему? Потому что, хотя вы правильно блокируете очереди validatingItems и validatedItems перед постановкой рабочего элемента в очередь, вы не добавляете элемент в очередь validatingItems до тех пор, пока не запустится новый поток. Это означает, что может быть промежуток времени, когда другой поток вызывает ValidateItem(id) с тем же идентификатором (если только он не выполняется в одном основном потоке).

Я бы добавил элемент в очередь validatingItems непосредственно перед постановкой элемента в очередь внутри блокировки.

Обновлено: также QueueUserWorkItem() возвращает логическое значение, поэтому вы должны использовать возвращаемое значение, чтобы убедиться, что элемент был поставлен в очередь, и ЗАТЕМ добавить его в очередь validatingItems.

исправление вашего редактирования, которое повторно вводит ошибку. вам нужно добавить его, затем поставить в очередь, а затем удалить, если постановка в очередь не удалась.

TheSoftwareJedi 01.10.2008 05:08

ThreadPool может быть не оптимальным для того, чтобы сразу в него так много заедать. Вы можете исследовать верхние пределы его возможностей и / или попробовать свои силы.

Кроме того, в вашем коде существует состояние гонки, если вы не ожидаете повторяющихся проверок. Призыв к

this.validatingItems.Add(itemId);

должно происходить в основном потоке (ValidateItem), а не в потоке пула потоков (метод Validate). Этот вызов должен происходить за строку до постановки рабочего элемента в очередь в пул.

Худшая ошибка обнаруживается, если не проверять возврат QueueUserWorkItem. Очередь может дать сбой, и почему она не генерирует исключение, остается загадкой для всех нас. Если он возвращает false, вам нужно удалить элемент, который был добавлен в список validatingItems, и обработать ошибку (возможно, выбросить исключение).

Вы также можете попробовать использовать CCR - Concurrency and Coordination Runtime. Он похоронен внутри Microsoft Robotics Studio, но предоставляет отличный API для такого рода вещей.

Вам просто нужно создать «Порт» (по сути, очередь), подключить приемник (метод, который вызывается, когда что-то отправлено на него), а затем разместить в нем рабочие элементы. CCR обрабатывает очередь и рабочий поток, чтобы запустить ее.

Вот видео о CCR на Channel9.

Он очень высокопроизводительный и даже используется не для робототехники (Myspace.com использует его негласно для своей сети доставки контента).

Я бы порекомендовал заглянуть в MSDN: библиотека параллельных задач - DataFlow. Вы можете найти примеры реализации Producer-Consumer, в вашем случае это будут элементы базы данных producing для проверки, а процедура проверки станет consumer.

Также рекомендуется использовать ConcurrentDictionary<TKey, TValue> в качестве «параллельного» хеш-набора, в котором вы просто заполняете ключи без значений :). Вы потенциально можете сделать свой код lock-free.

Другие вопросы по теме