Python многопроцессорная обработка сотен файлов

У меня есть сотни или тысячи файлов, которые я пытаюсь распараллелить во время обработки, но у меня проблемы с логикой.

Моя цель состоит в том, чтобы либо 8 потоков работали над одним процессом (не уверен, если это возможно из-за обработки на фреймах данных pandas), либо иметь 8 потоков, каждый из которых работает с 1 файлом независимо.

Ниже приведен упрощенный код моего приложения. Честно говоря, я действительно не знаю, что он делает. Когда я запускаю приложение, оно начинает перелетать файлы, но когда я печатаю имя файла, они все не в порядке. Через пару минут он действительно замедляется, но все еще печатает случайные имена полей, которые не соответствуют порядку. Похоже, что что-то добавляется к выходному файлу, но я не могу сказать, откуда они.

Нужно ли мне разбивать список файлов на 8 файлов за раз и обрабатывать их таким образом? Или мой код просто неверен в том, как я это делаю? Я пробовал классы Pool и Process, но ни один из них не работает для этого варианта использования.

import os.path
from os import path
import pandas as pd
import numpy as np
import math
from multiprocessing import Pool, Manager, Process
import multiprocessing as mp
from concurrent import futures as cf
from multiprocessing.pool import ThreadPool


def apply_ref_to_ind(input_df, temp_ref_df):

    final_df = pd.merge(input_df, temp_ref_df, how='outer', on='CODE')
    final_df['CALC2'] = final_df['CALC1'] - temp_ref_df['CALC1']

    return final_df


def worker(input_file):
    """Worker process for operating on partitioned reference data"""

    temp_input = pd.read_csv(input_file, dtype=str)
    code = temp_input['CODE'].unique()[0]
    temp_ref_df = ref_df.loc[ref_df['CODE'] == code]

    print(input_file)
    return_df = apply_ref_to_ind(temp_input, temp_ref_df)

    if path.exists("output_file.csv"):
        return_df[final_layout].to_csv('output_file.csv', index=None)
    else:
        return_df[final_layout].to_csv('output_file.csv', mode='a', header=False, index=None)


if __name__ == '__main__':
    file_list = ['file.csv', 'file2.csv', 'file3.csv']  # etc...
    ref_df = pd.read_csv('reference.csv')
    final_layout = ['ID', 'CODE', 'CALC1', 'CALC2']

    pool = Pool(8)  # Create a multiprocessing Pool
    pool.map(worker, [file for file in file_list])  # process data_inputs iterable with pool

Да, вы не можете полагаться на процессы, происходящие в определенном порядке

juanpa.arrivillaga 10.08.2018 18:22

@ juanpa.arrivillaga Я подумал, что они будут идти в порядке файлов в списке файлов, из-за чего мне трудно понять, действительно ли это работает или нет. Он работает уже пару часов, но похоже, что выходной файл по какой-то причине был перезаписан и запущен заново.

bbennett36 10.08.2018 19:28

Нет, если вы ожидаете, что все будет работать параллельно, как вы можете ожидать, что они будут работать по порядку? Список будет иметь порядок потребляется, но после того, как каждый элемент будет передан следующему работнику пула процессов, вы не можете гарантировать, что рабочий, который получил что-то первым, закончит работу первым и т. д. И т. Д.

juanpa.arrivillaga 10.08.2018 20:15

Попался. Да, я знаю, что они не закончатся по порядку, поэтому я думаю, что меня это смутило. Они читают файлы по порядку, просто заканчивая в разное время. Теперь это имеет смысл. Спасибо!

bbennett36 10.08.2018 20:53
1
4
213
0

Другие вопросы по теме