Дубликаты, даже если дубликатов нет

У меня есть фрейм данных в результате нескольких соединений. Когда я проверяю, мне говорят, что у меня есть дубликат, хотя с моей точки зрения это невозможно. Вот абстрактный пример:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import pyspark.sql.functions as f
from pyspark.sql.functions import lit

# Create a Spark session
spark = SparkSession.builder.appName("CreateDataFrame").getOrCreate()

# User input for number of rows
n_a = 10
n_a_c = 5
n_a_c_d = 3
n_a_c_e = 4

# Define the schema for the DataFrame
schema_a = StructType([StructField("id1", StringType(), True)])
schema_a_b = StructType(
    [
        StructField("id1", StringType(), True),
        StructField("id2", StringType(), True),
        StructField("extra", StringType(), True),
    ]
)
schema_a_c = StructType(
    [
        StructField("id1", StringType(), True),
        StructField("id3", StringType(), True),
    ]
)
schema_a_c_d = StructType(
    [
        StructField("id3", StringType(), True),
        StructField("id4", StringType(), True),
    ]
)
schema_a_c_e = StructType(
    [
        StructField("id3", StringType(), True),
        StructField("id5", StringType(), True),
    ]
)

# Create a list of rows with increasing integer values for "id1" and a constant value of "1" for "id2"
rows_a = [(str(i),) for i in range(1, n_a + 1)]
rows_a_integers = [str(i) for i in range(1, n_a + 1)]
rows_a_b = [(str(i), str(1), "A") for i in range(1, n_a + 1)]


def get_2d_list(ids_part_1: list, n_new_ids: int):
    rows = [
        [
            (str(i), str(i) + "_" + str(j))
            for i in ids_part_1
            for j in range(1, n_new_ids + 1)
        ]
    ]
    return [item for sublist in rows for item in sublist]


rows_a_c = get_2d_list(ids_part_1=rows_a_integers, n_new_ids=n_a_c)
rows_a_c_d = get_2d_list(ids_part_1=[i[1] for i in rows_a_c], n_new_ids=n_a_c_d)
rows_a_c_e = get_2d_list(ids_part_1=[i[1] for i in rows_a_c], n_new_ids=n_a_c_e)

# Create the DataFrame
df_a = spark.createDataFrame(rows_a, schema_a)
df_a_b = spark.createDataFrame(rows_a_b, schema_a_b)
df_a_c = spark.createDataFrame(rows_a_c, schema_a_c)
df_a_c_d = spark.createDataFrame(rows_a_c_d, schema_a_c_d)
df_a_c_e = spark.createDataFrame(rows_a_c_e, schema_a_c_e)

# Join everything
df_join = (
    df_a.join(df_a_b, on = "id1")
    .join(df_a_c, on = "id1")
    .join(df_a_c_d, on = "id3")
    .join(df_a_c_e, on = "id3")
)

# Nested structure
# show
df_nested = df_join.withColumn("id3", f.struct(f.col("id3"))).orderBy("id3")

for i, index in enumerate([(5, 3), (4, 3), (3, None)]):
    remaining_columns = list(set(df_nested.columns).difference(set([f"id{index[0]}"])))
    df_nested = (
        df_nested.groupby(*remaining_columns)
        .agg(f.collect_list(f.col(f"id{index[0]}")).alias(f"id{index[0]}_tmp"))
        .drop(f"id{index[0]}")
        .withColumnRenamed(
            f"id{index[0]}_tmp",
            f"id{index[0]}",
        )
    )

    if index[1]:
        df_nested = df_nested.withColumn(
            f"id{index[1]}",
            f.struct(
                f.col(f"id{index[1]}.*"),
                f.col(f"id{index[0]}"),
            ).alias(f"id{index[1]}"),
        ).drop(f"id{index[0]}")

Я проверяю дубликаты на основе id3, которые должны быть уникальными для всего фрейма данных на втором уровне:

# Investigate for duplicates
df_test = df_nested.select("id2", "extra", f.explode(f.col("id3")["id3"]).alias("id3"))
df_test.groupby("id3").count().filter(f.col("count") > 1).show()

Которые говорят мне, что ID3 == 8_3 существует дважды:

+---+-----+
|id3|count|
+---+-----+
|8_3|    2|
+---+-----+

Тем не менее, в фрейме данных явно уникальный для ID3. Что можно показать (ID4 и ID5 находятся на следующем уровне)

df_join.groupby("id3", "id4", "id5").count().filter(f.col("count") > 1).show()

ведущий к

+---+---+---+-----+
|id3|id4|id5|count|
+---+---+---+-----+
+---+---+---+-----+

Если это поможет, я использую Databricks Runtime Version 11.3 LTS (включает Apache Spark 3.3.0, Scala 2.12)

Что вы увидите, если просто сделаете print(df_test[df_test['id3'] == "8_3"])? Это должно показать вам повторяющиеся строки.

Barmar 05.05.2023 17:08
df_test[df_test['id3'] == "8_3"] показывает мне дважды |id2|extra|id3| := | 1| A|8_3|
Lazloo Xp 08.05.2023 08:54
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
2
125
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вы группируете фреймы данных и объединяете сгруппированные элементы в массив. Все это в цикле. Группировка также выполняется по ранее собранным массивам.

Упущенное предположение состоит в том, что когда вы используете одни и те же элементы, вы получите один и тот же массив. Но это не так, так как искра не гарантирует порядок элементов в результате collect_list. В документах конкретно сказано:

Функция недетерминирована, потому что порядок собранных результатов зависит от порядка строк, который может быть недетерминированным после перетасовки.

По какой-то причине значения collect_list собираются не по порядку (на моей машине было то же самое), и вы получаете два разных массива, которые будут собраны в две строки в следующих 8_3.

Чтобы решить «проблему», вы должны явно указать, что хотите, чтобы массивы были отсортированы (используйте groupby после array_sort).

.agg(f.array_sort(f.collect_list(f.col(f"id{index[0]}"))).alias(f"id{index[0]}_tmp"))

или собрать значения в набор с помощью collect_list, так как наборы не имеют порядка:

.agg(f.collect_set(f.col(f"id{index[0]}")).alias(f"id{index[0]}_tmp"))

Проблему было нелегко найти, потому что у вас много общего кода с циклами, переименованиями и т. д.

Надеюсь, это поможет.

Другие вопросы по теме