У меня есть простой фрагмент кода, который на какое-то время сводит меня с ума. Несколько дней назад я опубликовал вопрос это с вопросом create_task
не работает с input
. Теперь я понял кое-что, связанное с этим. Я запускаю цикл событий в отдельном потоке и нажимаю на него задачи. Очень прямой код.
import asyncio
import threading
async def printer(message):
print(f'[printer] {message}')
def loop_runner(loop):
loop.run_forever()
if __name__ == '__main__':
event_loop = asyncio.get_event_loop()
t = threading.Thread(target=loop_runner, args=(event_loop,))
t.start()
for m in ['hello', 'world', 'foo', 'bar']:
print(f'[loop running ?] {event_loop.is_running()}')
event_loop.create_task(printer(m))
Ничего не печатается, кроме этих сообщений журнала.
[loop running ?] True
[loop running ?] True
[loop running ?] True
[loop running ?] True
Теперь, если я заблокирую поток цикла событий и позволю ему работать после такой паузы.
def loop_runner(loop):
time.sleep(1 / 1000)
loop.run_forever()
Все работает, и это печатается
[loop running ?] False
[loop running ?] False
[loop running ?] False
[loop running ?] False
[printer] hello
[printer] world
[printer] foo
[printer] bar
На первый взгляд кажется, что задачи, созданные в текущем цикле событий, не выполняются. Но почему?
Я не видел ничего по этому поводу в документации. В большинстве примеров, которые я видел в Интернете, люди создают задачи в цикле из других сопрограмм и ждут их. Но я думаю, что можно использовать создание задач вне сопрограммы, если вы не хотите их ждать.
При создании задачи вне потока цикла событий вам необходимо использовать asyncio.run_coroutine_threadsafe
. Эта функция запланирует выполнение сопрограммы потокобезопасным способом и уведомит цикл обработки событий о том, что необходимо выполнить новую работу. Он также вернет объект concurrent.futures.Future
, который вы можете использовать для блокировки текущего потока, пока результат не будет доступен.
From surface it looks like tasks created in running event loop do not get executed. But why is that?
Вызов create_task
недостаточен, поскольку он не содержит кода для «пробуждения» цикла обработки событий. Это особенность — такое пробуждение обычно не требуется, и его добавление просто замедлит обычное однопоточное использование. Когда create_task
вызывается из потока цикла событий, он находится внутри обратного вызова цикла событий, поэтому цикл событий может проверить свою очередь задач, как только он восстановит контроль, когда он завершит выполнение обратного вызова. Но когда create_task
вызывается из другого потока, цикл событий спит в ожидании ввода-вывода, поэтому run_coroutine_threadsafe
необходимо, чтобы разбудить его.
Чтобы проверить это, вы можете создать сопрограмму «heartbeat», которая содержит только бесконечный цикл, который что-то печатает и ожидает asyncio.sleep(1)
. Вы увидите, что задачи, созданные с помощью create_task
, выполняются вместе с сердцебиением, что также приводит к пробуждению цикла событий. В загруженных асинхронных приложениях этот эффект может создать впечатление, что create_task
из другого потока «работает». Однако на это никогда не следует полагаться, поскольку create_task
не может реализовать правильную блокировку и может повредить внутренние компоненты цикла обработки событий.
I have not seen anything regarding this in documentation.
Взгляните на раздел параллелизм и многопоточность.
На самом деле я уже пробовал run_coroutine_threadsafe
и знал, что это работает. Мне просто было интересно, почему create_task
не работает. Спасибо за ссылку на соответствующую часть документации :) там написано *Цикл событий запускается в потоке (обычно в основном потоке) и выполняет все обратные вызовы и задачи в его нить*. Означает ли это, что он будет выполнять задачи ТОЛЬКО из того потока, в котором он работает?
Спасибо за добавление более подробной информации к ответу. Я определенно искал такое объяснение.
@NafeesAnwar Нет проблем, это частый источник путаницы. Многие разработчики никогда не обнаруживают «поточно-ориентированные» API-интерфейсы и задаются вопросом, почему asyncio ненадежно или медленно реагирует на create_task
, см., например. этот ответ или Вот этот. Я предложил исправить это с помощью обнаружение проблемы и создать исключение, но это предложение не было принято из соображений производительности.
Ваш код ведет себя так, как ожидалось:
It should be noted that calling this (
run_forever()
) causes our main thread to block indefinitely.
(https://tutorialedge.net/python/concurrency/asyncio-event-loops-tutorial/#the-run-forever-method)
см. также Asyncio Как вы используете run_forever? для решений (loop.call_soon_threadsafe()
и asyncio.run_coroutine_threadsafe()
).
ОП звонит run_forever
в отдельной ветке, поэтому предложение не применяется. Связанный учебник предполагает, что вы запускаете цикл обработки событий в основном потоке, что часто бывает, но не всегда.
OP не запускает цикл событий в потоке. Он просто выполняет .run_forever()
в треде.
Возможный дубликат Asyncio Как вы используете run_forever?