Как обрабатывать асинхронные добавления в файл CSV без повреждения

У меня есть большое количество задач asyncio, которые потребляют данные через очередь и записывают их в отдельные файлы. Однако некоторые файлы будут записываться несколько раз в режиме a+. Я написал код для имитации случайной обработки, аналогично моему примеру из реальной жизни.

Я использую asyncio.Lock() следующим образом, чтобы защитить файл от любой задачи, которая берет на себя ответственность за запись в него, но все равно получаю результаты CSV, которые не выровнены и/или повреждены. Кроме того, кажется, что заголовок записывается несколько раз, хотя размер файла не должен быть равен 0 после первой записи заголовка.

Что мне не хватает?

import asyncio
import aiofiles
import aiofiles.os
import aiocsv
import uuid
import random
import json
from pathlib import Path
from datetime import datetime, timezone

async def write_csv(item: list,  load_id: str, prefix: str) -> None:

    Path("./test_files").mkdir(parents=True, exist_ok=True)
    file_path = Path("./test_files").joinpath(f"{prefix}_{load_id}.csv")

    # Asynchronously write to our file
    async with aiofiles.open(file_path, mode = "a+", newline = "") as f:

        print(f"INFO: writing file: {Path(file_path).resolve()}")
        w: aiocsv.AsyncWriter = aiocsv.AsyncWriter(f)
        print(f"file size: {await aiofiles.os.path.getsize(file_path)}")

        # If the file is empty, write the header
        if await aiofiles.os.path.getsize(file_path) == 0:
            print("file was empty! writing header")

            # Write the header
            async with asyncio.Lock():
                await w.writerow([
                    "response",
                    "load_id",
                    "last_updated_timestamp_utc"
                ])

        # do something special for specific file name
        # I am just trying to simulate more random data processing
        if prefix == "file_one":
            # Shuffle the chunks again
            item = random.shuffle(item)

        # Write the data
        for chunk in item:
            async with asyncio.Lock():
                await w.writerow([
                    json.dumps(chunk),
                    load_id,
                    datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
                ])

async def main() -> None:

    # Create fake data
    items: list[str] = [["hello", "world"], ["asyncio", "test"]] * 500

    # Possible file prefixes
    prefixes: list[str] = ["file_one", "file_two"]

    tasks: list = []
    load_id = str(uuid.uuid4())
    for i in items:
        # Randomly assign which file we will write to
        task = asyncio.create_task(write_csv(i, load_id, random.choice(prefixes)))
        tasks.append(task)

    errors = await asyncio.gather(*tasks, return_exceptions=True)
    # print(errors)

if __name__ == "__main__":
    loop = asyncio.new_event_loop()
    loop.run_until_complete(main())

Я предполагаю, что происходит что-то вроде этого: Задача А открывает файл и помещает указатель файла в текущий конец файла. Задача B открывает тот же файл, устанавливает указатель файла в ту же позицию. Задача A добавляет данные в файл. Задача B записывает данные в позицию, где ранее находился конец, и перезаписывает части или все данные, записанные задачей A.

Michael Butscher 15.06.2024 07:00

@MichaelButscher, это кажется вероятным... но как мне обойти этот сценарий, если блокировка дескриптора файла не работает?

Coldchain9 15.06.2024 19:43

Вы должны убедиться, что для каждого файла открыт не более одного файлового объекта, который его представляет. Одним из способов может быть словарь кэша, который сопоставляет пути к файлам со слабыми ссылками файловых объектов. Функция возвращает или создает файловые объекты по заданному пути. Перед завершением работы программы все оставшиеся файловые объекты в кэше закрываются. Вместо слабых ссылок вы также можете использовать объекты-оболочки для файловых объектов, у которых есть счетчик ссылок, который увеличивается и уменьшается в зависимости от того, скольким задачам требуется оболочка. Фактический файловый объект закрывается только тогда, когда счетчик достигает нуля.

Michael Butscher 15.06.2024 20:40

Используйте библиотеку logging. Он отлично работает в разных потоках и правильно блокирует файл при добавлении новых данных. Вы можете настроить форматирование журнала таким образом, чтобы в файл добавлялся только ваш текст.

Johnny Cheesecutter 15.06.2024 21:00
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
4
84
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Если вы хотите сериализовать доступ к ресурсу с помощью экземпляра asyncio.Lock, тогда весь код доступа к ресурсу должен попытаться получить один и тот же экземпляр блокировки. Но у вас в коде:

        ...
        # Write the data
        for chunk in item:
            async with asyncio.Lock():
        ...

Следовательно, каждая задача постоянно создает новые экземпляры блокировки, которые не используются другими задачами.

Вторая проблема: у вас есть:

        ...
        if prefix == "file_one":
            # Shuffle the chunks again
            item = random.shuffle(item)
        ...

random.shuffle перемешивает последовательность на месте и возвращает None. Вместо этого у вас должно быть:

        ...
        if prefix == "file_one":
            # Shuffle the chunks again
            random.shuffle(item)
        ...

Я бы также рекомендовал, если у вас есть открытый файл f, то размер проще всего определить с помощью:

size = f.seek(0, 2)  # Seek to the end of file and return the offset

В следующем коде каждая задача открывает выходной CSV-файл один раз, но перед записью строки проверяет ее правильное расположение, переходя к текущему концу файла. Затем строка записывается, а выходной файл csv очищается перед освобождением управления:

import asyncio
import aiofiles
import aiofiles.os
import aiocsv
import uuid
import random
import json
from pathlib import Path
from datetime import datetime, timezone

async def write_csv(lock: asyncio.Lock, item: list,  load_id: str, prefix: str) -> None:

    Path("./test_files").mkdir(parents=True, exist_ok=True)
    file_path = Path("./test_files").joinpath(f"{prefix}_{load_id}.csv")

    async with aiofiles.open(file_path, mode = "a+", newline = "") as f:
        print(f"INFO: writing file: {Path(file_path).resolve()}")
        w: aiocsv.AsyncWriter = aiocsv.AsyncWriter(f)

        async with lock:
            # Asynchronously write to our file
                size = await f.seek(0, 2)  # Seek to end
                # If the file is empty, write the header
                if size == 0:
                    print("file was empty! writing header")

                    # Write the header
                    await w.writerow([
                        "response",
                        "load_id",
                        "last_updated_timestamp_utc"
                    ])
                    await f.flush()
            # Release the lock implicitly

        # do something special for specific file name
        # I am just trying to simulate more random data processing
        if prefix == "file_one":
            # Shuffle the chunks again
            random.shuffle(item)

        # Write the data
        for chunk in item:
            async with lock:
                size = await f.seek(0, 2) #  # Seek to end
                print(f"{file_path} file size: {size}")
                await w.writerow([
                    json.dumps(chunk),
                    load_id,
                    datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
                ])
                await f.flush()

async def main() -> None:

    # Create fake data
    items: list[str] = [["hello", "world"], ["asyncio", "test"]] * 500

    # Possible file prefixes
    prefixes: list[str] = ["file_one", "file_two"]

    tasks: list = []
    load_id = str(uuid.uuid4())
    lock = asyncio.Lock()
    for i in items:
        # Randomly assign which file we will write to
        task = asyncio.create_task(write_csv(lock, i, load_id, random.choice(prefixes)))
        tasks.append(task)

    errors = await asyncio.gather(*tasks, return_exceptions=True)
    # print(errors)

if __name__ == "__main__":
    loop = asyncio.new_event_loop()
    loop.run_until_complete(main())

Отличный улов! Да, я должен разделить право собственности на замок, а не создавать новые. Я проверю это позже и приму, когда подтвержу, что это работает. Спасибо!

Coldchain9 18.06.2024 18:32

Другие вопросы по теме