Когда подходящее время для вызова loop.close ()?

Я немного поэкспериментировал с asyncio и прочитал PEP; несколько уроков; и даже Книга О'Рейли.

Я думаю, что у меня это получилось, но я все еще озадачен поведением loop.close(), которое я не могу понять, когда его "безопасно" вызывать.

Мой вариант использования - это набор блокирующих вызовов «старой школы», которые я оборачиваю в run_in_executor() и внешнюю сопрограмму; если какой-либо из этих вызовов пойдет не так, я хочу остановить прогресс, отменить те, которые еще не выполнены, распечатать разумный журнал и затем (надеюсь, чисто) уйти с дороги.

Скажем, примерно так:

import asyncio
import time


def blocking(num):
    time.sleep(num)
    if num == 2:
        raise ValueError("don't like 2")
    return num


async def my_coro(loop, num):
    try:
        result = await loop.run_in_executor(None, blocking, num)
        print(f"Coro {num} done")
        return result
    except asyncio.CancelledError:
        # Do some cleanup here.
        print(f"man, I was canceled: {num}")


def main():
    loop = asyncio.get_event_loop()
    tasks = []
    for num in range(5):
        tasks.append(loop.create_task(my_coro(loop, num)))

    try:
        # No point in waiting; if any of the tasks go wrong, I
        # just want to abandon everything. The ALL_DONE is not
        # a good solution here.
        future = asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
        done, pending = loop.run_until_complete(future)
        if pending:
            print(f"Still {len(pending)} tasks pending")
            # I tried putting a stop() - with/without a run_forever()
            # after the for - same exception raised.
            #  loop.stop()
            for future in pending:
                future.cancel()

        for task in done:
            res = task.result()
            print("Task returned", res)
    except ValueError as error:
        print("Outer except --", error)
    finally:
        # I also tried placing the run_forever() here,
        # before the stop() - no dice.
        loop.stop()
        if pending:
            print("Waiting for pending futures to finish...")
            loop.run_forever()
        loop.close()

Я пробовал несколько вариантов вызовов stop() и run_forever(), "сначала run_forever, затем stop", похоже, является тем, который следует использовать в соответствии с к пидоку, и без вызова close() дает удовлетворительный результат:

Coro 0 done
Coro 1 done
Still 2 tasks pending
Task returned 1
Task returned 0
Outer except -- don't like 2
Waiting for pending futures to finish...
man, I was canceled: 4
man, I was canceled: 3

Process finished with exit code 0

Однако, когда добавляется вызов close() (как показано выше), я получаю два исключения:

exception calling callback for <Future at 0x104f21438 state=finished returned int>
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/_base.py", line 324, in _invoke_callbacks
    callback(self)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/futures.py", line 414, in _call_set_state
    dest_loop.call_soon_threadsafe(_set_state, destination, source)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 620, in call_soon_threadsafe
    self._check_closed()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

что в лучшем случае раздражает, но для меня совершенно сбивает с толку: и, что еще хуже, я не мог понять, как правильно поступить в такой ситуации.

Итак, два вопроса:

  • что мне не хватает? как мне изменить приведенный выше код таким образом, чтобы при включенном вызове close() он не поднимался?

  • что на самом деле произойдет, если я не позвоню close() - в этом тривиальном случае, я полагаю, это в значительной степени избыточно; но каковы могут быть последствия в «реальном» производственном коде?

Для моего личного удовлетворения также:

  • почему он вообще повышается? чего еще цикл хочет от сопрограмм / задач: они либо вышли; поднятый; или были отменены: разве этого недостаточно, чтобы сделать его счастливым?

Большое спасибо за любые предложения, которые могут у вас возникнуть!

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
4
0
2 093
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Distilled to its simplest, my use case is a bunch of blocking "old school" calls, which I wrap in the run_in_executor() and an outer coroutine; if any of those calls goes wrong, I want to stop progress, cancel the ones still outstanding

Это не может работать так, как предполагалось, потому что run_in_executor отправляет функцию в пул потоков, а потоки ОС не могут быть отменены в Python (или на других языках, которые их раскрывают). Отмена будущего, возвращенного run_in_executor, будет пытаться отменить базовый concurrent.futures.Future, но это будет иметь эффект только в том случае, если функция блокировки еще не запущена, например потому что пул потоков занят. Как только он начинает выполняться, его нельзя безопасно отменить. Поддержка безопасной и надежной отмены - одно из преимуществ использования asyncio по сравнению с потоками.

Если вы имеете дело с синхронным кодом, будь то устаревший блокирующий вызов или более продолжительный код, связанный с процессором, вы должны запустить его с run_in_executor и включить способ его прерывания. Например, код может иногда проверять флаг stop_requested и завершать работу, если это правда, возможно, вызывая исключение. Затем вы можете «отменить» эти задачи, установив соответствующий флаг или флаги.

how should I modify the code above in a way that with the call to close() included does not raise?

Насколько я могу судить, в настоящее время нет возможности сделать это без модификации blocking и кода верхнего уровня. run_in_executor будет настаивать на информировании цикла событий о результате, и это не удается, когда цикл событий закрывается. Отмена asyncio future не помогает, потому что проверка отмены выполняется в потоке цикла событий, а ошибка возникает до этого, когда call_soon_threadsafe вызывается рабочим потоком. (Можно было бы переместить проверку в рабочий поток, но следует тщательно проанализировать, не приводит ли это к состоянию гонки между вызовом cancel() и фактической проверкой.)

why does it raise at all? what more does the loop want from the coros/tasks: they either exited; raised; or were canceled: isn't this enough to keep it happy?

Он хочет, чтобы функции блокировки, переданные в run_in_executor (буквально называемые blocking в вопросе), которые уже были запущены, завершили работу до закрытия цикла событий. Вы отменили асинхронное будущее, но лежащее в основе параллельное будущее все еще хочет «вызвать домой», обнаружив, что цикл замкнут.

Не очевидно, является ли это ошибкой в ​​asyncio или вы просто не должны закрывать цикл событий, пока вы каким-то образом не убедитесь, что вся работа, отправленная в run_in_executor, выполнена. Для этого потребуются следующие изменения:

  • Не пытайтесь отменить отложенные фьючерсы. Их отмена на первый взгляд кажется правильной, но это не дает вам возможности использовать wait() для этих фьючерсов, поскольку asyncio будет считать их завершенными.
  • Вместо этого отправьте вашим фоновым задачам событие для конкретного приложения, информирующее их о том, что они должны быть прерваны.
  • Позвоните в loop.run_until_complete(asyncio.wait(pending)) перед loop.close().

С этими модификациями (за исключением события, связанного с приложением - я просто позволил sleep() завершить свой курс), исключение не появилось.

what actually happens if I don't call close() - in this trivial case, I presume it's largely redundant; but what might the consequences be in a "real" production code?

Так как типичный цикл обработки событий работает, пока работает приложение, не должно возникнуть проблем, если вы не вызовете close() в самом конце программы. Операционная система в любом случае очистит ресурсы при выходе из программы.

Вызов loop.close() важен для циклов событий, у которых есть четкое время жизни. Например, библиотека может создать новый цикл событий для конкретной задачи, запустить его в выделенном потоке и избавиться от него. Неспособность закрыть такой цикл может привести к утечке его внутренних ресурсов (таких как канал, который он использует для межпоточного пробуждения) и привести к сбою программы. Другим примером являются наборы тестов, которые часто запускают новый цикл событий для каждого модульного теста, чтобы гарантировать разделение тестовых сред.


Обновлено: I зарегистрировал ошибку для этого выпуска.
Обновлено еще раз: Ошибка была фиксированный разработчиками.

Спасибо за подробный ответ (и за сообщение об ошибке - я догадывался, что это действительно «не я», но я действительно новичок, когда дело касается asyncio). Кроме того, я хотел уточнить, что blocking() (да, это название было преднамеренным) в «реальной жизни» будет выполнять свою работу по частям: так что, хотя я полностью осознаю, что поток не может быть действительно надежно отменен, я хотел использовать CanceledError как сигнал бросить работу. Еще раз спасибо за очевидное время, которое потребовалось для объяснения проблемы, это полезно далеко за пределами непосредственного варианта использования.

Marco Massenzio 13.05.2018 01:30

@Marco Использование CancelledError не будет работать, потому что исключение появится только в коде asyncio, оно не будет внедрено в другой поток (и нет надежного механизма Python для этого). Случайные проверки флага "прерывания" - уродливое, но необходимое зло, когда имеешь дело с этими вещами.

user4815162342 13.05.2018 14:36

абсолютно, я полностью это понимаю (поэтому в сопрограмме есть try: except, а не blocking). Вот почему пример является «надуманным и упрощенным» - в нем не учитывается необходимость иметь «управляемый блокировкой» флаг _canceled, который устанавливается в сопутствующем и проверяется (время от времени) в вызове блокировки.

Marco Massenzio 14.05.2018 19:39

Пока проблема апстрима не будет исправлен, другой способ обойти проблему - заменить использование run_in_executor специальной версией без недостатка. Хотя запуск собственного run_in_executor поначалу звучит как плохая идея, на самом деле это лишь небольшой связующий элемент между concurrent.futures и будущим asyncio.

Простая версия run_in_executor может быть полностью реализована с использованием общедоступного API этих двух классов:

def run_in_executor(executor, fn, *args):
    """Submit FN to EXECUTOR and return an asyncio future."""
    loop = asyncio.get_event_loop()
    if args:
        fn = functools.partial(fn, *args)
    work_future = executor.submit(fn)
    aio_future = loop.create_future()
    aio_cancelled = False

    def work_done(_f):
        if not aio_cancelled:
            loop.call_soon_threadsafe(set_result)

    def check_cancel(_f):
        nonlocal aio_cancelled
        if aio_future.cancelled():
            work_future.cancel()
            aio_cancelled = True

    def set_result():
        if work_future.cancelled():
            aio_future.cancel()
        elif work_future.exception() is not None:
            aio_future.set_exception(work_future.exception())
        else:
            aio_future.set_result(work_future.result())

    work_future.add_done_callback(work_done)
    aio_future.add_done_callback(check_cancel)

    return aio_future

Когда loop.run_in_executor(blocking) заменяется на run_in_executor(executor, blocking), где executor является ThreadPoolExecutor, созданным в main(), код работает без других модификаций.

Конечно, в этом варианте синхронные функции будут продолжать выполняться в другом потоке до завершения, несмотря на то, что они были отменены, но это неизбежно без их модификации для поддержки явного прерывания.

Другие вопросы по теме