Я пытаюсь глубоко погрузиться в модуль asyncio Python. Я понимаю, что нам нужно дождаться сопрограммы, чтобы получить результаты, и когда мы ожидаем какую-либо сопрограмму, окружающая сопрограмма передает свое выполнение циклу событий и позволяет запускать любую другую сопрограмму, которая готова к выполнению.
Рассмотрим следующий пример:
async def parse_data(idx,pool):
await compute_and_fetch_data()
# Do some work
async def compute_and_fetch_data(idx, pool):
# Do some CPU intensive task such as a complex compuation and produce the query to be executed
#Hardcoding query for demonstration
query = "SELECT * FROM customers"
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(query)
result = await cur.fetchall()
logger.info(f"Fetched result for query {idx+1} successfully.")
return result
async def main():
pool = await aiomysql.create_pool(host='localhost', user='user', password='password', db='laundry')
asyncio.gather(parse_data(1, pool))
asyncio.run(main())
Внутри сопрограммы parse_data мы ожидаем сопрограмму Compute_and_fetch_data. Я добавил заполнитель для обозначения некоторых вычислений, которые будет выполнять Compute_and_fetch_data перед выполнением запроса к базе данных. Поскольку вычисления — это задача с интенсивным использованием ЦП, а выполнение запроса — это задача, связанная с вводом-выводом, прекратит ли сопрограмма parse_data свое выполнение, как только она достигнет блока кода await compute_and_fetch_data
, или она сначала выполнит вычисления внутри Compute_and_fetch_data и уступит выполнение, когда он достигает стадии, когда необходимо выполнить запрос к базе данных (трудоёмкий процесс)?
Если последнее верно, можно ли по-прежнему утверждать, что окружающая сопрограмма прекращает свое выполнение, когда достигает ключевого слова await
?
Да, ключевое слово await
вместе с операторами async with
и async for
являются точками, а затем, где другим параллельным задачам разрешено выполняться в одном и том же цикле событий.
Однако для полноты обратите внимание, что сначала вычисляется выражение справа от await
— в этом случае у вас есть вызов сопрограммной функции: этот вызов возвращает сопрограммный объект синхронно и очень эффективно — нет На этом этапе код выполняется внутри сопрограммной функции. Именно при разрешении самого await
код внутри сопрограммы (в данном случае compute_and_fetch_data
) фактически выполняется — и вместе с ним другие ожидающие задачи также могут предпринимать шаги. Как только код в сопрограмме запускается, он будет выполняться один в потоке цикла asyncio, без прерывания до разрешения первого кода async with
внутри него. Итак, обратите внимание, что вычисления, которые вы заменили заполнителем, будут эффективно блокировать запуск любого другого асинхронного кода. Обычный подход для этого — реорганизовать вычислительный код в другую, синхронную функцию и использовать для нее await asyncio.run_in_executor(...)
. (Обратите внимание, что Python, имеющий GIL, по-прежнему может привести к тому, что ваш код будет выполняться неоптимально: цикл asyncio будет свободен для выполнения других задач, но если вычислительный код представляет собой чистый код Python, фактически будет работать один из двух потоков. На этот раз обходным путем будет проверка вашего кода исполнителя и выяснение того, стоит ли использовать ProcessPoolExecutor
вместо используемого по умолчанию ThreadPoolExecutor
asyncio — существуют компромиссы в ресурсах, необходимых для запуска рабочих процессов и сериализации данных, которые будут вычисляется с одной стороны, и возможность запускать по-настоящему параллельный код за счет использования других ядер ЦП, что позволяет многопроцессорная обработка.
когда мы ожидаем какой-либо сопрограммы, окружающая сопрограмма передает свое выполнение циклу событий и позволяет запустить любую другую сопрограмму, которая готова к выполнению.
Это не совсем правильно. См. ниже.
можно ли еще сказать, что окружающая сопрограмма прекращает свое выполнение, когда достигает ключевого слова
await
?
Ответ — нет, не для самих сопрограмм. Когда вы используете сопрограмму, она фактически блокирует текущую задачу. Давайте проверим это:
import asyncio
async def print_message(message: str):
print(message)
async def main():
queue = asyncio.Queue()
await queue.get()
await print_message("This will not run until the above get returns")
asyncio.run(main())
При вызове await
создается задача на запуск сопрограммы asyncio.run
, но задача только одна. Поэтому когда вы main
, это блокирует задачу. Это уступит место другим задачам, выполняющимся в цикле событий, но в этом случае других задач нет. Если мы отменим это с помощью await queue.get()
, то возникнет следующее исключение:
Traceback (most recent call last):
File "C:\Users\<user>\AppData\Local\Programs\Python\Python312\Lib\asyncio\runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\<user>\AppData\Local\Programs\Python\Python312\Lib\asyncio\base_events.py", line 684, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "C:\Users\<user>\Documents\GitHub\pyside-asyncio-prototype\prototype\test.py", line 10, in main
await queue.get()
File "C:\Users\<user>\AppData\Local\Programs\Python\Python312\Lib\asyncio\queues.py", line 158, in get
await getter
asyncio.exceptions.CancelledError
Здесь вы можете видеть, что именно ctrl + c
отменили.
Если мы изменим нашу тестовую программу так, чтобы:
import asyncio
async def print_message(message: str):
print(message)
async def main():
queue = asyncio.Queue()
await asyncio.gather(
queue.get(),
print_message("This will print because the task is concurrent"),
)
asyncio.run(main())
затем распечатывается следующее:
> python test.py
This will print because the task is concurrent
Конечно, программа блокируется, потому что все еще ожидает await queue.get()
. Причина, по которой сообщение распечатывается, заключается в том, что queue.get()
запускает каждый из своих аргументов, которые могут быть сопрограммами или задачами, как задачи, если они еще не являются задачами. В нашем случае и asyncio.gather
, и queue.get
(с их полными аргументами) являются сопрограммами, поэтому они запускаются как параллельные задачи.
Итак, ключевое понятие, которое нужно понять в print_message
, — это разница между сопрограммой и задачей . Вы можете думать о сопрограммах как об основных строительных блоках asyncio
программы, но именно задачи позволяют вещам выполняться одновременно. Когда вы asyncio
используете сопрограмму внутри задачи, она уступает место другим задачам. Это то, что обеспечивает параллелизм в await
. На самом деле все сопрограммы выполняются последовательно, поскольку цикл событий asyncio
является однопоточным, но когда задача встречает asyncio
, он передает выполнение другим задачам. Когда ожидается сопрограмма, выполняющая операцию ввода-вывода, вы «выигрываете» производительность в await
, потому что Python передаст операцию ввода-вывода операционной системе, которая распределит операцию(и) по любому ядру, в котором она находится. желания, прежде чем передавать вычисления обратно в Python. Пока ввод-вывод выполняет свои вычисления вне Python, asyncio
блокирует текущую задачу, и это позволяет другим задачам выполнять некоторую работу, в то время как ввод-вывод этого конкретного await
выполняется через некоторую библиотеку операционной системы.
Насколько я вижу, ни один из ваших кодов не будет работать одновременно. В цикле событий запускается только одна задача, поэтому каждое ожидание блокирует эту задачу до тех пор, пока не вернется ожидаемое. asyncio.gather
запускает только одну задачу, и я не вижу других задач, запускаемых в коде, хотя это зависит от особенностей базовых библиотек. Любая работа, связанная с ЦП (т. е. вычислительная), блокирует весь цикл событий и каждую задачу. Только await
уступают другим задачам, но await
по-прежнему блокируют задачу, над которой они работают.
Я бы прочитал разделы задач в документации asyncio
. Документация TaskGroup
помогает объяснить, как задачи позволяют коду выполняться одновременно.
Кроме того, используйте асинхронное ведение журнала вместо обычного средства ведения журнала. Вам нужно использовать logging.getLogger("asyncio")
, чтобы получить asyncio
регистратор.
Итак, в моем примере он передаст выполнение другим задачам, когда задача встретит await fetch_and_compute_data(), но поскольку fetch_and_compute_data вначале выполняет некоторую вычислительную работу, это не принесет большой пользы. Эти вычисления будут выполнены, когда циклы событий вернут эту задачу к выполнению. Это верно ?