Python create_task не работает в запущенном цикле событий

У меня есть простой фрагмент кода, который на какое-то время сводит меня с ума. Несколько дней назад я опубликовал вопрос это с вопросом create_task не работает с input. Теперь я понял кое-что, связанное с этим. Я запускаю цикл событий в отдельном потоке и нажимаю на него задачи. Очень прямой код.

import asyncio
import threading


async def printer(message):
    print(f'[printer] {message}')


def loop_runner(loop):
    loop.run_forever()


if __name__ == '__main__':
    event_loop = asyncio.get_event_loop()
    t = threading.Thread(target=loop_runner, args=(event_loop,))
    t.start()

    for m in ['hello', 'world', 'foo', 'bar']:
        print(f'[loop running ?] {event_loop.is_running()}')
        event_loop.create_task(printer(m))

Ничего не печатается, кроме этих сообщений журнала.

[loop running ?] True
[loop running ?] True
[loop running ?] True
[loop running ?] True

Теперь, если я заблокирую поток цикла событий и позволю ему работать после такой паузы.

def loop_runner(loop):
    time.sleep(1 / 1000)
    loop.run_forever()

Все работает, и это печатается

[loop running ?] False
[loop running ?] False
[loop running ?] False
[loop running ?] False
[printer] hello
[printer] world
[printer] foo
[printer] bar

На первый взгляд кажется, что задачи, созданные в текущем цикле событий, не выполняются. Но почему?

Я не видел ничего по этому поводу в документации. В большинстве примеров, которые я видел в Интернете, люди создают задачи в цикле из других сопрограмм и ждут их. Но я думаю, что можно использовать создание задач вне сопрограммы, если вы не хотите их ждать.

Возможный дубликат Asyncio Как вы используете run_forever?

Joe 31.05.2019 08:54
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
3
1
6 595
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

При создании задачи вне потока цикла событий вам необходимо использовать asyncio.run_coroutine_threadsafe. Эта функция запланирует выполнение сопрограммы потокобезопасным способом и уведомит цикл обработки событий о том, что необходимо выполнить новую работу. Он также вернет объект concurrent.futures.Future, который вы можете использовать для блокировки текущего потока, пока результат не будет доступен.

From surface it looks like tasks created in running event loop do not get executed. But why is that?

Вызов create_task недостаточен, поскольку он не содержит кода для «пробуждения» цикла обработки событий. Это особенность — такое пробуждение обычно не требуется, и его добавление просто замедлит обычное однопоточное использование. Когда create_task вызывается из потока цикла событий, он находится внутри обратного вызова цикла событий, поэтому цикл событий может проверить свою очередь задач, как только он восстановит контроль, когда он завершит выполнение обратного вызова. Но когда create_task вызывается из другого потока, цикл событий спит в ожидании ввода-вывода, поэтому run_coroutine_threadsafe необходимо, чтобы разбудить его.

Чтобы проверить это, вы можете создать сопрограмму «heartbeat», которая содержит только бесконечный цикл, который что-то печатает и ожидает asyncio.sleep(1). Вы увидите, что задачи, созданные с помощью create_task, выполняются вместе с сердцебиением, что также приводит к пробуждению цикла событий. В загруженных асинхронных приложениях этот эффект может создать впечатление, что create_task из другого потока «работает». Однако на это никогда не следует полагаться, поскольку create_task не может реализовать правильную блокировку и может повредить внутренние компоненты цикла обработки событий.

I have not seen anything regarding this in documentation.

Взгляните на раздел параллелизм и многопоточность.

На самом деле я уже пробовал run_coroutine_threadsafe и знал, что это работает. Мне просто было интересно, почему create_task не работает. Спасибо за ссылку на соответствующую часть документации :) там написано *Цикл событий запускается в потоке (обычно в основном потоке) и выполняет все обратные вызовы и задачи в его нить*. Означает ли это, что он будет выполнять задачи ТОЛЬКО из того потока, в котором он работает?

Nafees Anwar 31.05.2019 10:00

Спасибо за добавление более подробной информации к ответу. Я определенно искал такое объяснение.

Nafees Anwar 31.05.2019 13:32

@NafeesAnwar Нет проблем, это частый источник путаницы. Многие разработчики никогда не обнаруживают «поточно-ориентированные» API-интерфейсы и задаются вопросом, почему asyncio ненадежно или медленно реагирует на create_task, см., например. этот ответ или Вот этот. Я предложил исправить это с помощью обнаружение проблемы и создать исключение, но это предложение не было принято из соображений производительности.

user4815162342 31.05.2019 13:59

Ваш код ведет себя так, как ожидалось:

It should be noted that calling this (run_forever()) causes our main thread to block indefinitely.

(https://tutorialedge.net/python/concurrency/asyncio-event-loops-tutorial/#the-run-forever-method)

см. также Asyncio Как вы используете run_forever? для решений (loop.call_soon_threadsafe() и asyncio.run_coroutine_threadsafe()).

ОП звонит run_forever в отдельной ветке, поэтому предложение не применяется. Связанный учебник предполагает, что вы запускаете цикл обработки событий в основном потоке, что часто бывает, но не всегда.

user4815162342 31.05.2019 09:37

OP не запускает цикл событий в потоке. Он просто выполняет .run_forever() в треде.

Joe 31.05.2019 13:36

Другие вопросы по теме