Найти иерархию дерева в группе и собрать в список — PySpark

В приведенных ниже данных для каждого id2 я хочу собрать список id1, который находится над ними в иерархии/уровне.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("group_id", StringType(), False),
    StructField("level", IntegerType(), False),
    StructField("id1", IntegerType(), False),
    StructField("id2", IntegerType(), False)
])

# Feature values
levels = [1, 2, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4]

id1_values = [0, 200001, 677555, 677555, 677555, 677555, 677555, 605026, 605026, 605026, 605026, 605026, 605026, 662867, 662867,  662867, 662867, 662867]

id2_values = [200001, 677555, 605026, 662867, 676423,  659933, 660206, 675767, 681116, 913248, 
    910758, 913773, 698738, 910387, 910758, 910387, 910113, 910657]

data = zip(['A'] * len(levels), levels, id1_values, id2_values)

# Create DataFrame
data = spark.createDataFrame(data, schema)

Это можно сделать следующим образом, используя оконную функцию и Collect_list.

window = Window.partitionBy('group_id').orderBy('level').rowsBetween(Window.unboundedPreceding, Window.currentRow)

data.withColumn("list_id1", F.collect_list("id1").over(window)).show(truncate=False)
Output: 
+--------+-----+------+------+-------------------------------------------------------------------------------------------------------------------------------------------+
|group_id|level|id1   |id2   |list_id1                                                                                                                                   |
+--------+-----+------+------+-------------------------------------------------------------------------------------------------------------------------------------------+
|A       |1    |0     |200001|[0]                                                                                                                                        |
|A       |2    |200001|677555|[0, 200001]                                                                                                                                |
|A       |3    |677555|605026|[0, 200001, 677555]                                                                                                                        |
|A       |3    |677555|662867|[0, 200001, 677555, 677555]                                                                                                                |
|A       |3    |677555|676423|[0, 200001, 677555, 677555, 677555]                                                                                                        |
|A       |3    |677555|659933|[0, 200001, 677555, 677555, 677555, 677555]                                                                                                |
|A       |3    |677555|660206|[0, 200001, 677555, 677555, 677555, 677555, 677555]                                                                                        |
|A       |4    |605026|675767|[0, 200001, 677555, 677555, 677555, 677555, 677555, 605026]                                                                                |
|A       |4    |605026|681116|[0, 200001, 677555, 677555, 677555, 677555, 677555, 605026, 605026]                                                                        |
|A       |4    |605026|913248|[0, 200001, 677555, 677555, 677555, 677555, 677555, 605026, 605026, 605026]                                                                |
|A       |4    |605026|910758|[0, 200001, 677555, 677555, 677555, 677555, 677555, 605026, 605026, 605026, 605026]                                                        |
|A       |4    |605026|913773|[0, 200001, 677555, 677555, 677555, 677555, 677555, 605026, 605026, 605026, 605026, 605026]                                                |
|A       |4    |605026|698738|[0, 200001, 677555, 677555, 677555, 677555, 677555, 605026, 605026, 605026, 605026, 605026, 605026]                                        |
|A       |4    |662867|910387|[0, 200001, 677555, 677555, 677555, 677555, 677555, 605026, 605026, 605026, 605026, 605026, 605026, 662867]                                |
|A       |4    |662867|910758|[0, 200001, 677555, 677555, 677555, 677555, 677555, 605026, 605026, 605026, 605026, 605026, 605026, 662867, 662867]                        |
|A       |4    |662867|910387|[0, 200001, 677555, 677555, 677555, 677555, 677555, 605026, 605026, 605026, 605026, 605026, 605026, 662867, 662867, 662867]                |
|A       |4    |662867|910113|[0, 200001, 677555, 677555, 677555, 677555, 677555, 605026, 605026, 605026, 605026, 605026, 605026, 662867, 662867, 662867, 662867]        |
|A       |4    |662867|910657|[0, 200001, 677555, 677555, 677555, 677555, 677555, 605026, 605026, 605026, 605026, 605026, 605026, 662867, 662867, 662867, 662867, 662867]|
+--------+-----+------+------+-------------------------------------------------------------------------------------------------------------------------------------------+

В некоторых случаях существует несколько id1 одного уровня. Я хочу, чтобы Collect_list принял это во внимание.

Например, на уровне 4 у нас есть два уникальных идентификатора 1: 605026 и 662867. Для идентификатора 2 910387 это соответствует идентификатору 1 662867 на уровне 4. Я не хочу включать 605026 в список.

Список, который я хочу собрать, должен включать только один идентификатор 1 на уровень, захватывая путь к дереву до уровня 1.

Для id2: 910657 этот список должен быть [662867,677555, 200001, 0]

Как этого можно добиться с помощью PySpark API?

Я не уверен, что понял это: «Например, на втором уровне у нас есть два уникальных идентификатора 1: 605026 и 662867. Для идентификатора 2 910387 это соответствует идентификатору 1 662867 на уровне 4. Я не хочу включать 605026 в список. .». На самом деле на втором уровне у вас есть только один уникальный id1, то есть 200001, верно? Вы ошиблись в том, что там описали? Будет полезно, если вы проясните, о какой именно строке вы говорите, как она должна выглядеть на самом деле и почему.

Sachin Hosmani 12.08.2024 14:28

@SachinHosmani - извините, я имел в виду уровень 4. Я обновил вопрос.

Henri 12.08.2024 14:43

Добавил ответ. Я понял, что не использовал столбец группы. Должно ли все это быть сделано для каждой группы?

Sachin Hosmani 12.08.2024 19:55

@SachinHosmani, да, в моем реальном случае использования я собираюсь применить функцию, собирающую коллекцию для каждой группы, и объединить ее обратно в новый фрейм данных.

Henri 12.08.2024 20:11
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
4
50
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Прежде всего, я бы предложил переименовать столбцы вашей таблицы, чтобы id1 было parent, а id2 — как node. Ваша таблица по существу фиксирует отношения родитель-потомок между узлами. Ваша цель — построить полные пути от каждого узла.

Это трудоемкий процесс, поскольку он требует повторных соединений вплоть до начала пути. В худшем случае потребуется столько же соединений, сколько и самый длинный путь.

Вот что я попробовал

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("HierarchicalPath").getOrCreate()

data = [
    ("A", 1, 0, 200001),
    ("A", 2, 200001, 677555),
    ("A", 3, 677555, 605026),
    ("A", 3, 677555, 662867),
    ("A", 3, 677555, 676423),
    ("A", 3, 677555, 659933),
    ("A", 3, 677555, 660206),
    ("A", 4, 605026, 675767),
    ("A", 4, 605026, 681116),
    ("A", 4, 605026, 913248),
    ("A", 4, 605026, 910758),
    ("A", 4, 605026, 913773),
    ("A", 4, 605026, 698738),
    ("A", 4, 662867, 910387),
    ("A", 4, 662867, 910758),
    ("A", 4, 662867, 910387),
    ("A", 4, 662867, 910113),
    ("A", 4, 662867, 910657)
]
columns = ["group_id", "level", "parent", "node"]
df = spark.createDataFrame(data, columns)

df = df.withColumn("path", F.array("parent"))
df.show(truncate=False)

# Iteratively build the full path
max_level = df.select(F.max("level")).collect()[0][0]
current_level = max_level

# Keep a copy of the original df to preserve child-parent relationships
original_df = df

while current_level > 1:
    # Repeatedly join to get the parent information from other original df
    # and overwrite the "growing" `df`
    joined_df = df.alias("child").join(
        original_df.alias("parent"),
        F.col("child.parent") == F.col("parent.node"),
        "left" # Left join because some paths are shorter than others
    ).select(
        F.col("child.group_id"),
        F.col("child.level"),
        F.col("parent.parent").alias("parent"),
        F.col("child.node"),
        # Append the latest parent to the path only if it's not null
        F.expr("CASE WHEN parent.parent IS NOT NULL THEN array_union(child.path, array(parent.parent)) ELSE child.path END").alias("path")
    )
    
    df = joined_df
    
    current_level -= 1

df.show(truncate=False)

# Some massaging in the end to produce accurate results: add the node itself
# to the path and reverse the list
result_df = df.select(
    "node",
    F.expr("array_union(reverse(path), array(node))").alias("full_path")
).orderBy("level")

result_df.show(truncate=False)

Дает мне это как результат


+------+-----------------------------------+
|node  |full_path                          |
+------+-----------------------------------+
|200001|[0, 200001]                        |
|677555|[0, 200001, 677555]                |
|605026|[0, 200001, 677555, 605026]        |
|662867|[0, 200001, 677555, 662867]        |
|676423|[0, 200001, 677555, 676423]        |
|659933|[0, 200001, 677555, 659933]        |
|660206|[0, 200001, 677555, 660206]        |
|675767|[0, 200001, 677555, 605026, 675767]|
|681116|[0, 200001, 677555, 605026, 681116]|
|910387|[0, 200001, 677555, 662867, 910387]|
|910758|[0, 200001, 677555, 662867, 910758]|
|910387|[0, 200001, 677555, 662867, 910387]|
|910113|[0, 200001, 677555, 662867, 910113]|
|910657|[0, 200001, 677555, 662867, 910657]|
|913248|[0, 200001, 677555, 605026, 913248]|
|910758|[0, 200001, 677555, 605026, 910758]|
|913773|[0, 200001, 677555, 605026, 913773]|
|698738|[0, 200001, 677555, 605026, 698738]|
+------+-----------------------------------+

Отличное решение. Единственное замечание: номер узла также фигурирует в списке full_path. Этого я не хочу. Но с этим можно легко справиться.

Henri 12.08.2024 21:02

Другие вопросы по теме