Asyncio Future ThreadExecutor

Я пытаюсь преобразовать этот преобразователь (файлы XML из S3 в JSON) в многопоточное приложение, чтобы ускорить выполнение нескольких файлов (985). Поскольку данный файл будет иметь размер около 1 ГБ, я хотел бы отправить, скажем, 8 из этих файлов для анализа за один раз.

Всякий раз, когда я запускаю это, я получаю: RuntimeWarning: coroutine 'process_object' was never awaited

Вот код на высоком уровне:

async def process_object(filename, pid=None):
    start = time.time()
    s3 = S3Client(...)
    opensearch_client = OpenSearchClient(...)
    Parser.DEBUG = True
    parser = Parser(s3, opensearch_client)
    save_file = ...
    s3.download_from_s3(filename, save_file)
    parser.current_prefix = filename
    await parser.parse(save_file)
    return f"Processed {filename} in {time.time() - start} seconds"

if "__main__" == __name__:
    objects = get_objects(top_n=3) # list of prefixes for S3

    loop = asyncio.get_event_loop()

    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        futures = [
            asyncio.wrap_future(future)
            for future in [
                loop.run_in_executor(executor, process_object, url) for url in objects
            ]
        ]
        results = loop.run_until_complete(asyncio.gather(*futures))

    loop.close()
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
0
42
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я изменил и упростил ваш код Я не знаю, почему вы комбинируете фьючерсы пула потоков с asyncio, если вы хотите ограничить количество рабочих, вы можете использовать семафоры в Asyncio.

Ниже приведен код без использования параллельных фьючерсов и упрощенного кода, который работает, поскольку я не могу воспроизвести вышеуказанную ошибку точно в моем локальном

Попробуй это:

async def process_object(filename, pid=None):
    start = time.time()
    s3 = S3Client(...)
    opensearch_client = OpenSearchClient(...)
    Parser.DEBUG = True
    parser = Parser(s3, opensearch_client)
    save_file = ...
    s3.download_from_s3(filename, save_file)
    parser.current_prefix = filename
    await parser.parse(save_file)
    print(f"Processed {filename} in {time.time() - start} seconds")


async def process_objects_bg(objects):
    resp = await asyncio.gather(*[process_object(url) for url in objects])
    return resp


if "__main__" == __name__:
    objects = get_objects(top_n=3)  # list of prefixes for S3
    asyncio.run(process_objects_bg(objects))

Благодарю вас! Да, я хотел ограничить количество потоков, чтобы они не исчерпывали ОЗУ/дисковое пространство, использование семафоров идеально!

user1314147 04.05.2022 16:34

Я думаю, что основная причина, по которой я начал с ThreadPoolExecutor, заключается в том, что я хотел бы, чтобы эти задачи выполнялись одновременно. Вернуться к чертежной доске, так сказать

user1314147 04.05.2022 20:53

Другие вопросы по теме