Каков наиболее эффективный способ многопроцессорной обработки очень большого кадра данных?

У меня есть большой Dataframe, над которым мне нужно выполнить множество операций сопоставления, и в прошлом я всегда использовал для этого приведенный ниже метод. Однако Dataframe, который я сейчас пытаюсь выполнить в многопроцессорном режиме, представляет собой CSV-файл размером 2 ГБ, с которым на моем компьютере возникают проблемы с многопроцессорной обработкой, даже при наличии только одного раздела. Я предполагаю, что это связано с тем, что когда Dataframe разбивается на фрагменты для многопроцессорной обработки, объем необходимой памяти удваивается, и поэтому мой компьютер не может с этим справиться. Это мой текущий код:

def parallelize_dataframe(df, func, additional_param, num_partitions):
    df_split = np.array_split(df, num_partitions)
    results = []
    with ProcessPoolExecutor(max_workers=num_partitions) as executor:
        futures = {executor.submit(func, chunk, additional_param): chunk for chunk in df_split}
        for future in tqdm(futures, total=len(futures), desc = "Overall progress"):
            results.append(future.result())
    return pd.concat(results)

Любая помощь очень ценится.

Рассматривали ли вы полярные животные вместо панд?

user19077881 26.07.2024 17:41

Если ваша задача привязана к ЦП, многопроцессорность в Python сделает ее МЕДЛЕННЕЕ, поскольку GIL все еще действует. Используйте поляры или опубликуйте здесь подробную информацию о вашем совпадении, чтобы получить лучший ответ.

Code Different 26.07.2024 19:17

@CodeDifferent многопроцессорность (по сравнению с многопоточностью) — это, в частности, способ обойти GIL в Python (возможно, начиная с версии 3.13, начиная со сборок без GIL). Если вы можете создать рабочий процесс, который не передает слишком много данных между процессами и привязан к процессору; многопроцессорность абсолютно точно сможет дать прирост скорости.

Aaron 29.07.2024 17:25
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
3
77
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

для таких задач я бы предложил предварительно обработать файл csv, чтобы разделить его на примерно равные куски, которые считываются дочерним процессом, а не считываются основным процессом и отправляются дочернему процессу. Отправка этих данных от основного к дочернему требует немало накладных расходов (и памяти). Вот пример:

from multiprocessing import Pool
from io import BytesIO
import pandas as pd

csvfile = r"c:\some\example\data.csv"

chunksize = 2**20  # 1MiB chunk size (try different values depending on file size and processing speed)

#example csv contents
# colA,colB,colC
# 1,2,3
# 4,5,6
# 7,8,9
# ...

def sum_cols(args):
    file_start, file_end, col_names = args  # unpack tuple args as Pool.imap_unordered only supports a single arg
    with open(csvfile, "rb") as f:
        f.seek(file_start)
        buf = BytesIO(f.read(file_end-file_start))  # store chunk of csv in a buffer to pass to pandas
    df = pd.read_csv(buf, names=col_names)  # col_names aren't in the chunk so pass them explicitly
    return df.sum()
        
if __name__ == "__main__":
    with open(csvfile, "rb") as f:
        firstline = f.readline()
        headers = [col_title.strip() for col_title in firstline.split(b",")]
        startpoints = []
        endpoints = []
        while True:  # scan the file without reading in much data to find good chunk boundaries (start and end on newline)
            startpoints.append(f.tell())
            f.seek(chunksize,1)  # skip ahead by chunksize bytes
            line = f.readline()  # find the end of the current line
            endpoints.append(f.tell())
            if not line:  # empty line indicates end of the file
                if startpoints[-1] == endpoints[-1]: #if the last chunk landed exactly on the last line of the file, there could be an empty chunk
                    startpoints.pop()
                    endpoints.pop()
                break
    
    arglist = [(start, end, headers) for start, end in zip(startpoints, endpoints)]
    with Pool() as pool:
        print(sum(pool.imap(sum_cols, arglist)))

Я не использовал polars и не могу на основании этого ответить, но насколько я понимаю, это очень быстро.

Мне кажется, что когда вы выполняете f.seek(chunksize,1), если вам случится искать в пределах последней строки файла, тогда, когда вы выполняете следующую итерацию while True; вы создадите новую запись в arglist, что приведет к созданию sum_cols фрейма данных без строк.

Booboo 27.07.2024 15:27

Я считаю, что ответ Аарона является разумным подходом. Но позвольте мне предложить несколько предложений:

При создании arglist выполнение f.seek(chunksize,1) позволяет позиционировать следующее чтение в последней строке файла CSV. Следовательно, на следующей итерации мы фактически находимся в конце файла, но в arglist будет добавлена ​​новая запись, в результате чего в sum_cols будет создан пустой фрейм данных.

В общем, вы хотите разделить большой CSV-файл на некоторое количество фреймов данных, так что если исходное содержимое ячейки было строками, то содержимое ячеек фреймов данных будет строками.

Подходящий аргумент chunksize должен быть передан imap в зависимости от количества задач, которые будут поставлены в очередь в пул, и размера пула. Это повысит производительность, когда в многопроцессорный пул отправляется большое количество задач.

Есть также несколько вещей, которые мы делаем по ночам, чтобы уменьшить использование памяти. Поскольку multiprocessing.pool.imap используется, нет причин, по которым arglist не должно быть выражением-генератором или заменено функцией-генератором, позволяющей сэкономить немного памяти. Дополнительную память можно сэкономить, если не нужно явно передавать имена постоянных столбцов для каждой отправленной задачи.

from multiprocessing import Pool, cpu_count
from io import StringIO
import pandas as pd
from functools import partial

csvfile = r"c:\some\example\data.csv"

chunk_bytes = 2**20  # 1MiB chunk byte size (try different values depending on file size and processing speed)
n_tasks = 2000  # approximation of: file size / chunk_bytes

#example csv contents
# colA,colB,colC
# 1,2,3
# 4,5,6
# 7,8,9
# ...

def sum_cols(col_names, args):
    file_start, file_end = args  # Unpack args
    with open(csvfile, "rb") as f:
        f.seek(file_start)
        buf = StringIO(f.read(file_end-file_start).decode("utf-8"))  # store chunk of csv in a buffer to pass to pandas
    df = pd.read_csv(buf, names=col_names)  # col_names aren't in the chunk so pass them explicitly
    return df.sum()

if __name__ == "__main__":

    def compute_chunksize(iterable_size, pool_size):
        chunksize, remainder = divmod(iterable_size, 4 * pool_size)
        if remainder:
            chunksize += 1
        return chunksize

    def generate_args(file_size, f):
        file_start = f.tell()
        while file_start < file_size:  # scan the file without reading in much data to find good chunk boundaries (start and end on newline)
            f.seek(chunk_bytes, 1)  # skip ahead by chunksize bytes
            line = f.readline()  # find the end of the current line
            file_end = f.tell()
            yield file_start, file_end
            file_start = file_end

    pool_size = cpu_count()
    chunksize = compute_chunksize(n_tasks, pool_size)

    with open(csvfile, "rb") as f:
        file_size = f.seek(0, 2)
        f.seek(0, 0) # Seek back to the beginning:
        firstline = f.readline()
        headers = [col_title for col_title in firstline.decode("utf-8").strip().split(",")]

        with Pool(pool_size) as pool:
            worker = partial(sum_cols, headers)
            print(sum(pool.imap(worker, generate_args(file_size, f), chunksize=chunksize)))

Другие вопросы по теме