Я хочу применить схему к конкретным нетехническим столбцам кадра данных Spark. Предварительно я добавляю искусственный идентификатор, используя Window
и row_number
, чтобы позже можно было присоединиться к новому DataFrame некоторых других технических столбцов из исходного DataFrame. Однако после применения схемы сгенерированный идентификатор искажается. Ниже приведен пример кода. Может кто-нибудь объяснить, почему это происходит и как решить проблему?
from pyspark.sql.functions import row_number, lit, col, monotonically_increasing_id, sum
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# Sample DataFrame
data = [(1, "Alice"), (2, "Bob"), (3, "Charlie")]
df = spark.createDataFrame(data, ["id", "name"])
# Schema to apply
schema = StructType([
StructField("id", IntegerType(), False),
StructField("name", StringType(), False),
])
# Create ID column
w = Window().orderBy(lit('A'))
df = df.withColumn('_special_surrogate_id', row_number().over(w))
# Improved method
surrogate_key_field = StructField("_special_surrogate_id", StringType(), False)
schema_with_surrogate = StructType(schema.fields + [surrogate_key_field])
# Loop because sometimes it works and sometimes it does't work
for i in range(11):
df_filtered = df.select("id", "name", "_special_surrogate_id")
df_filtered = spark.createDataFrame(df_filtered.rdd, schema_with_surrogate)
combined_df = df.withColumnRenamed("id", "id1").join(df_filtered.withColumnRenamed("id", "id2"), on = "_special_surrogate_id")
print("Diffs in Iteration " + str(i) + ":")
print(combined_df.withColumn("diff", (col("id1") != col("id2")).cast("integer")).agg(sum("diff")).collect()[0][0])
Какой у вас тип машины? Вы выполняете его на одном узле? При работе в моем кластере столбцы идентификаторов иногда совпадают, а иногда нет. Я не уверен, почему. Может ли это быть вызвано ленивой оценкой искры при вызове createDataFrame?
Дело в том, что я использую Synapse Analytics. Когда я настраиваю свой сеанс на использование только одного исполнителя, строки все время совпадают. Как только я использую более одного, возникают различия.
Проблема:
orderBy(lit('A'))
используется для определения порядка row_number(). Это не детерминированный способ определения значения номера строки.
За кулисами Spark:
Spark использует несколько исполнителей для завершения процесса нумерации строк. Для этого Spark выполняет первый проход, во время которого каждый исполнитель упорядочивает имеющиеся у него данные. Затем делаем еще один проход, чтобы упорядочить все данные.
Пример сценария —
Когда строка данных Alice
и строка данных Bob
обрабатываются разными исполнителями, их можно считать «первой строкой» для своего исполнителя.
Затем Spark должен на втором проходе решить, какой строке данных в конечном итоге будет присвоено значение row_number 1
.
Однако ваша логика не подсказывает Спарку, как принять это решение.
Перетасовка данных, среди других механизмов Spark, может привести к тому, что Spark не будет принимать одно и то же решение каждый раз, когда присваивает значение номера строки.
Рекомендация:
Если у вас есть комбинация столбцов, которая может привести к уникальному набору значений для каждой строки, ее следует использовать в операторе orderBy
, чтобы предоставить Spark информацию, необходимую для последовательного и детерминированного присвоения номеров строк.
например orderBy(F.col('id'),F.col('name'))
Я попробовал запустить ваш код, и сгенерированные столбцы идентификаторов, похоже, соответствуют исходным данным в
df