Я работаю с PySpark в записных книжках Synapse, и мне нужно загрузить файл Parquet в DataFrame, применить некоторые преобразования (например, переименовать столбцы), а затем сохранить измененный DataFrame обратно в то же место, перезаписав исходный файл. Однако когда я пытаюсь сохранить DataFrame, он создает каталог с именем Test.parquet, содержащий два файла: один с именем SUCCESS, а другой со строкой случайных букв и цифр.
Вот код, который я использую:
%%pyspark
df = spark.read.load('path/to/Test.parquet', format='parquet')
display(df.limit(10))
column_mapping = {
"FullName": "Full Name",
}
for old_col, new_col in column_mapping.items():
df = df.withColumnRenamed(old_col, new_col)
display(df.limit(10))
df.write.parquet('path/to/Test.parquet', mode='overwrite')
Вот как происходит перезапись файла:
Как я могу правильно перезаписать исходный файл Parquet, не создавая дополнительных файлов или каталогов? Любая помощь приветствуется!





Проблема с сохранением только одного паркета в Spark заключается в том, что он разделяет таблицы данных в целях параллелизма. Также файлы сжимаются быстрым алгоритмом. Я попытался дать ответ на основе файловой системы блоков данных. Возможно, вам нужно адаптировать пути к файлам под свои нужды.
Что вам нужно сделать, это:
coalsce(1), чтобы получить один файл в папке, созданной командой df.write.os.listdir в папке, созданной искрой.os.replaceshutil.rmtree.Я дал вам ответ, как вы можете достичь того, чего хотите.
from pyspark.sql import DataFrame
import os
import shutil
from typing import Union
from pathlib import Path
def get_local_path(path: Union[str, Path]) -> str:
"""
Transforms a potential dbfs path to a
path accessible by standard file system operations
:param path: Path to transform to local path
:return: The local path
"""
return str(path).replace("dbfs:", "/dbfs")
def save_as_one_parquet_file(
df: DataFrame,
output_file_path: str,
):
"""
Saves a spark dataframe as a single parquet file.
:param df: The spark dataframe to write.
:param output_file_path: The output filepath for writing the parquet file.
"""
localpath = get_local_path(output_file_path)
tmp_file_path = localpath + "_temp"
(
df.coalesce(1)
.write.mode("overwrite")
.format("parquet")
.save(output_file_path + "_temp")
)
file = [file for file in os.listdir(localpath + "_temp") if file.endswith(".parquet")][0]
os.replace(os.path.join(localpath + "_temp", file), localpath)
shutil.rmtree(tmp_file_path)
# Reading in
path = "dbfs:/mnt/dl2-temp-p-chn-1/test/Flights 1m.parquet"
df = spark.read.parquet(path)
# Transformations
df_new = df.withColumn("blub", f.lit(2))
# Saving as one parquet file
save_as_one_parquet_file(df_new, path)