Почему мой столбец row_number в PySpark испорчен при применении схемы?

Я хочу применить схему к конкретным нетехническим столбцам кадра данных Spark. Предварительно я добавляю искусственный идентификатор, используя Window и row_number, чтобы позже можно было присоединиться к новому DataFrame некоторых других технических столбцов из исходного DataFrame. Однако после применения схемы сгенерированный идентификатор искажается. Ниже приведен пример кода. Может кто-нибудь объяснить, почему это происходит и как решить проблему?

from pyspark.sql.functions import row_number, lit, col, monotonically_increasing_id, sum
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Sample DataFrame
data = [(1, "Alice"), (2, "Bob"), (3, "Charlie")]
df = spark.createDataFrame(data, ["id", "name"])


# Schema to apply
schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("name", StringType(), False),
])

# Create ID column
w = Window().orderBy(lit('A'))
df = df.withColumn('_special_surrogate_id', row_number().over(w))

# Improved method
surrogate_key_field = StructField("_special_surrogate_id", StringType(), False)
schema_with_surrogate = StructType(schema.fields + [surrogate_key_field])

# Loop because sometimes it works and sometimes it does't work
for i in range(11):
    
    df_filtered = df.select("id", "name", "_special_surrogate_id")   
    df_filtered = spark.createDataFrame(df_filtered.rdd, schema_with_surrogate)

    combined_df = df.withColumnRenamed("id", "id1").join(df_filtered.withColumnRenamed("id", "id2"), on = "_special_surrogate_id")

    print("Diffs in Iteration " + str(i) + ":")
    print(combined_df.withColumn("diff", (col("id1") != col("id2")).cast("integer")).agg(sum("diff")).collect()[0][0])

Я попробовал запустить ваш код, и сгенерированные столбцы идентификаторов, похоже, соответствуют исходным данным в df

Derek O 24.06.2024 15:42

Какой у вас тип машины? Вы выполняете его на одном узле? При работе в моем кластере столбцы идентификаторов иногда совпадают, а иногда нет. Я не уверен, почему. Может ли это быть вызвано ленивой оценкой искры при вызове createDataFrame?

stats_guy 25.06.2024 10:08

Дело в том, что я использую Synapse Analytics. Когда я настраиваю свой сеанс на использование только одного исполнителя, строки все время совпадают. Как только я использую более одного, возникают различия.

stats_guy 25.06.2024 14:13
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
3
90
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Проблема:

orderBy(lit('A')) используется для определения порядка row_number(). Это не детерминированный способ определения значения номера строки.

За кулисами Spark:

Spark использует несколько исполнителей для завершения процесса нумерации строк. Для этого Spark выполняет первый проход, во время которого каждый исполнитель упорядочивает имеющиеся у него данные. Затем делаем еще один проход, чтобы упорядочить все данные.

Пример сценария —

Когда строка данных Alice и строка данных Bob обрабатываются разными исполнителями, их можно считать «первой строкой» для своего исполнителя.

Затем Spark должен на втором проходе решить, какой строке данных в конечном итоге будет присвоено значение row_number 1.

Однако ваша логика не подсказывает Спарку, как принять это решение.

Перетасовка данных, среди других механизмов Spark, может привести к тому, что Spark не будет принимать одно и то же решение каждый раз, когда присваивает значение номера строки.

Рекомендация:

Если у вас есть комбинация столбцов, которая может привести к уникальному набору значений для каждой строки, ее следует использовать в операторе orderBy, чтобы предоставить Spark информацию, необходимую для последовательного и детерминированного присвоения номеров строк.

например orderBy(F.col('id'),F.col('name'))

Другие вопросы по теме