Связь между очередями в Asyncio не работает должным образом

Я работаю над шаблоном, в котором я общаюсь между несколькими очередями для обработки элементов в конвейере. Я использую датчики для связи между очередями, когда следует прекратить работу, однако в следующем коде я вижу результаты, которые меня смущают.

При чтении write_q в write_task() я вижу, что первое значение входит в качестве дозорного None вместо задач в том порядке, в котором они были расположены response_task(). Если я правильно понял, write_task() должен получать предметы по порядку и обрабатывать их по мере создания задач.

Кроме того, при печати qsize() в write_task() после того, как я нашел дозорного, он говорит, что здесь 0 элементов, однако при обратной печати в основном кажется, что qsize() из write_q все еще содержит 2 элемента. Я где-то читал, что aiofiles использует run_in_executor(), что означает, что могут быть различия в том, где находится очередь.

Большая часть приведенного ниже кода представляет собой шаблон, иллюстрирующий реальный сценарий того, почему мой код продолжает бесконечно блокироваться.

import asyncio
import aiofiles
import aiocsv
import json

async def fetch(t: float) -> dict:
    print(f"INFO: Sleeping for {t}s")
    await asyncio.sleep(t)
    return t

async def task(l: list,  request_q: asyncio.Queue) -> None:

    # Read tasks from source of data
    for i in l:
        await request_q.put(
            asyncio.create_task(fetch(i))
        )

    # Sentinel value to signal we are done receiving from source
    await request_q.put(None)

async def request_task(request_q: asyncio.Queue, write_q: asyncio.Queue) -> None:
    while True:
        req = await request_q.get()

        # If we received sentinel for tasks, pass message to next queue
        if not req:
            print("INFO: received sentinel from request_q")
            request_q.task_done()
            await request_q.put(None) # put back into the queue to signal to other consumers we are done
            break
        
        # Make the request
        resp = await req
        await write_q.put(resp)
        request_q.task_done()

async def write_task(write_q: asyncio.Queue) -> None:

    headers: bool = True
    async with aiofiles.open("file.csv", mode = "w+", newline='') as f:
        w = aiocsv.AsyncWriter(f)
        while True:
            # Get data out of the queue to write it
            data = await write_q.get()
            print(data)

            # if not data:
            #     print(f"INFO: Found sentinel in write_task, queue size was: {write_q.qsize()}")
            #     write_q.task_done()
            #     await f.flush()
            #     break

            if headers:
                await w.writerow([
                    "status",
                    "data",
                ])
                headers = False

            # Write the data from the response
            await w.writerow([
                "200",
                json.dumps(data)
            ])
            await f.flush()
            write_q.task_done()

async def main() -> None:

    # Create fake data to POST
    items: list[str] = [.2, .5, 1] 

    # Queues for orchestrating 
    request_q = asyncio.Queue()
    write_q = asyncio.Queue()

    # one producer
    producer = asyncio.create_task(
        task(items, request_q)
    )

    # 5 request consumers
    request_consumers = [
        asyncio.create_task(
            request_task(request_q, write_q)
        )
        for _ in range(2)
    ]

    # 5 write consumers
    write_consumer = asyncio.create_task(
        write_task(write_q)
    )

    errors = await asyncio.gather(producer, return_exceptions=True)
    print(f"INFO: Producer has completed! exceptions: {errors}")

    await request_q.join()
    for c in request_consumers:
        c.cancel()
    print("INFO: request consumer has completed! ")
    print(f"INFO: write_q in main qsize: {write_q.qsize()}")
    
    await write_q.join()
    print("INFO: write queue has completed! ")
    # await write_consumer
    write_consumer.cancel()
    print("INFO: Complete!")

if __name__ == "__main__":
    # loop = asyncio.new_event_loop()
    # loop.run_until_complete(main())
    asyncio.run(main())

Можете ли вы сократить еще больше, или это абсолютный минимум для вашей проблемы? Если вы получаете Sentinel, прежде чем что-либо писать, нужны ли нам aiocsv и aiofiles?

jupiterbjy 27.06.2024 01:56

Да, я не могу воспроизвести ту же ошибку, когда разбираюсь с ограниченным пониманием вашего проекта - думаю, я подожду вашего обновленного кода.

jupiterbjy 27.06.2024 03:25

@jupiterbjy Я свернул пример. Я удалил дозорный из write_task() и изменил завершение задачи в main() на write_consumer.cancel(). Это дает мне предупреждение «задача уничтожена, но все еще находится в ожидании», если я не изменю loop.run_until_complete() на asyncio.run(). В целом много тонких изменений, которые, я не совсем понимаю, почему это решает проблему.

Coldchain9 27.06.2024 14:06
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
3
70
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Мой первый комментарий заключается в том, что ваш код гораздо сложнее, чем мне нужно. Но настоящая проблема заключается в том, что вам нужно выполнить N выборок (где N — длина вашего списка items) и M задач (где M в настоящее время равно 5), выполняющих эти N выборок одновременно, но нельзя предположить, что эти задачи будут выполнены за один раз. заказ, соответствующий вашему items списку.

Я считаю, что самое простое решение — предварительно выделить список results длиной N, и каждому запросу на выборку передается этот список и индекс, указывающий, куда должен идти результат в этом списке. Вы не можете начать запись файла CSV, пока не будут завершены все выборки, если вы хотите, чтобы строки файла соответствовали входному списку items. Вам нужна только одна очередь!

Для демонстрационных целей я переименовал items в более описательный data_list и инициализировал его как:

data_list: list[str] = [[f"hello{i}", f"world{i}"] for i in range(10)]

Я также модифицировал fetch, чтобы имитировать получение данных и простой возврат req[data]. Следовательно, когда программа завершит работу, содержимое файла file.csv должно быть:

Я также переименовал некоторые ваши функции и переменные в более описательные:

import asyncio
import aiohttp
import aiofiles
import aiocsv
import json

N_REQUEST_TASKS = 5

async def fetch(req: dict, results: list, idx: int) -> dict:
    # Make the request

    # For demo purposes:
    import random

    await asyncio.sleep(random.random())
    result = req['data']
    print(f"INFO returning {result} at index {idx}")
    results[idx] = result
    return

    async with aiohttp.ClientSession() as session:
        try:
            async with session.request("POST", url=req["url"], data=json.dumps(req["data"]), headers=req["headers"]) as response:
                result = await response.json()
                response.raise_for_status()
                print(f"INFO: response status was: {response.status}")

                # Put response into queue to be written to file
        except Exception as err:
            print(f"ERROR: error making request: {err}")
            result = err
        finally:
            print(f"INFO returning {result} at index {idx}")
            results[idx] = result


async def create_requests(data_list: list,  results: list, request_q: asyncio.Queue) -> None:
    # Read tasks from source of data
    for idx, data in enumerate(data_list):
        # Put a request task into the queue
        req: dict = {
            "headers": {"Accept": "application/json"},
            "url": "https://httpbin.org/post",
            "data": data
        }
        await request_q.put(
            (req, results, idx)
        )

    for _ in range(N_REQUEST_TASKS):
        # One sentinel for each request task:
        await request_q.put(None)


async def request_task(request_q: asyncio.Queue) -> None:
    while True:
        # Retrieve necessary data to make request
        request = await request_q.get()

        # Sentinel?
        if not request:
            print("INFO: received sentinel from request_q")
            break

        # Make the request which will put data into the response queue
        # Unpack:
        req, results, idx = request
        print(f"INFO: request in request_task: {req['data']}")
        await fetch(req, results, idx)


async def writer(results: list) -> None:
    async with aiofiles.open("file.csv", mode = "w", newline='') as f:
        w = aiocsv.AsyncWriter(f)

        await w.writerow([
            "status",
            "data",
        ])

        for result in results:
            print(f"INFO: data in write_task: {result}")
            if isinstance(result, Exception):
                continue

            # Write the data from the response
            await w.writerow([
                "200",
                json.dumps(result)
            ])
            await f.flush()

async def main() -> None:

    # Create fake data to POST
    data_list: list[str] = [[f"hello{i}", f"world{i}"] for i in range(10)]

    # Preallocate results list so that results will be in correct order
    results = [None] * len(data_list)

    # Request queue
    request_q = asyncio.Queue()

    tasks = []

    # one producer
    tasks.append(
        asyncio.create_task(
            create_requests(data_list, results, request_q)
        )
    )

    # N_REQUEST_TASKS consumers
    for _ in  range(N_REQUEST_TASKS):
        tasks.append(
            asyncio.create_task(
                request_task(request_q)
            )
        )

    await asyncio.gather(*tasks)
    print(f"INFO: Results have been produced")

    await writer(results)
    print("INFO: writer has completed! ")

if __name__ == "__main__":
    asyncio.run(main())

Распечатки:

INFO: request in request_task: ['hello0', 'world0']
INFO: request in request_task: ['hello1', 'world1']
INFO: request in request_task: ['hello2', 'world2']
INFO: request in request_task: ['hello3', 'world3']
INFO: request in request_task: ['hello4', 'world4']
INFO returning ['hello1', 'world1'] at index 1
INFO: request in request_task: ['hello5', 'world5']
INFO returning ['hello0', 'world0'] at index 0
INFO: request in request_task: ['hello6', 'world6']
INFO returning ['hello6', 'world6'] at index 6
INFO: request in request_task: ['hello7', 'world7']
INFO returning ['hello2', 'world2'] at index 2
INFO: request in request_task: ['hello8', 'world8']
INFO returning ['hello3', 'world3'] at index 3
INFO: request in request_task: ['hello9', 'world9']
INFO returning ['hello4', 'world4'] at index 4
INFO: received sentinel from request_q
INFO returning ['hello9', 'world9'] at index 9
INFO: received sentinel from request_q
INFO returning ['hello5', 'world5'] at index 5
INFO: received sentinel from request_q
INFO returning ['hello7', 'world7'] at index 7
INFO: received sentinel from request_q
INFO returning ['hello8', 'world8'] at index 8
INFO: received sentinel from request_q
INFO: Results have been produced
INFO: data in write_task: ['hello0', 'world0']
INFO: data in write_task: ['hello1', 'world1']
INFO: data in write_task: ['hello2', 'world2']
INFO: data in write_task: ['hello3', 'world3']
INFO: data in write_task: ['hello4', 'world4']
INFO: data in write_task: ['hello5', 'world5']
INFO: data in write_task: ['hello6', 'world6']
INFO: data in write_task: ['hello7', 'world7']
INFO: data in write_task: ['hello8', 'world8']
INFO: data in write_task: ['hello9', 'world9']
INFO: writer has completed!

Обновлять

Если вас не волнует порядок выполнения задач выборки и вы хотите добавлять строки сразу после получения данных, то использование двух очередей является самым простым подходом следующим образом:

import asyncio
import aiohttp
import aiofiles
import aiocsv
import json

N_REQUEST_TASKS = 5

async def fetch(req: dict) -> dict:
    # Make the request

    # For demo purposes:
    import random

    await asyncio.sleep(random.random())
    result = req['data']
    print(f"INFO returning {result}")
    return result

    async with aiohttp.ClientSession() as session:
        try:
            async with session.request("POST", url=req["url"], data=json.dumps(req["data"]), headers=req["headers"]) as response:
                result = await response.json()
                response.raise_for_status()
                print(f"INFO: response status was: {response.status}")

                # Put response into queue to be written to file
        except Exception as err:
            print(f"ERROR: error making request: {err}")
            result = err
        finally:
            print(f"INFO returning {result}")
            return result


async def create_requests(data_list: list, request_q: asyncio.Queue) -> None:
    # Read tasks from source of data
    for data in data_list:
        # Put a request task into the queue
        req: dict = {
            "headers": {"Accept": "application/json"},
            "url": "https://httpbin.org/post",
            "data": data
        }
        await request_q.put(
            req
        )

    for _ in range(N_REQUEST_TASKS):
        # One sentinel for each request task:
        await request_q.put(None)


async def request_task(request_q: asyncio.Queue, writer_q: asyncio.Queue) -> None:
    while True:
        # Retrieve necessary data to make request
        req = await request_q.get()

        # Sentinel?
        if not req:
            print("INFO: received sentinel from request_q")
            break

        # Make the request which will put data into the response queue
        print(f"INFO: request in request_task: {req['data']}")
        result = await fetch(req)
        await writer_q.put(result)


async def writer(writer_q) -> None:
    async with aiofiles.open("file.csv", mode = "w", newline='') as f:
        w = aiocsv.AsyncWriter(f)

        await w.writerow([
            "status",
            "data",
        ])

        while True:
            result = await writer_q.get()
            if result is None:
                break

            print(f"INFO: data in write_task: {result}")
            if isinstance(result, Exception):
                continue

            # Write the data from the response
            await w.writerow([
                "200",
                json.dumps(result)
            ])
            await f.flush()

async def main() -> None:

    # Create fake data to POST
    data_list: list[str] = [[f"hello{i}", f"world{i}"] for i in range(10)]

    # Request queue
    request_q = asyncio.Queue()
    # Writer queue
    writer_q = asyncio.Queue()

    tasks = []

    # one producer
    tasks.append(
        asyncio.create_task(
            create_requests(data_list, request_q)
        )
    )

    # N_REQUEST_TASKS consumers
    for _ in  range(N_REQUEST_TASKS):
        tasks.append(
            asyncio.create_task(
                request_task(request_q, writer_q)
            )
        )

    writer_task = asyncio.create_task(
        writer(writer_q)
    )

    await asyncio.gather(*tasks)
    print(f"INFO: Results have been produced")

    # Put sentinelto get writer to quit:
    await writer_q.put(None)
    await writer_task
    print("INFO: writer has completed! ")

if __name__ == "__main__":
    asyncio.run(main())

Спасибо за Ваш ответ. Я только что обновил исходный код из-за комментария другого пользователя. ваш ответ имеет смысл, но моя цель — записать запросы в файл после получения ответа. Меня не особо волнует порядок их записи в файл. Я просто хочу делать запросы API и писать в файл, когда у меня есть время простоя в ожидании ответа.

Coldchain9 27.06.2024 14:10

Я добавил обновление, которое, как мне кажется, является тем, что вы ищете,

Booboo 27.06.2024 15:16

Да, это работает очень хорошо. Поэтому я предполагаю, что ключом является знать, сколько запросов мы будем делать заранее (это возможно в моем случае использования), и сигнализировать от create_requests() о том же количестве дозорных, что и запросы, и позволить всему этому течь вниз по течению. Этот шаблон проектирования избавляет от необходимости присоединяться к очередям и полагаться исключительно на дозорных для уничтожения этих потребительских задач?

Coldchain9 27.06.2024 15:43

Нет нет нет. Вопрос не в том, сколько запросов вы будете делать, а в количестве одновременных задач (подумайте о «размере пула»), которые будут отправлять эти запросы. В данном случае у нас одновременно выполняются 5 задач, в общей сложности выполняющих 10 запросов, поэтому необходимое количество дозорных — 5 (по одному на каждого инициатора запроса). Как только задача видит дозорного, она завершается, и вам нужно, чтобы каждая задача видела дозорного. Когда все эти задачи будут выполнены, вы знаете, что все результаты помечены знаком writer_q, поэтому вам нужно добавить к writer_q отметку, обозначающую «результатов больше нет».

Booboo 27.06.2024 16:50

Итак, количество N_REQUEST_TASKS можно выбрать произвольно. Чтобы убедиться, что я правильно понимаю, это мои работники. Каждый из них получит дозорного для остановки create_requests() в конечном итоге из-за 1 к 1 put(None). а пока они будут ждать получения предметов из request_q, когда у них будет время.

Coldchain9 27.06.2024 17:02

Клянусь Юпитером, вы это получили!

Booboo 27.06.2024 17:14

Другие вопросы по теме