У меня есть небольшая утилита для параллельного вызова синхронного кода с использованием asyncio
.
import asyncio
from concurrent.futures import ThreadPoolExecutor
from asyncio import AbstractEventLoop, BaseEventLoop
async def call_many_async(fun, many_kwargs):
return await asyncio.gather(*[asyncio.to_thread(fun, **kwargs) for kwargs in many_kwargs])
def call_many(fun, many_kwargs):
loop = asyncio.get_event_loop()
if loop.is_running():
print('running loop scheduling there')
# implement the correct run inside the loop, without the run_until_complete which is crashing, because the loop already runs
future = asyncio.run_coroutine_threadsafe(call_many_async(fun, many_kwargs),
loop)
print('got the future')
res = future.result()
print('got the result')
return res
else:
return loop.run_until_complete(call_many_async(fun, many_kwargs))
и он хорошо работает при использовании из Python
import time
def something_complex(param) -> int:
print(f"call started with {param=}")
time.sleep(0.1) # calling some time-costly API
print("call ended")
return 3 # returning the result
results = call_many(something_complex, ({"param": i} for i in range(1, 5)))
из Python работает без проблем, но у меня проблема с его использованием из IPython
в Jupyter, я просто получаю
running loop scheduling there
got the future
и он висит вечно.
Изначально у меня было просто
def call_many(fun, many_kwargs):
loop = asyncio.get_event_loop()
return loop.run_until_complete(call_many_async(fun, many_kwargs))
но там я получил ошибку
RuntimeError: This event loop is already running
Как это решить?
Конечно,
results = await call_many_async(something_complex, ({"param": i} for i in range(1, 5)))
assert len(results) == 4
работает, но я хочу использовать call_many
как часть более крупной базы кода, которую я буду вызывать из блокнота Jupyter.
Я прочитал https://blog.jupyter.org/ipython-7-0-async-repl-a35ce050f7f7 но не нашел решения, так как не хочу вызывать асинхронный код напрямую из jupyter ячейка записной книжки, но из синхронного кода.
Я хочу избегать решений с использованием async def call_many(fun, many_kwargs)
, потому что весь смысл в том, чтобы иметь возможность использовать код, который вызывает эту функцию из нескольких мест, без необходимости иметь синхронный и асинхронный эквивалент одного и того же.
Я видел Как запустить асинхронный код Python в блокноте Jupyter? но это объясняет, как напрямую вызывать асинхронный код, который, как я объяснял выше, меня не интересует.
@dkasa он уже это делает.
Jupyter запускает собственный цикл событий в основном потоке, поэтому вы можете вызвать await
напрямую из Jupyter, также asyncio.run_coroutine_threadsafe утверждает:
Эта функция предназначена для вызова из другого потока ОС, чем тот, где работает цикл событий.
так что ждать его на той же голове тупик. я бы, вероятно, просто запустил ваш цикл событий в другом потоке.
import threading
import asyncio
async def call_many_async(fun, many_kwargs):
return await asyncio.gather(*[asyncio.to_thread(fun, **kwargs) for kwargs in many_kwargs])
def call_many(fun, many_kwargs):
result = None
def run_func():
nonlocal result
loop = asyncio.new_event_loop()
result = loop.run_until_complete(call_many_async(fun, many_kwargs))
thread = threading.Thread(target=run_func)
thread.start()
thread.join()
return result
import time
def something_complex(param) -> int:
print(f"call started with {param=}")
time.sleep(0.1) # calling some time-costly API
print("call ended")
return 3 # returning the result
results = call_many(something_complex, ({"param": i} for i in range(1, 5)))
в противном случае вы можете просто сложить вызовы async
на всем пути от основной точки выполнения Jupyter до вашего кода, создав все функции async
и await
каждую до единого.
import asyncio
async def call_many_async(fun, many_kwargs):
return await asyncio.gather(*[asyncio.to_thread(fun, **kwargs) for kwargs in many_kwargs])
async def call_many(fun, many_kwargs):
return await call_many_async(fun, many_kwargs)
fut = call_many(something_complex, ({"param": i} for i in range(1, 5)))
results = await fut
наконец, если вы используете только to_thread
, вы можете просто создать и использовать ThreadPoolExecutor и напрямую вызывать его функцию .map
, без необходимости создавать или использовать цикл событий.
from concurrent.futures import ThreadPoolExecutor
def call_many(fun, many_kwargs):
result = None
with ThreadPoolExecutor() as pool:
return list(pool.map(fun, many_kwargs))
import time
def something_complex(param) -> int:
print(f"call started with {param=}")
time.sleep(0.1) # calling some time-costly API
print("call ended")
return 3 # returning the result
results = call_many(something_complex, ({"param": i} for i in range(1, 5)))
Спасибо. Я хочу избежать async def call_many(fun, many_kwargs)
, но запуск его из нового цикла событий звучит неплохо. К сожалению, первая часть вашего ответа не возвращается. Я хочу, чтобы call_many
возвращал значение run_func
.
@MatějRačinský я добавил, как можно получить от него возвращаемое значение
Отвечает ли это на ваш вопрос? Как запустить асинхронный код Python в блокноте Jupyter?