Синхронизировать большой CSV-файл с БД с помощью ядра dotnet

Я хочу разработать пакетный процесс в .NET Core для синхронизации файла CSV, содержащего 3 миллиона строк. Процесс считывает данные из файла, обновляет существующие элементы в базе данных и создает новые элементы, если они не существуют. Однако в настоящее время эта операция занимает значительное количество времени; например, обработка 100 000 строк занимает 30 минут. Я использовал параллельные задачи для повышения производительности, но все еще сталкиваюсь с проблемами задержки. Есть ли какое-либо решение, позволяющее сократить время обработки?

Ниже моего кода:

 public async Task<bool> ImportDataFromCsvFile()
        {
            try
            {
                // Read CSV file asynchronously
                List<CsvInvoice> data = await _azureBlobService.ReadCsvFileAsync();
                List<InvoiceDb> Invoices = new List<InvoiceDb>();

                // Parallel processing of data to create InvoiceAutoDb list
                Parallel.ForEach(data, Invoice =>
                {
                    var entity = new InvoiceDb
                    {
                        Id_Invoice = Invoice.IdInvoice,
                        DateCheck = Invoice.DateCheck
                    };
                    lock (Invoices) // Ensure thread-safety when adding to the list
                    {
                        Invoices.Add(entity);
                    }
                });

                // Get distinct Invoices
                var distinctInvoice = Invoices
                    .GroupBy(c => c.Id_Invoice)
                    .Where(x => x.Count() == 1)
                    .Select(g => g.First())
                    .ToList();

                var nonDistinctInvoice = Invoices
                    .Except(distinctInvoice)
                    .ToList();

                if (nonDistinctInvoice.Any())
                {
                    Log.Warning("Invoice Dupliqué(s) {duplicata}", nonDistinctInvoice.Select(c => c.Id_Invoice).Distinct());
                }

                // Use Parallel.ForEachAsync for parallel asynchronous processing
//this is the part where it consume time
                await Parallel.ForEachAsync(distinctInvoice, async (Invoice, cancellationToken) =>
                {
                    using (var scope = _serviceProvider.CreateScope())
                    {
                        var service = scope.ServiceProvider.GetRequiredService<IInvoiceService>();
                        var existingInvoice = await service.GetByNumeroInvoiceAsync(Invoice.Id_Invoice);
                        if (existingInvoice == null)
                        {
                            await service.ImportNewInvoiceAsync(Invoice);
                        }
                        else
                        {
                            existingInvoice.DateCheck = Invoice.DateCheck;
                            await service.UpdateImportedInvoiceAsync(existingInvoice);
                        }

                        await service.SaveChangesAsync();
                    }
                });

                Log.Information("{result} line imported successfully", distinctInvoice.Count);

            // Publish to Kafka
            await PublishInvoice(distinctInvoice);

            return true;
        }
    catch (Exception ex)
    {
        Log.Error("Import Data failed {errorMessage}", ex.Message);
        throw new Exception(ex.Message);
    }
}
`

Измерьте и попытайтесь увидеть, что тормозит. Также рассмотрите возможность использования загрузчиков csv для ваших баз данных.

Daniel A. White 08.07.2024 14:05

Это Parallel.ForEach, наверное, бессмысленно, создание и добавление списка должно происходить очень быстро.

Charlieface 08.07.2024 15:25

Parallel.ForEach с lock внутри, вероятно, сильно замедляет ваш код. Вы по-прежнему звоните Invoices.Add(entity) столько же раз и последовательно, так что все просто накладно, что вам не нужно.

Enigmativity 10.07.2024 01:58

Я бы запустил ваш код со стандартными циклами foreach и посмотрел, будет ли он быстрее или все равно.

Enigmativity 10.07.2024 02:00

Первый Parallel.ForEach у меня не отнимает много времени. Часть, занимающая большое количество времени, связана со вторым Parallel в этой строке: await Parallel.ForEachAsync(distinctInvoice, async (Invoice, cancelToken) => ...

Imane Sab 10.07.2024 11:00

@ImaneSab - Вероятно, потому, что он однопоточный и выдает каждое обновление по одному оператору SQL за раз.

Enigmativity 10.07.2024 12:39

@Enigmativity, что вы порекомендуете в этом случае, или у вас есть другой подход, которому я могу следовать?

Imane Sab 10.07.2024 16:07

@ImaneSab — Создайте свою собственную команду SQL и обновите данные в пакетном режиме. Это будет почти мгновенно.

Enigmativity 11.07.2024 00:30

@Enigmativity, я думаю, что обновление всех записей одновременно может быть проблематичным, потому что если возникнет проблема с одной записью, все выполнение может остановиться. Я предпочитаю регистрировать записи с проблемами и продолжать обработку действительных, поэтому обрабатываю записи индивидуально.

Imane Sab 11.07.2024 17:01

@ImaneSab — запустите его через транзакцию, чтобы он откатился в случае ошибки, а затем разделите обновления пополам, чтобы изолировать плохое. Это даст вам довольно быстрые обновления.

Enigmativity 12.07.2024 01:35

@Enigmativity, спасибо за это решение. Я протестировал это, и это значительно улучшило время обновления. Большое спасибо

Imane Sab 16.07.2024 13:14

@ImaneSab - Можете ли вы опубликовать это как ответ?

Enigmativity 16.07.2024 13:19
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
12
84
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Проблема была успешно решена путем реализации подхода запуска обновлений через транзакцию, чтобы гарантировать откат в случае ошибки, и разделения обновлений пополам для изоляции и устранения проблемного элемента. Этот подход был предложен @Enigmativity, и он успешно решил проблему, как описано.


 public async Task<(bool isSuccess, List<InvoiceDb> failedItems)> UpsertItemsInBatchesAsync(List<InvoiceDb> items, int batchSize = 1000)
 {
     var failedItems = new List<InvoiceDb>();
     for (int i = 0; i < items.Count; i += batchSize)
     {
         var batch = items.Skip(i).Take(batchSize).ToList();
          
         var (batchSuccess, batchFailedItems) = await TryUpsertBatchAsync(batch);

         if (!batchSuccess)
         {
             failedItems.AddRange(batchFailedItems);
         }
     }

     return (failedItems.Count == 0, failedItems);
 }

 private async Task<(bool isSuccess, List<InvoiceDb> failedItems)> TryUpsertBatchAsync(List<InvoiceDb> batch)
 {
     using (var transaction = await _dbContext.Database.BeginTransactionAsync())
     {
         try
         {
             await _dbContext.BulkInsertOrUpdateAsync(batch);
             await transaction.CommitAsync();
             return (true, new List<InvoiceDb>());
         }
         catch
         {
             await transaction.RollbackAsync();
             return (false, await BisectItemsAsync(batch));
         }
     }
 }
 private async Task<List<InvoiceDb>> BisectItemsAsync(List<InvoiceDb> items)
 {
     var failedItems = new List<InvoiceDb>();

     if (items.Count <= 1)
     {
         // If we are down to one item, it's the problematic one
         return items;
     }

     var mid = items.Count / 2;
     var firstHalf = items.Take(mid).ToList();
     var secondHalf = items.Skip(mid).ToList();

     var failedFirstHalf = await UpsertItemsInBatchesAsync(firstHalf, 1);
     var failedSecondHalf = await UpsertItemsInBatchesAsync(secondHalf, 1);

     failedItems.AddRange(failedFirstHalf.failedItems);
     failedItems.AddRange(failedSecondHalf.failedItems);

     return failedItems;
 }

Если у вас есть лучшая реализация, я открыт для нее.

Можете ли вы опубликовать код?

Enigmativity 17.07.2024 13:35

@Enigmativity, я добавил код

Imane Sab 19.07.2024 15:41

Другие вопросы по теме