Asyncio, как связать сопрограммы в цепочку

У меня есть следующий тестовый код, в котором я пытаюсь объединить разные сопрограммы. Идея состоит в том, что я хочу иметь одну сопрограмму, которая загружает данные, и как только данные будут загружены, я хочу передать данные во вторую процедуру, которая затем обработает данные. Код ниже работает, когда я пропускаю шагprocess_data, но всякий раз, когда я включаю шагprocess_data (пытаясь объединить сопрограммы), он терпит неудачу. Как я могу это исправить?

import asyncio
import time

task_inputs = [0,1,2,3,4,5,4,3,4]

async def download_dummy(url):
    await asyncio.sleep(url)
    data = url
    print(f'downloaded {url}')
    return data

async def process_data(data):
    await asyncio.sleep(1)
    processed_data = data*2
    print(f"processed {data}")
    return processed_data

async def main(task_inputs):
    task_handlers  = []
    print(f"started at {time.strftime('%X')}")
    async with asyncio.TaskGroup() as tg:
        for task in task_inputs:
            res = tg.create_task(process_data(download_dummy(task)))
            # res = tg.create_task(download_dummy(task))
            task_handlers.append(res)

    print(f"finished at {time.strftime('%X')}")
    results = [task_handler.result() for task_handler in task_handlers]
    print(results)

asyncio.run(main(task_inputs))

Ошибка, которую я получаю, довольно показательна: кажется, что первая сопрограмма фактически не выполняется, когда она передается второй сопрограмме, но я не уверен, как я могу элегантно это исправить.

+ Exception Group Traceback (most recent call last):
  |   File "C:\Program Files\JetBrains\PyCharm Community Edition 2024.1.4\plugins\python-ce\helpers\pydev\pydevd.py", line 2252, in <module>
  |     main()
  |   File "C:\Program Files\JetBrains\PyCharm Community Edition 2024.1.4\plugins\python-ce\helpers\pydev\pydevd.py", line 2234, in main
  |     globals = debugger.run(setup['file'], None, None, is_module)
  |               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "C:\Program Files\JetBrains\PyCharm Community Edition 2024.1.4\plugins\python-ce\helpers\pydev\pydevd.py", line 1544, in run
  |     return self._exec(is_module, entry_point_fn, module_name, file, globals, locals)
  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "C:\Program Files\JetBrains\PyCharm Community Edition 2024.1.4\plugins\python-ce\helpers\pydev\pydevd.py", line 1551, in _exec
  |     pydev_imports.execfile(file, globals, locals)  # execute the script
  |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "C:\Program Files\JetBrains\PyCharm Community Edition 2024.1.4\plugins\python-ce\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile
  |     exec(compile(contents+"\n", file, 'exec'), glob, loc)
  |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 31, in <module>
  |     asyncio.run(main(task_inputs))
  |   File "C:\Users\Tue J Boesen\AppData\Local\Programs\Python\Python312-arm64\Lib\asyncio\runners.py", line 194, in run
  |     return runner.run(main)
  |            ^^^^^^^^^^^^^^^^
  |   File "C:\Users\Tue J Boesen\AppData\Local\Programs\Python\Python312-arm64\Lib\asyncio\runners.py", line 118, in run
  |     return self._loop.run_until_complete(task)
  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "C:\Users\Tue J Boesen\AppData\Local\Programs\Python\Python312-arm64\Lib\asyncio\base_events.py", line 687, in run_until_complete
  |     return future.result()
  |            ^^^^^^^^^^^^^^^
  |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 21, in main
  |     async with asyncio.TaskGroup() as tg:
  |   File "C:\Users\Tue J Boesen\AppData\Local\Programs\Python\Python312-arm64\Lib\asyncio\taskgroups.py", line 145, in __aexit__
  |     raise me from None
  | ExceptionGroup: unhandled errors in a TaskGroup (9 sub-exceptions)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
    |     processed_data = data*2
    |                      ~~~~^~
    | TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
    +---------------- 2 ----------------
    | Traceback (most recent call last):
    |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
    |     processed_data = data*2
    |                      ~~~~^~
    | TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
    +---------------- 3 ----------------
    | Traceback (most recent call last):
    |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
    |     processed_data = data*2
    |                      ~~~~^~
    | TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
    +---------------- 4 ----------------
    | Traceback (most recent call last):
    |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
    |     processed_data = data*2
    |                      ~~~~^~
    | TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
    +---------------- 5 ----------------
    | Traceback (most recent call last):
    |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
    |     processed_data = data*2
    |                      ~~~~^~
    | TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
    +---------------- 6 ----------------
    | Traceback (most recent call last):
    |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
    |     processed_data = data*2
    |                      ~~~~^~
    | TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
    +---------------- 7 ----------------
    | Traceback (most recent call last):
    |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
    |     processed_data = data*2
    |                      ~~~~^~
    | TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
    +---------------- 8 ----------------
    | Traceback (most recent call last):
    |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
    |     processed_data = data*2
    |                      ~~~~^~
    | TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
    +---------------- 9 ----------------
    | Traceback (most recent call last):
    |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
    |     processed_data = data*2
    |                      ~~~~^~
    | TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
    +------------------------------------
await download_dummy(task)?
Nearoo 11.07.2024 11:09
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
2
1
66
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

У вас есть:

...
res = tg.create_task(process_data(download_dummy(task)))
...

process_data ожидает передачи числа, которое будет умножено на 2. Вместо этого вы передаете ему download_dummy(task), который является экземпляром сопрограммы.

Поскольку вы хотите имитировать загрузку некоторых данных с URL-адреса, а затем обработать их, я считаю, что самым очевидным решением было бы переименовать download_data в download_and_process_data. Затем, когда данные загружены, он вызывает process_data следующим образом:

import asyncio
import time

task_inputs = [0,1,2,3,4,5,4,3,4]

async def download_and_process_data(url):
    await asyncio.sleep(url)
    data = url
    print(f'downloaded {url}')
    return await process_data(data)

async def process_data(data):
    await asyncio.sleep(1)
    processed_data = data*2
    print(f"processed {data}")
    return processed_data

async def main(task_inputs):
    task_handlers  = []
    print(f"started at {time.strftime('%X')}")
    async with asyncio.TaskGroup() as tg:
        for task in task_inputs:
            res = tg.create_task(download_and_process_data(task))
            # res = tg.create_task(download_dummy(task))
            task_handlers.append(res)

    print(f"finished at {time.strftime('%X')}")
    results = [task_handler.result() for task_handler in task_handlers]
    print(results)

asyncio.run(main(task_inputs))

Распечатки:

started at 05:32:48
downloaded 0
downloaded 1
processed 0
downloaded 2
processed 1
downloaded 3
downloaded 3
processed 2
downloaded 4
processed 3
downloaded 4
downloaded 4
processed 3
downloaded 5
processed 4
processed 4
processed 4
processed 5
finished at 05:32:54
[0, 2, 4, 6, 8, 10, 8, 6, 8]

В качестве альтернативы вы можете оставить res = tg.create_task(process_data(download_dummy(task)) в его нынешнем виде, но затем определить process_data для ожидания сопрограммы следующим образом:

import asyncio
import time

task_inputs = [0,1,2,3,4,5,4,3,4]

async def download_dummy(url):
    await asyncio.sleep(url)
    data = url
    print(f'downloaded {url}')
    return data

async def process_data(coro):
    data = await coro
    await asyncio.sleep(1)
    processed_data = data*2
    print(f"processed {data}")
    return processed_data

async def main(task_inputs):
    task_handlers  = []
    print(f"started at {time.strftime('%X')}")
    async with asyncio.TaskGroup() as tg:
        for task in task_inputs:
            res = tg.create_task(process_data(download_dummy(task)))
            # res = tg.create_task(download_dummy(task))
            task_handlers.append(res)

    print(f"finished at {time.strftime('%X')}")
    results = [task_handler.result() for task_handler in task_handlers]
    print(results)

asyncio.run(main(task_inputs))

В этом случае вы переходите к сопрограмме, которую необходимо дождаться, чтобы получить данные, которые она будет обрабатывать. Это работает, но представляет собой запутанную инверсию логики.

Я хочу, чтобы этапы загрузки и обработки были отдельными и общими, чтобы я мог использовать эти процедуры в целом. Идея заключалась в том, что я мог бы затем легко объединить все шаги, необходимые в конкретном конвейере save_to_disk(process(download(url))). А для другого конвейера я мог бы пропустить обработку и просто сохранить_на_диск. Однако я понимаю, что вы имеете в виду, и постараюсь использовать это для выполнения работы.

Tue 11.07.2024 12:16

Основываясь на ответе @Booboo, я создал следующее, которое, по сути, является лишь слегка переписанным его первым предложением.

Однако у этого решения все еще есть проблема: мне нужно определить отдельную функцию, чтобы объединить эти различные общие шаги, но на данный момент это кажется лучшим решением.

import asyncio
import time

task_inputs = [0,1,2,3,4,5,4,3,4]

async def download_dummy(url):
    await asyncio.sleep(url)
    data = url
    print(f'downloaded {url}')
    return data

async def process_data(data):
    await asyncio.sleep(1)
    processed_data = data*2
    print(f"processed {data}")
    return processed_data

async def download_and_process(url):
    data = await download_dummy(url)
    processed_data = await process_data(data)
    return processed_data


async def main(task_inputs):
    task_handlers  = []
    print(f"started at {time.strftime('%X')}")
    async with asyncio.TaskGroup() as tg:
        for task in task_inputs:
            res = tg.create_task(download_and_process(task))
            task_handlers.append(res)

    print(f"finished at {time.strftime('%X')}")
    results = [task_handler.result() for task_handler in task_handlers]
    print(results)

asyncio.run(main(task_inputs))
Ответ принят как подходящий

Проблема в том, что строка tg.create_task(process_data(download_dummy(task))) не будет вызывать process_data с результатом download_dummy, а с ожидаемым - то есть параметр, который попадает внутрь process_data, нужно дождаться, чтобы получить свое значение.

Для простых, жестко запрограммированных конвейеров я обычно просто создаю небольшую функцию, вызывающую сопрограммы по порядку:

async def pipeline(arg):
    step1 = await download_data(arg)
    step2 = await process_results(step1)
    return step2

А потом tg.create_task(pipeline(task)))

Оказывается, pipeline можно сделать универсальным и заставить сопрограммы запускаться последовательно во время выполнения - это должно работать даже в сложных случаях:

from typing import Sequence, Awaitable, Any

async def pipeline(coroutines: Sequence[awaitable], initial_arg):
     partial = initial_arg
     for coroutine in couroutines:
          partial = await coroutine(partial)
     return partial
[...]
async def main(task_inputs):
    task_handlers  = []
    print(f"started at {time.strftime('%X')}")
    chain = [download_dummy, process_data]
    async with asyncio.TaskGroup() as tg:
        for task in task_inputs:
            res = tg.create_task(pipeline(chain, task)
            task_handlers.append(res)

Другие вопросы по теме