С# экспортирует большие таблицы SQL Server в пакетах/фрагментах, используя DataReader и модуль записи CSV

Я разработал пакетный писатель CSV. Но процесс кажется довольно медленным по сравнению с BCP. Единственное требование, которое у меня есть, — экспортировать большие таблицы без столбцов идентификации или первичного ключа в несколько файлов CSV небольшого размера и называть их соответствующим идентификатором пакета.

Проблема с BCP заключается в том, что он будет записывать только в один большой файл.

Что делает мой текущий процесс: Считывает данные и с помощью записи CSV записывает в поток памяти. Я постоянно проверяю, превышает ли поток памяти определенный размер пакета, тогда я копирую поток памяти асинхронно и записываю в текстовый файл.

Без исключений нехватки памяти я могу экспортировать пакет размером 250 МБ.

Но этот процесс занимает в 5 раз больше времени по сравнению с экспортом BCP.

Есть ли лучший способ добиться пакетного экспорта в CSV, чем то, что я делаю.

Пожалуйста, порекомендуйте.

The issue with bcp is that it will only write to a single big file. Запись в один файл. Напишите отдельный процесс для разделения одного файла на несколько файлов меньшего размера. Я могу почти гарантировать, что он сотрет пол с точки зрения производительности с подходом, который вы рассматриваете.
mjwills 14.12.2020 06:04

Не проверял разделение файлов после экспорта. Но из того, что я читал, требуется много времени, чтобы разделить большие файлы на более мелкие файлы, так как он должен пройти через весь файл, чтобы разделить его.

sunny 14.12.2020 14:49

+1 за mjwillis, но BCP совместим с пакетной обработкой — вам просто нужно запускать ее один раз для каждой партии. FETCH NEXT N ROWS и OFFSET N ROWS должны работать.

Mitch 14.12.2020 14:52

Разделение CSV по количеству записей — это задача, которую можно выполнить, используя IO как узкое место. Считыватель конечного автомата просто должен игнорировать CRLF в кавычках и разделять на количество CRLF без кавычек. Чтобы получить бонусные баллы, выполните массовое копирование и разделение на лету (избегая дополнительного места для записи и чтения и хранения полного файла, записываемого на диск).

Mitch 14.12.2020 15:00

Привет, Митч, мне понравилась идея с бонусным баллом. Можете ли вы уточнить это для меня, пожалуйста, я думал, что у меня такой же процесс, но при записи в файл после потоковой передачи данных определенного размера я сохраняю его в потоке памяти и асинхронно записываю в файл . Таким образом, у меня проблемы с памятью. Могу ли я обойтись без потока памяти и по-прежнему разделить поток на несколько частей?

sunny 14.12.2020 15:40

Извлечение N строк и смещение n строк: не займет ли это много времени, если у нас нет первичного ключа или столбца идентификаторов в этой большой таблице размером 100 ГБ?

sunny 14.12.2020 15:43
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
3
6
1 809
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

На ум приходит пара вариантов:

Используйте FETCH / OFFSET

Если исходный запрос можно легко пакетировать в SQL Server (например, кластеризованный индекс, от которого вы можете отказаться), FETCH и OFFSET в основном бесплатны.

Если таблица представляет собой кучу, FETCH/OFFSET на самом деле не вариант, но вы можете подумать о добавлении кластеризованного индекса, поскольку не так уж много веских аргументов против этого (хотя для таблицы размером 100 ГБ это будет дорого:)

bcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 0 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch1.csv -S Server -U sa -P Password -w
bcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 20000 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch2.csv -S Server -U sa -P Password -w
bcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 40000 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch3.csv -S Server -U sa -P Password -w
bcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 60000 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch4.csv -S Server -U sa -P Password -w

Использовать SqlDataReader

По результатам измерений с использованием таблицы размером примерно 1,2 ГБ простая реализация C# CSV SQL Export (см. ниже) достигает 75 % производительности BCP на той же таблице и в той же системе. (Это также имеет то преимущество, что правильно обрабатывает формат CSV в отношении встроенных запятых, кавычек и CRLF).

static void Main(string[] args)
{
    var con = new SqlConnection(@"Server=(local);Database=Demo;User Id=sa;Password=bar;");
    con.Open();

    var sqr = new SqlCommand("SELECT * FROM dbo.Table", con);

    using (var reader = sqr.ExecuteReader())
    using (var tw = File.CreateText("out.csv"))
    {
        while (reader.Read())
        {
            for (int i = 0; i < reader.FieldCount; i++)
            {
                if (i != 0)
                {
                    tw.Write(',');
                }

                var val = FormatValue(reader[i]);
                if (val == null)
                {
                    // no-op
                }
                else if (val.IndexOfAny(new[] { '"', ',', '\r', '\n' }) >= 0)
                {
                    tw.Write('"');
                    tw.Write(val.Replace("\"", "\"\""));
                    tw.Write('"');
                }
                else
                {
                    tw.Write(val);
                }
            }
            tw.Write("\r\n");
        }
    }
}

private static string FormatValue(object v)
{
    if (v == null)
    {
        return null;
    }
    if (v is DateTime dt)
    {
        return dt.ToString("O");
    }
    if (v is DateTimeOffset dto)
    {
        return dto.ToString("O");
    }
    if (v is byte[] ba)
    {
        var sb = new StringBuilder(2 + ba.Length * 2);
        sb.Append("0x");
        for (int i = 0; i < ba.Length; i++)
        {
            sb.Append(ba[i].ToString("X2"));
        }
        return sb.ToString();
    }
    return v.ToString();
}

Производительность, по-видимому, ограничена тем, что GC обрабатывает так много распределений строк, поэтому, если требуется более высокая производительность, то же самое, переведенное на язык, отличный от CLR (например, C++), вероятно, будет соответствовать производительности BCP.

Использовать службы SSIS

SSIS может выполнять все шаги в одном пакете. Точные шаги, вероятно, лучше оставить для другого вопроса, но в основном они сводятся к синтезу столбца для «Номера файла» и использованию назначения «Плоский файл». Плохой пример этого

Используйте SSIS для создания большого CSV-файла, а затем разделите его.

Если вы используете SSIS (напрямую или с помощью мастера экспорта данных), вы получите CSV-файл, совместимый с RFC 4180, который можно разделить. Примером инструмента для разделения такого файла может быть:

class Program
{
    static void Main(string[] args)
    {
        int n = 0;
        using (var src = File.OpenRead("rfc4180_in.csv"))
        using (var dst = new CsvRfc4180SplittingWriteStream(() => File.Create($"rfc4180_out{n++}.csv"), 100 /* mb per chunk */ * 1024 * 1024))
        {
            src.CopyTo(dst);
        }
    }
}

/// <summary>
/// Abstract class which uses ParseDataGetCutPoint to split the files into streams at least 
/// cutAfterPosition bytes long.
/// </summary>
abstract class SplittingWriteStream : Stream
{
    private long _TotalPosition;
    private long CurrentStreamPos;
    private readonly long CutAfterPosition;
    private readonly Func<Stream> StreamCtor;
    private Stream CurrentStream;

    public SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
    {
        if (cutAfterPosition < 0L)
        {
            throw new ArgumentOutOfRangeException(nameof(cutAfterPosition));
        }
        this.CutAfterPosition = cutAfterPosition;

        this.StreamCtor = createStream ?? throw new ArgumentNullException(nameof(createStream));
        this.CurrentStream = createStream();
    }

    protected override void Dispose(bool disposing) => CurrentStream.Dispose();

    public override void Flush() => CurrentStream.Flush();

    public override void Write(byte[] buffer, int offset, int count)
    {
        // ignore count to always exceed cutAfterPosition
        var cutPoint = ParseDataGetCutPoint(buffer, offset, count, getCutPoint: CurrentStreamPos > CutAfterPosition);
        if (cutPoint < 0)
        {
            CurrentStream.Write(buffer, offset, count);
        }
        else
        {
            if (cutPoint > 0)
            {
                CurrentStream.Write(buffer, offset, cutPoint);
            }

            try
            {
                CurrentStream.Dispose();
            }
            finally
            {
                CurrentStream = null;
                CurrentStreamPos = 0L;
                CurrentStream = StreamCtor();
            }

            if (cutPoint != count)
            {
                CurrentStream.Write(buffer, offset + cutPoint, count - cutPoint);
            }
        }

        CurrentStreamPos += count;
        _TotalPosition += count;
    }

    protected abstract int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint);

    #region Stream Write-only stubs

    public override bool CanRead => false;
    public override bool CanSeek => false;
    public override bool CanWrite => true;
    public override long Length => throw new NotSupportedException();
    public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
    public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
    public override void SetLength(long value) => throw new NotSupportedException();

    public override long Position
    {
        get => _TotalPosition;
        set => throw new NotSupportedException();
    }

    #endregion
}

class CsvRfc4180SplittingWriteStream : SplittingWriteStream
{
    public CsvRfc4180SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
        : base(createStream, cutAfterPosition)
    {
    }

    bool inQuotedString;
    bool lastWasQuote;
    protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)
    {
        int? cutPoint = null;
        for (int n = 0; n < count; n++)
        {
            var i = n + offset;
            StepState(buffer[i]);

            // check for CRLF if desired and not escaped
            if (getCutPoint && !inQuotedString && cutPoint == null
                && buffer[i] == '\r' && n + 1 < count && buffer[i + 1] == '\n')
            {
                cutPoint = n;
            }
        }

        return cutPoint ?? -1;
    }

    private void StepState(byte v)
    {
        var isQuote = v == '"';
        if (lastWasQuote)
        {
            lastWasQuote = false;

            if (isQuote)
            {
                // Double quotes:
                //  nop
                //  Inside quoted string == literal escape
                //  Outside quoted string == empty string
            }
            else
            {
                // quote with non-quote following == toggle quoted string
                inQuotedString ^= true;
            }
        }
        else
        {
            lastWasQuote = isQuote;
        }
    }
}

Используйте BCP, затем разделяйте на лету

Если требуется BCP и его (плохая) обработка CSV допустима, он может записывать в именованный поток канала для разделения на лету.

class Program
{
    static void Main(string[] args)
    {
        Thread copyThread;
        var pipeId = $"bcp_{Guid.NewGuid():n}";
        // bcp requires read/write pipe
        using (var np = new NamedPipeServerStream(pipeId))
        {
            copyThread = new Thread(_1 =>
            {
                np.WaitForConnection();
                int n = 0;
                // Use CrlfUtf16leSplittingWriteStream with -w (UTF 16 Little Endian)
                // Use CrlfUtf8SplittingWriteStream other (UTF 8 / ANSII / ASCII / OEM)
                using (var dst = new CrlfUtf16leSplittingWriteStream(() => File.Create($"rfc4180_out{n++}.csv"), 100 /* mb per chunk */ * 1024 * 1024))
                {
                    np.CopyTo(dst);
                }
            });
            copyThread.Name = "Write thread";
            copyThread.IsBackground = true;
            copyThread.Start();

            var bcp = Process.Start(
                @"C:\Program Files\Microsoft SQL Server\Client SDK\ODBC\170\Tools\Binn\bcp.exe",
                $@"FWDB.Rx.RxBatches out \\.\pipe\{pipeId} -S (local) -U sa -P abc -w -t,");
            bcp.WaitForExit();
        }
        copyThread.Join();
    }
}

class CrlfUtf16leSplittingWriteStream : SplittingWriteStream
{
    public CrlfUtf16leSplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
        : base(createStream, cutAfterPosition)
    {
    }

    protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)
    {
        if (getCutPoint)
        {
            for (int n = 0; n < count - 3 /* CR 00 LF 00 */; n++)
            {
                var i = n + offset;
                if (buffer[i] == '\r' && buffer[i + 1] == 0
                    && buffer[i + 2] == '\n' && buffer[i + 3] == 0)
                {
                    // split after CRLF
                    return n + 4;
                }
            }
        }

        return -1;
    }
}

class CrlfUtf8SplittingWriteStream : SplittingWriteStream
{
    public CrlfUtf8SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
        : base(createStream, cutAfterPosition)
    {
    }

    protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)
    {
        if (getCutPoint)
        {
            for (int n = 0; n < count - 1 /* CR LF */; n++)
            {
                var i = n + offset;
                if (buffer[i] == '\r' && buffer[i + 1] == '\n')
                {
                    // split after CRLF
                    return n + 2;
                }
            }
        }

        return -1;
    }
}

/// <summary>
/// Abstract class which uses ParseDataGetCutPoint to split the files into streams at least 
/// cutAfterPosition bytes long.
/// </summary>
abstract class SplittingWriteStream : Stream
{
    private long _TotalPosition;
    private long CurrentStreamPos;
    private readonly long CutAfterPosition;
    private readonly Func<Stream> StreamCtor;
    private Stream CurrentStream;

    public SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
    {
        if (cutAfterPosition < 0L)
        {
            throw new ArgumentOutOfRangeException(nameof(cutAfterPosition));
        }
        this.CutAfterPosition = cutAfterPosition;

        this.StreamCtor = createStream ?? throw new ArgumentNullException(nameof(createStream));
        this.CurrentStream = createStream();
    }

    protected override void Dispose(bool disposing) => CurrentStream.Dispose();

    public override void Flush() => CurrentStream.Flush();

    public override void Write(byte[] buffer, int offset, int count)
    {
        // ignore count to always exceed cutAfterPosition
        var cutPoint = ParseDataGetCutPoint(buffer, offset, count, getCutPoint: CurrentStreamPos > CutAfterPosition);
        if (cutPoint < 0)
        {
            CurrentStream.Write(buffer, offset, count);
        }
        else
        {
            if (cutPoint > 0)
            {
                CurrentStream.Write(buffer, offset, cutPoint);
            }

            try
            {
                CurrentStream.Dispose();
            }
            finally
            {
                CurrentStream = null;
                CurrentStreamPos = 0L;
                CurrentStream = StreamCtor();
            }

            if (cutPoint != count)
            {
                CurrentStream.Write(buffer, offset + cutPoint, count - cutPoint);
            }
        }

        CurrentStreamPos += count;
        _TotalPosition += count;
    }

    protected abstract int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint);

    #region Stream Write-only stubs

    public override bool CanRead => false;
    public override bool CanSeek => false;
    public override bool CanWrite => true;
    public override long Length => throw new NotSupportedException();
    public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
    public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
    public override void SetLength(long value) => throw new NotSupportedException();

    public override long Position
    {
        get => _TotalPosition;
        set => throw new NotSupportedException();
    }

    #endregion
}

class CsvRfc4180SplittingWriteStream : SplittingWriteStream
{
    public CsvRfc4180SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
        : base(createStream, cutAfterPosition)
    {
    }

    bool inQuotedString;
    bool lastWasQuote;
    protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)
    {
        int? cutPoint = null;
        for (int n = 0; n < count; n++)
        {
            var i = n + offset;
            StepState(buffer[i]);

            // check for CRLF if desired and not escaped
            if (getCutPoint && !inQuotedString && cutPoint == null
                && buffer[i] == '\r' && n + 1 < count && buffer[i + 1] == '\n')
            {
                cutPoint = n;
            }
        }

        return cutPoint ?? -1;
    }

    private void StepState(byte v)
    {
        var isQuote = v == '"';
        if (lastWasQuote)
        {
            lastWasQuote = false;

            if (isQuote)
            {
                // Double quotes:
                //  nop
                //  Inside quoted string == literal escape
                //  Outside quoted string == empty string
            }
            else
            {
                // quote with non-quote following == toggle quoted string
                inQuotedString ^= true;
            }
        }
        else
        {
            lastWasQuote = isQuote;
        }
    }
}

Большое спасибо Этот ответ определенно будет полезен многим людям в будущем

sunny 15.12.2020 04:43

Я не могу проголосовать за ответ, так как мои очки репутации меньше Еще раз спасибо!

sunny 15.12.2020 04:44

Другие вопросы по теме