Я хочу разработать пакетный процесс в .NET Core для синхронизации файла CSV, содержащего 3 миллиона строк. Процесс считывает данные из файла, обновляет существующие элементы в базе данных и создает новые элементы, если они не существуют. Однако в настоящее время эта операция занимает значительное количество времени; например, обработка 100 000 строк занимает 30 минут. Я использовал параллельные задачи для повышения производительности, но все еще сталкиваюсь с проблемами задержки. Есть ли какое-либо решение, позволяющее сократить время обработки?
Ниже моего кода:
public async Task<bool> ImportDataFromCsvFile()
{
try
{
// Read CSV file asynchronously
List<CsvInvoice> data = await _azureBlobService.ReadCsvFileAsync();
List<InvoiceDb> Invoices = new List<InvoiceDb>();
// Parallel processing of data to create InvoiceAutoDb list
Parallel.ForEach(data, Invoice =>
{
var entity = new InvoiceDb
{
Id_Invoice = Invoice.IdInvoice,
DateCheck = Invoice.DateCheck
};
lock (Invoices) // Ensure thread-safety when adding to the list
{
Invoices.Add(entity);
}
});
// Get distinct Invoices
var distinctInvoice = Invoices
.GroupBy(c => c.Id_Invoice)
.Where(x => x.Count() == 1)
.Select(g => g.First())
.ToList();
var nonDistinctInvoice = Invoices
.Except(distinctInvoice)
.ToList();
if (nonDistinctInvoice.Any())
{
Log.Warning("Invoice Dupliqué(s) {duplicata}", nonDistinctInvoice.Select(c => c.Id_Invoice).Distinct());
}
// Use Parallel.ForEachAsync for parallel asynchronous processing
//this is the part where it consume time
await Parallel.ForEachAsync(distinctInvoice, async (Invoice, cancellationToken) =>
{
using (var scope = _serviceProvider.CreateScope())
{
var service = scope.ServiceProvider.GetRequiredService<IInvoiceService>();
var existingInvoice = await service.GetByNumeroInvoiceAsync(Invoice.Id_Invoice);
if (existingInvoice == null)
{
await service.ImportNewInvoiceAsync(Invoice);
}
else
{
existingInvoice.DateCheck = Invoice.DateCheck;
await service.UpdateImportedInvoiceAsync(existingInvoice);
}
await service.SaveChangesAsync();
}
});
Log.Information("{result} line imported successfully", distinctInvoice.Count);
// Publish to Kafka
await PublishInvoice(distinctInvoice);
return true;
}
catch (Exception ex)
{
Log.Error("Import Data failed {errorMessage}", ex.Message);
throw new Exception(ex.Message);
}
}
`
Это Parallel.ForEach
, наверное, бессмысленно, создание и добавление списка должно происходить очень быстро.
Parallel.ForEach
с lock
внутри, вероятно, сильно замедляет ваш код. Вы по-прежнему звоните Invoices.Add(entity)
столько же раз и последовательно, так что все просто накладно, что вам не нужно.
Я бы запустил ваш код со стандартными циклами foreach
и посмотрел, будет ли он быстрее или все равно.
Первый Parallel.ForEach у меня не отнимает много времени. Часть, занимающая большое количество времени, связана со вторым Parallel в этой строке: await Parallel.ForEachAsync(distinctInvoice, async (Invoice, cancelToken) => ...
@ImaneSab - Вероятно, потому, что он однопоточный и выдает каждое обновление по одному оператору SQL за раз.
@Enigmativity, что вы порекомендуете в этом случае, или у вас есть другой подход, которому я могу следовать?
@ImaneSab — Создайте свою собственную команду SQL и обновите данные в пакетном режиме. Это будет почти мгновенно.
@Enigmativity, я думаю, что обновление всех записей одновременно может быть проблематичным, потому что если возникнет проблема с одной записью, все выполнение может остановиться. Я предпочитаю регистрировать записи с проблемами и продолжать обработку действительных, поэтому обрабатываю записи индивидуально.
@ImaneSab — запустите его через транзакцию, чтобы он откатился в случае ошибки, а затем разделите обновления пополам, чтобы изолировать плохое. Это даст вам довольно быстрые обновления.
@Enigmativity, спасибо за это решение. Я протестировал это, и это значительно улучшило время обновления. Большое спасибо
@ImaneSab - Можете ли вы опубликовать это как ответ?
Проблема была успешно решена путем реализации подхода запуска обновлений через транзакцию, чтобы гарантировать откат в случае ошибки, и разделения обновлений пополам для изоляции и устранения проблемного элемента. Этот подход был предложен @Enigmativity, и он успешно решил проблему, как описано.
public async Task<(bool isSuccess, List<InvoiceDb> failedItems)> UpsertItemsInBatchesAsync(List<InvoiceDb> items, int batchSize = 1000)
{
var failedItems = new List<InvoiceDb>();
for (int i = 0; i < items.Count; i += batchSize)
{
var batch = items.Skip(i).Take(batchSize).ToList();
var (batchSuccess, batchFailedItems) = await TryUpsertBatchAsync(batch);
if (!batchSuccess)
{
failedItems.AddRange(batchFailedItems);
}
}
return (failedItems.Count == 0, failedItems);
}
private async Task<(bool isSuccess, List<InvoiceDb> failedItems)> TryUpsertBatchAsync(List<InvoiceDb> batch)
{
using (var transaction = await _dbContext.Database.BeginTransactionAsync())
{
try
{
await _dbContext.BulkInsertOrUpdateAsync(batch);
await transaction.CommitAsync();
return (true, new List<InvoiceDb>());
}
catch
{
await transaction.RollbackAsync();
return (false, await BisectItemsAsync(batch));
}
}
}
private async Task<List<InvoiceDb>> BisectItemsAsync(List<InvoiceDb> items)
{
var failedItems = new List<InvoiceDb>();
if (items.Count <= 1)
{
// If we are down to one item, it's the problematic one
return items;
}
var mid = items.Count / 2;
var firstHalf = items.Take(mid).ToList();
var secondHalf = items.Skip(mid).ToList();
var failedFirstHalf = await UpsertItemsInBatchesAsync(firstHalf, 1);
var failedSecondHalf = await UpsertItemsInBatchesAsync(secondHalf, 1);
failedItems.AddRange(failedFirstHalf.failedItems);
failedItems.AddRange(failedSecondHalf.failedItems);
return failedItems;
}
Если у вас есть лучшая реализация, я открыт для нее.
Можете ли вы опубликовать код?
@Enigmativity, я добавил код
Измерьте и попытайтесь увидеть, что тормозит. Также рассмотрите возможность использования загрузчиков csv для ваших баз данных.