Как перезаписать файл паркета в том же месте с помощью PySpark

Я работаю с PySpark в записных книжках Synapse, и мне нужно загрузить файл Parquet в DataFrame, применить некоторые преобразования (например, переименовать столбцы), а затем сохранить измененный DataFrame обратно в то же место, перезаписав исходный файл. Однако когда я пытаюсь сохранить DataFrame, он создает каталог с именем Test.parquet, содержащий два файла: один с именем SUCCESS, а другой со строкой случайных букв и цифр.

Вот код, который я использую:

%%pyspark
df = spark.read.load('path/to/Test.parquet', format='parquet')
display(df.limit(10))

column_mapping = {
    "FullName": "Full Name",
}

for old_col, new_col in column_mapping.items():
    df = df.withColumnRenamed(old_col, new_col)
    display(df.limit(10))
df.write.parquet('path/to/Test.parquet', mode='overwrite')

Вот как происходит перезапись файла:

Как я могу правильно перезаписать исходный файл Parquet, не создавая дополнительных файлов или каталогов? Любая помощь приветствуется!

Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
0
100
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Проблема с сохранением только одного паркета в Spark заключается в том, что он разделяет таблицы данных в целях параллелизма. Также файлы сжимаются быстрым алгоритмом. Я попытался дать ответ на основе файловой системы блоков данных. Возможно, вам нужно адаптировать пути к файлам под свои нужды.

Что вам нужно сделать, это:

  1. Объедините DataFrame с помощью coalsce(1), чтобы получить один файл в папке, созданной командой df.write.
  2. Распознайте отдельный файл паркета с помощью операции os.listdir в папке, созданной искрой.
  3. Замените его на каталог, из которого вы изначально читали, os.replace
  4. Удалите папку, созданную из искры, со всеми содержащимися подфайлами, используя shutil.rmtree.

Я дал вам ответ, как вы можете достичь того, чего хотите.


from pyspark.sql import DataFrame
import os
import shutil
from typing import Union
from pathlib import Path


def get_local_path(path: Union[str, Path]) -> str:
    """
    Transforms a potential dbfs path to a
    path accessible by standard file system operations

    :param path: Path to transform to local path
    :return: The local path
    """
    return str(path).replace("dbfs:", "/dbfs")

def save_as_one_parquet_file(
    df: DataFrame,
    output_file_path: str,
):
    """
    Saves a spark dataframe as a single parquet file.

    :param df: The spark dataframe to write.
    :param output_file_path: The output filepath for writing the parquet file.
    """

    localpath = get_local_path(output_file_path)
    tmp_file_path = localpath + "_temp"
    (
        df.coalesce(1)
        .write.mode("overwrite")
        .format("parquet")
        .save(output_file_path + "_temp")
    )

    file = [file for file in os.listdir(localpath + "_temp") if file.endswith(".parquet")][0]
    os.replace(os.path.join(localpath + "_temp", file), localpath)
    shutil.rmtree(tmp_file_path)

# Reading in
path = "dbfs:/mnt/dl2-temp-p-chn-1/test/Flights 1m.parquet"
df = spark.read.parquet(path)

# Transformations
df_new = df.withColumn("blub", f.lit(2))

# Saving as one parquet file
save_as_one_parquet_file(df_new, path)

Другие вопросы по теме

Похожие вопросы

Создать столбец подмножества массива структур без разбивки
Потоковая передача Spark + интеграция с Kafka, чтение данных из Kafka каждые 15 минут и сохранение смещения последнего чтения с помощью PySpark
Не удалось загрузить предварительный просмотр: размер записной книжки превысил ограничение в байтах
Как сделать левое соединение, чтобы ключи могли иметь множественную степень детализации с помощью Spark?
Ошибки токена (доступа) при подключении к MS SQL Server из блокнотов Python DataBricks через драйвер JDBC PySPark с использованием субъекта службы Azure и MSAL
Как создать отдельные диапазоны дат из набора диапазонов в sql
Java.lang.NoClassDefFoundError: org/apache/hadoop/fs/impl/prefetch/PrefetchingStatistics при запуске pyspark
Вычисление количества скользящих чисел из двух разных столбцов временных рядов в pyspark
Pyspark: динамическое выравнивание таблицы иерархии
Преобразуйте каждую пару значений ключа в столбцы кадра данных в pyspark