Asyncio: отмена задач и запуск новых при поднятии сигнального флага

Моя программа должна навсегда считывать данные из классов провайдеров, хранящихся в PROVIDERS, определенных в конфигурации. Каждую секунду он должен проверять, изменился ли конфиг, и если да, то останавливать все задачи, перезагружать конфиг и создавать новые задачи.

Приведенный ниже код вызывает CancelledError, потому что я отменяю свои задачи. Должен ли я действительно попробовать / поймать каждый из них для достижения своих целей или есть лучший образец?

async def main(config_file):
    load_config(config_file)

    tasks = []
    config_task = asyncio.create_task(watch_config(config_file))  # checks every 1s if config changed and raises ConfigChangedSignal if so

    tasks.append(config_task)
    for asset_name, provider in PROVIDERS.items():
        task = asyncio.create_task(provider.read_forever())
        tasks.append(task)

    try:
        await asyncio.gather(*tasks, return_exceptions=False)
    except ConfigChangedSignal:
        # Restarting
        for task in asyncio.tasks.all_tasks():
            task.cancel()  # raises CancelledError
        await main(config_file)


try:
    asyncio.run(main(config_file))
except KeyboardInterrupt:
    logger.debug("Ctrl-C pressed. Aborting")
Мутабельность и переработка объектов в Python
Мутабельность и переработка объектов в Python
Объекты являются основной конструкцией любого языка ООП, и каждый язык определяет свой собственный синтаксис для их создания, обновления и...
Другой маршрут в Flask Python
Другой маршрут в Flask Python
Flask - это фреймворк, который поддерживает веб-приложения. В этой статье я покажу, как мы можем использовать @app .route в flask, чтобы иметь другую...
14 Задание: Типы данных и структуры данных Python для DevOps
14 Задание: Типы данных и структуры данных Python для DevOps
Проверить тип данных используемой переменной, мы можем просто написать: your_variable=100
Python PyPDF2 - запись метаданных PDF
Python PyPDF2 - запись метаданных PDF
Python скрипт, который будет записывать метаданные в PDF файл, для этого мы будем использовать PDF ридер из библиотеки PyPDF2 . PyPDF2 - это...
Переменные, типы данных и операторы в Python
Переменные, типы данных и операторы в Python
В Python переменные используются как место для хранения значений. Пример переменной формы:
Почему Python - идеальный выбор для проекта AI и ML
Почему Python - идеальный выбор для проекта AI и ML
Блог, которым поделился Harikrishna Kundariya в нашем сообществе Developer Nation Community.
1
0
64
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Если вы используете Python 3.11, ваш шаблон напрямую сопоставляется с использованием asyncio.TaskGroup, «преемника» asyncio.gather, который использует новые «группы исключений». По умолчанию, если какая-либо задача в группе вызывает исключение, все задачи в группе отменяются:

Я поиграл с этим фрагментом в консоли ipython и запустил asyncio.run(main(False)) без исключения и asyncio.run(main(True)) для создания исключения только для проверки результатов:

import asyncio

async def doit(i, n, cancel=False):
    await asyncio.sleep(n)
    if cancel:
        raise RuntimeError()
    print(i, "done")

async def main(cancel):
  try:
      async with asyncio.TaskGroup() as group:
          tasks = [group.create_task(doit(i, 2)) for i in range(10)]
          group.create_task(doit(42, 1, cancel=cancel))
          group.create_task(doit(11, .5))
  except Exception:
      pass
  await asyncio.sleep(3)

Ваш код может вместить это - Однако, помимо наилучшей практики отмены задач, вы выполняете рекурсивный вызов своего main, который, хотя и будет работать для большинства практических целей, может заставить опытных разработчиков «вздохнуть», а также может сломаться в крайних случаях (это не сработает). например, после ~1000 циклов) и утечки ресурсов.

Правильный способ сделать это — собрать цикл while, поскольку вызовы функций Python, даже хвостовые вызовы, не будут очищать ресурсы в области вызова:

import asyncio
...


async def main(config_file):
    while True:
        load_config(config_file)
        try:
            async with asyncio.TaskGroup() as tasks:
                tasks.create_task(watch_config(config_file))  # checks every 1s if config changed and raises ConfigChangedSignal if so

                for asset_name, provider in PROVIDERS.items():
                tasks.create_task.create_task(provider.read_forever())

            # all tasks are awaited at the end of the with block
        except *ConfigChangedSignal:  # <- the new syntax in Python 3.11
            # Restarting is just a matter of re-doing the while-loop
            # ... log.info("config changed")
            pass

        # any other exception won't be caught and will error, allowing one
        # to review what went wrong
        
...


Для Python 3.10 перебор задач и отмена каждой из них кажется нормальным, но вы должны посмотреть на этот рекурсивный вызов. Если вам не нужен цикл while внутри текущего основного, реорганизуйте код так, чтобы сам main вызывался из внешнего цикла while.



async def main(config_file):
    while True:
        await inner_main(config_file)

async def inner_main(config_file):
    load_config(config_file)

    # keep the existing body
    ...
    except ConfigChangedSignal:
        # Restarting
        for task in asyncio.tasks.all_tasks():
            task.cancel()  # raises CancelledError
       # await main call dropped from here


В итоге я сделал именно то, что вы предлагаете в своем втором ответе, и я действительно забыл, что Python не любит рекурсию, поэтому цикл while True решил это. Моя первоначальная путаница относительно того, как решить эту проблему, возникла из-за того, что я думал, что KeyboardInterrupt и CancelledError наследуются от Exception, а они оба — нет. Спасибо!

bluppfisk 23.11.2022 12:13

Ответ jsbueno подходит.

Простая альтернатива — заключить весь цикл обработки событий во внешнее «пока»:

async def main(config_file):
    load_config(config_file)

    tasks = []
    for asset_name, provider in PROVIDERS.items():
        task = asyncio.create_task(provider.read_forever())
        tasks.append(task)

    try:
        await watch_config(config_file)
    except ConfigChangedSignal:
        pass

try:
    while True:
        asyncio.run(main(config_file))
except KeyboardInterrupt:
    logger.debug("Ctrl-C pressed. Aborting")

Я думаю, что это будет иметь неприятный побочный эффект двойного создания задач каждый раз при изменении конфигурации.

bluppfisk 23.11.2022 12:11

Это не очевидно, но к тому времени, когда asyncio.run() возвращается, он отменяет все ожидающие задачи и замыкает цикл. Поэтому задачи не должны создаваться дважды.

fancidev 24.11.2022 11:18

Ах ты прав. Это также допустимое решение, однако я предпочитаю приведенное выше, поскольку оно более прозрачно в отношении того, что происходит.

bluppfisk 24.11.2022 14:21

Другие вопросы по теме