У меня есть большое количество задач asyncio, которые потребляют данные через очередь и записывают их в отдельные файлы. Однако некоторые файлы будут записываться несколько раз в режиме a+. Я написал код для имитации случайной обработки, аналогично моему примеру из реальной жизни.
Я использую asyncio.Lock() следующим образом, чтобы защитить файл от любой задачи, которая берет на себя ответственность за запись в него, но все равно получаю результаты CSV, которые не выровнены и/или повреждены. Кроме того, кажется, что заголовок записывается несколько раз, хотя размер файла не должен быть равен 0 после первой записи заголовка.
Что мне не хватает?
import asyncio
import aiofiles
import aiofiles.os
import aiocsv
import uuid
import random
import json
from pathlib import Path
from datetime import datetime, timezone
async def write_csv(item: list, load_id: str, prefix: str) -> None:
Path("./test_files").mkdir(parents=True, exist_ok=True)
file_path = Path("./test_files").joinpath(f"{prefix}_{load_id}.csv")
# Asynchronously write to our file
async with aiofiles.open(file_path, mode = "a+", newline = "") as f:
print(f"INFO: writing file: {Path(file_path).resolve()}")
w: aiocsv.AsyncWriter = aiocsv.AsyncWriter(f)
print(f"file size: {await aiofiles.os.path.getsize(file_path)}")
# If the file is empty, write the header
if await aiofiles.os.path.getsize(file_path) == 0:
print("file was empty! writing header")
# Write the header
async with asyncio.Lock():
await w.writerow([
"response",
"load_id",
"last_updated_timestamp_utc"
])
# do something special for specific file name
# I am just trying to simulate more random data processing
if prefix == "file_one":
# Shuffle the chunks again
item = random.shuffle(item)
# Write the data
for chunk in item:
async with asyncio.Lock():
await w.writerow([
json.dumps(chunk),
load_id,
datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
])
async def main() -> None:
# Create fake data
items: list[str] = [["hello", "world"], ["asyncio", "test"]] * 500
# Possible file prefixes
prefixes: list[str] = ["file_one", "file_two"]
tasks: list = []
load_id = str(uuid.uuid4())
for i in items:
# Randomly assign which file we will write to
task = asyncio.create_task(write_csv(i, load_id, random.choice(prefixes)))
tasks.append(task)
errors = await asyncio.gather(*tasks, return_exceptions=True)
# print(errors)
if __name__ == "__main__":
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
@MichaelButscher, это кажется вероятным... но как мне обойти этот сценарий, если блокировка дескриптора файла не работает?
Вы должны убедиться, что для каждого файла открыт не более одного файлового объекта, который его представляет. Одним из способов может быть словарь кэша, который сопоставляет пути к файлам со слабыми ссылками файловых объектов. Функция возвращает или создает файловые объекты по заданному пути. Перед завершением работы программы все оставшиеся файловые объекты в кэше закрываются. Вместо слабых ссылок вы также можете использовать объекты-оболочки для файловых объектов, у которых есть счетчик ссылок, который увеличивается и уменьшается в зависимости от того, скольким задачам требуется оболочка. Фактический файловый объект закрывается только тогда, когда счетчик достигает нуля.
Используйте библиотеку logging. Он отлично работает в разных потоках и правильно блокирует файл при добавлении новых данных. Вы можете настроить форматирование журнала таким образом, чтобы в файл добавлялся только ваш текст.






Если вы хотите сериализовать доступ к ресурсу с помощью экземпляра asyncio.Lock, тогда весь код доступа к ресурсу должен попытаться получить один и тот же экземпляр блокировки. Но у вас в коде:
...
# Write the data
for chunk in item:
async with asyncio.Lock():
...
Следовательно, каждая задача постоянно создает новые экземпляры блокировки, которые не используются другими задачами.
Вторая проблема: у вас есть:
...
if prefix == "file_one":
# Shuffle the chunks again
item = random.shuffle(item)
...
random.shuffle перемешивает последовательность на месте и возвращает None. Вместо этого у вас должно быть:
...
if prefix == "file_one":
# Shuffle the chunks again
random.shuffle(item)
...
Я бы также рекомендовал, если у вас есть открытый файл f, то размер проще всего определить с помощью:
size = f.seek(0, 2) # Seek to the end of file and return the offset
В следующем коде каждая задача открывает выходной CSV-файл один раз, но перед записью строки проверяет ее правильное расположение, переходя к текущему концу файла. Затем строка записывается, а выходной файл csv очищается перед освобождением управления:
import asyncio
import aiofiles
import aiofiles.os
import aiocsv
import uuid
import random
import json
from pathlib import Path
from datetime import datetime, timezone
async def write_csv(lock: asyncio.Lock, item: list, load_id: str, prefix: str) -> None:
Path("./test_files").mkdir(parents=True, exist_ok=True)
file_path = Path("./test_files").joinpath(f"{prefix}_{load_id}.csv")
async with aiofiles.open(file_path, mode = "a+", newline = "") as f:
print(f"INFO: writing file: {Path(file_path).resolve()}")
w: aiocsv.AsyncWriter = aiocsv.AsyncWriter(f)
async with lock:
# Asynchronously write to our file
size = await f.seek(0, 2) # Seek to end
# If the file is empty, write the header
if size == 0:
print("file was empty! writing header")
# Write the header
await w.writerow([
"response",
"load_id",
"last_updated_timestamp_utc"
])
await f.flush()
# Release the lock implicitly
# do something special for specific file name
# I am just trying to simulate more random data processing
if prefix == "file_one":
# Shuffle the chunks again
random.shuffle(item)
# Write the data
for chunk in item:
async with lock:
size = await f.seek(0, 2) # # Seek to end
print(f"{file_path} file size: {size}")
await w.writerow([
json.dumps(chunk),
load_id,
datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
])
await f.flush()
async def main() -> None:
# Create fake data
items: list[str] = [["hello", "world"], ["asyncio", "test"]] * 500
# Possible file prefixes
prefixes: list[str] = ["file_one", "file_two"]
tasks: list = []
load_id = str(uuid.uuid4())
lock = asyncio.Lock()
for i in items:
# Randomly assign which file we will write to
task = asyncio.create_task(write_csv(lock, i, load_id, random.choice(prefixes)))
tasks.append(task)
errors = await asyncio.gather(*tasks, return_exceptions=True)
# print(errors)
if __name__ == "__main__":
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
Отличный улов! Да, я должен разделить право собственности на замок, а не создавать новые. Я проверю это позже и приму, когда подтвержу, что это работает. Спасибо!
Я предполагаю, что происходит что-то вроде этого: Задача А открывает файл и помещает указатель файла в текущий конец файла. Задача B открывает тот же файл, устанавливает указатель файла в ту же позицию. Задача A добавляет данные в файл. Задача B записывает данные в позицию, где ранее находился конец, и перезаписывает части или все данные, записанные задачей A.