Я разработал пакетный писатель CSV. Но процесс кажется довольно медленным по сравнению с BCP. Единственное требование, которое у меня есть, — экспортировать большие таблицы без столбцов идентификации или первичного ключа в несколько файлов CSV небольшого размера и называть их соответствующим идентификатором пакета.
Проблема с BCP заключается в том, что он будет записывать только в один большой файл.
Что делает мой текущий процесс: Считывает данные и с помощью записи CSV записывает в поток памяти. Я постоянно проверяю, превышает ли поток памяти определенный размер пакета, тогда я копирую поток памяти асинхронно и записываю в текстовый файл.
Без исключений нехватки памяти я могу экспортировать пакет размером 250 МБ.
Но этот процесс занимает в 5 раз больше времени по сравнению с экспортом BCP.
Есть ли лучший способ добиться пакетного экспорта в CSV, чем то, что я делаю.
Пожалуйста, порекомендуйте.
Не проверял разделение файлов после экспорта. Но из того, что я читал, требуется много времени, чтобы разделить большие файлы на более мелкие файлы, так как он должен пройти через весь файл, чтобы разделить его.
+1 за mjwillis, но BCP совместим с пакетной обработкой — вам просто нужно запускать ее один раз для каждой партии. FETCH NEXT N ROWS
и OFFSET N ROWS
должны работать.
Разделение CSV по количеству записей — это задача, которую можно выполнить, используя IO как узкое место. Считыватель конечного автомата просто должен игнорировать CRLF в кавычках и разделять на количество CRLF без кавычек. Чтобы получить бонусные баллы, выполните массовое копирование и разделение на лету (избегая дополнительного места для записи и чтения и хранения полного файла, записываемого на диск).
Привет, Митч, мне понравилась идея с бонусным баллом. Можете ли вы уточнить это для меня, пожалуйста, я думал, что у меня такой же процесс, но при записи в файл после потоковой передачи данных определенного размера я сохраняю его в потоке памяти и асинхронно записываю в файл . Таким образом, у меня проблемы с памятью. Могу ли я обойтись без потока памяти и по-прежнему разделить поток на несколько частей?
Извлечение N строк и смещение n строк: не займет ли это много времени, если у нас нет первичного ключа или столбца идентификаторов в этой большой таблице размером 100 ГБ?
На ум приходит пара вариантов:
Если исходный запрос можно легко пакетировать в SQL Server (например, кластеризованный индекс, от которого вы можете отказаться), FETCH и OFFSET в основном бесплатны.
Если таблица представляет собой кучу, FETCH/OFFSET на самом деле не вариант, но вы можете подумать о добавлении кластеризованного индекса, поскольку не так уж много веских аргументов против этого (хотя для таблицы размером 100 ГБ это будет дорого:)
bcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 0 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch1.csv -S Server -U sa -P Password -w
bcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 20000 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch2.csv -S Server -U sa -P Password -w
bcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 40000 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch3.csv -S Server -U sa -P Password -w
bcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 60000 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch4.csv -S Server -U sa -P Password -w
По результатам измерений с использованием таблицы размером примерно 1,2 ГБ простая реализация C# CSV SQL Export (см. ниже) достигает 75 % производительности BCP на той же таблице и в той же системе. (Это также имеет то преимущество, что правильно обрабатывает формат CSV в отношении встроенных запятых, кавычек и CRLF).
static void Main(string[] args)
{
var con = new SqlConnection(@"Server=(local);Database=Demo;User Id=sa;Password=bar;");
con.Open();
var sqr = new SqlCommand("SELECT * FROM dbo.Table", con);
using (var reader = sqr.ExecuteReader())
using (var tw = File.CreateText("out.csv"))
{
while (reader.Read())
{
for (int i = 0; i < reader.FieldCount; i++)
{
if (i != 0)
{
tw.Write(',');
}
var val = FormatValue(reader[i]);
if (val == null)
{
// no-op
}
else if (val.IndexOfAny(new[] { '"', ',', '\r', '\n' }) >= 0)
{
tw.Write('"');
tw.Write(val.Replace("\"", "\"\""));
tw.Write('"');
}
else
{
tw.Write(val);
}
}
tw.Write("\r\n");
}
}
}
private static string FormatValue(object v)
{
if (v == null)
{
return null;
}
if (v is DateTime dt)
{
return dt.ToString("O");
}
if (v is DateTimeOffset dto)
{
return dto.ToString("O");
}
if (v is byte[] ba)
{
var sb = new StringBuilder(2 + ba.Length * 2);
sb.Append("0x");
for (int i = 0; i < ba.Length; i++)
{
sb.Append(ba[i].ToString("X2"));
}
return sb.ToString();
}
return v.ToString();
}
Производительность, по-видимому, ограничена тем, что GC обрабатывает так много распределений строк, поэтому, если требуется более высокая производительность, то же самое, переведенное на язык, отличный от CLR (например, C++), вероятно, будет соответствовать производительности BCP.
SSIS может выполнять все шаги в одном пакете. Точные шаги, вероятно, лучше оставить для другого вопроса, но в основном они сводятся к синтезу столбца для «Номера файла» и использованию назначения «Плоский файл». Плохой пример этого
Если вы используете SSIS (напрямую или с помощью мастера экспорта данных), вы получите CSV-файл, совместимый с RFC 4180, который можно разделить. Примером инструмента для разделения такого файла может быть:
class Program
{
static void Main(string[] args)
{
int n = 0;
using (var src = File.OpenRead("rfc4180_in.csv"))
using (var dst = new CsvRfc4180SplittingWriteStream(() => File.Create($"rfc4180_out{n++}.csv"), 100 /* mb per chunk */ * 1024 * 1024))
{
src.CopyTo(dst);
}
}
}
/// <summary>
/// Abstract class which uses ParseDataGetCutPoint to split the files into streams at least
/// cutAfterPosition bytes long.
/// </summary>
abstract class SplittingWriteStream : Stream
{
private long _TotalPosition;
private long CurrentStreamPos;
private readonly long CutAfterPosition;
private readonly Func<Stream> StreamCtor;
private Stream CurrentStream;
public SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
{
if (cutAfterPosition < 0L)
{
throw new ArgumentOutOfRangeException(nameof(cutAfterPosition));
}
this.CutAfterPosition = cutAfterPosition;
this.StreamCtor = createStream ?? throw new ArgumentNullException(nameof(createStream));
this.CurrentStream = createStream();
}
protected override void Dispose(bool disposing) => CurrentStream.Dispose();
public override void Flush() => CurrentStream.Flush();
public override void Write(byte[] buffer, int offset, int count)
{
// ignore count to always exceed cutAfterPosition
var cutPoint = ParseDataGetCutPoint(buffer, offset, count, getCutPoint: CurrentStreamPos > CutAfterPosition);
if (cutPoint < 0)
{
CurrentStream.Write(buffer, offset, count);
}
else
{
if (cutPoint > 0)
{
CurrentStream.Write(buffer, offset, cutPoint);
}
try
{
CurrentStream.Dispose();
}
finally
{
CurrentStream = null;
CurrentStreamPos = 0L;
CurrentStream = StreamCtor();
}
if (cutPoint != count)
{
CurrentStream.Write(buffer, offset + cutPoint, count - cutPoint);
}
}
CurrentStreamPos += count;
_TotalPosition += count;
}
protected abstract int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint);
#region Stream Write-only stubs
public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length => throw new NotSupportedException();
public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override long Position
{
get => _TotalPosition;
set => throw new NotSupportedException();
}
#endregion
}
class CsvRfc4180SplittingWriteStream : SplittingWriteStream
{
public CsvRfc4180SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
: base(createStream, cutAfterPosition)
{
}
bool inQuotedString;
bool lastWasQuote;
protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)
{
int? cutPoint = null;
for (int n = 0; n < count; n++)
{
var i = n + offset;
StepState(buffer[i]);
// check for CRLF if desired and not escaped
if (getCutPoint && !inQuotedString && cutPoint == null
&& buffer[i] == '\r' && n + 1 < count && buffer[i + 1] == '\n')
{
cutPoint = n;
}
}
return cutPoint ?? -1;
}
private void StepState(byte v)
{
var isQuote = v == '"';
if (lastWasQuote)
{
lastWasQuote = false;
if (isQuote)
{
// Double quotes:
// nop
// Inside quoted string == literal escape
// Outside quoted string == empty string
}
else
{
// quote with non-quote following == toggle quoted string
inQuotedString ^= true;
}
}
else
{
lastWasQuote = isQuote;
}
}
}
Если требуется BCP и его (плохая) обработка CSV допустима, он может записывать в именованный поток канала для разделения на лету.
class Program
{
static void Main(string[] args)
{
Thread copyThread;
var pipeId = $"bcp_{Guid.NewGuid():n}";
// bcp requires read/write pipe
using (var np = new NamedPipeServerStream(pipeId))
{
copyThread = new Thread(_1 =>
{
np.WaitForConnection();
int n = 0;
// Use CrlfUtf16leSplittingWriteStream with -w (UTF 16 Little Endian)
// Use CrlfUtf8SplittingWriteStream other (UTF 8 / ANSII / ASCII / OEM)
using (var dst = new CrlfUtf16leSplittingWriteStream(() => File.Create($"rfc4180_out{n++}.csv"), 100 /* mb per chunk */ * 1024 * 1024))
{
np.CopyTo(dst);
}
});
copyThread.Name = "Write thread";
copyThread.IsBackground = true;
copyThread.Start();
var bcp = Process.Start(
@"C:\Program Files\Microsoft SQL Server\Client SDK\ODBC\170\Tools\Binn\bcp.exe",
$@"FWDB.Rx.RxBatches out \\.\pipe\{pipeId} -S (local) -U sa -P abc -w -t,");
bcp.WaitForExit();
}
copyThread.Join();
}
}
class CrlfUtf16leSplittingWriteStream : SplittingWriteStream
{
public CrlfUtf16leSplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
: base(createStream, cutAfterPosition)
{
}
protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)
{
if (getCutPoint)
{
for (int n = 0; n < count - 3 /* CR 00 LF 00 */; n++)
{
var i = n + offset;
if (buffer[i] == '\r' && buffer[i + 1] == 0
&& buffer[i + 2] == '\n' && buffer[i + 3] == 0)
{
// split after CRLF
return n + 4;
}
}
}
return -1;
}
}
class CrlfUtf8SplittingWriteStream : SplittingWriteStream
{
public CrlfUtf8SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
: base(createStream, cutAfterPosition)
{
}
protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)
{
if (getCutPoint)
{
for (int n = 0; n < count - 1 /* CR LF */; n++)
{
var i = n + offset;
if (buffer[i] == '\r' && buffer[i + 1] == '\n')
{
// split after CRLF
return n + 2;
}
}
}
return -1;
}
}
/// <summary>
/// Abstract class which uses ParseDataGetCutPoint to split the files into streams at least
/// cutAfterPosition bytes long.
/// </summary>
abstract class SplittingWriteStream : Stream
{
private long _TotalPosition;
private long CurrentStreamPos;
private readonly long CutAfterPosition;
private readonly Func<Stream> StreamCtor;
private Stream CurrentStream;
public SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
{
if (cutAfterPosition < 0L)
{
throw new ArgumentOutOfRangeException(nameof(cutAfterPosition));
}
this.CutAfterPosition = cutAfterPosition;
this.StreamCtor = createStream ?? throw new ArgumentNullException(nameof(createStream));
this.CurrentStream = createStream();
}
protected override void Dispose(bool disposing) => CurrentStream.Dispose();
public override void Flush() => CurrentStream.Flush();
public override void Write(byte[] buffer, int offset, int count)
{
// ignore count to always exceed cutAfterPosition
var cutPoint = ParseDataGetCutPoint(buffer, offset, count, getCutPoint: CurrentStreamPos > CutAfterPosition);
if (cutPoint < 0)
{
CurrentStream.Write(buffer, offset, count);
}
else
{
if (cutPoint > 0)
{
CurrentStream.Write(buffer, offset, cutPoint);
}
try
{
CurrentStream.Dispose();
}
finally
{
CurrentStream = null;
CurrentStreamPos = 0L;
CurrentStream = StreamCtor();
}
if (cutPoint != count)
{
CurrentStream.Write(buffer, offset + cutPoint, count - cutPoint);
}
}
CurrentStreamPos += count;
_TotalPosition += count;
}
protected abstract int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint);
#region Stream Write-only stubs
public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length => throw new NotSupportedException();
public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override long Position
{
get => _TotalPosition;
set => throw new NotSupportedException();
}
#endregion
}
class CsvRfc4180SplittingWriteStream : SplittingWriteStream
{
public CsvRfc4180SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
: base(createStream, cutAfterPosition)
{
}
bool inQuotedString;
bool lastWasQuote;
protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)
{
int? cutPoint = null;
for (int n = 0; n < count; n++)
{
var i = n + offset;
StepState(buffer[i]);
// check for CRLF if desired and not escaped
if (getCutPoint && !inQuotedString && cutPoint == null
&& buffer[i] == '\r' && n + 1 < count && buffer[i + 1] == '\n')
{
cutPoint = n;
}
}
return cutPoint ?? -1;
}
private void StepState(byte v)
{
var isQuote = v == '"';
if (lastWasQuote)
{
lastWasQuote = false;
if (isQuote)
{
// Double quotes:
// nop
// Inside quoted string == literal escape
// Outside quoted string == empty string
}
else
{
// quote with non-quote following == toggle quoted string
inQuotedString ^= true;
}
}
else
{
lastWasQuote = isQuote;
}
}
}
Большое спасибо Этот ответ определенно будет полезен многим людям в будущем
Я не могу проголосовать за ответ, так как мои очки репутации меньше Еще раз спасибо!
The issue with bcp is that it will only write to a single big file.
Запись в один файл. Напишите отдельный процесс для разделения одного файла на несколько файлов меньшего размера. Я могу почти гарантировать, что он сотрет пол с точки зрения производительности с подходом, который вы рассматриваете.