Как правильно запланировать и дождаться результата асинхронного кода из синхронного контекста в блокноте Jupyter?

У меня есть небольшая утилита для параллельного вызова синхронного кода с использованием asyncio.

import asyncio
from concurrent.futures import ThreadPoolExecutor
from asyncio import AbstractEventLoop, BaseEventLoop

async def call_many_async(fun, many_kwargs):
    return await asyncio.gather(*[asyncio.to_thread(fun, **kwargs) for kwargs in many_kwargs])
    
def call_many(fun, many_kwargs):
    loop = asyncio.get_event_loop()
    if loop.is_running():
        print('running loop scheduling there')
        # implement the correct run inside the loop, without the run_until_complete which is crashing, because the loop already runs
        future = asyncio.run_coroutine_threadsafe(call_many_async(fun, many_kwargs),
                                                loop)
        print('got the future')
        res = future.result()
        print('got the result')
        return res
    else:
        return loop.run_until_complete(call_many_async(fun, many_kwargs))

и он хорошо работает при использовании из Python

import time
def something_complex(param) -> int:
    print(f"call started with {param=}")
    time.sleep(0.1) # calling some time-costly API
    print("call ended")
    return 3 # returning the result

results = call_many(something_complex, ({"param": i} for i in range(1, 5)))

из Python работает без проблем, но у меня проблема с его использованием из IPython в Jupyter, я просто получаю

running loop scheduling there
got the future

и он висит вечно.

Изначально у меня было просто

def call_many(fun, many_kwargs):
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(call_many_async(fun, many_kwargs))

но там я получил ошибку

RuntimeError: This event loop is already running

Как это решить?

Конечно,

results = await call_many_async(something_complex, ({"param": i} for i in range(1, 5)))
assert len(results) == 4

работает, но я хочу использовать call_many как часть более крупной базы кода, которую я буду вызывать из блокнота Jupyter. Я прочитал https://blog.jupyter.org/ipython-7-0-async-repl-a35ce050f7f7 но не нашел решения, так как не хочу вызывать асинхронный код напрямую из jupyter ячейка записной книжки, но из синхронного кода.

Я хочу избегать решений с использованием async def call_many(fun, many_kwargs), потому что весь смысл в том, чтобы иметь возможность использовать код, который вызывает эту функцию из нескольких мест, без необходимости иметь синхронный и асинхронный эквивалент одного и того же.

Я видел Как запустить асинхронный код Python в блокноте Jupyter? но это объясняет, как напрямую вызывать асинхронный код, который, как я объяснял выше, меня не интересует.

Отвечает ли это на ваш вопрос? Как запустить асинхронный код Python в блокноте Jupyter?

dkasa 26.04.2024 15:45

@dkasa он уже это делает.

Ahmed AEK 26.04.2024 15:46
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
2
63
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Jupyter запускает собственный цикл событий в основном потоке, поэтому вы можете вызвать await напрямую из Jupyter, также asyncio.run_coroutine_threadsafe утверждает:

Эта функция предназначена для вызова из другого потока ОС, чем тот, где работает цикл событий.

так что ждать его на той же голове тупик. я бы, вероятно, просто запустил ваш цикл событий в другом потоке.

import threading
import asyncio

async def call_many_async(fun, many_kwargs):
    return await asyncio.gather(*[asyncio.to_thread(fun, **kwargs) for kwargs in many_kwargs])
    
def call_many(fun, many_kwargs):
    result = None
    def run_func():
        nonlocal result
        loop = asyncio.new_event_loop()
        result = loop.run_until_complete(call_many_async(fun, many_kwargs))
    thread = threading.Thread(target=run_func)
    thread.start()
    thread.join()
    return result

import time
def something_complex(param) -> int:
    print(f"call started with {param=}")
    time.sleep(0.1) # calling some time-costly API
    print("call ended")
    return 3 # returning the result

results = call_many(something_complex, ({"param": i} for i in range(1, 5)))

в противном случае вы можете просто сложить вызовы async на всем пути от основной точки выполнения Jupyter до вашего кода, создав все функции async и await каждую до единого.

import asyncio

async def call_many_async(fun, many_kwargs):
    return await asyncio.gather(*[asyncio.to_thread(fun, **kwargs) for kwargs in many_kwargs])
    
async def call_many(fun, many_kwargs):
    return await call_many_async(fun, many_kwargs)

fut = call_many(something_complex, ({"param": i} for i in range(1, 5)))
results = await fut

наконец, если вы используете только to_thread, вы можете просто создать и использовать ThreadPoolExecutor и напрямую вызывать его функцию .map, без необходимости создавать или использовать цикл событий.

from concurrent.futures import ThreadPoolExecutor

def call_many(fun, many_kwargs):
    result = None
    with ThreadPoolExecutor() as pool:
          return list(pool.map(fun, many_kwargs))

import time
def something_complex(param) -> int:
    print(f"call started with {param=}")
    time.sleep(0.1) # calling some time-costly API
    print("call ended")
    return 3 # returning the result

results = call_many(something_complex, ({"param": i} for i in range(1, 5)))

Спасибо. Я хочу избежать async def call_many(fun, many_kwargs), но запуск его из нового цикла событий звучит неплохо. К сожалению, первая часть вашего ответа не возвращается. Я хочу, чтобы call_many возвращал значение run_func.

Matěj Račinský 26.04.2024 16:06

@MatějRačinský я добавил, как можно получить от него возвращаемое значение

Ahmed AEK 26.04.2024 16:25

Другие вопросы по теме