В моем приложении Python есть функция синхронизации boo()
, которая вызывается внутри текущего цикла событий. boo()
должен получить некоторые данные от foo(arg1, arg2)
, которая является асинхронной функцией.
К сожалению, я не могу превратить boo()
в асинхронную функцию. Он должен оставаться синхронизированным. (Это ограничение вне моих рук).
Как я могу вызвать foo(arg1, arg2)
изнутри boo()
, дождаться завершения и продолжить выполнение?
Я попытался создать минимальный воспроизводимый пример. Это самое близкое, что я мог получить. Реальное приложение большое и сложное и может вести себя по-разному.
import time
import asyncio
async def work_for_data():
time.sleep(3)
return 42
# sync function, calling async function
def get_number():
return asyncio.get_event_loop().run_until_complete(work_for_data())
async def get_data():
return get_number()
async def run():
loop = asyncio.get_event_loop()
task = asyncio.create_task(get_data())
loop.run_until_complete(task)
if __name__ == "__main__":
asyncio.run(run())
Этот код вызывает:
File "./minimal_example.py", line 9, in get_number
return asyncio.get_event_loop().run_until_complete(work_for_data())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/Cellar/[email protected]/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 629, in run_until_complete
self._check_running()
File "/usr/local/Cellar/[email protected]/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 588, in _check_running
raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running
Я предпринял много попыток решить эту проблему, но все они не сработали.
data = asyncio.run(foo(arg1, arg2))
Возникло следующее исключение:
data = asyncio.run(foo(arg1, arg2))
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/root/.pycharm_helpers/pydevd_asyncio/pydevd_nest_asyncio.py", line 143, in run
loop.run_until_complete(task)
File "uvloop/loop.pyx", line 1511, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1504, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1377, in uvloop.loop.Loop.run_forever
File "uvloop/loop.pyx", line 518, in uvloop.loop.Loop._run
RuntimeError: this event loop is already running.
loop = asyncio.get_event_loop()
data = loop.run_until_complete(foo(arg1, arg2))
Возникло следующее исключение:
data = loop.run_until_complete(foo(arg1, arg2))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "uvloop/loop.pyx", line 1511, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1504, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1377, in uvloop.loop.Loop.run_forever
File "uvloop/loop.pyx", line 518, in uvloop.loop.Loop._run
RuntimeError: this event loop is already running.
loop = asyncio.get_running_loop()
with ThreadPoolExecutor() as executor:
future = executor.submit(lambda: asyncio.run_coroutine_threadsafe(foo(arg1, arg2), loop).result())
data = future.result()
Интерпретатор зависал при выполнении future.result()
loop = asyncio.get_event_loop()
future = asyncio.Future()
def callback(task):
if task.exception():
future.set_exception(task.exception())
else:
future.set_result(task.result())
task = asyncio.run_coroutine_threadsafe(foo(arg1, arg2), loop)
task.add_done_callback(callback)
result = task.result() ## Stuck here
return result
Интерпретатор зависал при выполнении task.result()
Я тоже попробовал asyncio.run()
, но возникла ошибка во время выполнения (отредактировал вопрос и включил подробности)
Хорошо, цикл уже запущен, тогда другие способы, которые вы пробовали, должны работать. Если нет, то вам снова нужно будет предоставить минимально воспроизводимый пример и уточнить, что именно означает «не работает».
Да, я упомянул вначале, что цикл уже запущен. Все методы, которые я описал в вопросе, не сработали. Я обновил вопрос с ошибками, которые я получаю
Вам, вероятно, придется вызывать boo
во время работы цикла событий, но нужно ли вам, чтобы цикл событий запускал его? Вызов самого boo
в исполнителе и использование boo
для запуска асинхронных вызовов должно работать.
кстати, хороший вопрос - спасибо за работу по демонстрации ваших попыток и связанных с ними ошибок.
@user2357112 user2357112, ...отличается ли ваше предложение от «Попытки 3»?
Я попытался установить гнездо-asyncio, импортировать его и вызвать nest_asyncio.apply()
в точке входа приложения. К сожалению, я все еще вижу эту ошибку
@CharlesDuffy: Да. Попытка 3 все еще выполняется внутри цикла событий. boo
сам нужно запустить в экзекьюторе. Он не может просто делегировать исполнителю вызов run_coroutine_threadsafe
.
Не существует простого способа сделать это — обычно асинхронные функции можно вызывать только из асинхронных функций: именно так они и должны работать.
Причина в том, что любая промежуточная синхронная функция в вызове цепочек не должна быть «приостановлена», когда внутренняя функция выполняет оператор await
и ожидает, что цикл выполнит другие параллельные задачи: для синхронной функции не существует никаких механизмов. приостановить и возобновить — вот что делает async def
.
Итак, очевидный и правильный подход состоит в том, чтобы все ваши вызовы, связанные с функцией, которая должна будет ожидать операции ввода-вывода (или ее эквивалента), должны состоять из асинхронных вызовов.
Иногда это может привести к дублированию кода — существуют очень похожие функции, которые должны вызываться из синхронного кода, а другие — из асинхронного кода, и даже большие базы кода и расширенные платформы с трудом справляются с этим.
ОДНАКО, если вашей внутренней функции не понадобится результат вызова самой асинхронной функции, т. е. она может просто создать задачу с асинхронной функцией и вернуть эту задачу - асинхронная функция, вызывающая синхронную функцию, должна затем сохранить возвращенную задачу. и ждём этого:
Имейте в виду, что код, загружаемый обратным вызовом, может сбивать с толку, и это на самом деле является одной из основных мотиваций для включения в язык всего синтаксиса и механизмов, поддерживающих асинхронность.
Итак, для кода, которому просто не нужно возвращаемое значение:
import asyncio
async def work_for_data():
time.sleep(3)
return 42
# sync function, calling async function
def get_number():
task = asyncio.create_task(work_for_data())
return task
async def get_data():
# trigger sync function that will start the tasks for producing async data
number_task = get_number()
# do more concurrent stuff
...
# await for the data produced by `get_number()`:
number = await number_task
async def run()
global queue
task = asyncio.create_task(get_data())
await task
if __name__ == "__main__":
asyncio.run(run())
Другие подходы заключаются в том, чтобы использовать синхронный код для создания задачи для промежуточной функции, которая будет ожидать целевую асинхронную функцию, и помещать ее возвращаемое значение в очередь, которое затем можно будет использовать в другом месте — всякий раз, когда результат будет готов. Или, если у синхронной функции будет сам код, который должен обрабатывать значения, созданные в асинхронном коде, то способ сделать это — установить обратный вызов, вызвав метод .add_done_callback
для созданной задачи. Однако результат обработки этого обратного вызова не будет возвращен асинхронной функции, вызывающей функцию синхронизации (get_number). Если это необходимо, то предпочтительнее просто сделать всю цепочку вызовов асинхронной.
Нет проблем с планированием асинхронной функции из функции синхронизации, но в asyncio (и с одним потоком) невозможно, чтобы функция синхронизации ждала результата. Если управление не возвращается планировщику, никакая другая функция не может быть запущена и результат не будет вычислен. Каждая функция (синхронная или асинхронная), выполняемая в цикле событий asyncio, принадлежит задаче, и одновременно может выполняться только одна задача.
Вы должны использовать несколько потоков, и один из них будет запускать цикл событий. Другой поток(и) может отправлять работу и ждать результата. Вот как: https://docs.python.org/3/library/asyncio-dev.html#concurrency-and-multithreading
Не рекомендуется вызывать асинхронную функцию из функции синхронизации, поскольку она включает обмен данными между потоками и блокирует всю программу. Хороший способ решить эту проблему — адаптировать функции синхронизации к асинхронным частям и сделать асинхронной всю программу.
Однако есть обходной путь, который может «решить» вашу проблему.
import asyncio
async def calc_async():
# Use asyncio.sleep to non-blockingly wait for 2 seconds
await asyncio.sleep(2)
return 10
def sync_function():
# Set up a new event loop for running async code
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Run the async function until complete and get the result
result = loop.run_until_complete(calc_async())
return result
finally:
# Close the loop to clean up resources
loop.close()
# Example usage
result = sync_function()
print("Result from async function:", result)
Что значит «Ничего из этого не помогло». иметь в виду? Просто
asyncio.run()
в любом случае должно работать.