Я немного поэкспериментировал с asyncio и прочитал PEP; несколько уроков; и даже Книга О'Рейли.
Я думаю, что у меня это получилось, но я все еще озадачен поведением loop.close(), которое я не могу понять, когда его "безопасно" вызывать.
Мой вариант использования - это набор блокирующих вызовов «старой школы», которые я оборачиваю в run_in_executor() и внешнюю сопрограмму; если какой-либо из этих вызовов пойдет не так, я хочу остановить прогресс, отменить те, которые еще не выполнены, распечатать разумный журнал и затем (надеюсь, чисто) уйти с дороги.
Скажем, примерно так:
import asyncio
import time
def blocking(num):
time.sleep(num)
if num == 2:
raise ValueError("don't like 2")
return num
async def my_coro(loop, num):
try:
result = await loop.run_in_executor(None, blocking, num)
print(f"Coro {num} done")
return result
except asyncio.CancelledError:
# Do some cleanup here.
print(f"man, I was canceled: {num}")
def main():
loop = asyncio.get_event_loop()
tasks = []
for num in range(5):
tasks.append(loop.create_task(my_coro(loop, num)))
try:
# No point in waiting; if any of the tasks go wrong, I
# just want to abandon everything. The ALL_DONE is not
# a good solution here.
future = asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
done, pending = loop.run_until_complete(future)
if pending:
print(f"Still {len(pending)} tasks pending")
# I tried putting a stop() - with/without a run_forever()
# after the for - same exception raised.
# loop.stop()
for future in pending:
future.cancel()
for task in done:
res = task.result()
print("Task returned", res)
except ValueError as error:
print("Outer except --", error)
finally:
# I also tried placing the run_forever() here,
# before the stop() - no dice.
loop.stop()
if pending:
print("Waiting for pending futures to finish...")
loop.run_forever()
loop.close()
Я пробовал несколько вариантов вызовов stop() и run_forever(), "сначала run_forever, затем stop", похоже, является тем, который следует использовать в соответствии с к пидоку, и без вызова close() дает удовлетворительный результат:
Coro 0 done
Coro 1 done
Still 2 tasks pending
Task returned 1
Task returned 0
Outer except -- don't like 2
Waiting for pending futures to finish...
man, I was canceled: 4
man, I was canceled: 3
Process finished with exit code 0
Однако, когда добавляется вызов close() (как показано выше), я получаю два исключения:
exception calling callback for <Future at 0x104f21438 state=finished returned int>
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/_base.py", line 324, in _invoke_callbacks
callback(self)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/futures.py", line 414, in _call_set_state
dest_loop.call_soon_threadsafe(_set_state, destination, source)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 620, in call_soon_threadsafe
self._check_closed()
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
что в лучшем случае раздражает, но для меня совершенно сбивает с толку: и, что еще хуже, я не мог понять, как правильно поступить в такой ситуации.
Итак, два вопроса:
что мне не хватает? как мне изменить приведенный выше код таким образом, чтобы при включенном вызове close() он не поднимался?
что на самом деле произойдет, если я не позвоню close() - в этом тривиальном случае, я полагаю, это в значительной степени избыточно; но каковы могут быть последствия в «реальном» производственном коде?
Для моего личного удовлетворения также:
Большое спасибо за любые предложения, которые могут у вас возникнуть!






Distilled to its simplest, my use case is a bunch of blocking "old school" calls, which I wrap in the
run_in_executor()and an outer coroutine; if any of those calls goes wrong, I want to stop progress, cancel the ones still outstanding
Это не может работать так, как предполагалось, потому что run_in_executor отправляет функцию в пул потоков, а потоки ОС не могут быть отменены в Python (или на других языках, которые их раскрывают). Отмена будущего, возвращенного run_in_executor, будет пытаться отменить базовый concurrent.futures.Future, но это будет иметь эффект только в том случае, если функция блокировки еще не запущена, например потому что пул потоков занят. Как только он начинает выполняться, его нельзя безопасно отменить. Поддержка безопасной и надежной отмены - одно из преимуществ использования asyncio по сравнению с потоками.
Если вы имеете дело с синхронным кодом, будь то устаревший блокирующий вызов или более продолжительный код, связанный с процессором, вы должны запустить его с run_in_executor и включить способ его прерывания. Например, код может иногда проверять флаг stop_requested и завершать работу, если это правда, возможно, вызывая исключение. Затем вы можете «отменить» эти задачи, установив соответствующий флаг или флаги.
how should I modify the code above in a way that with the call to close() included does not raise?
Насколько я могу судить, в настоящее время нет возможности сделать это без модификации blocking и кода верхнего уровня. run_in_executor будет настаивать на информировании цикла событий о результате, и это не удается, когда цикл событий закрывается. Отмена asyncio future не помогает, потому что проверка отмены выполняется в потоке цикла событий, а ошибка возникает до этого, когда call_soon_threadsafe вызывается рабочим потоком. (Можно было бы переместить проверку в рабочий поток, но следует тщательно проанализировать, не приводит ли это к состоянию гонки между вызовом cancel() и фактической проверкой.)
why does it raise at all? what more does the loop want from the coros/tasks: they either exited; raised; or were canceled: isn't this enough to keep it happy?
Он хочет, чтобы функции блокировки, переданные в run_in_executor (буквально называемые blocking в вопросе), которые уже были запущены, завершили работу до закрытия цикла событий. Вы отменили асинхронное будущее, но лежащее в основе параллельное будущее все еще хочет «вызвать домой», обнаружив, что цикл замкнут.
Не очевидно, является ли это ошибкой в asyncio или вы просто не должны закрывать цикл событий, пока вы каким-то образом не убедитесь, что вся работа, отправленная в run_in_executor, выполнена. Для этого потребуются следующие изменения:
wait() для этих фьючерсов, поскольку asyncio будет считать их завершенными.loop.run_until_complete(asyncio.wait(pending)) перед loop.close().С этими модификациями (за исключением события, связанного с приложением - я просто позволил sleep() завершить свой курс), исключение не появилось.
what actually happens if I don't call
close()- in this trivial case, I presume it's largely redundant; but what might the consequences be in a "real" production code?
Так как типичный цикл обработки событий работает, пока работает приложение, не должно возникнуть проблем, если вы не вызовете close() в самом конце программы. Операционная система в любом случае очистит ресурсы при выходе из программы.
Вызов loop.close() важен для циклов событий, у которых есть четкое время жизни. Например, библиотека может создать новый цикл событий для конкретной задачи, запустить его в выделенном потоке и избавиться от него. Неспособность закрыть такой цикл может привести к утечке его внутренних ресурсов (таких как канал, который он использует для межпоточного пробуждения) и привести к сбою программы. Другим примером являются наборы тестов, которые часто запускают новый цикл событий для каждого модульного теста, чтобы гарантировать разделение тестовых сред.
@Marco Использование CancelledError не будет работать, потому что исключение появится только в коде asyncio, оно не будет внедрено в другой поток (и нет надежного механизма Python для этого). Случайные проверки флага "прерывания" - уродливое, но необходимое зло, когда имеешь дело с этими вещами.
абсолютно, я полностью это понимаю (поэтому в сопрограмме есть try: except, а не blocking). Вот почему пример является «надуманным и упрощенным» - в нем не учитывается необходимость иметь «управляемый блокировкой» флаг _canceled, который устанавливается в сопутствующем и проверяется (время от времени) в вызове блокировки.
Пока проблема апстрима не будет исправлен, другой способ обойти проблему - заменить использование run_in_executor специальной версией без недостатка. Хотя запуск собственного run_in_executor поначалу звучит как плохая идея, на самом деле это лишь небольшой связующий элемент между concurrent.futures и будущим asyncio.
Простая версия run_in_executor может быть полностью реализована с использованием общедоступного API этих двух классов:
def run_in_executor(executor, fn, *args):
"""Submit FN to EXECUTOR and return an asyncio future."""
loop = asyncio.get_event_loop()
if args:
fn = functools.partial(fn, *args)
work_future = executor.submit(fn)
aio_future = loop.create_future()
aio_cancelled = False
def work_done(_f):
if not aio_cancelled:
loop.call_soon_threadsafe(set_result)
def check_cancel(_f):
nonlocal aio_cancelled
if aio_future.cancelled():
work_future.cancel()
aio_cancelled = True
def set_result():
if work_future.cancelled():
aio_future.cancel()
elif work_future.exception() is not None:
aio_future.set_exception(work_future.exception())
else:
aio_future.set_result(work_future.result())
work_future.add_done_callback(work_done)
aio_future.add_done_callback(check_cancel)
return aio_future
Когда loop.run_in_executor(blocking) заменяется на run_in_executor(executor, blocking), где executor является ThreadPoolExecutor, созданным в main(), код работает без других модификаций.
Конечно, в этом варианте синхронные функции будут продолжать выполняться в другом потоке до завершения, несмотря на то, что они были отменены, но это неизбежно без их модификации для поддержки явного прерывания.
Спасибо за подробный ответ (и за сообщение об ошибке - я догадывался, что это действительно «не я», но я действительно новичок, когда дело касается
asyncio). Кроме того, я хотел уточнить, чтоblocking()(да, это название было преднамеренным) в «реальной жизни» будет выполнять свою работу по частям: так что, хотя я полностью осознаю, что поток не может быть действительно надежно отменен, я хотел использоватьCanceledErrorкак сигнал бросить работу. Еще раз спасибо за очевидное время, которое потребовалось для объяснения проблемы, это полезно далеко за пределами непосредственного варианта использования.