Проблема Python Azure BloblClient UTF-16

Я хотел бы перекачивать данные из потока в CSV-файл, сохраняемый в контейнере больших двоичных объектов Azure ADLS Gen2. Кодировка должна быть UTF-16-LE, чтобы ее можно было правильно открыть в Excel. Если я использую UTF8 без BOM, мой код работает правильно, но в случае UTF-16 получается беспорядок. Я использую модуль StringIO и csv для преобразования входящих данных в строки в формате CSV. Данные, которые я загружаю, могут иметь не более 1 млн строк (для совместимости с Excel).

Я также пробовал blobclient.append_block(), который добавлял спецификацию в начале каждого фрагмента в файле CSV, поэтому, скажем, в каждой 1000-й строке была спецификация.

Вы знаете, как прочитать список списков из потока и загрузить его в Azure Blob в виде CSV-файла, совместимого с Excel? (Я обнаружил, что Excel правильно обрабатывает только UTF16 для данных, которые у меня есть.)

Версия большого двоичного объекта Azure: "12.14.1"

Результат выглядит так с UTF-16: (UTF-16-LE добавляет нулевые порядковые номера, которые плохо читаются в notepad++, открытие в Excel еще хуже)

"string";"integer";"float"2.";2;3.0
"4.";4;6.0
"6.";6;9.0
*⋾㠀⸀∀㬀㠀㬀㄀㈀⸀ ഀ＀⋾㄀ ⸀∀㬀㄀ 㬀㄀㔀⸀ ഀ＀⋾㄀㈀⸀∀㬀㄀㈀㬀㄀㠀⸀ ഀ＀⋾㄀㐀⸀∀㬀㄀㐀㬀㈀㄀⸀ ഀ＀⋾㄀㘀⸀∀㬀㄀㘀㬀㈀㐀⸀ ഀ＀⋾㄀㠀⸀∀㬀㄀㠀㬀㈀㜀⸀ ഀ＀⋾㈀ ⸀∀㬀㈀ 㬀㌀ ⸀ ഀ＀⋾㈀㈀⸀∀㬀㈀㈀㬀㌀㌀⸀ ഀ＀⋾㈀㐀⸀∀㬀㈀㐀㬀㌀㘀⸀ ഀ＀⋾㈀㘀⸀∀㬀㈀㘀㬀㌀㤀⸀ ഀ＀⋾㈀㠀⸀∀㬀㈀㠀㬀㐀㈀⸀ ഀ＀⋾㌀ ⸀∀㬀㌀ 㬀㐀㔀⸀ ഀ＀⋾㌀㈀⸀∀㬀㌀㈀㬀㐀㠀⸀ ഀ＀⋾㌀㐀⸀∀㬀㌀㐀㬀㔀㄀⸀ ഀ＀⋾㌀㘀⸀∀㬀㌀㘀㬀㔀㐀⸀ ഀ＀⋾㌀㠀⸀∀㬀㌀㠀㬀㔀㜀⸀ ഀ＀⋾㐀 ⸀∀㬀㐀 㬀㘀 ⸀ ഀ＀⋾㐀㈀⸀∀㬀㐀㈀㬀㘀㌀⸀ ഀ＀⋾㐀㐀⸀∀㬀㐀㐀㬀㘀㘀⸀ ഀ＀⋾㐀㘀⸀∀㬀㐀㘀㬀㘀㤀⸀ ഀ＀⋾㐀㠀⸀∀㬀㐀㠀㬀㜀㈀⸀ ഀ＀⋾㔀 ⸀∀㬀㔀 㬀㜀㔀⸀ ഀ＀⋾㔀㈀⸀∀㬀㔀㈀㬀㜀㠀⸀ ഀ＀⋾㔀㐀⸀∀㬀㔀㐀㬀㠀㄀⸀ ഀ＀⋾㔀㘀⸀∀㬀㔀㘀㬀㠀㐀⸀ ഀ＀⋾㔀㠀⸀∀㬀㔀㠀㬀㠀㜀⸀ ഀ＀⋾㘀 ⸀∀㬀㘀 㬀㤀 ⸀ ഀ＀⋾㘀㈀⸀∀㬀㘀㈀㬀㤀㌀⸀ ഀ＀⋾㘀㐀⸀∀㬀㘀㐀㬀㤀㘀⸀ ഀ＀⋾㘀㘀⸀∀㬀㘀㘀㬀㤀㤀⸀ ഀ਀"68.";68;102.0
"70.";70;105.0
"72.";72;108.0
"74.";74;111.0
"76.";76;114.0
"78.";78;117.0
"80.";80;120.0
"82.";82;123.0
"84.";84;126.0
"86.";86;129.0
"88.";88;132.0
"90.";90;135.0
"92.";92;138.0
"94.";94;141.0
"96.";96;144.0
"98.";98;147.0

Код ссылки, который я изменил, взят с учебного сайта MS

import csv
from io import StringIO
from uuid import uuid4

from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient, BlobClient, ContentSettings, BlobBlock #, ContainerClient

class MyDialect(csv.Dialect):
    """ CSV dialect compatible with Synapse DP SQL COPY INTO command """
    delimiter      = ';'
    doublequote    = True
    lineterminator = '\r\n'
    quotechar      = '"'
    quoting        = csv.QUOTE_NONNUMERIC
    escapechar     = "/"
    
    
class FakeReader:
    def __init__(self, chunk_size = 2):
        self.chunk_size = 2
        self.data_set = [[str(i)+".", i, i * 1.5] for i in range(100)]
    
    def header(self):
        return [["string", "integer", "float"]]
    
    def readChunk(self):
        ix = 0
        while True:
            data = self.data_set[ix : ix + self.chunk_size]
            ix += self.chunk_size
            yield data
            
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_value, exc_tb):
        print("closing the input stream...")

        

def getBloblClient(account_name, account_key, container_name,
                   max_chunk_get_size = 4 * 1024 * 1024, max_single_get_size = 4 * 1024 * 1024, 
                   file_relative_path = "data.csv"):
    blob_url = f"https://{account_name}.blob.core.windows.net"
    blob_service_client = BlobServiceClient(blob_url, 
                                            credential = account_key,
                                            max_chunk_get_size = max_chunk_get_size,
                                            max_single_get_size = max_single_get_size
                                           )
    return blob_service_client.get_blob_client(container = container_name, blob = file_relative_path)
            

def upload_blocks(blob_client, chunk_size : int = 2, block_size: int = 1024*1024, encoding = "UTF-8"):
    
    csv.register_dialect("mydialect", MyDialect)
    
    # memory csv stream as data pipe between 
    stream = io.StringIO()
    csv_writer = csv.writer(stream, dialect = "mydialect")
    
    record_cnt = 0
    bytes_moved = 0
    block_id_list = []
    first_chunk = True
    
    with FakeReader(chunk_size = 2) as reader:
        for chunk in reader.readChunk():
            record_cnt += len(chunk)
            if first_chunk:
                chunk = reader.header() + chunk
                first_chunk = False
            
            # convert the records into CSV rows
            stream.seek(0)
            stream.truncate()
            csv_writer.writerows(chunk)

            # append the csv rows from the stream object to blob storage csv file
            stream.seek(0)
            buffer = stream.read(block_size)
            
            if not buffer:
                break
            bytes_moved += len(buffer)
            
            # append to blob stage
            block_id = uuid4().hex
            block_id_list.append(BlobBlock(block_id = block_id))

            blob_client.stage_block(block_id = block_id, 
                                    data = buffer, 
                                    length = len(buffer), 
                                    encoding = encoding)

        blob_client.commit_block_list(block_id_list)
    
    return bytes_moved, record_cnt



account_name   = "account-name"
container_name = "container-name"
account_key    = "your-account-key"

blob_client = getBloblClient(account_name, account_key, container_name)
upload_blocks(blob_client, encoding = "UTF-16-LE")
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
0
50
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Результат выглядит следующим образом с UTF-16: (UTF-16-LE добавляет нулевые порядковые номера, которые плохо читаются в notepad++, открытие в Excel еще хуже)

Вы используете в коде диалект CSV, совместимый с кодировкой UTF-8. Но вы должны специально управлять кодировкой, потому что вы хотите отправить данные как UTF-16-LE. Перед загрузкой данных CSV в Azure Blob Storage вы можете закодировать их как UTF-16-LE, используя функцию кодирования Python.

Двухбайтовые символы включены в UTF-16 data, и необходимо принять дополнительные меры для обеспечения подходящей кодировки и порядка следования байтов.

Вы можете использовать модифицированный ниже код:

Код:

import csv
from io import StringIO
import io
from uuid import uuid4

from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient, BlobClient, ContentSettings, BlobBlock #, ContainerClient

class MyDialect(csv.Dialect):
    """ CSV dialect compatible with Synapse DP SQL COPY INTO command """
    delimiter      = ';'
    doublequote    = True
    lineterminator = '\r\n'
    quotechar      = '"'
    quoting        = csv.QUOTE_NONNUMERIC
    escapechar     = "/"
    
    
class FakeReader:
    def __init__(self, chunk_size = 2):
        self.chunk_size = 2
        self.data_set = [[str(i)+".", i, i * 1.5] for i in range(100)]
    
    def header(self):
        return [["string", "integer", "float"]]
    
    def readChunk(self):
        ix = 0
        while True:
            data = self.data_set[ix : ix + self.chunk_size]
            ix += self.chunk_size
            yield data
            
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_value, exc_tb):
        print("closing the input stream...")

        

def getBloblClient(account_name, account_key, container_name,
                   max_chunk_get_size = 4 * 1024 * 1024, max_single_get_size = 4 * 1024 * 1024, 
                   file_relative_path = "data.csv"):
    blob_url = f"https://{account_name}.blob.core.windows.net"
    blob_service_client = BlobServiceClient(blob_url, 
                                            credential = account_key,
                                            max_chunk_get_size = max_chunk_get_size,
                                            max_single_get_size = max_single_get_size
                                           )
    return blob_service_client.get_blob_client(container = container_name, blob = file_relative_path)
            

def upload_blocks(blob_client, chunk_size : int = 2, block_size: int = 1024*1024):
    
    csv.register_dialect("mydialect", MyDialect)
    
    # memory csv stream as data pipe between 
    stream = io.StringIO()
    csv_writer = csv.writer(stream, dialect = "mydialect")
    
    record_cnt = 0
    bytes_moved = 0
    block_id_list = []
    first_chunk = True
    
    with FakeReader(chunk_size=2) as reader:
        for chunk in reader.readChunk():
            record_cnt += len(chunk)
            if first_chunk:
                chunk = reader.header() + chunk
                first_chunk = False

            # convert the records into CSV rows
            stream.seek(0)
            stream.truncate()
            csv_writer.writerows(chunk)

            # Append the csv rows from the stream object to blob storage csv file
            stream.seek(0)
            buffer = stream.read(block_size)

            if not buffer:
                break
            bytes_moved += len(buffer)

            # Encode the data as UTF-16-LE
            utf16_buffer = buffer.encode("utf-16le")

            # Append to blob storage
            block_id = uuid4().hex
            block_id_list.append(BlobBlock(block_id=block_id))

            blob_client.stage_block(block_id=block_id, data=utf16_buffer, length=len(utf16_buffer))

    blob_client.commit_block_list(block_id_list)

    return bytes_moved, record_cnt


account_name   = "xxxxx"
container_name = "xxxxx"
account_key    = "xxxxxx"

blob_client = getBloblClient(account_name, account_key, container_name)
upload_blocks(blob_client, encoding = "UTF-16-LE")

Выход:

Спасибо! Я мог создать файл, но, к сожалению, в нем не было встроенной информации о кодировке. Пытался изменить кодировку на utf-16-le или просто utf-16, но результат тот же. Могу я попросить вас попробовать добавить "autosúčiastky" как фейковое поле во входные данные и проверить результат?

spyder 28.07.2023 16:14

Я добавил LE BOM в первый utf16_buffer, и это сработало! :-) if first_chunk: utf16_buffer = b"\xff\xfe" + utf16_buffer

spyder 28.07.2023 16:20

Другие вопросы по теме