Как эффективно обрабатывать гнездовые асинхронные операции с помощью Python asyncio lib?

Я работаю над проектом Python, в котором есть следующие асинхронные операции. Я хочу убедиться, что каждый уровень имеет хорошую обработку ошибок и возвращается в основной цикл для повторного выполнения. Я хочу поставить типа: если есть какая-то ошибка, я хочу, чтобы задачи pnding были отменены.

import asyncio

async def fetch_data_from_service(service_name):
    await asyncio.sleep(1)  # Simulating I/O operation
    if service_name == "ServiceB":
        raise Exception(f"Error fetching data from {service_name}")
    return f"Data from {service_name}"

async def process_data(data):
    await asyncio.sleep(1)  # Simulating data processing
    if data == "Data from ServiceC":
        raise Exception("Error processing data from ServiceC")
    return f"Processed {data}"

async def main_task():
    try:
        dataA = await fetch_data_from_service("ServiceA")
        dataB = await fetch_data_from_service("ServiceB")
        dataC = await fetch_data_from_service("ServiceC")
        
        processedA = await process_data(dataA)
        processedB = await process_data(dataB)
        processedC = await process_data(dataC)

        print(processedA, processedB, processedC)
    except Exception as e:
        print(f"Exception caught in main_task: {e}")
        # How to ensure all pending tasks are canceled if an error occurs?
        # How to propagate this error back to the main event loop?

# Running the event loop
if __name__ == "__main__":
    try:
        asyncio.run(main_task())
    except Exception as e:
        print(f"Exception caught in event loop: {e}")

Мне нужна эффективная обработка гнезд, проверка каждого уровня, отмена задачи в случае возникновения ошибки и перенаправление ошибки в основной цикл событий.

Я завернул каждую операцию в блок try-expect, чтобы перехватывать ошибки и управлять ими. Но я не уверен в этом процессе отмены.

Я ожидаю, что асинхронные задачи будут выполняться без ошибок, даже если возникнет ошибка, вызвать все таксы и переключиться на основной цикл событий.

Пожалуйста, помогите мне с этим.

Я считаю вопрос слишком общим и не очень ясным (что считается эффективным? что такое обработка гнезд? проверка уровня? перенаправление ошибок?). В вашем примере других задач нет, что вы хотите отменить? Возможно, вы захотите использовать asyncio.TaskGroup. Если вы хотите остановить цикл обработки событий из-за ошибки, просто вызовите или повторно вызовите исключение в асинхронной функции (сопрограмме), запускаемой asyncio.run.

VPfB 07.07.2024 12:57
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
1
51
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Мое первое наблюдение заключается в том, что вы не используете преимущества asyncio в момент написания вашего кода: вы создаете сопрограмму, немедленно планируете ее запуск и ожидаете ее завершения, прежде чем повторить это действие со следующей сопрограммой. То есть у вас не будет одновременно выполняться несколько задач asyncio, и обработка не будет перекрываться. Мы исправим это, создав три задачи, которые будут выполняться одновременно. Поскольку вы хотите, чтобы исключение, возникшее в одной задаче, привело к завершению других задач, я предлагаю использовать группу задач . Обратите внимание, что код можно упростить, если задача fetch_data_from_service вызывает process_data напрямую, и поэтому функция была переименована в fetch_and_process_data_from_service. Если process_data требует большого количества ресурсов ЦП, рассмотрите возможность выполнения этого кода в отдельном процессе с помощью run_in_executor.

import asyncio

async def fetch_and_process_data_from_service(service_name):
    await asyncio.sleep(1)  # Simulating I/O operation
    if service_name == "ServiceB":
        raise Exception(f"Error fetching data from {service_name}")
    return await process_data(f"Data from {service_name}")

async def process_data(data):
    await asyncio.sleep(1)  # Simulating data processing
    if data == "Data from ServiceC":
        raise Exception("Error processing data from ServiceC")
    return f"Processed {data}"

async def main_task():
    try:
        async with asyncio.TaskGroup() as tg:
            tasks = [
                tg.create_task(fetch_and_process_data_from_service(service_name))
                for service_name in ("ServiceA", "ServiceB", "ServiceC")
            ]

        results = [
            task.result() for task in tasks
        ]
        print(results)
    except Exception as e:
        print(f"Exception caught in main_task: {e}")
        raise  # propogate the exception back

# Running the event loop
if __name__ == "__main__":
    try:
        asyncio.run(main_task())
    except Exception as e:
        print(f"Exception caught in event loop: {e}")

Распечатки:

Exception caught in main_task: unhandled errors in a TaskGroup (1 sub-exception)
Exception caught in event loop: unhandled errors in a TaskGroup (1 sub-exception)

Однако учтите, что мы не знаем, какая задача не удалась и почему. Если это важно, мы можем запланировать запуск сопрограмм как параллельные задачи, используя `asyncio.gather', но становится необходимым явно отменить незавершенные задачи, если одна из задач вызывает исключение:

import asyncio

async def fetch_and_process_data_from_service(service_name):
    await asyncio.sleep(1)  # Simulating I/O operation
    if service_name == "ServiceB":
        raise Exception(f"Error fetching data from {service_name}")
    return await process_data(f"Data from {service_name}")

async def process_data(data):
    await asyncio.sleep(1)  # Simulating data processing
    if data == "Data from ServiceC":
        raise Exception("Error processing data from ServiceC")
    return f"Processed {data}"

async def main_task():
    tasks = [
        asyncio.create_task(fetch_and_process_data_from_service(service_name))
        for service_name in ("ServiceA", "ServiceB", "ServiceC")
    ]

    try:
        results = await asyncio.gather(*tasks)
        print(results)
    except Exception as e:
        print(f"Exception caught in main_task: {e}")
        for task in tasks:
            if not task.done():
                task.cancel()
        raise  # propogate the exception back

# Running the event loop
if __name__ == "__main__":
    try:
        asyncio.run(main_task())
    except Exception as e:
        print(f"Exception caught in event loop: {e}")

Распечатки:

Exception caught in main_task: Error fetching data from ServiceB
Exception caught in event loop: Error fetching data from ServiceB

Другие вопросы по теме