Моя программа должна навсегда считывать данные из классов провайдеров, хранящихся в PROVIDERS, определенных в конфигурации. Каждую секунду он должен проверять, изменился ли конфиг, и если да, то останавливать все задачи, перезагружать конфиг и создавать новые задачи.
Приведенный ниже код вызывает CancelledError, потому что я отменяю свои задачи. Должен ли я действительно попробовать / поймать каждый из них для достижения своих целей или есть лучший образец?
async def main(config_file):
load_config(config_file)
tasks = []
config_task = asyncio.create_task(watch_config(config_file)) # checks every 1s if config changed and raises ConfigChangedSignal if so
tasks.append(config_task)
for asset_name, provider in PROVIDERS.items():
task = asyncio.create_task(provider.read_forever())
tasks.append(task)
try:
await asyncio.gather(*tasks, return_exceptions=False)
except ConfigChangedSignal:
# Restarting
for task in asyncio.tasks.all_tasks():
task.cancel() # raises CancelledError
await main(config_file)
try:
asyncio.run(main(config_file))
except KeyboardInterrupt:
logger.debug("Ctrl-C pressed. Aborting")
Если вы используете Python 3.11, ваш шаблон напрямую сопоставляется с использованием asyncio.TaskGroup, «преемника» asyncio.gather, который использует новые «группы исключений». По умолчанию, если какая-либо задача в группе вызывает исключение, все задачи в группе отменяются:
Я поиграл с этим фрагментом в консоли ipython и запустил asyncio.run(main(False)) без исключения и asyncio.run(main(True)) для создания исключения только для проверки результатов:
import asyncio
async def doit(i, n, cancel=False):
await asyncio.sleep(n)
if cancel:
raise RuntimeError()
print(i, "done")
async def main(cancel):
try:
async with asyncio.TaskGroup() as group:
tasks = [group.create_task(doit(i, 2)) for i in range(10)]
group.create_task(doit(42, 1, cancel=cancel))
group.create_task(doit(11, .5))
except Exception:
pass
await asyncio.sleep(3)
Ваш код может вместить это - Однако, помимо наилучшей практики отмены задач, вы выполняете рекурсивный вызов своего main, который, хотя и будет работать для большинства практических целей, может заставить опытных разработчиков «вздохнуть», а также может сломаться в крайних случаях (это не сработает). например, после ~1000 циклов) и утечки ресурсов.
Правильный способ сделать это — собрать цикл while, поскольку вызовы функций Python, даже хвостовые вызовы, не будут очищать ресурсы в области вызова:
import asyncio
...
async def main(config_file):
while True:
load_config(config_file)
try:
async with asyncio.TaskGroup() as tasks:
tasks.create_task(watch_config(config_file)) # checks every 1s if config changed and raises ConfigChangedSignal if so
for asset_name, provider in PROVIDERS.items():
tasks.create_task.create_task(provider.read_forever())
# all tasks are awaited at the end of the with block
except *ConfigChangedSignal: # <- the new syntax in Python 3.11
# Restarting is just a matter of re-doing the while-loop
# ... log.info("config changed")
pass
# any other exception won't be caught and will error, allowing one
# to review what went wrong
...
Для Python 3.10 перебор задач и отмена каждой из них кажется нормальным, но вы должны посмотреть на этот рекурсивный вызов. Если вам не нужен цикл while внутри текущего основного, реорганизуйте код так, чтобы сам main вызывался из внешнего цикла while.
async def main(config_file):
while True:
await inner_main(config_file)
async def inner_main(config_file):
load_config(config_file)
# keep the existing body
...
except ConfigChangedSignal:
# Restarting
for task in asyncio.tasks.all_tasks():
task.cancel() # raises CancelledError
# await main call dropped from here
Ответ jsbueno подходит.
Простая альтернатива — заключить весь цикл обработки событий во внешнее «пока»:
async def main(config_file):
load_config(config_file)
tasks = []
for asset_name, provider in PROVIDERS.items():
task = asyncio.create_task(provider.read_forever())
tasks.append(task)
try:
await watch_config(config_file)
except ConfigChangedSignal:
pass
try:
while True:
asyncio.run(main(config_file))
except KeyboardInterrupt:
logger.debug("Ctrl-C pressed. Aborting")
Я думаю, что это будет иметь неприятный побочный эффект двойного создания задач каждый раз при изменении конфигурации.
Это не очевидно, но к тому времени, когда asyncio.run() возвращается, он отменяет все ожидающие задачи и замыкает цикл. Поэтому задачи не должны создаваться дважды.
Ах ты прав. Это также допустимое решение, однако я предпочитаю приведенное выше, поскольку оно более прозрачно в отношении того, что происходит.
В итоге я сделал именно то, что вы предлагаете в своем втором ответе, и я действительно забыл, что Python не любит рекурсию, поэтому цикл while True решил это. Моя первоначальная путаница относительно того, как решить эту проблему, возникла из-за того, что я думал, что KeyboardInterrupt и CancelledError наследуются от Exception, а они оба — нет. Спасибо!