Я пытаюсь написать метод, который читает каждую страницу PDF-файла, но поскольку чтение каждой страницы через API занимает значительное количество времени, а я просматриваю PDF-файлы длиной в сотни страниц, я хочу поставить чтение в очередь каждой страницы асинхронно, а затем возвращать результаты, когда они будут готовы, поэтому несколько страниц считываются одновременно.
Я использую Task.Run для постановки задачи в очередь и ожидаю, что журнал отладки распечатает страницы не по порядку, но они выполняются только по порядку, поэтому я думаю, что они запускаются синхронно. Есть идеи?
var tasks = new List<Task>();
foreach (Page page in _pdfDoc.GetPages()) {
var task = Task.Run(() => {
//tried adding await Task.Yield() here, doesn't work
Debug.WriteLine("searching page " + page.Number);
if (page.Text.Contains(query)) {
pagesWithQuery.Add(page.Number);
}
howManySearched += 1;
Dispatcher.UIThread.InvokeAsync(() => {
searchProgress.Value = howManySearched;
});
return Task.CompletedTask;
});
tasks.Add(task);
// await task; <== does nothing??
}
// await Task.WhenAll(tasks); <== also nothing
см. редактирование, добавление строки не имеет эффекта
Просто чтобы подтвердить Task.WhenAll(tasks); раскомментировано, но ожидание в цикле остается закомментированным, не работает?
Кевин: нет, это не так. КД: Я знаю, что поведение библиотеки PDF является синхронным, поэтому я не знаю, имеет ли значение конкретный материал PDF, ради вопроса можно предположить, что его можно заменить синхронным вызовом чего угодно; Я также переписываю ранее написанное веб-приложение, и именно так я реализовал его раньше.
Вы делали какое-нибудь профилирование? Вероятно, .Contains работает очень быстро, поэтому проблема с производительностью, вероятно, связана с кодом анализа PDF-файлов библиотеки. Но это в GetPages() или в page.Text? В первом случае вы, вероятно, не получите многого, а просто сделаете свой код более сложным и подверженным ошибкам.
@gumydev1 Task.Run не работает синхронно. Это факт, как солнце, восходящее с востока. Это не обсуждается. Код вопроса, однако, слишком запутан и страдает от условий гонки (howManySearched), захвата лямбда-переменных (searchProgress.Value = HowManySearched;), внеочередных обновлений, .UIThread.InvokeAsync и перегрузки ЦП большим количеством потоков, чем имеется ядер для их обработки. Вы можете удалить большую часть этого кода, используя await Parallel.ForEachAsync(pages,(page,cancellationToken)=>....
Отчет о прогрессе следует осуществлять с помощью Progress<T>. Действие прогресса — это действие, которое должно увеличивать счетчики и печатать прогресс. Если вы хотите отслеживать страницы, вы можете добавить их в ConcurrentQueue или ConcurrentDictionary. Если вы хотите, чтобы страницы каким-либо образом обрабатывались другой задачей или потоком пользовательского интерфейса, вы можете использовать Channel. Вы также можете использовать Channel как упорядоченную и буферизованную альтернативу Progress<>.
Привет, спасибо за ваш ответ; просто чтобы уточнить: я использую библиотеку Avalonia для прогресса, и в ней есть элемент ProgressBar, который, как я предполагал, не имеет другого способа обновления, индикатор выполнения в настоящее время работает так, как ожидалось, но следует ли мне по-прежнему использовать класс Progress?
@gumydev1 использование Progress<T> делает ваш код красивее и осмысленнее. Производительность такая же, как и у Dispatcher.UIThread.InvokeAsync. Если вы сообщаете слишком часто, они оба будут вести себя одинаково плохо (пользовательский интерфейс зависает), поскольку цикл сообщений пользовательского интерфейса переполнен сообщениями.





Я использую
Task.Run, чтобы поставить задачу в очередь, и ожидаю, что журнал отладки распечатает страницы не по порядку, но они выполняются только по порядку, поэтому я думаю, что они запускаются синхронно.
У вас недостаточно данных, чтобы поддержать это предположение. Вы регистрируете только начало каждого Task:
Task task = Task.Run(() =>
{
Debug.WriteLine("searching page " + page.Number);
...но вы понятия не имеете, когда Task завершится. Вы можете получить лучшее представление об уровне параллелизма, достигнутом вашим кодом, выполнив что-то вроде этого:
object locker = new();
int concurrencyCounter = 0;
int maxConcurrency = 0;
Task task = Task.Run(() =>
{
int concurrency = Interlocked.Increment(ref concurrencyCounter);
lock (locker) maxConcurrency = Math.Max(maxConcurrency, concurrency);
try
{
Debug.WriteLine("searching page " + page.Number);
// Do work with the PDF page...
} finally { Interlocked.Decrement(ref concurrencyCounter); }
});
//...
await Task.WhenAll(tasks);
Debug.WriteLine($"Maximum concurrency: {maxConcurrency}");
Кстати, в вашем коде масса идиоматичности. Вы не используете преимущества ни класса Parallel , ни оператора AsParallel PLINQ, ни класса Progress<T>, и я подозреваю, что существуют также условия гонки вокруг использования неопределенных переменных howManySearched и pagesWithQuery.
неидиоматичность, конечно?
@Charlieface Я имею в виду, что они делают все по-своему, вместо того, чтобы использовать устоявшиеся инструменты и шаблоны.
@Theodor Zoulias, эй, спасибо за это, однако я попробовал это, и максимальный параллелизм оказался равен 1, как я и подозревал. Я также попробовал использовать Parallel.ForEachAsync, результат тот же. Я не знаю, будет ли это проблемой, но этот код запускается внутри собственного асинхронного режима Task.Run, чтобы не засорять пользовательский интерфейс (пользовательский интерфейс работает и работает плавно). есть ли что-нибудь, что заставило бы каждый Task.Run в цикле ждать завершения предыдущего? Я никогда раньше не программировал на C#, поэтому не знаю соглашений.
@gumydev1 это неожиданно. Единственное, что может привести к тому, что максимальный параллелизм останется равным 1, — это сильная насыщенность ThreadPool. Мало того, что ThreadPool должен отсутствовать среди доступных рабочих процессов, но должна быть какая-то другая параллельная операция, которая крадет новые потоки, которые ThreadPool внедряет, когда он насыщен (это один новый поток в секунду). Вы можете попробовать заранее увеличить количество потоков, которые ThreadPool создает немедленно по требованию, например, с помощью ThreadPool.SetMinThreads(100, 100);, и посмотреть, будет ли это иметь какое-то значение.
@gumydev1 также существует вероятность того, что работа внутри Task.Run настолько незначительна, что action завершается почти мгновенно. Вы можете попробовать добавить Thread.Sleep(100); где-нибудь внутри action и посмотреть, повлияет ли это на максимальный параллелизм.
@gumydev1, вы также можете попробовать запустить приложение без подключенного отладчика. Иногда отладчик вмешивается нежелательным образом, и вам приходится отлаживать отладчик вместо отладки приложения.
@TheodorZoulias нашел проблему. библиотека, которую я использую (pdfpig), имеет блокировку доступа к файлу, которая блокирует его чтение другими потоками до завершения текущего потока. так что это не моя проблема с кодом. спасибо за помощь, хотя
@gumydev1 да, что-то подобное было возможно. Но это по-прежнему не объясняет, почему внутри Task.Run нет параллелизма.
В Task.Run сомнений нет. Однако в этом коде есть много проблем, которые создают неправильное впечатление, некоторые из них вызваны ненужной сложностью:
howManySearched означают, что оно неправильно увеличено.() => {searchProgress.Value = howManySearched;}, что означает, что отображается активное значение переменной на момент выполнения, а не значение при вызове InvokeAsync.Все это может создать впечатление, что howManySearched прыгает от 0 до максимума просто потому, что это значение, когда пользовательский интерфейс наконец-то доходит до его отображения.
.NET предлагает множество способов одновременной обработки данных, причем гораздо проще.
ConcurrentDictionary<T>.Channel<T>.Код вопроса можно заменить на:
record SearchProgress(int Number,bool Finished);
int _searched;
ConcurrentDictionary<int,Page> _foundPages=new ConcurrentDictionary<int,Page>();
...
async Task FindInDocument(string query)
{
var pages=_pdfDoc.GetPages();
//Set up the UI
ResetSearchProgress(pages.Count);
var progress=new Progress<string>(ReportProgress);
//Perform the search
await Parallel.ForEachAsync(pages,(page,ct)=>
{
progress.Report(new SearchProgress(page.Number,false);
if (page.Text.Contains(query))
{
// Assume there are no duplicate page numbers
_foundPages.TryAdd(page.Number,page);
}
progress.Report(new SearchProgress(page.Number,true);
});
//We're back on the UI thread
FinishSearchProgress();
}
Методы, сбрасывающие счетчики, индикатор выполнения и словарь, были выделены в отдельные методы, чтобы немного навести порядок и удалить любые зависимости от элементов пользовательского интерфейса из кода поиска.
void ResetSearchProgress(int pageCount)
{
_searched=0;
_foundPages.Clear();
searchProgress.Value=0;
searchProgress.Maximun=pageCount;
}
void FinishSearchProgress()
{
searchProgress.Value=0;
txtStatus.Text=$"Found {_foundPages.Count} pages";
}
Метод ReportProgress обновляет найденное количество и отображает текст состояния. Предполагая, что searchProgress является searchProgress, searchProgress.Increment() используется вместо прямой установки значения. _searched на самом деле не нужен, по крайней мере, для индикатора выполнения.
void ReportProgress(SearchProgress p)
{
_searched++;
searchProgress.Increment(1);
txtStatus.Text=p.Finished?$"Searched Page {p.Number}"
:$"Searching Page {p.Number}";
}
Тип SearchProgress может быть настолько сложным, насколько это необходимо, например, включая полное сообщение, время, Enum для статуса вместо bool Finished и т. д.
«Возможно, наводнение процессора большим количеством потоков, чем ядер». -- Это правда, но это не должно иметь большого значения. Известно, что все популярные операционные системы хорошо справляются с этой ситуацией, справедливо и эффективно назначая кванты времени ЦП каждому потоку. Я не говорю, что это идеально. Только это не имеет большого значения.
Использовать 70 серверов вместо 30 для одной и той же нагрузки — это большое дело. Или использовать более крупную облачную виртуальную машину вместо той, которая вам действительно нужна. Сбой Thinkpad из-за перегрева из-за того, что ядра слишком долго работали на 100%, также является большой проблемой (хотя это была многопроцессорность Python). Очевидно, мы имеем в виду совершенно разные нагрузки.
Неважно, что происходит, когда IIS начинает перезапускать веб-серверы из-за высокой загрузки ЦП в часы пик. Это отличный способ сломать все 70 серверов.
Вам понадобится
await Task.WhenAll(tasks)снаружи цикла, а не внутри.