Итак, у меня есть большой список файлов, которые нужно преобразовать в CSV. Каждый файл сам по себе довольно большой, и каждая строка представляет собой строку. Каждая строка файлов может представлять один из трех типов данных, каждый из которых обрабатывается немного по-разному. Мое текущее решение выглядит следующим образом:
type1_columns = [...]
type2_columns = [...]
type3_columns = [...]
file_list = os.listdir(filelist)
def process_type1_line(json_line):
#processing logic
to_append = [a, b, c, d, e]
type1_series = pd.Series(to_append, index=type1_columns)
return type1_series
def process_type2_line(json_line):
#processing logic
to_append = [a, b, c, d, e]
type2_series = pd.Series(to_append, index=type2_columns)
return type2_series
def process_type3_line(json_line):
#processing logic
to_append = [a, b, c, d, e]
type3_series = pd.Series(to_append, index=type3_columns)
return type3_series
def process_file(file):
type1_df = pd.DataFrame(columns=type1_columns)
type2_df = pd.DataFrame(columns=type2_columns)
type3_df = pd.DataFrame(columns=type3_columns)
with open(filepath/file) as f:
data=f.readlines()
for line in data:
#some logic to get the record_type and convert line to json
record_type = ...
json_line = ...
if record_type == "type1":
type1_series = process_type1_line(json_line)
type1_df = type1_df.append(type1_series, ignore_index=True)
if record_type == "type2":
type2_series = process_type2_line(json_line)
type2_df = type2_df.append(type2_series, ignore_index=True)
if record_type == "type3":
type3_series = process_type3_line(json_line)
type3_df = type3_df.append(type3_series, ignore_index=True)
type1_df.to_csv(type1_csv_path.csv)
type2_df.to_csv(type2_csv_path.csv)
type3_df.to_csv(type3_csv_path.csv)
for file in file_list:
process_file(file)
Я перебираю файлы и создаю кадры данных для каждого из трех разных типов записей. Я анализирую строки и вызываю соответствующую функцию обработки для каждой. Возвращенная серия добавляется к окончательному фрейму данных для этого типа записи для этого файла. После обработки файла три фрейма данных сохраняются в формате CSV, и мы начинаем со следующего файла.
Проблема в том, что этот подход занимает слишком много времени, мне потребовались бы недели, чтобы обработать все файлы.
Я попытался изменить свой подход, используя многопроцессорность (с которой у меня нет большого опыта) следующим образом:
with ThreadPoolExecutor(max_workers=30) as executor:
futures = [executor.submit(process_file, file) for file in file_list]
В некоторых операторах печати журнала я вижу, что это запустило обработку 30 файлов, но ни один из них не был завершен, поэтому я, по крайней мере, знаю, что мой подход ошибочен. Может ли кто-нибудь объяснить, каким будет лучший подход к этой проблеме? Возможно, какая-то комбинация многопроцессорности и асинхронности?
У вас две большие проблемы:
Вы загружаете весь входной файл в память, производите весь результат в памяти, а затем сразу записываете весь выходной файл. Это означает, что если у вас есть 30 рабочих процессов, работающих параллельно, вам потребуется память, пропорциональная 30 вашим (самоописанным) файлам большой. Вы также сохраняете все данные дважды: один раз как list
из str
строк, возвращаемых f.readlines()
, а затем снова в одной из трех DataFrame
s; если вы использовали свой код, без исполнителей, как есть, и просто изменили:
data=f.readlines()
for line in data:
к:
for line in f:
вы немедленно сократите использование памяти примерно наполовину, что (может быть) достаточно, чтобы остановить перегрузку страниц. Тем не менее, вы по-прежнему будете использовать память, пропорциональную размеру файла, для хранения DataFrame
s, поэтому вы возобновите переборку, если распараллелите свой код, и можете по-прежнему перебирать даже без параллелизма, если файлы достаточно велики.
Вы используете .append
для строки каждый, которая, IIRC, для DataFrame
s является формой алгоритма Шлемиля Художника: каждый append
создает совершенно новый DataFrame
, копируя все содержимое старого DataFrame
плюс небольшое количество новых данных в новый DataFrame
, работа которого занимает все больше и больше времени по мере увеличения существующих данных; то, что должно амортизироваться O(n)
работа становится O(n**2)
работой.
Между ними вы используете способ больше памяти, чем необходимо, и выполняет тонна ненужной работы при повторяющихся добавлениях. Параллелизм может помочь выполнять рутинную работу быстрее, но взамен он увеличивает требования к памяти в 30 раз; скорее всего, у вас не так много ОЗУ (если эти файлы действительно большие, у вас может не хватить ОЗУ даже для одного из файлов), и вы в конечном итоге перебиваете страницы (записываете память в файл подкачки/файл подкачки в освобождайте место для других вещей, считывая их обратно по запросу и часто удаляя память, которая выгружается до того, как вы закончите с ней, делая доступ к памяти привязанным к производительности диска, что на несколько порядков медленнее, чем доступ к ОЗУ).
Я недостаточно хорошо знаю Pandas, чтобы сказать, предлагает ли он какое-то лучшее дополнительное решение для того, что вы делаете, но на самом деле оно вам не нужно; просто работайте с вводом строка за строкой и используйте модуль csv
для записи строк по ходу работы. Ваши требования к памяти упадут с «пропорционально размеру каждого входного файла» до «пропорционально данным из каждого линия входного файла».
Ваша функция process_file
будет выглядеть примерно так:
def process_file(file):
# Open input file and all output files (newline='' needed to play nice with csv module
# which takes control of newline format to ensure dialect rules followed precisely,
# regardless of OS line separator rules)
with open(filepath/file) as f,\
open(type1_csv_path, 'w', newline='') as type1f,\
open(type2_csv_path, 'w', newline='') as type2f,\
open(type3_csv_path, 'w', newline='') as type3f:
csv1 = csv.writer(type1f)
csv1.writerow(type1_columns) # Omit if no useful column header
csv2 = csv.writer(type2f)
csv2.writerow(type2_columns) # Omit if no useful column header
csv3 = csv.writer(type3f)
csv3.writerow(type3_columns) # Omit if no useful column header
for line in f: # Directly iterating file object lazily fetches line at a time
# where .readlines() eagerly fetches whole file, requiring
# a ton of memory for no reason
#some logic to get the record_type and convert line to json
record_type = ...
json_line = ...
if record_type == "type1":
type1_series = process_type1_line(json_line)
csv1.writerow(type1_series) # Might need to convert to plain list if Series
# doesn't iterate the values you need directly
elif record_type == "type2":
type2_series = process_type2_line(json_line)
csv2.writerow(type2_series)
elif record_type == "type3":
type3_series = process_type3_line(json_line)
csv3.writerow(type3_series)
Если это работает как есть (без исполнителей), используйте его таким образом. Если вы пролистывали страницу даже без исполнителей или файлы были достаточно большими, чтобы повторяющиеся append
сильно навредили вам, этого может быть достаточно, чтобы все заработало само по себе. Если это слишком медленно, исполнители мощь дают небольшое преимущество, если вы выполняете много работы для обработки каждой строки в выходной формат (поскольку, пока большинство рабочих обрабатывают, один или два рабочих могут адекватно разделить доступ к диску для чтения и записи) , но если объем работы по обработке на строку невелик, все, что превышает небольшую горстку рабочих процессов (я бы начал с двух или трех), только увеличит конкуренцию за диск (особенно если вы используете жесткий диск с вращающимся диском, а не SSD). ), а параллелизм либо не поможет, либо будет активно вредить.
Возможно, вам потребуется настроить точный используемый диалект CSV (передается в качестве аргументов в csv.writer
) и, возможно, явно установить конкретный encoding
для выходных файлов, а не локаль по умолчанию (например, передать encoding='utf-8'
или encoding='utf-16'
в open
s для записи, поэтому всегда запись в кодировке, которую ожидает потребитель(и) файлов .csv
), но это общая форма.