Я пытаюсь преобразовать этот преобразователь (файлы XML из S3 в JSON) в многопоточное приложение, чтобы ускорить выполнение нескольких файлов (985). Поскольку данный файл будет иметь размер около 1 ГБ, я хотел бы отправить, скажем, 8 из этих файлов для анализа за один раз.
Всякий раз, когда я запускаю это, я получаю: RuntimeWarning: coroutine 'process_object' was never awaited
Вот код на высоком уровне:
async def process_object(filename, pid=None):
start = time.time()
s3 = S3Client(...)
opensearch_client = OpenSearchClient(...)
Parser.DEBUG = True
parser = Parser(s3, opensearch_client)
save_file = ...
s3.download_from_s3(filename, save_file)
parser.current_prefix = filename
await parser.parse(save_file)
return f"Processed {filename} in {time.time() - start} seconds"
if "__main__" == __name__:
objects = get_objects(top_n=3) # list of prefixes for S3
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = [
asyncio.wrap_future(future)
for future in [
loop.run_in_executor(executor, process_object, url) for url in objects
]
]
results = loop.run_until_complete(asyncio.gather(*futures))
loop.close()
Я изменил и упростил ваш код Я не знаю, почему вы комбинируете фьючерсы пула потоков с asyncio, если вы хотите ограничить количество рабочих, вы можете использовать семафоры в Asyncio.
Ниже приведен код без использования параллельных фьючерсов и упрощенного кода, который работает, поскольку я не могу воспроизвести вышеуказанную ошибку точно в моем локальном
Попробуй это:
async def process_object(filename, pid=None):
start = time.time()
s3 = S3Client(...)
opensearch_client = OpenSearchClient(...)
Parser.DEBUG = True
parser = Parser(s3, opensearch_client)
save_file = ...
s3.download_from_s3(filename, save_file)
parser.current_prefix = filename
await parser.parse(save_file)
print(f"Processed {filename} in {time.time() - start} seconds")
async def process_objects_bg(objects):
resp = await asyncio.gather(*[process_object(url) for url in objects])
return resp
if "__main__" == __name__:
objects = get_objects(top_n=3) # list of prefixes for S3
asyncio.run(process_objects_bg(objects))
Я думаю, что основная причина, по которой я начал с ThreadPoolExecutor, заключается в том, что я хотел бы, чтобы эти задачи выполнялись одновременно. Вернуться к чертежной доске, так сказать
Благодарю вас! Да, я хотел ограничить количество потоков, чтобы они не исчерпывали ОЗУ/дисковое пространство, использование семафоров идеально!