Я хотел бы перекачивать данные из потока в CSV-файл, сохраняемый в контейнере больших двоичных объектов Azure ADLS Gen2. Кодировка должна быть UTF-16-LE, чтобы ее можно было правильно открыть в Excel. Если я использую UTF8 без BOM, мой код работает правильно, но в случае UTF-16 получается беспорядок. Я использую модуль StringIO и csv для преобразования входящих данных в строки в формате CSV. Данные, которые я загружаю, могут иметь не более 1 млн строк (для совместимости с Excel).
Я также пробовал blobclient.append_block(), который добавлял спецификацию в начале каждого фрагмента в файле CSV, поэтому, скажем, в каждой 1000-й строке была спецификация.
Вы знаете, как прочитать список списков из потока и загрузить его в Azure Blob в виде CSV-файла, совместимого с Excel? (Я обнаружил, что Excel правильно обрабатывает только UTF16 для данных, которые у меня есть.)
Версия большого двоичного объекта Azure: "12.14.1"
Результат выглядит так с UTF-16: (UTF-16-LE добавляет нулевые порядковые номера, которые плохо читаются в notepad++, открытие в Excel еще хуже)
"string";"integer";"float"2.";2;3.0
"4.";4;6.0
"6.";6;9.0
*⋾㠀⸀∀㬀㠀㬀㈀⸀ ഀ⋾ ⸀∀㬀 㬀㔀⸀ ഀ⋾㈀⸀∀㬀㈀㬀㠀⸀ ഀ⋾㐀⸀∀㬀㐀㬀㈀⸀ ഀ⋾㘀⸀∀㬀㘀㬀㈀㐀⸀ ഀ⋾㠀⸀∀㬀㠀㬀㈀㜀⸀ ഀ⋾㈀ ⸀∀㬀㈀ 㬀㌀ ⸀ ഀ⋾㈀㈀⸀∀㬀㈀㈀㬀㌀㌀⸀ ഀ⋾㈀㐀⸀∀㬀㈀㐀㬀㌀㘀⸀ ഀ⋾㈀㘀⸀∀㬀㈀㘀㬀㌀㤀⸀ ഀ⋾㈀㠀⸀∀㬀㈀㠀㬀㐀㈀⸀ ഀ⋾㌀ ⸀∀㬀㌀ 㬀㐀㔀⸀ ഀ⋾㌀㈀⸀∀㬀㌀㈀㬀㐀㠀⸀ ഀ⋾㌀㐀⸀∀㬀㌀㐀㬀㔀⸀ ഀ⋾㌀㘀⸀∀㬀㌀㘀㬀㔀㐀⸀ ഀ⋾㌀㠀⸀∀㬀㌀㠀㬀㔀㜀⸀ ഀ⋾㐀 ⸀∀㬀㐀 㬀㘀 ⸀ ഀ⋾㐀㈀⸀∀㬀㐀㈀㬀㘀㌀⸀ ഀ⋾㐀㐀⸀∀㬀㐀㐀㬀㘀㘀⸀ ഀ⋾㐀㘀⸀∀㬀㐀㘀㬀㘀㤀⸀ ഀ⋾㐀㠀⸀∀㬀㐀㠀㬀㜀㈀⸀ ഀ⋾㔀 ⸀∀㬀㔀 㬀㜀㔀⸀ ഀ⋾㔀㈀⸀∀㬀㔀㈀㬀㜀㠀⸀ ഀ⋾㔀㐀⸀∀㬀㔀㐀㬀㠀⸀ ഀ⋾㔀㘀⸀∀㬀㔀㘀㬀㠀㐀⸀ ഀ⋾㔀㠀⸀∀㬀㔀㠀㬀㠀㜀⸀ ഀ⋾㘀 ⸀∀㬀㘀 㬀㤀 ⸀ ഀ⋾㘀㈀⸀∀㬀㘀㈀㬀㤀㌀⸀ ഀ⋾㘀㐀⸀∀㬀㘀㐀㬀㤀㘀⸀ ഀ⋾㘀㘀⸀∀㬀㘀㘀㬀㤀㤀⸀ ഀ"68.";68;102.0
"70.";70;105.0
"72.";72;108.0
"74.";74;111.0
"76.";76;114.0
"78.";78;117.0
"80.";80;120.0
"82.";82;123.0
"84.";84;126.0
"86.";86;129.0
"88.";88;132.0
"90.";90;135.0
"92.";92;138.0
"94.";94;141.0
"96.";96;144.0
"98.";98;147.0
Код ссылки, который я изменил, взят с учебного сайта MS
import csv
from io import StringIO
from uuid import uuid4
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient, BlobClient, ContentSettings, BlobBlock #, ContainerClient
class MyDialect(csv.Dialect):
""" CSV dialect compatible with Synapse DP SQL COPY INTO command """
delimiter = ';'
doublequote = True
lineterminator = '\r\n'
quotechar = '"'
quoting = csv.QUOTE_NONNUMERIC
escapechar = "/"
class FakeReader:
def __init__(self, chunk_size = 2):
self.chunk_size = 2
self.data_set = [[str(i)+".", i, i * 1.5] for i in range(100)]
def header(self):
return [["string", "integer", "float"]]
def readChunk(self):
ix = 0
while True:
data = self.data_set[ix : ix + self.chunk_size]
ix += self.chunk_size
yield data
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_tb):
print("closing the input stream...")
def getBloblClient(account_name, account_key, container_name,
max_chunk_get_size = 4 * 1024 * 1024, max_single_get_size = 4 * 1024 * 1024,
file_relative_path = "data.csv"):
blob_url = f"https://{account_name}.blob.core.windows.net"
blob_service_client = BlobServiceClient(blob_url,
credential = account_key,
max_chunk_get_size = max_chunk_get_size,
max_single_get_size = max_single_get_size
)
return blob_service_client.get_blob_client(container = container_name, blob = file_relative_path)
def upload_blocks(blob_client, chunk_size : int = 2, block_size: int = 1024*1024, encoding = "UTF-8"):
csv.register_dialect("mydialect", MyDialect)
# memory csv stream as data pipe between
stream = io.StringIO()
csv_writer = csv.writer(stream, dialect = "mydialect")
record_cnt = 0
bytes_moved = 0
block_id_list = []
first_chunk = True
with FakeReader(chunk_size = 2) as reader:
for chunk in reader.readChunk():
record_cnt += len(chunk)
if first_chunk:
chunk = reader.header() + chunk
first_chunk = False
# convert the records into CSV rows
stream.seek(0)
stream.truncate()
csv_writer.writerows(chunk)
# append the csv rows from the stream object to blob storage csv file
stream.seek(0)
buffer = stream.read(block_size)
if not buffer:
break
bytes_moved += len(buffer)
# append to blob stage
block_id = uuid4().hex
block_id_list.append(BlobBlock(block_id = block_id))
blob_client.stage_block(block_id = block_id,
data = buffer,
length = len(buffer),
encoding = encoding)
blob_client.commit_block_list(block_id_list)
return bytes_moved, record_cnt
account_name = "account-name"
container_name = "container-name"
account_key = "your-account-key"
blob_client = getBloblClient(account_name, account_key, container_name)
upload_blocks(blob_client, encoding = "UTF-16-LE")






Результат выглядит следующим образом с UTF-16: (UTF-16-LE добавляет нулевые порядковые номера, которые плохо читаются в notepad++, открытие в Excel еще хуже)
Вы используете в коде диалект CSV, совместимый с кодировкой UTF-8. Но вы должны специально управлять кодировкой, потому что вы хотите отправить данные как UTF-16-LE. Перед загрузкой данных CSV в Azure Blob Storage вы можете закодировать их как UTF-16-LE, используя функцию кодирования Python.
Двухбайтовые символы включены в UTF-16 data, и необходимо принять дополнительные меры для обеспечения подходящей кодировки и порядка следования байтов.
Вы можете использовать модифицированный ниже код:
Код:
import csv
from io import StringIO
import io
from uuid import uuid4
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient, BlobClient, ContentSettings, BlobBlock #, ContainerClient
class MyDialect(csv.Dialect):
""" CSV dialect compatible with Synapse DP SQL COPY INTO command """
delimiter = ';'
doublequote = True
lineterminator = '\r\n'
quotechar = '"'
quoting = csv.QUOTE_NONNUMERIC
escapechar = "/"
class FakeReader:
def __init__(self, chunk_size = 2):
self.chunk_size = 2
self.data_set = [[str(i)+".", i, i * 1.5] for i in range(100)]
def header(self):
return [["string", "integer", "float"]]
def readChunk(self):
ix = 0
while True:
data = self.data_set[ix : ix + self.chunk_size]
ix += self.chunk_size
yield data
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_tb):
print("closing the input stream...")
def getBloblClient(account_name, account_key, container_name,
max_chunk_get_size = 4 * 1024 * 1024, max_single_get_size = 4 * 1024 * 1024,
file_relative_path = "data.csv"):
blob_url = f"https://{account_name}.blob.core.windows.net"
blob_service_client = BlobServiceClient(blob_url,
credential = account_key,
max_chunk_get_size = max_chunk_get_size,
max_single_get_size = max_single_get_size
)
return blob_service_client.get_blob_client(container = container_name, blob = file_relative_path)
def upload_blocks(blob_client, chunk_size : int = 2, block_size: int = 1024*1024):
csv.register_dialect("mydialect", MyDialect)
# memory csv stream as data pipe between
stream = io.StringIO()
csv_writer = csv.writer(stream, dialect = "mydialect")
record_cnt = 0
bytes_moved = 0
block_id_list = []
first_chunk = True
with FakeReader(chunk_size=2) as reader:
for chunk in reader.readChunk():
record_cnt += len(chunk)
if first_chunk:
chunk = reader.header() + chunk
first_chunk = False
# convert the records into CSV rows
stream.seek(0)
stream.truncate()
csv_writer.writerows(chunk)
# Append the csv rows from the stream object to blob storage csv file
stream.seek(0)
buffer = stream.read(block_size)
if not buffer:
break
bytes_moved += len(buffer)
# Encode the data as UTF-16-LE
utf16_buffer = buffer.encode("utf-16le")
# Append to blob storage
block_id = uuid4().hex
block_id_list.append(BlobBlock(block_id=block_id))
blob_client.stage_block(block_id=block_id, data=utf16_buffer, length=len(utf16_buffer))
blob_client.commit_block_list(block_id_list)
return bytes_moved, record_cnt
account_name = "xxxxx"
container_name = "xxxxx"
account_key = "xxxxxx"
blob_client = getBloblClient(account_name, account_key, container_name)
upload_blocks(blob_client, encoding = "UTF-16-LE")
Выход:

Я добавил LE BOM в первый utf16_buffer, и это сработало! :-) if first_chunk: utf16_buffer = b"\xff\xfe" + utf16_buffer
Спасибо! Я мог создать файл, но, к сожалению, в нем не было встроенной информации о кодировке. Пытался изменить кодировку на utf-16-le или просто utf-16, но результат тот же. Могу я попросить вас попробовать добавить "autosúčiastky" как фейковое поле во входные данные и проверить результат?