Это упрощенная версия моего кода:
main
— сопрограмма, которая останавливается после второй итерации.get_numbers
— это асинхронный генератор, который выдает числа, но в диспетчере асинхронного контекста.
import asyncio
class MyContextManager:
async def __aenter__(self):
print("Enter to the Context Manager...")
return self
async def __aexit__(self, exc_type, exc_value, exc_tb):
print(exc_type)
print("Exit from the Context Manager...")
await asyncio.sleep(1)
print("This line is not executed") # <-------------------
await asyncio.sleep(1)
async def get_numbers():
async with MyContextManager():
for i in range(30):
yield i
async def main():
async for i in get_numbers():
print(i)
if i == 1:
break
asyncio.run(main())
И вывод:
Enter to the Context Manager...
0
1
<class 'asyncio.exceptions.CancelledError'>
Exit from the Context Manager...
Собственно у меня два вопроса:
__aexit__
возможность выполнить. Но строка print("This line is not executed")
не выполняется. Почему это? Правильно ли предположить, что если у нас есть оператор await
внутри __aexit__
, код после этой строки вообще не будет выполняться, и мы не должны полагаться на него для очистки?help()
на асинхронных генераторах показывает, что: | aclose(...)
| aclose() -> raise GeneratorExit inside generator.
так почему я получаю исключение <class 'asyncio.exceptions.CancelledError'>
внутри __aexit__
?
* I'm using Python 3.10.4
@Bergi Вы сказали это в первой части? Я имею в виду, что в трио после каждого ожидания в __aexit__
он будет возвращаться к циклу событий, пока это не будет выполнено?
надо будет вечером попробовать
Чтобы ответить на первый вопрос:
Is it correct to assume that if we have an await statement inside the
__aexit__
, the codes after that line is not going to execute at all?
Я бы сказал, что нет, это не всегда так. Пока main
имеет достаточно времени и может снова передать управление обратно в цикл событий, код внутри __aexit__
может выполняться. Я пробовал это:
async def main():
async for i in get_numbers():
print(i)
if i == 1:
break
await asyncio.sleep(4) # <---- New
.run()
заботится только о переданной ему сопрограмме и выполняет ее до конца, а не о других сопрограммах, включая __aexit__
. Поэтому, если ему не хватает времени или он не передает управление циклу обработки событий, я не могу полагаться на следующие строки после первого оператора ожидания.
Дополнительная информация, которая может помочь:
Здесь, в методе base_events.py/run_forever
(который вызывается .run()
), я обнаружил, что self._asyncgen_finalizer_hook
передается в sys.set_asyncgen_hooks
. Тело _asyncgen_finalizer_hook
:
def _asyncgen_finalizer_hook(self, agen):
self._asyncgens.discard(agen)
if not self.is_closed():
self.call_soon_threadsafe(self.create_task, agen.aclose())
Но реализация call_soon_threadsafe
пуста.
I'll clean up this answer and remove these guesses later.
Я не уверен, что происходит, но публикую то, что я нашел, на случай, если это окажется полезным для других, которые решат провести расследование. Результат изменится на ожидаемый, когда мы сохраним ссылку на get_numbers()
вне main()
. Я бы сказал, что сбор мусора get_numbers()
кажется слишком ранним, но отключение gc
не помогает, поэтому мое предположение может быть неверным.
import asyncio
test = None
class MyContextManager:
async def __aenter__(self):
print("Enter to the Context Manager...")
return self
async def __aexit__(self, exc_type, exc_value, exc_tb):
print(exc_type)
print("Exit from the Context Manager...")
await asyncio.sleep(1)
print("This line is not executed") # <-- Executed now
await asyncio.sleep(1)
async def get_numbers():
async with MyContextManager():
for i in range(30):
yield i
async def main():
global test
test = get_numbers()
async for i in test:
print(i)
if i == 1:
break
asyncio.run(main())
ответ прост: интерпретатор продолжит выполнение __aexit__
через одну секунду, но функция main
завершена и указателя на контекстный менеджер нет.
первое очевидное решение, о котором вы упоминаете, - это подождать достаточно долго после основной функции:
async def main():
async for i in get_numbers():
print(i)
if i == 1:
break
await asyncio.sleep(4) # <---- New
другой способ - использовать try/finally:
async def __aexit__(self, exc_type, exc_value, exc_tb):
try:
pass
print(exc_type)
print("Exit from the Context Manager...")
await asyncio.sleep(1)
finally:
print("This line is not executed") # <-------------------
Это относится не только к __aexit__
, но и ко всему асинхронному коду: когда цикл событий закрывается, он должен выбрать между оставшимися задачами отмена или сохранение их. В интересах очистки большинство фреймворков предпочитают отмену вместо того, чтобы полагаться на программиста для очистки сохраненных задач позже.
Этот вид очистки завершения работы представляет собой отдельный механизм от изящного развертывания функций, контекстов и т.п. в стеке вызовов во время обычного выполнения. Менеджер контекста, который также должен выполнять очистку при отмене, должен быть специально к этому подготовлен.. Тем не менее, во многих случаях можно не быть готовым к этому, поскольку многие ресурсы сами по себе становятся безопасными.
В современных структурах циклов событий обычно существует три уровня очистки:
__aexit__
вызывается, когда область заканчивается, и может получить исключение, которое вызвало развертывание в качестве аргумента. Ожидается, что уборка будет отложена до тех пор, пока это необходимо. Это сравнимо с __exit__
запуском синхронного кода.__aexit__
может получить CancelledError
1 в качестве аргумента или в виде исключения на любой await
/async for
/async with
. Очистка может задержать это, но ожидается, что она будет продолжаться как можно быстрее. Это сравнимо с KeyboardInterrupt
отменой синхронного кода.__aexit__
может получить GeneratorExit
в качестве аргумента или в виде исключения на любой await
/async for
/async with
. Уборка должна проходить как можно быстрее. Это сравнимо с GeneratorExit
замыканием синхронного генератора.Чтобы обрабатывать отмену/закрытие, любой код async
— будь то в __aexit__
или где-то еще — должен обрабатывать CancelledError
или GeneratorExit
. В то время как первое может быть отложено или подавлено, со вторым следует иметь дело немедленно и синхронно2.
async def __aexit__(self, exc_type, exc_value, exc_tb):
print("Exit from the Context Manager...")
try:
await asyncio.sleep(1)
except GeneratorExit:
print("Exit stage left NOW")
except asyncio.CancelledError:
print("Got cancelled, just cleaning up a few things...")
await asyncio.sleep(0.5)
else:
print("Nothing to see here, taking my time on the way out")
await asyncio.sleep(1)
Примечание: Как правило, исчерпывающее рассмотрение таких случаев невозможно. Различные формы очистки могут прерывать друг друга, например, отмена развертывания и последующее закрытие. Обработка очистки возможна только на основе максимальных усилий; надежная очистка достигается за счет отказоустойчивости, например, с помощью транзакций, а не явной очистки.
Очистка асинхронных генераторов в конкретном случае является сложным случаем, поскольку они могут быть очищены всеми случаями одновременно: развертывание по завершении работы генератора, отмена по мере уничтожения задачи-владельца или закрытие по мере того, как генератор очищается сборщиком мусора. Порядок поступления сигналов очистки зависит от реализации.
Правильный способ решить эту проблему — не полагаться в первую очередь на неявную очистку. Вместо этого каждая сопрограмма перед выходом должна убедиться, что все ее ресурсы закрыты.
async def main():
async_iter = get_numbers()
async for i in async_iter:
print(i)
if i == 1:
break
await async_iter.aclose() # wait for generator to close before exiting
В последних версиях этот шаблон кодируется через aclosing
контекстный менеджер.
1Название и/или идентификатор этого исключения могут отличаться.
2Хотя во время await
можно GeneratorExit
выполнять асинхронные действия, они могут не поддаваться циклу событий. Синхронный интерфейс выгоден для обеспечения этого.
IIRC, это работает, как и ожидалось, с трио, так что это может быть ошибка в asyncio.