У меня есть сотни или тысячи файлов, которые я пытаюсь распараллелить во время обработки, но у меня проблемы с логикой.
Моя цель состоит в том, чтобы либо 8 потоков работали над одним процессом (не уверен, если это возможно из-за обработки на фреймах данных pandas), либо иметь 8 потоков, каждый из которых работает с 1 файлом независимо.
Ниже приведен упрощенный код моего приложения. Честно говоря, я действительно не знаю, что он делает. Когда я запускаю приложение, оно начинает перелетать файлы, но когда я печатаю имя файла, они все не в порядке. Через пару минут он действительно замедляется, но все еще печатает случайные имена полей, которые не соответствуют порядку. Похоже, что что-то добавляется к выходному файлу, но я не могу сказать, откуда они.
Нужно ли мне разбивать список файлов на 8 файлов за раз и обрабатывать их таким образом? Или мой код просто неверен в том, как я это делаю? Я пробовал классы Pool и Process, но ни один из них не работает для этого варианта использования.
import os.path
from os import path
import pandas as pd
import numpy as np
import math
from multiprocessing import Pool, Manager, Process
import multiprocessing as mp
from concurrent import futures as cf
from multiprocessing.pool import ThreadPool
def apply_ref_to_ind(input_df, temp_ref_df):
final_df = pd.merge(input_df, temp_ref_df, how='outer', on='CODE')
final_df['CALC2'] = final_df['CALC1'] - temp_ref_df['CALC1']
return final_df
def worker(input_file):
"""Worker process for operating on partitioned reference data"""
temp_input = pd.read_csv(input_file, dtype=str)
code = temp_input['CODE'].unique()[0]
temp_ref_df = ref_df.loc[ref_df['CODE'] == code]
print(input_file)
return_df = apply_ref_to_ind(temp_input, temp_ref_df)
if path.exists("output_file.csv"):
return_df[final_layout].to_csv('output_file.csv', index=None)
else:
return_df[final_layout].to_csv('output_file.csv', mode='a', header=False, index=None)
if __name__ == '__main__':
file_list = ['file.csv', 'file2.csv', 'file3.csv'] # etc...
ref_df = pd.read_csv('reference.csv')
final_layout = ['ID', 'CODE', 'CALC1', 'CALC2']
pool = Pool(8) # Create a multiprocessing Pool
pool.map(worker, [file for file in file_list]) # process data_inputs iterable with pool
@ juanpa.arrivillaga Я подумал, что они будут идти в порядке файлов в списке файлов, из-за чего мне трудно понять, действительно ли это работает или нет. Он работает уже пару часов, но похоже, что выходной файл по какой-то причине был перезаписан и запущен заново.
Нет, если вы ожидаете, что все будет работать параллельно, как вы можете ожидать, что они будут работать по порядку? Список будет иметь порядок потребляется, но после того, как каждый элемент будет передан следующему работнику пула процессов, вы не можете гарантировать, что рабочий, который получил что-то первым, закончит работу первым и т. д. И т. Д.
Попался. Да, я знаю, что они не закончатся по порядку, поэтому я думаю, что меня это смутило. Они читают файлы по порядку, просто заканчивая в разное время. Теперь это имеет смысл. Спасибо!
Да, вы не можете полагаться на процессы, происходящие в определенном порядке