У меня есть следующий код:
import asyncio
async def myfunc(i):
print("hello", i)
await asyncio.sleep(i)
print("world", i)
async def main():
asyncio.create_task(myfunc(2))
asyncio.create_task(myfunc(1))
asyncio.run(main())
Он выводит:
hello 2
hello 1
Обратите внимание, что world
нигде не печатается. Почему производится то, что мы видим? Я ожидал:
hello 2
hello 1
world 1
world 2
Потому что я думал, что вызовы asyncio.sleep(i)
приведут к выполнению цикла событий, и в этот момент цикл событий перепланирует их после соответствующего времени ожидания. Очевидно, я неправильно понимаю. Может кто-нибудь объяснить?
Проблема в том, что цикл в main не ожидает завершения задач, из-за чего задачи не завершаются. Используйте asyncio.gather()
для запуска и дождитесь выполнения всех сопрограмм.
import asyncio
async def myfunc(i):
print("hello", i)
await asyncio.sleep(i)
print("world", i)
async def main():
await asyncio.gather(myfunc(2), myfunc(1))
asyncio.run(main())
Логика, которую вы описываете в комментариях, более сложная и нет функции, которая ее реализует, поэтому вам нужно спроектировать логику, в этом случае есть 2 условия, которые должны выполниться для завершения приложения:
Учитывая это, я использую Future для создания флага, который указывает на это, также используйте add_done_callback, чтобы уведомить меня о завершении задания.
import asyncio
from collections import deque
import random
async def f(identifier, seconds):
print(f"Starting task: {identifier}, seconds: {seconds}s")
await asyncio.sleep(seconds)
print(f"Finished task: {identifier}")
return identifier
T = 3
class Manager:
def __init__(self, q):
self._q = q
self._future = asyncio.get_event_loop().create_future()
self._active_tasks = set()
self._status = False
@property
def q(self):
return self._q
def launch_task(self):
try:
identifier = self.q.pop()
except IndexError:
return False
else:
seconds = random.uniform(T - 1, T + 1)
task = asyncio.create_task(f(identifier, seconds))
self._active_tasks.add(identifier)
task.add_done_callback(self._finished_callback)
return True
async def work(self):
self._status = True
while self.launch_task():
await asyncio.sleep(T)
self._status = False
await self._future
def _finished_callback(self, c):
self._active_tasks.remove(c.result())
if not self._active_tasks and not self._status:
self._future.set_result(None)
async def main():
identifiers = deque(range(10))
manager = Manager(identifiers)
await manager.work()
if __name__ == "__main__":
asyncio.run(main())
@ CodeM4aster Я не понимаю вашего комментария, не говорите о сетях и потоках. Вы хотите запускать новую задачу каждые T секунд, и эта задача может занять N секунд. Я не ошибаюсь? Если да, есть ли событие, которое должно привести к завершению программы или она должна работать вечно?
Не совсем. В цикле я создаю новую задачу каждые T секунд. Каждый потребляет из глобальной очереди заданий и повторяется, пока не станет пустым. Цикл создания задач также зацикливается до тех пор, пока глобальная очередь не станет пустой. Как только этот цикл завершается, программа завершается, даже если некоторые задачи еще не завершены. Я бы хотел подождать, пока они закончат, прежде чем выйти
@CodeM4aster Насколько я понимаю, вы уже предопределили N задач, которые запускаются последовательно каждые T секунд, и вам нужна программа, когда все задачи завершатся, я прав?
Практически, задачи не определены заранее. Когда я создаю задачу, мне нужна информация, которая доступна только во время выполнения. Нет верхней границы для N, я продолжаю создавать новые задачи каждые T секунд, пока очередь заданий не опустеет, после чего я должен не создавать больше задач, а позволить существующим задачам завершиться
@ CodeM4aster N зависит от очереди задач, вот что я имею в виду. Я указываю предопределенное именно потому, что все зависит от исходных фиксированных данных: очереди.
Моя ошибка, N зависит от очереди, но мои задачи не предопределены. Очередь заданий — это очередь блоков данных для загрузки с одноранговых узлов. Один узел имеет фиксированную пропускную способность для загрузки, а моя пропускная способность для загрузки не ограничена. Я узнаю о новом партнере каждые T секунд. Поэтому я создаю задачу для каждого нового узла, о котором мне стало известно, а не для каждого блока данных. Как только все блоки из очереди будут использованы, каждая задача должна завершить загрузку своего последнего блока, после чего программа должна выйти.
@CodeM4aster см. обновление
Нашел гораздо более простое решение, чем то, которое предоставил @eyllanesc здесь. Оказывается, есть функция, которая это реализует.
Я понимаю. Как бы я подошел к этому в случае, когда я создаю_задачу каждые 2 секунды в цикле в основном, пока не завершится операция сетевого ввода-вывода? Я хотел бы сделать что-то вроде эквивалента pthread_join после цикла, есть ли такой эквивалент? Я бы использовал сбор, но мне нужно, чтобы цикл не блокировался