У меня есть следующий тестовый код, в котором я пытаюсь объединить разные сопрограммы. Идея состоит в том, что я хочу иметь одну сопрограмму, которая загружает данные, и как только данные будут загружены, я хочу передать данные во вторую процедуру, которая затем обработает данные. Код ниже работает, когда я пропускаю шагprocess_data, но всякий раз, когда я включаю шагprocess_data (пытаясь объединить сопрограммы), он терпит неудачу. Как я могу это исправить?
import asyncio
import time
task_inputs = [0,1,2,3,4,5,4,3,4]
async def download_dummy(url):
await asyncio.sleep(url)
data = url
print(f'downloaded {url}')
return data
async def process_data(data):
await asyncio.sleep(1)
processed_data = data*2
print(f"processed {data}")
return processed_data
async def main(task_inputs):
task_handlers = []
print(f"started at {time.strftime('%X')}")
async with asyncio.TaskGroup() as tg:
for task in task_inputs:
res = tg.create_task(process_data(download_dummy(task)))
# res = tg.create_task(download_dummy(task))
task_handlers.append(res)
print(f"finished at {time.strftime('%X')}")
results = [task_handler.result() for task_handler in task_handlers]
print(results)
asyncio.run(main(task_inputs))
Ошибка, которую я получаю, довольно показательна: кажется, что первая сопрограмма фактически не выполняется, когда она передается второй сопрограмме, но я не уверен, как я могу элегантно это исправить.
+ Exception Group Traceback (most recent call last):
| File "C:\Program Files\JetBrains\PyCharm Community Edition 2024.1.4\plugins\python-ce\helpers\pydev\pydevd.py", line 2252, in <module>
| main()
| File "C:\Program Files\JetBrains\PyCharm Community Edition 2024.1.4\plugins\python-ce\helpers\pydev\pydevd.py", line 2234, in main
| globals = debugger.run(setup['file'], None, None, is_module)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "C:\Program Files\JetBrains\PyCharm Community Edition 2024.1.4\plugins\python-ce\helpers\pydev\pydevd.py", line 1544, in run
| return self._exec(is_module, entry_point_fn, module_name, file, globals, locals)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "C:\Program Files\JetBrains\PyCharm Community Edition 2024.1.4\plugins\python-ce\helpers\pydev\pydevd.py", line 1551, in _exec
| pydev_imports.execfile(file, globals, locals) # execute the script
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "C:\Program Files\JetBrains\PyCharm Community Edition 2024.1.4\plugins\python-ce\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile
| exec(compile(contents+"\n", file, 'exec'), glob, loc)
| File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 31, in <module>
| asyncio.run(main(task_inputs))
| File "C:\Users\Tue J Boesen\AppData\Local\Programs\Python\Python312-arm64\Lib\asyncio\runners.py", line 194, in run
| return runner.run(main)
| ^^^^^^^^^^^^^^^^
| File "C:\Users\Tue J Boesen\AppData\Local\Programs\Python\Python312-arm64\Lib\asyncio\runners.py", line 118, in run
| return self._loop.run_until_complete(task)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "C:\Users\Tue J Boesen\AppData\Local\Programs\Python\Python312-arm64\Lib\asyncio\base_events.py", line 687, in run_until_complete
| return future.result()
| ^^^^^^^^^^^^^^^
| File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 21, in main
| async with asyncio.TaskGroup() as tg:
| File "C:\Users\Tue J Boesen\AppData\Local\Programs\Python\Python312-arm64\Lib\asyncio\taskgroups.py", line 145, in __aexit__
| raise me from None
| ExceptionGroup: unhandled errors in a TaskGroup (9 sub-exceptions)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
| processed_data = data*2
| ~~~~^~
| TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
+---------------- 2 ----------------
| Traceback (most recent call last):
| File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
| processed_data = data*2
| ~~~~^~
| TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
+---------------- 3 ----------------
| Traceback (most recent call last):
| File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
| processed_data = data*2
| ~~~~^~
| TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
+---------------- 4 ----------------
| Traceback (most recent call last):
| File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
| processed_data = data*2
| ~~~~^~
| TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
+---------------- 5 ----------------
| Traceback (most recent call last):
| File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
| processed_data = data*2
| ~~~~^~
| TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
+---------------- 6 ----------------
| Traceback (most recent call last):
| File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
| processed_data = data*2
| ~~~~^~
| TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
+---------------- 7 ----------------
| Traceback (most recent call last):
| File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
| processed_data = data*2
| ~~~~^~
| TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
+---------------- 8 ----------------
| Traceback (most recent call last):
| File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
| processed_data = data*2
| ~~~~^~
| TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
+---------------- 9 ----------------
| Traceback (most recent call last):
| File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
| processed_data = data*2
| ~~~~^~
| TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
+------------------------------------
У вас есть:
...
res = tg.create_task(process_data(download_dummy(task)))
...
process_data
ожидает передачи числа, которое будет умножено на 2. Вместо этого вы передаете ему download_dummy(task)
, который является экземпляром сопрограммы.
Поскольку вы хотите имитировать загрузку некоторых данных с URL-адреса, а затем обработать их, я считаю, что самым очевидным решением было бы переименовать download_data
в download_and_process_data
. Затем, когда данные загружены, он вызывает process_data
следующим образом:
import asyncio
import time
task_inputs = [0,1,2,3,4,5,4,3,4]
async def download_and_process_data(url):
await asyncio.sleep(url)
data = url
print(f'downloaded {url}')
return await process_data(data)
async def process_data(data):
await asyncio.sleep(1)
processed_data = data*2
print(f"processed {data}")
return processed_data
async def main(task_inputs):
task_handlers = []
print(f"started at {time.strftime('%X')}")
async with asyncio.TaskGroup() as tg:
for task in task_inputs:
res = tg.create_task(download_and_process_data(task))
# res = tg.create_task(download_dummy(task))
task_handlers.append(res)
print(f"finished at {time.strftime('%X')}")
results = [task_handler.result() for task_handler in task_handlers]
print(results)
asyncio.run(main(task_inputs))
Распечатки:
started at 05:32:48
downloaded 0
downloaded 1
processed 0
downloaded 2
processed 1
downloaded 3
downloaded 3
processed 2
downloaded 4
processed 3
downloaded 4
downloaded 4
processed 3
downloaded 5
processed 4
processed 4
processed 4
processed 5
finished at 05:32:54
[0, 2, 4, 6, 8, 10, 8, 6, 8]
В качестве альтернативы вы можете оставить res = tg.create_task(process_data(download_dummy(task))
в его нынешнем виде, но затем определить process_data
для ожидания сопрограммы следующим образом:
import asyncio
import time
task_inputs = [0,1,2,3,4,5,4,3,4]
async def download_dummy(url):
await asyncio.sleep(url)
data = url
print(f'downloaded {url}')
return data
async def process_data(coro):
data = await coro
await asyncio.sleep(1)
processed_data = data*2
print(f"processed {data}")
return processed_data
async def main(task_inputs):
task_handlers = []
print(f"started at {time.strftime('%X')}")
async with asyncio.TaskGroup() as tg:
for task in task_inputs:
res = tg.create_task(process_data(download_dummy(task)))
# res = tg.create_task(download_dummy(task))
task_handlers.append(res)
print(f"finished at {time.strftime('%X')}")
results = [task_handler.result() for task_handler in task_handlers]
print(results)
asyncio.run(main(task_inputs))
В этом случае вы переходите к сопрограмме, которую необходимо дождаться, чтобы получить данные, которые она будет обрабатывать. Это работает, но представляет собой запутанную инверсию логики.
Я хочу, чтобы этапы загрузки и обработки были отдельными и общими, чтобы я мог использовать эти процедуры в целом. Идея заключалась в том, что я мог бы затем легко объединить все шаги, необходимые в конкретном конвейере save_to_disk(process(download(url))). А для другого конвейера я мог бы пропустить обработку и просто сохранить_на_диск. Однако я понимаю, что вы имеете в виду, и постараюсь использовать это для выполнения работы.
Основываясь на ответе @Booboo, я создал следующее, которое, по сути, является лишь слегка переписанным его первым предложением.
Однако у этого решения все еще есть проблема: мне нужно определить отдельную функцию, чтобы объединить эти различные общие шаги, но на данный момент это кажется лучшим решением.
import asyncio
import time
task_inputs = [0,1,2,3,4,5,4,3,4]
async def download_dummy(url):
await asyncio.sleep(url)
data = url
print(f'downloaded {url}')
return data
async def process_data(data):
await asyncio.sleep(1)
processed_data = data*2
print(f"processed {data}")
return processed_data
async def download_and_process(url):
data = await download_dummy(url)
processed_data = await process_data(data)
return processed_data
async def main(task_inputs):
task_handlers = []
print(f"started at {time.strftime('%X')}")
async with asyncio.TaskGroup() as tg:
for task in task_inputs:
res = tg.create_task(download_and_process(task))
task_handlers.append(res)
print(f"finished at {time.strftime('%X')}")
results = [task_handler.result() for task_handler in task_handlers]
print(results)
asyncio.run(main(task_inputs))
Проблема в том, что строка tg.create_task(process_data(download_dummy(task)))
не будет вызывать process_data
с результатом download_dummy
, а с ожидаемым - то есть параметр, который попадает внутрь process_data
, нужно дождаться, чтобы получить свое значение.
Для простых, жестко запрограммированных конвейеров я обычно просто создаю небольшую функцию, вызывающую сопрограммы по порядку:
async def pipeline(arg):
step1 = await download_data(arg)
step2 = await process_results(step1)
return step2
А потом tg.create_task(pipeline(task)))
Оказывается, pipeline
можно сделать универсальным и заставить сопрограммы запускаться последовательно во время выполнения - это должно работать даже в сложных случаях:
from typing import Sequence, Awaitable, Any
async def pipeline(coroutines: Sequence[awaitable], initial_arg):
partial = initial_arg
for coroutine in couroutines:
partial = await coroutine(partial)
return partial
[...]
async def main(task_inputs):
task_handlers = []
print(f"started at {time.strftime('%X')}")
chain = [download_dummy, process_data]
async with asyncio.TaskGroup() as tg:
for task in task_inputs:
res = tg.create_task(pipeline(chain, task)
task_handlers.append(res)
await download_dummy(task)
?