Привет.
Я пытаюсь реализовать в приложении какой-то многопоточный код. Цель этого кода - проверить элементы, которые ему предоставляет база данных. Проверка может занять довольно много времени (от нескольких сотен мс до нескольких секунд), поэтому этот процесс необходимо разделить на отдельный поток для каждого элемента.
Вначале база данных может выдавать 20 или 30 элементов в секунду, но эта цифра начинает быстро сокращаться, в конечном итоге достигая примерно 65 000 элементов в течение 24 часов, после чего приложение закрывается.
Я бы хотел, чтобы кто-нибудь более знающий мог взглянуть на мой код и посмотреть, есть ли какие-нибудь очевидные проблемы. Никто из тех, с кем я работаю, не знает многопоточности, так что я действительно сам по себе, в этом.
Вот код. Это довольно долго, но должно быть довольно ясно. Дайте мне знать, если у вас есть какие-либо отзывы или советы. Спасибо!
public class ItemValidationService
{
/// <summary>
/// The object to lock on in this class, for multithreading purposes.
/// </summary>
private static object locker = new object();
/// <summary>Items that have been validated.</summary>
private HashSet<int> validatedItems;
/// <summary>Items that are currently being validated.</summary>
private HashSet<int> validatingItems;
/// <summary>Remove an item from the index if its links are bad.</summary>
/// <param name = "id">The ID of the item.</param>
public void ValidateItem(int id)
{
lock (locker)
{
if
(
!this.validatedItems.Contains(id) &&
!this.validatingItems.Contains(id)
){
ThreadPool.QueueUserWorkItem(sender =>
{
this.Validate(id);
});
}
}
} // method
private void Validate(int itemId)
{
lock (locker)
{
this.validatingItems.Add(itemId);
}
// *********************************************
// Time-consuming routine to validate an item...
// *********************************************
lock (locker)
{
this.validatingItems.Remove(itemId);
this.validatedItems.Add(itemId);
}
} // method
} // class





Пул потоков - удобный выбор, если у вас есть спорадическая обработка легкий вес, не зависящая от времени. Однако я помню, как читал в MSDN, что это не подходит для крупномасштабной обработки такого рода.
Я использовал его для чего-то очень похожего и сожалею об этом. В последующих приложениях я использовал подход, основанный на рабочих потоках, и мне гораздо больше понравился уровень контроля, который у меня есть.
Мой любимый шаблон в модели рабочего потока - это создание главного потока, который содержит очередь элементов задач. Затем создайте группу воркеров, которые извлекают элементы из очереди для обработки. Я использую блокирующую очередь, чтобы, когда в процессе нет элементов, рабочие просто блокируются, пока что-то не будет помещено в очередь. В этой модели главный поток создает рабочие элементы из некоторого источника (базы данных и т. д.), А рабочие потоки потребляют их.
Я был бы обеспокоен производительностью здесь. Вы указали, что база данных может выдавать 20-30 элементов в секунду, а проверка элемента может занять несколько секунд. Это может быть довольно большое количество потоков - используя ваши метрики, в худшем случае 60-90 потоков! Думаю, здесь нужно пересмотреть дизайн. Майкл упомянул хороший образец. Использование очереди действительно помогает держать вещи под контролем и организовывать. Семафор также можно использовать для управления количеством создаваемых потоков - то есть у вас может быть максимальное количество разрешенных потоков, но при меньших нагрузках вам не обязательно создавать максимальное количество, если в конечном итоге работа выполняется меньшим - - т.е. размер вашего собственного пула может быть динамическим с ограничением.
При использовании пула потоков мне также труднее отслеживать выполнение потоков из пула при выполнении ими своей работы. Так что, если только это не выстрелил и забыл, я за более контролируемое исполнение. Я знаю, что вы упомянули, что ваше приложение закрывается после завершения всех 65K элементов. Как вы отслеживаете свои потоки, чтобы определить, завершили ли они свою работу, то есть все рабочие в очереди выполнены. Вы отслеживаете статус всех элементов в HashSets? Я думаю, если вы поставите свои элементы в очередь и ваши собственные рабочие потоки будут использовать эту очередь, вы сможете получить больший контроль. Хотя это может происходить за счет дополнительных накладных расходов с точки зрения сигнализации между потоками, чтобы указать, когда все элементы были поставлены в очередь, что позволяет им выйти.
Я добавляю каждый элемент в ThreadPool, чтобы позволить <i> ему </i> сколько потоков фактически запускаться одновременно. Я не просто создаю новую ветку для каждого элемента и запускаю его.
Хорошая точка зрения. Я бы просто побеспокоился о максимальном использовании пула потоков, поскольку он используется CLR где-то еще; однако, если это все приложение делает ...
Единственное, что я хотел бы затронуть, это то, знаете ли вы, может ли QueueUserWorkItem обрабатывать очереди с таким большим количеством элементов? Есть ли ограничения? Что произойдет, если QueueUserWorkItem вернет false?
Питер: У меня такие же вопросы.
Будьте осторожны, QueueUserWorkItem может потерпеть неудачу
Я поддерживаю идею использования блокирующей очереди и рабочих потоков. Вот реализация блокирующей очереди, которую я использовал в прошлом с хорошими результатами: https://www.codeproject.com/Articles/8018/Bounded-Blocking-Queue-One-Lock
Что входит в вашу логику проверки? Если это в основном связано с процессором, я бы создал не более 1 рабочего потока на процессор / ядро в коробке. Это скажет вам количество процессоров:
Environment.ProcessorCount
Если ваша проверка включает ввод-вывод, такой как доступ к файлам или доступ к базе данных, вы можете использовать на несколько потоков больше, чем количество процессоров.
Хорошая ссылка - я использовал ту же реализацию в классическом сценарии с одним производителем и несколькими потребителями. Я также изменил его один раз, превратив его в байтовую очередь (меньше накладных расходов с типами значений упаковки / распаковки) для обработки больших файлов - отлично сработало. Очень солидная реализация.
Возможная логическая ошибка в коде, опубликованном с вопросом, зависит от того, откуда берется идентификатор элемента в ValidateItem(int id). Почему? Потому что, хотя вы правильно блокируете очереди validatingItems и validatedItems перед постановкой рабочего элемента в очередь, вы не добавляете элемент в очередь validatingItems до тех пор, пока не запустится новый поток. Это означает, что может быть промежуток времени, когда другой поток вызывает ValidateItem(id) с тем же идентификатором (если только он не выполняется в одном основном потоке).
Я бы добавил элемент в очередь validatingItems непосредственно перед постановкой элемента в очередь внутри блокировки.
Обновлено: также QueueUserWorkItem() возвращает логическое значение, поэтому вы должны использовать возвращаемое значение, чтобы убедиться, что элемент был поставлен в очередь, и ЗАТЕМ добавить его в очередь validatingItems.
исправление вашего редактирования, которое повторно вводит ошибку. вам нужно добавить его, затем поставить в очередь, а затем удалить, если постановка в очередь не удалась.
ThreadPool может быть не оптимальным для того, чтобы сразу в него так много заедать. Вы можете исследовать верхние пределы его возможностей и / или попробовать свои силы.
Кроме того, в вашем коде существует состояние гонки, если вы не ожидаете повторяющихся проверок. Призыв к
this.validatingItems.Add(itemId);
должно происходить в основном потоке (ValidateItem), а не в потоке пула потоков (метод Validate). Этот вызов должен происходить за строку до постановки рабочего элемента в очередь в пул.
Худшая ошибка обнаруживается, если не проверять возврат QueueUserWorkItem. Очередь может дать сбой, и почему она не генерирует исключение, остается загадкой для всех нас. Если он возвращает false, вам нужно удалить элемент, который был добавлен в список validatingItems, и обработать ошибку (возможно, выбросить исключение).
Вы также можете попробовать использовать CCR - Concurrency and Coordination Runtime. Он похоронен внутри Microsoft Robotics Studio, но предоставляет отличный API для такого рода вещей.
Вам просто нужно создать «Порт» (по сути, очередь), подключить приемник (метод, который вызывается, когда что-то отправлено на него), а затем разместить в нем рабочие элементы. CCR обрабатывает очередь и рабочий поток, чтобы запустить ее.
Он очень высокопроизводительный и даже используется не для робототехники (Myspace.com использует его негласно для своей сети доставки контента).
Я бы порекомендовал заглянуть в MSDN: библиотека параллельных задач - DataFlow. Вы можете найти примеры реализации Producer-Consumer, в вашем случае это будут элементы базы данных producing для проверки, а процедура проверки станет consumer.
Также рекомендуется использовать ConcurrentDictionary<TKey, TValue> в качестве «параллельного» хеш-набора, в котором вы просто заполняете ключи без значений :). Вы потенциально можете сделать свой код lock-free.
@Chris: Почему «шкафчик» статичен, если переменные-члены, защищаемые «шкафчиком», нет? «шкафчик» должен быть переменной-членом, а не переменной класса.