Запустите асинхронную функцию из функции синхронизации в уже запущенном цикле событий

В моем приложении Python есть функция синхронизации boo(), которая вызывается внутри текущего цикла событий. boo() должен получить некоторые данные от foo(arg1, arg2), которая является асинхронной функцией.

К сожалению, я не могу превратить boo() в асинхронную функцию. Он должен оставаться синхронизированным. (Это ограничение вне моих рук).

Как я могу вызвать foo(arg1, arg2) изнутри boo(), дождаться завершения и продолжить выполнение?

Минимальный воспроизводимый пример

Я попытался создать минимальный воспроизводимый пример. Это самое близкое, что я мог получить. Реальное приложение большое и сложное и может вести себя по-разному.

import time
import asyncio

async def work_for_data():
    time.sleep(3)
    return 42

# sync function, calling async function
def get_number():
    return asyncio.get_event_loop().run_until_complete(work_for_data())

async def get_data():
    return get_number()

async def run():
    loop = asyncio.get_event_loop()
    task = asyncio.create_task(get_data())
    loop.run_until_complete(task)


if __name__ == "__main__":
    asyncio.run(run())

Этот код вызывает:

  File "./minimal_example.py", line 9, in get_number
    return asyncio.get_event_loop().run_until_complete(work_for_data())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/Cellar/[email protected]/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 629, in run_until_complete
    self._check_running()
  File "/usr/local/Cellar/[email protected]/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 588, in _check_running
    raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running

Попытки решить проблему

Я предпринял много попыток решить эту проблему, но все они не сработали.

Попытка 1

data = asyncio.run(foo(arg1, arg2))

Возникло следующее исключение:

    data = asyncio.run(foo(arg1, arg2))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/root/.pycharm_helpers/pydevd_asyncio/pydevd_nest_asyncio.py", line 143, in run
    loop.run_until_complete(task)
  File "uvloop/loop.pyx", line 1511, in uvloop.loop.Loop.run_until_complete
  File "uvloop/loop.pyx", line 1504, in uvloop.loop.Loop.run_until_complete
  File "uvloop/loop.pyx", line 1377, in uvloop.loop.Loop.run_forever
  File "uvloop/loop.pyx", line 518, in uvloop.loop.Loop._run
RuntimeError: this event loop is already running.

Попытка 2

loop = asyncio.get_event_loop()
data = loop.run_until_complete(foo(arg1, arg2))

Возникло следующее исключение:

    data = loop.run_until_complete(foo(arg1, arg2))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "uvloop/loop.pyx", line 1511, in uvloop.loop.Loop.run_until_complete
  File "uvloop/loop.pyx", line 1504, in uvloop.loop.Loop.run_until_complete
  File "uvloop/loop.pyx", line 1377, in uvloop.loop.Loop.run_forever
  File "uvloop/loop.pyx", line 518, in uvloop.loop.Loop._run
RuntimeError: this event loop is already running.

Попытка 3

loop = asyncio.get_running_loop()
    with ThreadPoolExecutor() as executor:
        future = executor.submit(lambda: asyncio.run_coroutine_threadsafe(foo(arg1, arg2), loop).result())
        data = future.result()

Интерпретатор зависал при выполнении future.result()

Попытка 4

    loop = asyncio.get_event_loop()
    future = asyncio.Future()

    def callback(task):
        if task.exception():
            future.set_exception(task.exception())
        else:
            future.set_result(task.result())

    task = asyncio.run_coroutine_threadsafe(foo(arg1, arg2), loop)
    task.add_done_callback(callback)

    result = task.result()  ## Stuck here
    return result

Интерпретатор зависал при выполнении task.result()

Что значит «Ничего из этого не помогло». иметь в виду? Просто asyncio.run() в любом случае должно работать.

AKX 02.08.2024 12:29

Я тоже попробовал asyncio.run(), но возникла ошибка во время выполнения (отредактировал вопрос и включил подробности)

Ido 02.08.2024 15:18

Хорошо, цикл уже запущен, тогда другие способы, которые вы пробовали, должны работать. Если нет, то вам снова нужно будет предоставить минимально воспроизводимый пример и уточнить, что именно означает «не работает».

deceze 02.08.2024 15:29

Да, я упомянул вначале, что цикл уже запущен. Все методы, которые я описал в вопросе, не сработали. Я обновил вопрос с ошибками, которые я получаю

Ido 02.08.2024 17:46
pypi.org/project/nest-asyncio — это заброшенное ПО, но оно работает. По крайней мере, у меня это работает; Я не использую увлуп.
Charles Duffy 02.08.2024 19:50

Вам, вероятно, придется вызывать boo во время работы цикла событий, но нужно ли вам, чтобы цикл событий запускал его? Вызов самого boo в исполнителе и использование boo для запуска асинхронных вызовов должно работать.

user2357112 02.08.2024 19:53

кстати, хороший вопрос - спасибо за работу по демонстрации ваших попыток и связанных с ними ошибок.

Charles Duffy 02.08.2024 19:55

@user2357112 user2357112, ...отличается ли ваше предложение от «Попытки 3»?

Charles Duffy 02.08.2024 19:56

Я попытался установить гнездо-asyncio, импортировать его и вызвать nest_asyncio.apply() в точке входа приложения. К сожалению, я все еще вижу эту ошибку

Ido 02.08.2024 20:08

@CharlesDuffy: Да. Попытка 3 все еще выполняется внутри цикла событий. boo сам нужно запустить в экзекьюторе. Он не может просто делегировать исполнителю вызов run_coroutine_threadsafe.

user2357112 02.08.2024 20:56
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
10
90
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Ответ принят как подходящий

Не существует простого способа сделать это — обычно асинхронные функции можно вызывать только из асинхронных функций: именно так они и должны работать.

Причина в том, что любая промежуточная синхронная функция в вызове цепочек не должна быть «приостановлена», когда внутренняя функция выполняет оператор await и ожидает, что цикл выполнит другие параллельные задачи: для синхронной функции не существует никаких механизмов. приостановить и возобновить — вот что делает async def.

Итак, очевидный и правильный подход состоит в том, чтобы все ваши вызовы, связанные с функцией, которая должна будет ожидать операции ввода-вывода (или ее эквивалента), должны состоять из асинхронных вызовов.

Иногда это может привести к дублированию кода — существуют очень похожие функции, которые должны вызываться из синхронного кода, а другие — из асинхронного кода, и даже большие базы кода и расширенные платформы с трудом справляются с этим.

ОДНАКО, если вашей внутренней функции не понадобится результат вызова самой асинхронной функции, т. е. она может просто создать задачу с асинхронной функцией и вернуть эту задачу - асинхронная функция, вызывающая синхронную функцию, должна затем сохранить возвращенную задачу. и ждём этого:

Имейте в виду, что код, загружаемый обратным вызовом, может сбивать с толку, и это на самом деле является одной из основных мотиваций для включения в язык всего синтаксиса и механизмов, поддерживающих асинхронность.

Итак, для кода, которому просто не нужно возвращаемое значение:

import asyncio

async def work_for_data():
    time.sleep(3)
    return 42

# sync function, calling async function
def get_number():
    task = asyncio.create_task(work_for_data())
    return task

async def get_data():
    # trigger sync function that will start the tasks for producing async data
    number_task = get_number()
    # do more concurrent stuff
    
    ...
    # await for the data produced by `get_number()`:
    number = await number_task

async def run()
    global queue

    task = asyncio.create_task(get_data())
    await task


if __name__ == "__main__":
    asyncio.run(run())

Другие подходы заключаются в том, чтобы использовать синхронный код для создания задачи для промежуточной функции, которая будет ожидать целевую асинхронную функцию, и помещать ее возвращаемое значение в очередь, которое затем можно будет использовать в другом месте — всякий раз, когда результат будет готов. Или, если у синхронной функции будет сам код, который должен обрабатывать значения, созданные в асинхронном коде, то способ сделать это — установить обратный вызов, вызвав метод .add_done_callback для созданной задачи. Однако результат обработки этого обратного вызова не будет возвращен асинхронной функции, вызывающей функцию синхронизации (get_number). Если это необходимо, то предпочтительнее просто сделать всю цепочку вызовов асинхронной.

Нет проблем с планированием асинхронной функции из функции синхронизации, но в asyncio (и с одним потоком) невозможно, чтобы функция синхронизации ждала результата. Если управление не возвращается планировщику, никакая другая функция не может быть запущена и результат не будет вычислен. Каждая функция (синхронная или асинхронная), выполняемая в цикле событий asyncio, принадлежит задаче, и одновременно может выполняться только одна задача.

Вы должны использовать несколько потоков, и один из них будет запускать цикл событий. Другой поток(и) может отправлять работу и ждать результата. Вот как: https://docs.python.org/3/library/asyncio-dev.html#concurrency-and-multithreading

Не рекомендуется вызывать асинхронную функцию из функции синхронизации, поскольку она включает обмен данными между потоками и блокирует всю программу. Хороший способ решить эту проблему — адаптировать функции синхронизации к асинхронным частям и сделать асинхронной всю программу.

Однако есть обходной путь, который может «решить» вашу проблему.

  1. Оберните асинхронную часть в asyncio.loop
  2. Обработка передачи данных между синхронизирующей и асинхронной частью.
import asyncio

async def calc_async():
    # Use asyncio.sleep to non-blockingly wait for 2 seconds
    await asyncio.sleep(2)
    return 10

def sync_function():
    # Set up a new event loop for running async code
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    try:
        # Run the async function until complete and get the result
        result = loop.run_until_complete(calc_async())
        return result
    finally:
        # Close the loop to clean up resources
        loop.close()

# Example usage
result = sync_function()
print("Result from async function:", result)

Другие вопросы по теме