Я работаю над шаблоном, в котором я общаюсь между несколькими очередями для обработки элементов в конвейере. Я использую датчики для связи между очередями, когда следует прекратить работу, однако в следующем коде я вижу результаты, которые меня смущают.
При чтении write_q
в write_task()
я вижу, что первое значение входит в качестве дозорного None
вместо задач в том порядке, в котором они были расположены response_task()
. Если я правильно понял, write_task()
должен получать предметы по порядку и обрабатывать их по мере создания задач.
Кроме того, при печати qsize()
в write_task()
после того, как я нашел дозорного, он говорит, что здесь 0 элементов, однако при обратной печати в основном кажется, что qsize()
из write_q
все еще содержит 2 элемента. Я где-то читал, что aiofiles
использует run_in_executor()
, что означает, что могут быть различия в том, где находится очередь.
Большая часть приведенного ниже кода представляет собой шаблон, иллюстрирующий реальный сценарий того, почему мой код продолжает бесконечно блокироваться.
import asyncio
import aiofiles
import aiocsv
import json
async def fetch(t: float) -> dict:
print(f"INFO: Sleeping for {t}s")
await asyncio.sleep(t)
return t
async def task(l: list, request_q: asyncio.Queue) -> None:
# Read tasks from source of data
for i in l:
await request_q.put(
asyncio.create_task(fetch(i))
)
# Sentinel value to signal we are done receiving from source
await request_q.put(None)
async def request_task(request_q: asyncio.Queue, write_q: asyncio.Queue) -> None:
while True:
req = await request_q.get()
# If we received sentinel for tasks, pass message to next queue
if not req:
print("INFO: received sentinel from request_q")
request_q.task_done()
await request_q.put(None) # put back into the queue to signal to other consumers we are done
break
# Make the request
resp = await req
await write_q.put(resp)
request_q.task_done()
async def write_task(write_q: asyncio.Queue) -> None:
headers: bool = True
async with aiofiles.open("file.csv", mode = "w+", newline='') as f:
w = aiocsv.AsyncWriter(f)
while True:
# Get data out of the queue to write it
data = await write_q.get()
print(data)
# if not data:
# print(f"INFO: Found sentinel in write_task, queue size was: {write_q.qsize()}")
# write_q.task_done()
# await f.flush()
# break
if headers:
await w.writerow([
"status",
"data",
])
headers = False
# Write the data from the response
await w.writerow([
"200",
json.dumps(data)
])
await f.flush()
write_q.task_done()
async def main() -> None:
# Create fake data to POST
items: list[str] = [.2, .5, 1]
# Queues for orchestrating
request_q = asyncio.Queue()
write_q = asyncio.Queue()
# one producer
producer = asyncio.create_task(
task(items, request_q)
)
# 5 request consumers
request_consumers = [
asyncio.create_task(
request_task(request_q, write_q)
)
for _ in range(2)
]
# 5 write consumers
write_consumer = asyncio.create_task(
write_task(write_q)
)
errors = await asyncio.gather(producer, return_exceptions=True)
print(f"INFO: Producer has completed! exceptions: {errors}")
await request_q.join()
for c in request_consumers:
c.cancel()
print("INFO: request consumer has completed! ")
print(f"INFO: write_q in main qsize: {write_q.qsize()}")
await write_q.join()
print("INFO: write queue has completed! ")
# await write_consumer
write_consumer.cancel()
print("INFO: Complete!")
if __name__ == "__main__":
# loop = asyncio.new_event_loop()
# loop.run_until_complete(main())
asyncio.run(main())
Да, я не могу воспроизвести ту же ошибку, когда разбираюсь с ограниченным пониманием вашего проекта - думаю, я подожду вашего обновленного кода.
@jupiterbjy Я свернул пример. Я удалил дозорный из write_task()
и изменил завершение задачи в main()
на write_consumer.cancel()
. Это дает мне предупреждение «задача уничтожена, но все еще находится в ожидании», если я не изменю loop.run_until_complete()
на asyncio.run()
. В целом много тонких изменений, которые, я не совсем понимаю, почему это решает проблему.
Мой первый комментарий заключается в том, что ваш код гораздо сложнее, чем мне нужно. Но настоящая проблема заключается в том, что вам нужно выполнить N выборок (где N — длина вашего списка items
) и M задач (где M в настоящее время равно 5), выполняющих эти N выборок одновременно, но нельзя предположить, что эти задачи будут выполнены за один раз. заказ, соответствующий вашему items
списку.
Я считаю, что самое простое решение — предварительно выделить список results
длиной N, и каждому запросу на выборку передается этот список и индекс, указывающий, куда должен идти результат в этом списке. Вы не можете начать запись файла CSV, пока не будут завершены все выборки, если вы хотите, чтобы строки файла соответствовали входному списку items
. Вам нужна только одна очередь!
Для демонстрационных целей я переименовал items
в более описательный data_list
и инициализировал его как:
data_list: list[str] = [[f"hello{i}", f"world{i}"] for i in range(10)]
Я также модифицировал fetch
, чтобы имитировать получение данных и простой возврат req[data]
. Следовательно, когда программа завершит работу, содержимое файла file.csv должно быть:
Я также переименовал некоторые ваши функции и переменные в более описательные:
import asyncio
import aiohttp
import aiofiles
import aiocsv
import json
N_REQUEST_TASKS = 5
async def fetch(req: dict, results: list, idx: int) -> dict:
# Make the request
# For demo purposes:
import random
await asyncio.sleep(random.random())
result = req['data']
print(f"INFO returning {result} at index {idx}")
results[idx] = result
return
async with aiohttp.ClientSession() as session:
try:
async with session.request("POST", url=req["url"], data=json.dumps(req["data"]), headers=req["headers"]) as response:
result = await response.json()
response.raise_for_status()
print(f"INFO: response status was: {response.status}")
# Put response into queue to be written to file
except Exception as err:
print(f"ERROR: error making request: {err}")
result = err
finally:
print(f"INFO returning {result} at index {idx}")
results[idx] = result
async def create_requests(data_list: list, results: list, request_q: asyncio.Queue) -> None:
# Read tasks from source of data
for idx, data in enumerate(data_list):
# Put a request task into the queue
req: dict = {
"headers": {"Accept": "application/json"},
"url": "https://httpbin.org/post",
"data": data
}
await request_q.put(
(req, results, idx)
)
for _ in range(N_REQUEST_TASKS):
# One sentinel for each request task:
await request_q.put(None)
async def request_task(request_q: asyncio.Queue) -> None:
while True:
# Retrieve necessary data to make request
request = await request_q.get()
# Sentinel?
if not request:
print("INFO: received sentinel from request_q")
break
# Make the request which will put data into the response queue
# Unpack:
req, results, idx = request
print(f"INFO: request in request_task: {req['data']}")
await fetch(req, results, idx)
async def writer(results: list) -> None:
async with aiofiles.open("file.csv", mode = "w", newline='') as f:
w = aiocsv.AsyncWriter(f)
await w.writerow([
"status",
"data",
])
for result in results:
print(f"INFO: data in write_task: {result}")
if isinstance(result, Exception):
continue
# Write the data from the response
await w.writerow([
"200",
json.dumps(result)
])
await f.flush()
async def main() -> None:
# Create fake data to POST
data_list: list[str] = [[f"hello{i}", f"world{i}"] for i in range(10)]
# Preallocate results list so that results will be in correct order
results = [None] * len(data_list)
# Request queue
request_q = asyncio.Queue()
tasks = []
# one producer
tasks.append(
asyncio.create_task(
create_requests(data_list, results, request_q)
)
)
# N_REQUEST_TASKS consumers
for _ in range(N_REQUEST_TASKS):
tasks.append(
asyncio.create_task(
request_task(request_q)
)
)
await asyncio.gather(*tasks)
print(f"INFO: Results have been produced")
await writer(results)
print("INFO: writer has completed! ")
if __name__ == "__main__":
asyncio.run(main())
Распечатки:
INFO: request in request_task: ['hello0', 'world0']
INFO: request in request_task: ['hello1', 'world1']
INFO: request in request_task: ['hello2', 'world2']
INFO: request in request_task: ['hello3', 'world3']
INFO: request in request_task: ['hello4', 'world4']
INFO returning ['hello1', 'world1'] at index 1
INFO: request in request_task: ['hello5', 'world5']
INFO returning ['hello0', 'world0'] at index 0
INFO: request in request_task: ['hello6', 'world6']
INFO returning ['hello6', 'world6'] at index 6
INFO: request in request_task: ['hello7', 'world7']
INFO returning ['hello2', 'world2'] at index 2
INFO: request in request_task: ['hello8', 'world8']
INFO returning ['hello3', 'world3'] at index 3
INFO: request in request_task: ['hello9', 'world9']
INFO returning ['hello4', 'world4'] at index 4
INFO: received sentinel from request_q
INFO returning ['hello9', 'world9'] at index 9
INFO: received sentinel from request_q
INFO returning ['hello5', 'world5'] at index 5
INFO: received sentinel from request_q
INFO returning ['hello7', 'world7'] at index 7
INFO: received sentinel from request_q
INFO returning ['hello8', 'world8'] at index 8
INFO: received sentinel from request_q
INFO: Results have been produced
INFO: data in write_task: ['hello0', 'world0']
INFO: data in write_task: ['hello1', 'world1']
INFO: data in write_task: ['hello2', 'world2']
INFO: data in write_task: ['hello3', 'world3']
INFO: data in write_task: ['hello4', 'world4']
INFO: data in write_task: ['hello5', 'world5']
INFO: data in write_task: ['hello6', 'world6']
INFO: data in write_task: ['hello7', 'world7']
INFO: data in write_task: ['hello8', 'world8']
INFO: data in write_task: ['hello9', 'world9']
INFO: writer has completed!
Обновлять
Если вас не волнует порядок выполнения задач выборки и вы хотите добавлять строки сразу после получения данных, то использование двух очередей является самым простым подходом следующим образом:
import asyncio
import aiohttp
import aiofiles
import aiocsv
import json
N_REQUEST_TASKS = 5
async def fetch(req: dict) -> dict:
# Make the request
# For demo purposes:
import random
await asyncio.sleep(random.random())
result = req['data']
print(f"INFO returning {result}")
return result
async with aiohttp.ClientSession() as session:
try:
async with session.request("POST", url=req["url"], data=json.dumps(req["data"]), headers=req["headers"]) as response:
result = await response.json()
response.raise_for_status()
print(f"INFO: response status was: {response.status}")
# Put response into queue to be written to file
except Exception as err:
print(f"ERROR: error making request: {err}")
result = err
finally:
print(f"INFO returning {result}")
return result
async def create_requests(data_list: list, request_q: asyncio.Queue) -> None:
# Read tasks from source of data
for data in data_list:
# Put a request task into the queue
req: dict = {
"headers": {"Accept": "application/json"},
"url": "https://httpbin.org/post",
"data": data
}
await request_q.put(
req
)
for _ in range(N_REQUEST_TASKS):
# One sentinel for each request task:
await request_q.put(None)
async def request_task(request_q: asyncio.Queue, writer_q: asyncio.Queue) -> None:
while True:
# Retrieve necessary data to make request
req = await request_q.get()
# Sentinel?
if not req:
print("INFO: received sentinel from request_q")
break
# Make the request which will put data into the response queue
print(f"INFO: request in request_task: {req['data']}")
result = await fetch(req)
await writer_q.put(result)
async def writer(writer_q) -> None:
async with aiofiles.open("file.csv", mode = "w", newline='') as f:
w = aiocsv.AsyncWriter(f)
await w.writerow([
"status",
"data",
])
while True:
result = await writer_q.get()
if result is None:
break
print(f"INFO: data in write_task: {result}")
if isinstance(result, Exception):
continue
# Write the data from the response
await w.writerow([
"200",
json.dumps(result)
])
await f.flush()
async def main() -> None:
# Create fake data to POST
data_list: list[str] = [[f"hello{i}", f"world{i}"] for i in range(10)]
# Request queue
request_q = asyncio.Queue()
# Writer queue
writer_q = asyncio.Queue()
tasks = []
# one producer
tasks.append(
asyncio.create_task(
create_requests(data_list, request_q)
)
)
# N_REQUEST_TASKS consumers
for _ in range(N_REQUEST_TASKS):
tasks.append(
asyncio.create_task(
request_task(request_q, writer_q)
)
)
writer_task = asyncio.create_task(
writer(writer_q)
)
await asyncio.gather(*tasks)
print(f"INFO: Results have been produced")
# Put sentinelto get writer to quit:
await writer_q.put(None)
await writer_task
print("INFO: writer has completed! ")
if __name__ == "__main__":
asyncio.run(main())
Спасибо за Ваш ответ. Я только что обновил исходный код из-за комментария другого пользователя. ваш ответ имеет смысл, но моя цель — записать запросы в файл после получения ответа. Меня не особо волнует порядок их записи в файл. Я просто хочу делать запросы API и писать в файл, когда у меня есть время простоя в ожидании ответа.
Я добавил обновление, которое, как мне кажется, является тем, что вы ищете,
Да, это работает очень хорошо. Поэтому я предполагаю, что ключом является знать, сколько запросов мы будем делать заранее (это возможно в моем случае использования), и сигнализировать от create_requests()
о том же количестве дозорных, что и запросы, и позволить всему этому течь вниз по течению. Этот шаблон проектирования избавляет от необходимости присоединяться к очередям и полагаться исключительно на дозорных для уничтожения этих потребительских задач?
Нет нет нет. Вопрос не в том, сколько запросов вы будете делать, а в количестве одновременных задач (подумайте о «размере пула»), которые будут отправлять эти запросы. В данном случае у нас одновременно выполняются 5 задач, в общей сложности выполняющих 10 запросов, поэтому необходимое количество дозорных — 5 (по одному на каждого инициатора запроса). Как только задача видит дозорного, она завершается, и вам нужно, чтобы каждая задача видела дозорного. Когда все эти задачи будут выполнены, вы знаете, что все результаты помечены знаком writer_q
, поэтому вам нужно добавить к writer_q
отметку, обозначающую «результатов больше нет».
Итак, количество N_REQUEST_TASKS можно выбрать произвольно. Чтобы убедиться, что я правильно понимаю, это мои работники. Каждый из них получит дозорного для остановки create_requests()
в конечном итоге из-за 1 к 1 put(None)
. а пока они будут ждать получения предметов из request_q
, когда у них будет время.
Клянусь Юпитером, вы это получили!
Можете ли вы сократить еще больше, или это абсолютный минимум для вашей проблемы? Если вы получаете Sentinel, прежде чем что-либо писать, нужны ли нам
aiocsv
иaiofiles
?