У меня есть большой Dataframe, над которым мне нужно выполнить множество операций сопоставления, и в прошлом я всегда использовал для этого приведенный ниже метод. Однако Dataframe, который я сейчас пытаюсь выполнить в многопроцессорном режиме, представляет собой CSV-файл размером 2 ГБ, с которым на моем компьютере возникают проблемы с многопроцессорной обработкой, даже при наличии только одного раздела. Я предполагаю, что это связано с тем, что когда Dataframe разбивается на фрагменты для многопроцессорной обработки, объем необходимой памяти удваивается, и поэтому мой компьютер не может с этим справиться. Это мой текущий код:
def parallelize_dataframe(df, func, additional_param, num_partitions):
df_split = np.array_split(df, num_partitions)
results = []
with ProcessPoolExecutor(max_workers=num_partitions) as executor:
futures = {executor.submit(func, chunk, additional_param): chunk for chunk in df_split}
for future in tqdm(futures, total=len(futures), desc = "Overall progress"):
results.append(future.result())
return pd.concat(results)
Любая помощь очень ценится.
Если ваша задача привязана к ЦП, многопроцессорность в Python сделает ее МЕДЛЕННЕЕ, поскольку GIL все еще действует. Используйте поляры или опубликуйте здесь подробную информацию о вашем совпадении, чтобы получить лучший ответ.
@CodeDifferent многопроцессорность (по сравнению с многопоточностью) — это, в частности, способ обойти GIL в Python (возможно, начиная с версии 3.13, начиная со сборок без GIL). Если вы можете создать рабочий процесс, который не передает слишком много данных между процессами и привязан к процессору; многопроцессорность абсолютно точно сможет дать прирост скорости.
для таких задач я бы предложил предварительно обработать файл csv, чтобы разделить его на примерно равные куски, которые считываются дочерним процессом, а не считываются основным процессом и отправляются дочернему процессу. Отправка этих данных от основного к дочернему требует немало накладных расходов (и памяти). Вот пример:
from multiprocessing import Pool
from io import BytesIO
import pandas as pd
csvfile = r"c:\some\example\data.csv"
chunksize = 2**20 # 1MiB chunk size (try different values depending on file size and processing speed)
#example csv contents
# colA,colB,colC
# 1,2,3
# 4,5,6
# 7,8,9
# ...
def sum_cols(args):
file_start, file_end, col_names = args # unpack tuple args as Pool.imap_unordered only supports a single arg
with open(csvfile, "rb") as f:
f.seek(file_start)
buf = BytesIO(f.read(file_end-file_start)) # store chunk of csv in a buffer to pass to pandas
df = pd.read_csv(buf, names=col_names) # col_names aren't in the chunk so pass them explicitly
return df.sum()
if __name__ == "__main__":
with open(csvfile, "rb") as f:
firstline = f.readline()
headers = [col_title.strip() for col_title in firstline.split(b",")]
startpoints = []
endpoints = []
while True: # scan the file without reading in much data to find good chunk boundaries (start and end on newline)
startpoints.append(f.tell())
f.seek(chunksize,1) # skip ahead by chunksize bytes
line = f.readline() # find the end of the current line
endpoints.append(f.tell())
if not line: # empty line indicates end of the file
if startpoints[-1] == endpoints[-1]: #if the last chunk landed exactly on the last line of the file, there could be an empty chunk
startpoints.pop()
endpoints.pop()
break
arglist = [(start, end, headers) for start, end in zip(startpoints, endpoints)]
with Pool() as pool:
print(sum(pool.imap(sum_cols, arglist)))
Я не использовал polars
и не могу на основании этого ответить, но насколько я понимаю, это очень быстро.
Мне кажется, что когда вы выполняете f.seek(chunksize,1)
, если вам случится искать в пределах последней строки файла, тогда, когда вы выполняете следующую итерацию while True
; вы создадите новую запись в arglist
, что приведет к созданию sum_cols
фрейма данных без строк.
Я считаю, что ответ Аарона является разумным подходом. Но позвольте мне предложить несколько предложений:
При создании arglist
выполнение f.seek(chunksize,1)
позволяет позиционировать следующее чтение в последней строке файла CSV. Следовательно, на следующей итерации мы фактически находимся в конце файла, но в arglist
будет добавлена новая запись, в результате чего в sum_cols
будет создан пустой фрейм данных.
В общем, вы хотите разделить большой CSV-файл на некоторое количество фреймов данных, так что если исходное содержимое ячейки было строками, то содержимое ячеек фреймов данных будет строками.
Подходящий аргумент chunksize должен быть передан imap
в зависимости от количества задач, которые будут поставлены в очередь в пул, и размера пула. Это повысит производительность, когда в многопроцессорный пул отправляется большое количество задач.
Есть также несколько вещей, которые мы делаем по ночам, чтобы уменьшить использование памяти. Поскольку multiprocessing.pool.imap
используется, нет причин, по которым arglist
не должно быть выражением-генератором или заменено функцией-генератором, позволяющей сэкономить немного памяти. Дополнительную память можно сэкономить, если не нужно явно передавать имена постоянных столбцов для каждой отправленной задачи.
from multiprocessing import Pool, cpu_count
from io import StringIO
import pandas as pd
from functools import partial
csvfile = r"c:\some\example\data.csv"
chunk_bytes = 2**20 # 1MiB chunk byte size (try different values depending on file size and processing speed)
n_tasks = 2000 # approximation of: file size / chunk_bytes
#example csv contents
# colA,colB,colC
# 1,2,3
# 4,5,6
# 7,8,9
# ...
def sum_cols(col_names, args):
file_start, file_end = args # Unpack args
with open(csvfile, "rb") as f:
f.seek(file_start)
buf = StringIO(f.read(file_end-file_start).decode("utf-8")) # store chunk of csv in a buffer to pass to pandas
df = pd.read_csv(buf, names=col_names) # col_names aren't in the chunk so pass them explicitly
return df.sum()
if __name__ == "__main__":
def compute_chunksize(iterable_size, pool_size):
chunksize, remainder = divmod(iterable_size, 4 * pool_size)
if remainder:
chunksize += 1
return chunksize
def generate_args(file_size, f):
file_start = f.tell()
while file_start < file_size: # scan the file without reading in much data to find good chunk boundaries (start and end on newline)
f.seek(chunk_bytes, 1) # skip ahead by chunksize bytes
line = f.readline() # find the end of the current line
file_end = f.tell()
yield file_start, file_end
file_start = file_end
pool_size = cpu_count()
chunksize = compute_chunksize(n_tasks, pool_size)
with open(csvfile, "rb") as f:
file_size = f.seek(0, 2)
f.seek(0, 0) # Seek back to the beginning:
firstline = f.readline()
headers = [col_title for col_title in firstline.decode("utf-8").strip().split(",")]
with Pool(pool_size) as pool:
worker = partial(sum_cols, headers)
print(sum(pool.imap(worker, generate_args(file_size, f), chunksize=chunksize)))
Рассматривали ли вы полярные животные вместо панд?