Какие методы создают цикл событий в библиотеке asyncio Python? Это asyncio.run()/asyncio.get_event_loop()? Также какой метод(ы) запускает цикл событий? Я попробовал поискать; но кажется там не очень однозначные ответы.






Для API высокого уровня он создается автоматически в asyncio.run() с помощью asyncio.new_event_loop() или функции loop_factory, если она предусмотрена. В низкоуровневом API это asyncio.new_event_loop(). asyncio.get_event_loop() возврат работающего цикла событий или, если он не запускался, цикла событий политики (asyncio.get_event_loop_policy().get_event_loop()). Он не создает новый цикл событий.
В asyncio у нас есть API высокого и низкого уровня. Высокоуровневые API подходят для ситуаций, когда нам нужно запустить простое приложение с 3 или 5 асинхронными функциями. В свою очередь, низкоуровневые API должны контролировать процесс создания циклов событий, управления свойствами экземпляра Future (например, имени, статуса работы и т. д.), настройки политик и т. д.
asyncio.run() с помощью asyncio.new_event_loop() или с помощью функции loop_factory, если она предусмотренаasyncio.new_event_loop() в низкоуровневом APIasyncio.run() делает четыре вещи:
asyncio.set_event_loop()asyncio.loop.create_task(), где цикл asyncio.SelectorEventLoop или asyncio.ProactorEventLoop, зависит от системы.asyncio.run_until_complete()asyncio.new_event_loop() делает первое предложение, поэтому вам нужно вручную установить новый цикл событий, как во втором предложении, и запустить
Как упоминалось выше, asyncio.run() автоматически устанавливает новый цикл событий. Для низкоуровневых API мы можем использовать:
asyncio.loop.run_until_complete()asyncio.loop.run_forever()Как ясно из названий, первый будет работать до последней выполненной задачи, а второй будет работать вечно
Во всех примерах код будет такой:
import asyncio
...
...
...
if __name__ == "__main__":
main()
где точки — остальная часть нашего кода
Вот эквивалентные примеры API высокого уровня:
async def get_data():
# Emulating a fetch from a remote server
data = [
{"name": "Jane", "age": 19},
{"name": "John", "age": 24}
]
await asyncio.sleep(1)
return data
def main():
print(asyncio.run(get_data()))
и низкоуровневый API:
def init_loop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop
async def get_data():
# Emulating a fetch from a remote server
data = [
{"name": "Jane", "age": 19},
{"name": "John", "age": 24}
]
await asyncio.sleep(1)
return data
def main():
loop = init_loop()
task = loop.create_task(get_data())
loop.run_until_complete(task)
print(task.result())
Результат:
[{'name': 'Jane', 'age': 19}, {'name': 'John', 'age': 24}]