Upsert огромный объем данных с помощью EFCore.BulkExtensions

Я использую функцию расширения ниже для Upsert данных с помощью EF Core с помощью EFCore.BulkExtensions, но проблема в том, что выполнение этой функции, когда я пытался вставить 2 миллиона записей, занимает около 17 минут, и в последнее время выдается это исключение.

Не удалось выделить место для объекта «временное хранилище запуска dbo.SORT: 140737501921280» в базе данных «tempdb», поскольку файловая группа «PRIMARY» заполнена. Освободите дисковое пространство, удалив ненужные файлы, удалив объекты в файловой группе, добавив дополнительные файлы в файловую группу или включив автоматическое увеличение для существующих файлов в файловой группе.\r\nЖурнал транзакций для базы данных tempdb заполнен из-за "ACTIVE_TRANSACTION" и задержка LSN равна (41:136:347)

Я вижу, что объем памяти для раздела «C» уменьшается, когда я выполнил эту функцию:

Когда я перезапускаю SQL Server, свободное пространство становится около 30 ГБ. Я пытался использовать многопоточность (параллельную) со вставкой без заметного изменения времени, так что вы порекомендуете или есть ли какие-либо проблемы в коде, показанном здесь?

Примечание: цикл for не занимает слишком много времени, даже если он содержит 2 миллиона записей.

public static async Task<OperationResultDto> AddOrUpdateBulkByTransactionAsync<TEntity>(this DbContext _myDatabaseContext, List<TEntity> data) where TEntity : class
{
    using (var transaction = await _myDatabaseContext.Database.BeginTransactionAsync())
    {
        try
        {
            _myDatabaseContext.Database.SetCommandTimeout(0);

            var currentTime = DateTime.Now;

            // Disable change tracking
            _myDatabaseContext.ChangeTracker.AutoDetectChangesEnabled = false;

            // Set CreatedDate and UpdatedDate for each entity
            foreach (var entity in data)
            {
                var createdDateProperty = entity.GetType().GetProperty("CreatedDate");

                if (createdDateProperty != null && (createdDateProperty.GetValue(entity) == null || createdDateProperty.GetValue(entity).Equals(DateTime.MinValue)))
                {
                    // Set CreatedDate only if it's not already set
                    createdDateProperty.SetValue(entity, currentTime);
                }

                var updatedDateProperty = entity.GetType().GetProperty("UpdatedDate");

                if (updatedDateProperty != null)
                {
                    updatedDateProperty.SetValue(entity, currentTime);
                }
            }

            // Bulk insert or update
            var updateByProperties = GetUpdateByProperties<TEntity>();

            var bulkConfig = new BulkConfig()
            {
                UpdateByProperties = updateByProperties,
                CalculateStats = true,
                SetOutputIdentity = false
            };

            // Batch size for processing
            int batchSize = 50000;

            for (int i = 0; i < data.Count; i += batchSize)
            {
                var batch = data.Skip(i).Take(batchSize).ToList();
                await _myDatabaseContext.BulkInsertOrUpdateAsync(batch, bulkConfig);
            }

            // Commit the transaction if everything succeeds
            await transaction.CommitAsync();

            return new OperationResultDto
            {
                OperationResult = bulkConfig.StatsInfo
            };
        }
        catch (Exception ex)
        {
            // Handle exceptions and roll back the transaction if something goes wrong
            transaction.Rollback();
            return new OperationResultDto
            {
                Error = new ErrorDto
                {
                    Details = ex.Message + ex.InnerException?.Message
                }
            };
        }
        finally
        {
            // Re-enable change tracking
            _myDatabaseContext.ChangeTracker.AutoDetectChangesEnabled = true;
        }
    }
}

Это из-за длительной транзакции. Файл журнала транзакций стал очень большим. Попробуйте сделать это без транзакции. Оно должно работать быстрее.

Svyatoslav Danyliv 31.03.2024 15:31

@SvyatlavDanyliv, и как я могу выполнить откат, если в процессе установки возникнут какие-либо проблемы?

Abdulaziz Burghal 31.03.2024 16:48

Я не знаю природу ваших данных. Если вы хотите простой возврат — увеличьте размер диска. Или придумать что-то идемпотентное, какие-то флаги в записях, которые еще не подтверждены. В любом случае для 2 миллионов записей любая операция в транзакции требует огромного пространства для хранения (DELETE, UPDATE, INSERT). SQL Server создает копию записей для отката.

Svyatoslav Danyliv 31.03.2024 16:57

@SvyatlavDanyliv Я удалил транзакцию, но раздел C все еще сильно заполняется.

Abdulaziz Burghal 31.03.2024 17:15

Где-то может быть внешняя транзакция. Используйте CalculateStats = false и не пакетируйте его вручную, вместо этого установите размер пакета в конфигурации.

Charlieface 31.03.2024 17:50

Это анализ потребностей: если у вас есть партия из 50 000 записей, эти записи копируются в tempdb, а затем с помощью MERGE они применяются к реальной таблице.

Svyatoslav Danyliv 31.03.2024 18:20

Диск емкостью 1 ТБ стоит около 200 долларов, что соответствует 2 часам оплачиваемого времени, не тратьте время на код, прежде чем покупать приличный диск.

siggemannen 01.04.2024 10:30

@Charlieface Спасибо, удаление ручной пакетной обработки увеличивает время выполнения запросов и устраняет исключения без необходимости использовать CalculateStats = false

Abdulaziz Burghal 02.04.2024 10:51

Вы изначально используете неправильный инструмент. ORM НЕ предназначены для заданий ETL. У самого EF нет BulkInsertOrUpdateAsync. Никакого «массового обновления» не существует. Стороннее расширение, которое вы использовали, не выполняет то, что вы предполагаете. Он вообще не работает с объектами, он делает то же, что и правильный ETL-скрипт: сохраняет данные во временную/промежуточную таблицу, а затем выполняет UPDATE target FROM source INNER JOIN Target, за которым следует INSERT .. FROM source LEFT JOIN Target where Target.ID is null.

Panagiotis Kanavos 02.04.2024 12:48

Это означает, что вы все еще используете диск, но у вас нет возможности контролировать или оптимизировать его. Исходные данные хранятся в tempdb. Вы не можете создавать индексы, не можете использовать сжатие, не можете контролировать поведение журналов, не можете использовать переключение разделов. Невозможно использовать ни один из методов, используемых для загрузки данных.

Panagiotis Kanavos 02.04.2024 12:50

@AbdulazizBurghal, откуда взялись эти 2 миллиона строк? То, что вы пытаетесь сделать, на 1000% неправильно: сначала использовать ORM, любой ORM, для заданий передачи данных, а затем использовать что-то под названием BulkInsertOrUpdateAsync для автоматического выполнения работы. Встроенный BULK INSERT будет гораздо более быстрым способом импортировать, например, CSV или текстовый файл в промежуточную таблицу.

Panagiotis Kanavos 02.04.2024 12:55

@PanagiotisKanavos Использование анализатора CSV на C# имеет ряд преимуществ: файл является локальным для клиента, а не для сервера, CSV можно анализировать или преобразовывать различными способами. BULK INSERT не имеет большого преимущества по производительности по сравнению с SqlBulkCopy (кроме, очевидно, задержки в сети). Всегда можно использовать BulkExtensions для вставки в пользовательскую индексированную таблицу и последующего выполнения MERGE. Обратите внимание, что первичный ключ в любом случае добавляется, поэтому это должно решить большинство проблем.

Charlieface 02.04.2024 13:03

Вы можете использовать CsvDataReader CsvHelper и передать его непосредственно в SqlBulkCopy. «Расширения» только добавляют к этому накладных расходов. Что касается MERGE, то это чистое зло с кучей ошибок. Задания ETL обычно используют UPDATE, за которым следует INSERT.

Panagiotis Kanavos 02.04.2024 13:05
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
13
380
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Похоже, что ручное пакетирование требует гораздо больших затрат на начальном этапе. Кроме того, вы используете CalculateStats = true, что означает, что при каждом прохождении статистика будет бессмысленно пересчитываться.

SqlBulkCopy, который BulkExtensions использует под капотом, имеет встроенную пакетную обработку, и вы можете использовать ее в массовой конфигурации. Вам не нужно собирать его самостоятельно. И затем в конце он рассчитает статистику (или вы можете это отключить).

Более того, явная транзакция означает, что журнал транзакций базы данных, а также любые рабочие таблицы и версии строк в базе данных tempdb не могут быть очищены до тех пор, пока они не будут зафиксированы. Возможно, вам захочется выполнить чистый откат, но за это придется заплатить. А если вам это действительно нужно, просто установите размер пакета в конфигурации равным 0, и вы получите одну большую внутреннюю транзакцию.

Итак, все, что вам нужно, это:

var bulkConfig = new BulkConfig()
{
    UpdateByProperties = updateByProperties,
    CalculateStats = true,
    SetOutputIdentity = false,
    BatchSize = 50000
};
await _myDatabaseContext.BulkInsertOrUpdateAsync(data, bulkConfig);

Также рассмотрите возможность использования BulkInsertAsync вместо BulkInsertOrUpdateAsync, поскольку это позволяет вставлять данные непосредственно в таблицу, а не через временную таблицу и MERGE.

Удаление цикла for и включение размера пакета в конфигурацию решает мою проблему (относительно времени и различных исключений). @Charlieface, вы можете добавить в ответ фрагмент ниже: var BulkConfig = new BulkConfig() { UpdateByProperties = updateByProperties, CalculateStats = true, SetOutputIdentity = false, BatchSize=50000 }; ждут _myDatabaseContext.BulkInsertOrUpdateAsync (данные, BulkConfig);

Abdulaziz Burghal 02.04.2024 12:51

@AbdulazizBurghal упоминание того, как вы использовали это стороннее расширение, не очень полезно. Люди, которых волнует загрузка данных, просто не используют ее. Они также не используют ORM, такие как EF Core. Откуда взялись эти 2 миллиона строк?

Panagiotis Kanavos 02.04.2024 12:53

@PanagiotisKanavos Я не совсем понял, о чем ты говоришь?

Abdulaziz Burghal 02.04.2024 12:56

@PanagiotisKanavos BulkExtensions работает довольно быстро, внутри него используется SqlBulkCopy.

Charlieface 02.04.2024 12:57

@Charlieface, но затем он записывает в неиндексированные, неоптимизированные таблицы в tempdb (очень загруженная база данных) вместо, например, отдельной файловой группы, используемой только для промежуточного хранения. Когда придет время выполнить часть UPDATE, запросу придется выполнить полное сканирование и заблокировать цель на долгое время. Он ничего не знает о данных, что означает, что он не может использовать, например, значения столбца идентификатора или даты, чтобы уменьшить необходимую работу. SqlBulkCopy не лучше, чем, например, BULK INSERT. Самое сложное будет потом

Panagiotis Kanavos 02.04.2024 13:04

@Charlieface Обратите внимание, но потребление процессора все равно очень высокое, есть какие-нибудь предложения?

Abdulaziz Burghal 02.04.2024 13:35

Процессор на сервере или клиенте? На сервере, вероятно, это так, как упоминал Панагиотос, поэтому используйте специальную временную таблицу с первичным ключом.

Charlieface 02.04.2024 14:09

@Charlieface на сервере и его использование из пространства раздела C. Как я могу использовать пользовательскую временную таблицу с efcore.bulkextensions?

Abdulaziz Burghal 02.04.2024 14:33

Не знаете, что вы спрашиваете: процессор или дисковое пространство? Дисковое пространство, которого вы не можете избежать, поскольку вставляете все это во временную таблицу. Возможно, потребуется масштабная сортировка, которую можно решить с помощью специальной таблицы. Я думаю, вам нужно будет сделать это отдельными шагами: создать временную таблицу вручную, ввести в нее BulkInsertOptimized, затем выполнить BulkInsertOrUpdateAsync с CustomSourceTableName.

Charlieface 02.04.2024 15:03

@Charlieface Как я могу очистить базу данных tempdb по этому пути «C:\Program Files\Microsoft SQL Server\MSSQL14.MSSQLSERVER\MSSQL\DATA» после выполнения массовой операции добавления с ядром .net? tempdb tempdb_mssql_2 tempdb_mssql_3 tempdb_mssql_4, потому что это занимает слишком много места, и после выполнения билка я вижу, что SQL Server занимает 50% оперативной памяти.

Abdulaziz Burghal 03.04.2024 14:25

Другие вопросы по теме