Оптимальный способ использования многопроцессорности для многих файлов

Итак, у меня есть большой список файлов, которые нужно преобразовать в CSV. Каждый файл сам по себе довольно большой, и каждая строка представляет собой строку. Каждая строка файлов может представлять один из трех типов данных, каждый из которых обрабатывается немного по-разному. Мое текущее решение выглядит следующим образом:

    type1_columns = [...]
    type2_columns = [...]
    type3_columns = [...]

    file_list = os.listdir(filelist)

    def process_type1_line(json_line):
       #processing logic
       to_append = [a, b, c, d, e]
       type1_series = pd.Series(to_append, index=type1_columns)
       return type1_series


    def process_type2_line(json_line):
       #processing logic
       to_append = [a, b, c, d, e]
       type2_series = pd.Series(to_append, index=type2_columns)
       return type2_series


    def process_type3_line(json_line):
       #processing logic
       to_append = [a, b, c, d, e]
       type3_series = pd.Series(to_append, index=type3_columns)
       return type3_series


    def process_file(file):
        type1_df = pd.DataFrame(columns=type1_columns)
        type2_df = pd.DataFrame(columns=type2_columns)
        type3_df = pd.DataFrame(columns=type3_columns)

        with open(filepath/file) as f:
             data=f.readlines()
             for line in data:
                  #some logic to get the record_type and convert line to json
                  record_type = ...
                  json_line = ...

                  if record_type == "type1":
                       type1_series = process_type1_line(json_line)
                       type1_df = type1_df.append(type1_series, ignore_index=True)
                  if record_type == "type2":
                       type2_series = process_type2_line(json_line)
                       type2_df = type2_df.append(type2_series, ignore_index=True)
                  if record_type == "type3":
                       type3_series = process_type3_line(json_line)
                       type3_df = type3_df.append(type3_series, ignore_index=True)

        type1_df.to_csv(type1_csv_path.csv)
        type2_df.to_csv(type2_csv_path.csv)
        type3_df.to_csv(type3_csv_path.csv)


     for file in file_list:
          process_file(file)

Я перебираю файлы и создаю кадры данных для каждого из трех разных типов записей. Я анализирую строки и вызываю соответствующую функцию обработки для каждой. Возвращенная серия добавляется к окончательному фрейму данных для этого типа записи для этого файла. После обработки файла три фрейма данных сохраняются в формате CSV, и мы начинаем со следующего файла.

Проблема в том, что этот подход занимает слишком много времени, мне потребовались бы недели, чтобы обработать все файлы.

Я попытался изменить свой подход, используя многопроцессорность (с которой у меня нет большого опыта) следующим образом:

     with ThreadPoolExecutor(max_workers=30) as executor:
          futures = [executor.submit(process_file, file) for file in file_list]

В некоторых операторах печати журнала я вижу, что это запустило обработку 30 файлов, но ни один из них не был завершен, поэтому я, по крайней мере, знаю, что мой подход ошибочен. Может ли кто-нибудь объяснить, каким будет лучший подход к этой проблеме? Возможно, какая-то комбинация многопроцессорности и асинхронности?

Формы c голосовым вводом в React с помощью Speechly
Формы c голосовым вводом в React с помощью Speechly
Пытались ли вы когда-нибудь заполнить веб-форму в области электронной коммерции, которая требует много кликов и выбора? Вас попросят заполнить дату,...
Стилизация и валидация html-формы без использования JavaScript (только HTML/CSS)
Стилизация и валидация html-формы без использования JavaScript (только HTML/CSS)
Будучи разработчиком веб-приложений, легко впасть в заблуждение, считая, что приложение без JavaScript не имеет права на жизнь. Нам становится удобно...
Flatpickr: простой модуль календаря для вашего приложения на React
Flatpickr: простой модуль календаря для вашего приложения на React
Если вы ищете пакет для быстрой интеграции календаря с выбором даты в ваше приложения, то библиотека Flatpickr отлично справится с этой задачей....
В чем разница между Promise и Observable?
В чем разница между Promise и Observable?
Разберитесь в этом вопросе, и вы значительно повысите уровень своей компетенции.
Что такое cURL в PHP? Встроенные функции и пример GET запроса
Что такое cURL в PHP? Встроенные функции и пример GET запроса
Клиент для URL-адресов, cURL, позволяет взаимодействовать с множеством различных серверов по множеству различных протоколов с синтаксисом URL.
Четыре эффективных способа центрирования блочных элементов в CSS
Четыре эффективных способа центрирования блочных элементов в CSS
У каждого из нас бывали случаи, когда нам нужно отцентрировать блочный элемент, но мы не знаем, как это сделать. Даже если мы реализуем какой-то...
0
0
18
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

У вас две большие проблемы:

  1. Вы загружаете весь входной файл в память, производите весь результат в памяти, а затем сразу записываете весь выходной файл. Это означает, что если у вас есть 30 рабочих процессов, работающих параллельно, вам потребуется память, пропорциональная 30 вашим (самоописанным) файлам большой. Вы также сохраняете все данные дважды: один раз как list из str строк, возвращаемых f.readlines(), а затем снова в одной из трех DataFrames; если вы использовали свой код, без исполнителей, как есть, и просто изменили:

          data=f.readlines()
          for line in data:
    

    к:

          for line in f:
    

    вы немедленно сократите использование памяти примерно наполовину, что (может быть) достаточно, чтобы остановить перегрузку страниц. Тем не менее, вы по-прежнему будете использовать память, пропорциональную размеру файла, для хранения DataFrames, поэтому вы возобновите переборку, если распараллелите свой код, и можете по-прежнему перебирать даже без параллелизма, если файлы достаточно велики.

  2. Вы используете .append для строки каждый, которая, IIRC, для DataFrames является формой алгоритма Шлемиля Художника: каждый append создает совершенно новый DataFrame, копируя все содержимое старого DataFrame плюс небольшое количество новых данных в новый DataFrame, работа которого занимает все больше и больше времени по мере увеличения существующих данных; то, что должно амортизироваться O(n) работа становится O(n**2) работой.

Между ними вы используете способ больше памяти, чем необходимо, и выполняет тонна ненужной работы при повторяющихся добавлениях. Параллелизм может помочь выполнять рутинную работу быстрее, но взамен он увеличивает требования к памяти в 30 раз; скорее всего, у вас не так много ОЗУ (если эти файлы действительно большие, у вас может не хватить ОЗУ даже для одного из файлов), и вы в конечном итоге перебиваете страницы (записываете память в файл подкачки/файл подкачки в освобождайте место для других вещей, считывая их обратно по запросу и часто удаляя память, которая выгружается до того, как вы закончите с ней, делая доступ к памяти привязанным к производительности диска, что на несколько порядков медленнее, чем доступ к ОЗУ).

Я недостаточно хорошо знаю Pandas, чтобы сказать, предлагает ли он какое-то лучшее дополнительное решение для того, что вы делаете, но на самом деле оно вам не нужно; просто работайте с вводом строка за строкой и используйте модуль csv для записи строк по ходу работы. Ваши требования к памяти упадут с «пропорционально размеру каждого входного файла» до «пропорционально данным из каждого линия входного файла».

Ваша функция process_file будет выглядеть примерно так:

def process_file(file):
    # Open input file and all output files (newline='' needed to play nice with csv module
    # which takes control of newline format to ensure dialect rules followed precisely,
    # regardless of OS line separator rules)
    with open(filepath/file) as f,\
         open(type1_csv_path, 'w', newline='') as type1f,\
         open(type2_csv_path, 'w', newline='') as type2f,\
         open(type3_csv_path, 'w', newline='') as type3f:
         csv1 = csv.writer(type1f)
         csv1.writerow(type1_columns)  # Omit if no useful column header
         csv2 = csv.writer(type2f)
         csv2.writerow(type2_columns)  # Omit if no useful column header
         csv3 = csv.writer(type3f)
         csv3.writerow(type3_columns)  # Omit if no useful column header
         for line in f:  # Directly iterating file object lazily fetches line at a time
                         # where .readlines() eagerly fetches whole file, requiring
                         # a ton of memory for no reason
              #some logic to get the record_type and convert line to json
              record_type = ...
              json_line = ...

              if record_type == "type1":
                   type1_series = process_type1_line(json_line)
                   csv1.writerow(type1_series)  # Might need to convert to plain list if Series
                                                # doesn't iterate the values you need directly
              elif record_type == "type2":
                   type2_series = process_type2_line(json_line)
                   csv2.writerow(type2_series)
              elif record_type == "type3":
                   type3_series = process_type3_line(json_line)
                   csv3.writerow(type3_series)

Если это работает как есть (без исполнителей), используйте его таким образом. Если вы пролистывали страницу даже без исполнителей или файлы были достаточно большими, чтобы повторяющиеся append сильно навредили вам, этого может быть достаточно, чтобы все заработало само по себе. Если это слишком медленно, исполнители мощь дают небольшое преимущество, если вы выполняете много работы для обработки каждой строки в выходной формат (поскольку, пока большинство рабочих обрабатывают, один или два рабочих могут адекватно разделить доступ к диску для чтения и записи) , но если объем работы по обработке на строку невелик, все, что превышает небольшую горстку рабочих процессов (я бы начал с двух или трех), только увеличит конкуренцию за диск (особенно если вы используете жесткий диск с вращающимся диском, а не SSD). ), а параллелизм либо не поможет, либо будет активно вредить.

Возможно, вам потребуется настроить точный используемый диалект CSV (передается в качестве аргументов в csv.writer) и, возможно, явно установить конкретный encoding для выходных файлов, а не локаль по умолчанию (например, передать encoding='utf-8' или encoding='utf-16' в opens для записи, поэтому всегда запись в кодировке, которую ожидает потребитель(и) файлов .csv), но это общая форма.

Другие вопросы по теме