Задачи python asyncio.create_task завершаются раньше, чем ожидалось?

У меня есть следующий код:

import asyncio

async def myfunc(i):
    print("hello", i)
    await asyncio.sleep(i)
    print("world", i)

async def main():
    asyncio.create_task(myfunc(2))
    asyncio.create_task(myfunc(1))
    
asyncio.run(main())

Он выводит:

hello 2
hello 1

Обратите внимание, что world нигде не печатается. Почему производится то, что мы видим? Я ожидал:

hello 2
hello 1
world 1
world 2

Потому что я думал, что вызовы asyncio.sleep(i) приведут к выполнению цикла событий, и в этот момент цикл событий перепланирует их после соответствующего времени ожидания. Очевидно, я неправильно понимаю. Может кто-нибудь объяснить?

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
0
41
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Проблема в том, что цикл в main не ожидает завершения задач, из-за чего задачи не завершаются. Используйте asyncio.gather() для запуска и дождитесь выполнения всех сопрограмм.

import asyncio


async def myfunc(i):
    print("hello", i)
    await asyncio.sleep(i)
    print("world", i)


async def main():
    await asyncio.gather(myfunc(2), myfunc(1))


asyncio.run(main())

Логика, которую вы описываете в комментариях, более сложная и нет функции, которая ее реализует, поэтому вам нужно спроектировать логику, в этом случае есть 2 условия, которые должны выполниться для завершения приложения:

  • Все задачи должны быть запущены.
  • Не должно быть активной задачи.

Учитывая это, я использую Future для создания флага, который указывает на это, также используйте add_done_callback, чтобы уведомить меня о завершении задания.

import asyncio
from collections import deque
import random


async def f(identifier, seconds):
    print(f"Starting task: {identifier}, seconds: {seconds}s")
    await asyncio.sleep(seconds)
    print(f"Finished task: {identifier}")
    return identifier


T = 3


class Manager:
    def __init__(self, q):
        self._q = q
        self._future = asyncio.get_event_loop().create_future()
        self._active_tasks = set()
        self._status = False

    @property
    def q(self):
        return self._q

    def launch_task(self):
        try:
            identifier = self.q.pop()
        except IndexError:
            return False
        else:
            seconds = random.uniform(T - 1, T + 1)
            task = asyncio.create_task(f(identifier, seconds))
            self._active_tasks.add(identifier)
            task.add_done_callback(self._finished_callback)
            return True

    async def work(self):
        self._status = True
        while self.launch_task():
            await asyncio.sleep(T)
        self._status = False
        await self._future

    def _finished_callback(self, c):
        self._active_tasks.remove(c.result())
        if not self._active_tasks and not self._status:
            self._future.set_result(None)


async def main():
    identifiers = deque(range(10))
    manager = Manager(identifiers)
    await manager.work()


if __name__ == "__main__":
    asyncio.run(main())

Я понимаю. Как бы я подошел к этому в случае, когда я создаю_задачу каждые 2 секунды в цикле в основном, пока не завершится операция сетевого ввода-вывода? Я хотел бы сделать что-то вроде эквивалента pthread_join после цикла, есть ли такой эквивалент? Я бы использовал сбор, но мне нужно, чтобы цикл не блокировался

Code M4aster 19.03.2022 22:07

@ CodeM4aster Я не понимаю вашего комментария, не говорите о сетях и потоках. Вы хотите запускать новую задачу каждые T секунд, и эта задача может занять N секунд. Я не ошибаюсь? Если да, есть ли событие, которое должно привести к завершению программы или она должна работать вечно?

eyllanesc 19.03.2022 22:17

Не совсем. В цикле я создаю новую задачу каждые T секунд. Каждый потребляет из глобальной очереди заданий и повторяется, пока не станет пустым. Цикл создания задач также зацикливается до тех пор, пока глобальная очередь не станет пустой. Как только этот цикл завершается, программа завершается, даже если некоторые задачи еще не завершены. Я бы хотел подождать, пока они закончат, прежде чем выйти

Code M4aster 19.03.2022 23:04

@CodeM4aster Насколько я понимаю, вы уже предопределили N задач, которые запускаются последовательно каждые T секунд, и вам нужна программа, когда все задачи завершатся, я прав?

eyllanesc 19.03.2022 23:07

Практически, задачи не определены заранее. Когда я создаю задачу, мне нужна информация, которая доступна только во время выполнения. Нет верхней границы для N, я продолжаю создавать новые задачи каждые T секунд, пока очередь заданий не опустеет, после чего я должен не создавать больше задач, а позволить существующим задачам завершиться

Code M4aster 19.03.2022 23:09

@ CodeM4aster N зависит от очереди задач, вот что я имею в виду. Я указываю предопределенное именно потому, что все зависит от исходных фиксированных данных: очереди.

eyllanesc 19.03.2022 23:12

Моя ошибка, N зависит от очереди, но мои задачи не предопределены. Очередь заданий — это очередь блоков данных для загрузки с одноранговых узлов. Один узел имеет фиксированную пропускную способность для загрузки, а моя пропускная способность для загрузки не ограничена. Я узнаю о новом партнере каждые T секунд. Поэтому я создаю задачу для каждого нового узла, о котором мне стало известно, а не для каждого блока данных. Как только все блоки из очереди будут использованы, каждая задача должна завершить загрузку своего последнего блока, после чего программа должна выйти.

Code M4aster 19.03.2022 23:21

@CodeM4aster см. обновление

eyllanesc 19.03.2022 23:44
Ответ принят как подходящий

Нашел гораздо более простое решение, чем то, которое предоставил @eyllanesc здесь. Оказывается, есть функция, которая это реализует.

Другие вопросы по теме