Как решить проблемы с памятью при многопроцессорной обработке с помощью Pool.map ()?

Я написал программу (ниже) для:

  • прочитать огромный текстовый файл как pandas dataframe
  • затем groupby с использованием определенного значения столбца для разделения данных и сохранения в виде списка фреймов данных.
  • затем передайте данные в multiprocess Pool.map() для параллельной обработки каждого фрейма данных.

Все нормально, программа хорошо работает на моем небольшом тестовом наборе данных. Но когда я загружаю свои большие данные (около 14 ГБ), потребление памяти экспоненциально увеличивается, а затем компьютер зависает или прекращается (в кластере HPC).

Я добавил коды для очистки памяти, как только данные / переменные не пригодятся. Я также закрываю бассейн, как только это будет сделано. По-прежнему с входом 14 ГБ я ожидал только 2 * 14 ГБ памяти, но похоже, что многое происходит. Я также пытался настроить с помощью chunkSize and maxTaskPerChild, etc, но я не вижу никакой разницы в оптимизации как в тесте, так и в большом файле.

Я думаю, что в этой позиции кода, когда я запускаю multiprocessing, требуются улучшения.

p = Pool(3) # number of pool to run at once; default at 1 result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values())) но я публикую весь код.

Пример теста: Я создал тестовый файл ("genome_matrix_final-chr1234-1mb.txt") размером до 250 МБ и запустил программу. Когда я смотрю на системный монитор, я вижу, что потребление памяти увеличилось примерно на 6 ГБ. Мне не совсем понятно, почему так много места в памяти занимает файл 250 МБ плюс некоторые выходы. Я поделился этим файлом через dropbox, если он помогает увидеть реальную проблему. https://www.dropbox.com/sh/coihujii38t5prd/AABDXv8ACGIYczeMtzKBo0eea?dl=0

Может кто-нибудь подсказать, как я могу избавиться от проблемы?

Мой скрипт на Python:

#!/home/bin/python3

import pandas as pd
import collections
from multiprocessing import Pool
import io
import time
import resource

print()
print('Checking required modules')
print()


''' change this input file name and/or path as need be '''
genome_matrix_file = "genome_matrix_final-chr1n2-2mb.txt"   # test file 01
genome_matrix_file = "genome_matrix_final-chr1234-1mb.txt"  # test file 02
#genome_matrix_file = "genome_matrix_final.txt"    # large file 

def main():
    with open("genome_matrix_header.txt") as header:
        header = header.read().rstrip('\n').split('\t')
        print()

    time01 = time.time()
    print('starting time: ', time01)

    '''load the genome matrix file onto pandas as dataframe.
    This makes is more easy for multiprocessing'''
    gen_matrix_df = pd.read_csv(genome_matrix_file, sep='\t', names=header)

    # now, group the dataframe by chromosome/contig - so it can be multiprocessed
    gen_matrix_df = gen_matrix_df.groupby('CHROM')

    # store the splitted dataframes as list of key, values(pandas dataframe) pairs
    # this list of dataframe will be used while multiprocessing
    gen_matrix_df_list = collections.OrderedDict()
    for chr_, data in gen_matrix_df:
        gen_matrix_df_list[chr_] = data

    # clear memory
    del gen_matrix_df

    '''Now, pipe each dataframe from the list using map.Pool() '''
    p = Pool(3)  # number of pool to run at once; default at 1
    result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))

    del gen_matrix_df_list  # clear memory

    p.close()
    p.join()


    # concat the results from pool.map() and write it to a file
    result_merged = pd.concat(result)
    del result  # clear memory

    pd.DataFrame.to_csv(result_merged, "matrix_to_haplotype-chr1n2.txt", sep='\t', header=True, index=False)

    print()
    print('completed all process in "%s" sec. ' % (time.time() - time01))
    print('Global maximum memory usage: %.2f (mb)' % current_mem_usage())
    print()


'''function to convert the dataframe from genome matrix to desired output '''
def matrix_to_vcf(matrix_df):

    print()
    time02 = time.time()

    # index position of the samples in genome matrix file
    sample_idx = [{'10a': 33, '10b': 18}, {'13a': 3, '13b': 19},
                    {'14a': 20, '14b': 4}, {'16a': 5, '16b': 21},
                    {'17a': 6, '17b': 22}, {'23a': 7, '23b': 23},
                    {'24a': 8, '24b': 24}, {'25a': 25, '25b': 9},
                    {'26a': 10, '26b': 26}, {'34a': 11, '34b': 27},
                    {'35a': 12, '35b': 28}, {'37a': 13, '37b': 29},
                    {'38a': 14, '38b': 30}, {'3a': 31, '3b': 15},
                    {'8a': 32, '8b': 17}]

    # sample index stored as ordered dictionary
    sample_idx_ord_list = []
    for ids in sample_idx:
        ids = collections.OrderedDict(sorted(ids.items()))
        sample_idx_ord_list.append(ids)


    # for haplotype file
    header = ['contig', 'pos', 'ref', 'alt']

    # adding some suffixes "PI" to available sample names
    for item in sample_idx_ord_list:
        ks_update = ''
        for ks in item.keys():
            ks_update += ks
        header.append(ks_update+'_PI')
        header.append(ks_update+'_PG_al')


    #final variable store the haplotype data
    # write the header lines first
    haplotype_output = '\t'.join(header) + '\n'


    # to store the value of parsed the line and update the "PI", "PG" value for each sample
    updated_line = ''

    # read the piped in data back to text like file
    matrix_df = pd.DataFrame.to_csv(matrix_df, sep='\t', index=False)

    matrix_df = matrix_df.rstrip('\n').split('\n')
    for line in matrix_df:
        if line.startswith('CHROM'):
            continue

        line_split = line.split('\t')
        chr_ = line_split[0]
        ref = line_split[2]
        alt = list(set(line_split[3:]))

        # remove the alleles "N" missing and "ref" from the alt-alleles
        alt_up = list(filter(lambda x: x!='N' and x!=ref, alt))

        # if no alt alleles are found, just continue
        # - i.e : don't write that line in output file
        if len(alt_up) == 0:
            continue

        #print('\nMining data for chromosome/contig "%s" ' %(chr_ ))
        #so, we have data for CHR, POS, REF, ALT so far
        # now, we mine phased genotype for each sample pair (as "PG_al", and also add "PI" tag)
        sample_data_for_vcf = []
        for ids in sample_idx_ord_list:
            sample_data = []
            for key, val in ids.items():
                sample_value = line_split[val]
                sample_data.append(sample_value)

            # now, update the phased state for each sample
            # also replacing the missing allele i.e "N" and "-" with ref-allele
            sample_data = ('|'.join(sample_data)).replace('N', ref).replace('-', ref)
            sample_data_for_vcf.append(str(chr_))
            sample_data_for_vcf.append(sample_data)

        # add data for all the samples in that line, append it with former columns (chrom, pos ..) ..
        # and .. write it to final haplotype file
        sample_data_for_vcf = '\t'.join(sample_data_for_vcf)
        updated_line = '\t'.join(line_split[0:3]) + '\t' + ','.join(alt_up) + \
            '\t' + sample_data_for_vcf + '\n'
        haplotype_output += updated_line

    del matrix_df  # clear memory
    print('completed haplotype preparation for chromosome/contig "%s" '
          'in "%s" sec. ' %(chr_, time.time()-time02))
    print('\tWorker maximum memory usage: %.2f (mb)' %(current_mem_usage()))

    # return the data back to the pool
    return pd.read_csv(io.StringIO(haplotype_output), sep='\t')


''' to monitor memory '''
def current_mem_usage():
    return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024.


if __name__ == '__main__':
    main()

Обновление для охотников за головами:

Я добился многопроцессорной обработки с помощью Pool.map(), но код вызывает большую нагрузку на память (входной тестовый файл ~ 300 МБ, но нагрузка на память составляет около 6 ГБ). Я ожидал только 3 * 300 мб нагрузки на память на макс.

  • Может кто-нибудь объяснить, что вызывает такие огромные потребности в памяти для такого маленького файла и для таких небольших вычислений.
  • Кроме того, я пытаюсь взять ответ и использовать его для улучшения многопроцессорности в моей большой программе. Таким образом, добавление любого метода, модуля, который не слишком сильно меняет структуру вычислительной части (процесс, связанный с процессором), должно быть в порядке.
  • Я включил два тестовых файла для тестирования, чтобы поиграть с кодом.
  • Прилагаемый код представляет собой полный код, поэтому он должен работать так, как задумано, при копировании-вставке. Любые изменения следует использовать только для улучшения оптимизации на этапах многопроцессорной обработки.

Я предлагаю поработать с pyspark, если вам нужно обработать тяжелый файл.

Dinusha Dilinka 22.03.2018 17:37

@DinushaDilanka: Я только что бегло просмотрел pyspark. Выглядит неплохо, но разве это замена пандам. Кроме того, еще одна проблема заключается в том, что мне придется выучить новый пакет и переписать всю свою программу. Эта программа, приведенная выше, является всего лишь пробным запуском моей программы и данных, чтобы избавиться от проблемы с памятью при многопроцессорной обработке. Любые примеры по вашему предложению были бы хороши. Спасибо,

everestial007 22.03.2018 21:22

Python не имеет хорошей производительности в многопроцессорной обработке, потому что это был язык сценариев. Поэтому лучше получить помощь от другой библиотеки, которая поддерживает API Python, например pyspark.

Dinusha Dilinka 23.03.2018 04:16

Если вы используете pyspark, вы можете читать все файлы одновременно, также можно делать groupby без использования какой-либо многопроцессорной обработки.

Dinusha Dilinka 23.03.2018 04:24

Приведите мне пример. Спасибо!

everestial007 23.03.2018 04:47

Пожалуйста, обратитесь к этому ссылка на сайт

Dinusha Dilinka 23.03.2018 04:52

@DinushaDilanka Я просмотрел несколько документов по pyspark, когда вы упомянули об этом сегодня утром. Но, честно говоря, вы меньше помогаете. В этой ссылке посмотрите на 7. Pandas vs PySpark DataFrame, который предполагает наличие нескольких несовместимостей - таким образом, это вызовет небольшую проблему. Я пытаюсь закончить свой проект, и это мало чем помогает. Если вы можете взять приведенный выше код и общий файл в Dropbox, как вы предлагаете это сделать, я могу применить эту идею к моей более крупной программе. В остальном, я сомневаюсь, что переход на pyspark через день возможен. Спасибо хоть !

everestial007 23.03.2018 05:13

Позвольте нам продолжить обсуждение в чате.

Dinusha Dilinka 23.03.2018 05:17

Можете ли вы свести это к более простому примеру без какого-либо нерелевантного кода, который имеет ту же проблему, и где решение вашего примера позволило бы вам построить решение для вашего реального кода? Это значительно упростило бы решение. См. минимальный воспроизводимый пример в справке по указателям. (Это определенно вопрос, на который можно дать ответ как есть, это может быть вопрос, на который легче ответить.)

abarnert 25.03.2018 21:30

@abarnert: Спасибо, что посмотрели. Я всегда получаю обратную связь, что мои вопросы недостаточно полны. Итак, я разместил этот вопрос, который является минимальным из того, что я пытаюсь сделать. Однако я задал другой вопрос, который заключался в том, чтобы спросить людей, в чем у меня проблема. Но, думаю, этот менее технический: stackoverflow.com/questions/49475489/…

everestial007 25.03.2018 21:33

Выяснить, как сделать вопрос одновременно полным и минимальным, обычно непросто - вычеркните слишком много неактуальных, и люди просто спросят: «Зачем вам это нужно?» Но если вы дадите нам код, который мы можем запускать и использовать, не разбираясь в формате вашего файла, а также о том, как вы его обрабатываете в Pandas и т. д., Может быть проще найти (и протестировать) решение.

abarnert 25.03.2018 21:38

@abarnert: Извините, если я не слежу за вами. Вы, кажется, довольно лаконично выражаетесь (будучи опытным программистом). Приведенный выше код и данные работоспособны (просто скопируйте и вставьте), единственная проблема, с которой я столкнулся, - это повышение производительности. Я хотел бы улучшить свой вопрос и выражение, но мне нужны конкретные подробности о том, что нужно добавить.

everestial007 25.03.2018 21:44

Вы не включили файл genome_matrix_header.txt в Dropbox, поэтому он не будет работать как есть. Не могли бы вы включить его? Спасибо.

Brian 29.03.2018 02:27

@Brian: Я только что добавил этот файл в общую ссылку Dropbox. В сценарий я также добавил метод для вычисления и отображения текущего использования памяти процессом. Надеюсь, это поможет, и дайте мне знать, если у вас возникнут вопросы.

everestial007 29.03.2018 04:42

Привет, @ everestial007, мой ответ не сработал?

Jeff Ellen 30.03.2018 19:34
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
25
15
15 049
3

Ответы 3

Я была такая же проблема. Мне нужно было обработать огромный текстовый корпус, сохранив базу знаний из нескольких DataFrames из миллионов строк, загруженных в память. Я думаю, что это обычная проблема, поэтому я буду ориентироваться на ответы в общих целях.

комбинация настроек решил проблему для меня (только 1 и 3 и 5 могут сделать это за вас):

  1. Используйте Pool.imap (или imap_unordered) вместо Pool.map. Это будет лениво перебирать данные, чем загружать их все в память перед началом обработки.

  2. Установите значение параметра chunksize. Это также сделает imap быстрее.

  3. Установите значение параметра maxtasksperchild.

  4. Добавить вывод на диск, чем в память. Мгновенно или время от времени, когда он достигает определенного размера.

  5. Запускайте код разными партиями. Вы можете использовать itertools.islice, если у вас есть итератор. Идея состоит в том, чтобы разделить ваш list(gen_matrix_df_list.values()) на три или более списков, затем вы передаете первую треть только map или imap, затем вторую третью в другом прогоне и т. д. Поскольку у вас есть список, вы можете просто нарезать его в той же строке код.

Спасибо за ответ. Можете ли вы использовать свой стиль кода (с использованием ваших собственных данных или моих данных), чтобы я мог передать идею по этому вопросу и моей большой программе.

everestial007 25.03.2018 00:21

Я думаю, что для меня нет никакой выгоды, используя # 5, поскольку данные будут в очереди (как входные, так и выходные) независимо. Только 4, кажется, дает разумный выигрыш в оптимизации памяти, но не вызывает ли это узких мест ввода-вывода и неупорядоченного вывода. Кроме того, я только что попробовал imap и не вижу никакого прироста (как по скорости, так и по потреблению памяти).

everestial007 25.03.2018 00:27

Это будет зависеть от специфики вашей обработки. Вы должны попробовать, но возникают узкие места. (4) также замедлит обработку. Вот один мой модуль files.fm/u/uqrq4zje

Abdulrahman Bres 25.03.2018 00:41

есть модули settings и read_data. Это ваш локальный модуль?

everestial007 25.03.2018 01:03

Да, несколько, в настройках есть пути к файлам, а в read-data есть итератор для чтения из огромного файла json, элемент за элементом. Модуль аннотатора принимает элемент и возвращает обработанный текст. Я не против показать весь проект, но он еще не готов и не все части нужны или работают.

Abdulrahman Bres 25.03.2018 01:09

Когда вы используете multiprocessing.Pool, с помощью системного вызова fork() будет создано несколько дочерних процессов. Каждый из этих процессов запускается с точной копии памяти родительского процесса на тот момент. Поскольку вы загружаете csv до создания Pool размера 3, каждый из этих 3 процессов в пуле без необходимости будет иметь копию фрейма данных. (gen_matrix_df, а также gen_matrix_df_list будут существовать в текущем процессе, а также в каждом из 3 дочерних процессов, поэтому в памяти будут находиться 4 копии каждой из этих структур)

Попробуйте создать Pool перед загрузкой файла (на самом деле в самом начале). Это должно уменьшить использование памяти.

Если он все еще слишком высок, вы можете:

  1. Выгрузить gen_matrix_df_list в файл, по 1 элементу в строке, например:

    import os
    import cPickle
    
    with open('tempfile.txt', 'w') as f:
        for item in gen_matrix_df_list.items():
            cPickle.dump(item, f)
            f.write(os.linesep)
    
  2. Используйте Pool.imap() на итераторе над строками, которые вы сбросили в этот файл, например:

    with open('tempfile.txt', 'r') as f:
        p.imap(matrix_to_vcf, (cPickle.loads(line) for line in f))
    

    (Обратите внимание, что matrix_to_vcf принимает кортеж (key, value) в приведенном выше примере, а не только значение)

Надеюсь, это поможет.

NB: я не тестировал приведенный выше код. Это сделано только для демонстрации идеи.

Спасибо за ответ. Я попробую этот ответ примерно через день и дам вам знать. Я надеюсь, что это сработает.

everestial007 27.03.2018 04:08

Возможно, вам не придется страдать от дискового ввода-вывода, если вы можете дважды уместить свои данные в памяти. У меня была именно эта проблема с большим DataFrame (хранящимся в self.big_df), но я смог обойтись более простым решением: просто разбейте DataFrame на части. У меня был быстрый цикл создания списка параметров с фрагментами df (так что теперь память составляет 2x self.big_df - один для оригинала и один для фрагментов), а затем я явно назначил self.big_df = {}. Впоследствии я создал пул, и у меня больше не было проблем с памятью, каждый поток требовал только памяти, равной небольшому проценту от исходного df.

Jeff Ellen 27.03.2018 09:18

Хорошо, я не видел, что уже делал @ everestial007, и прошло слишком много времени, чтобы редактировать мой комментарий. Я думаю, что сборщика мусора просто не происходит. Этот ответ лучше, если ваши данные могут поместиться в памяти только один раз, но вы потенциально можете долго ждать диск, если вы запишете его обратно, а затем снова прочитаете, если вам не нужно.

Jeff Ellen 27.03.2018 09:35

Предложение сбрасывать данные на диск и передавать оттуда только в том случае, если создание пула в верхней части функции недостаточно снижает потребление памяти. Я думаю, что запуск пула перед загрузкой будет иметь наибольшее влияние, потому что сейчас все хранится в памяти в 4 разных процессах.

tomas 27.03.2018 14:06

@tomas Я не пробовал, но я не вижу, как создание экземпляра пула раньше уменьшает память. Я не думаю, что в пуле есть только одна главная копия структур данных, я думаю, что у каждой есть своя собственная, на основе таких утверждений, как это: «Это может быть проблематично для больших аргументов, поскольку они будут перераспределены n_jobs раз рабочими» (из pythonhosted.org/joblib/parallel.html)

Jeff Ellen 28.03.2018 20:00

Каждый рабочий процесс имеет свою собственную копию аргумента, то есть истинного (в данном случае list(gen_matrix_df_list.values()). В дополнение к этому, каждый рабочий процесс имеет свою собственную копию каждой переменной, которая была создана до того, как рабочий процесс был порожден. (В этом случае, Самый большой из них - gen_matrix_df_list). Что касается gen_matrix_df, я не уверен, что с ним произойдет, потому что его ссылка удалена, но он все еще может быть выделен. Вызов gc.collect() после del gen_matrix_df также является хорошей идеей.

tomas 29.03.2018 10:39

@tomas Единственное, что улучшило использование моей памяти, - это переместить p=Pool(3) в начало функции main. Спасибо. Все остальное действительно ничего не улучшило. Даже переназначение переменной, а не удаление не имело никакого значения. Думаю, я собираюсь использовать такой подход: stackoverflow.com/questions/34143397/…, разделив мой файл по chr_. Я получил неполный ответ, но все же хотел бы предложить вознаграждение. @jeff ellen тоже предложил переместить Pool() вперед.

everestial007 01.04.2018 23:03

Я не понимаю, почему вы ссылаетесь на вопрос о чтении тысяч маленьких файлов, когда у вас всего один большой файл. Итак, вы говорите, что ваше решение состоит в том, чтобы предварительно обработать один большой файл в кучу более мелких (с отдельным потоком), а затем запустить код, как в том, который вы связали, чтобы прочитать эти небольшие файлы обратно? Это сложнее, потому что для отладки требуется больше строк кода, и теперь вам нужно сохранить две копии ваших данных, а также добавить много дополнительных накладных расходов на диск, поэтому вы не получите свой общий ответ так быстро. Кроме того, я не предлагал перемещать Pool () вперед, не уверен, что это помогает.

Jeff Ellen 02.04.2018 08:37

ОБЩИЙ ОТВЕТ ПО ПАМЯТИ С МНОГОПРОЦЕССИРОВКОЙ

Вы спросили: «Почему выделяется столько памяти?» Ответ основан на двух частях.

Первый, как вы уже заметили, каждый работник multiprocessing получает свою копию данных (цитируется отсюда), поэтому вы должны разбивать большие аргументы на части. Или, если возможно, для больших файлов читайте их понемногу.

By default the workers of the pool are real Python processes forked using the multiprocessing module of the Python standard library when n_jobs != 1. The arguments passed as input to the Parallel call are serialized and reallocated in the memory of each worker process.

This can be problematic for large arguments as they will be reallocated n_jobs times by the workers.

Второй, если вы пытаетесь освободить память, вам нужно понимать, что python работает иначе, чем другие языки, и вы полагаетесь на del, чтобы освободить память, когда это не так. Я не знаю, лучше ли это, но в моем собственном коде я преодолел это, переназначив переменную на None или пустой объект.

ДЛЯ ВАШЕГО КОНКРЕТНОГО ПРИМЕРА - РЕДАКТИРОВАНИЕ МИНИМАЛЬНОГО КОДА

Если вы можете разместить свои большие данные в памяти дважды, я думаю, вы можете делать то, что пытаетесь сделать, просто изменяя одну строку. Я написал очень похожий код, и он сработал у меня, когда я переназначил переменную (вице-вызов del или любой другой сборщик мусора). Если это не сработает, возможно, вам придется следовать приведенным выше советам и использовать дисковый ввод-вывод:

    #### earlier code all the same
    # clear memory by reassignment (not del or gc)
    gen_matrix_df = {}

    '''Now, pipe each dataframe from the list using map.Pool() '''
    p = Pool(3)  # number of pool to run at once; default at 1
    result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))

    #del gen_matrix_df_list  # I suspect you don't even need this, memory will free when the pool is closed

    p.close()
    p.join()
    #### later code all the same

ДЛЯ ВАШЕГО КОНКРЕТНОГО ПРИМЕРА - ОПТИМАЛЬНОЕ ИСПОЛЬЗОВАНИЕ ПАМЯТИ

Если вы можете разместить свои большие данные в памяти однажды, и у вас есть некоторое представление о том, насколько велик ваш файл, вы можете использовать Pandas read_csv частичное чтение файла для чтения в только строчки за раз, если вы действительно хотите микроуправлять тем, сколько данных читается, или [фиксированный объем памяти за раз с использованием chunksize], который возвращает итератор5. Под этим я подразумеваю, что параметр nrows - это всего лишь одно чтение: вы можете использовать его, чтобы просто взглянуть на файл, или если по какой-то причине вы хотите, чтобы каждая часть имела точно такое же количество строк (потому что, например, если какие-либо из ваших данных являются строками переменной длины, каждая строка не будет занимать одинаковый объем памяти). Но я думаю, что для целей подготовки файла для многопроцессорной обработки будет намного проще использовать фрагменты, потому что это напрямую связано с памятью, что является вашей заботой. Будет проще использовать метод проб и ошибок для размещения в памяти на основе фрагментов определенного размера, чем количества строк, что изменит объем использования памяти в зависимости от того, сколько данных находится в строках. Единственная другая сложная часть заключается в том, что по какой-то причине, связанной с конкретным приложением, вы группируете некоторые строки, поэтому это просто немного усложняет задачу. Используя ваш код в качестве примера:

   '''load the genome matrix file onto pandas as dataframe.
    This makes is more easy for multiprocessing'''

    # store the splitted dataframes as list of key, values(pandas dataframe) pairs
    # this list of dataframe will be used while multiprocessing
    #not sure why you need the ordered dict here, might add memory overhead
    #gen_matrix_df_list = collections.OrderedDict()  
    #a defaultdict won't throw an exception when we try to append to it the first time. if you don't want a default dict for some reason, you have to initialize each entry you care about.
    gen_matrix_df_list = collections.defaultdict(list)   
    chunksize = 10 ** 6

    for chunk in pd.read_csv(genome_matrix_file, sep='\t', names=header, chunksize=chunksize)
        # now, group the dataframe by chromosome/contig - so it can be multiprocessed
        gen_matrix_df = chunk.groupby('CHROM')
        for chr_, data in gen_matrix_df:
            gen_matrix_df_list[chr_].append(data)

    '''Having sorted chunks on read to a list of df, now create single data frames for each chr_'''
    #The dict contains a list of small df objects, so now concatenate them
    #by reassigning to the same dict, the memory footprint is not increasing 
    for chr_ in gen_matrix_df_list.keys():
        gen_matrix_df_list[chr_]=pd.concat(gen_matrix_df_list[chr_])

    '''Now, pipe each dataframe from the list using map.Pool() '''
    p = Pool(3)  # number of pool to run at once; default at 1
    result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))
    p.close()
    p.join()

Ваш ответ и ответ Томаша выглядят многообещающими. И у меня не было времени проверить это. Я сделаю это завтра. Мне нравится идея переназначения. А пока про As long as you can fit .... in memory twice - почему не 3, 4 раза? Я также думал, есть ли способ создать список как интегратор, генератор или выход и передать его процессу Pool.map(). Какие-либо предложения?

everestial007 31.03.2018 03:46

@ everestial007 Потому что вам нужно только уместить его дважды: полную исходную копию и каждый фрагмент по мере создания фрагментов, то есть дважды. 3 или 4 раза это просто перебор. Когда вы создаете генератор, вы экономите память только в том случае, если сначала у вас нет всего элемента в памяти (или если вы делаете что-то новое, например, генератор, являющийся результатом zip двух существующих списков). И на самом деле, я не знал этого раньше, но, посмотрев, у pandas есть метод частичного чтения файлов, который, я уверен, лучше работал бы в вашем случае. Отредактирую свой ответ.

Jeff Ellen 31.03.2018 16:49

Единственное, что улучшило использование моей памяти, - это переместить p=Pool(3) в начало функции main. Назначение chunksize мне не поможет, потому что мне нужно читать все данные сразу с одной хромосомы - немного сложная причина. Я также подумал, поможет ли чтение данных как итератор, генератор. Скорее, этот метод stackoverflow.com/questions/34143397/… работал лучше всего. Но будет некоторое сопротивление из-за перезаписи ввода-вывода.

everestial007 01.04.2018 22:55

Кроме того, переназначение действительно не уменьшило использование памяти. Я не уверен, по какой причине.

everestial007 01.04.2018 22:57

@ everestial007 Ваш ответ не имеет смысла, вы пробовали мой код? Вы говорите, что мое решение не сработает, потому что «вам нужно прочитать все данные с одной хромосомы сразу». Но ваш исходный код этого не делает. Читается весь CSV, ничего особенного. Затем ваш код использует «группировать по» для подготовки некоторой группы хромосом для каждого члена пула. Мой код делает почти то же самое: он считывает фрагмент файла, а затем использует «группировать по» для подготовки группы хромосом. Единственный вопрос в том, выбрал ли я хороший размер фрагмента для вашей системы, возможно, вам придется его настроить.

Jeff Ellen 02.04.2018 08:27

Другие вопросы по теме