В приведенных ниже данных для каждого id2 я хочу собрать список id1, который находится над ними в иерархии/уровне.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("group_id", StringType(), False),
StructField("level", IntegerType(), False),
StructField("id1", IntegerType(), False),
StructField("id2", IntegerType(), False)
])
# Feature values
levels = [1, 2, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4]
id1_values = [0, 200001, 677555, 677555, 677555, 677555, 677555, 605026, 605026, 605026, 605026, 605026, 605026, 662867, 662867, 662867, 662867, 662867]
id2_values = [200001, 677555, 605026, 662867, 676423, 659933, 660206, 675767, 681116, 913248,
910758, 913773, 698738, 910387, 910758, 910387, 910113, 910657]
data = zip(['A'] * len(levels), levels, id1_values, id2_values)
# Create DataFrame
data = spark.createDataFrame(data, schema)
Это можно сделать следующим образом, используя оконную функцию и Collect_list.
window = Window.partitionBy('group_id').orderBy('level').rowsBetween(Window.unboundedPreceding, Window.currentRow)
data.withColumn("list_id1", F.collect_list("id1").over(window)).show(truncate=False)
Output:
+--------+-----+------+------+-------------------------------------------------------------------------------------------------------------------------------------------+
|group_id|level|id1 |id2 |list_id1 |
+--------+-----+------+------+-------------------------------------------------------------------------------------------------------------------------------------------+
|A |1 |0 |200001|[0] |
|A |2 |200001|677555|[0, 200001] |
|A |3 |677555|605026|[0, 200001, 677555] |
|A |3 |677555|662867|[0, 200001, 677555, 677555] |
|A |3 |677555|676423|[0, 200001, 677555, 677555, 677555] |
|A |3 |677555|659933|[0, 200001, 677555, 677555, 677555, 677555] |
|A |3 |677555|660206|[0, 200001, 677555, 677555, 677555, 677555, 677555] |
|A |4 |605026|675767|[0, 200001, 677555, 677555, 677555, 677555, 677555, 605026] |
|A |4 |605026|681116|[0, 200001, 677555, 677555, 677555, 677555, 677555, 605026, 605026] |
|A |4 |605026|913248|[0, 200001, 677555, 677555, 677555, 677555, 677555, 605026, 605026, 605026] |
|A |4 |605026|910758|[0, 200001, 677555, 677555, 677555, 677555, 677555, 605026, 605026, 605026, 605026] |
|A |4 |605026|913773|[0, 200001, 677555, 677555, 677555, 677555, 677555, 605026, 605026, 605026, 605026, 605026] |
|A |4 |605026|698738|[0, 200001, 677555, 677555, 677555, 677555, 677555, 605026, 605026, 605026, 605026, 605026, 605026] |
|A |4 |662867|910387|[0, 200001, 677555, 677555, 677555, 677555, 677555, 605026, 605026, 605026, 605026, 605026, 605026, 662867] |
|A |4 |662867|910758|[0, 200001, 677555, 677555, 677555, 677555, 677555, 605026, 605026, 605026, 605026, 605026, 605026, 662867, 662867] |
|A |4 |662867|910387|[0, 200001, 677555, 677555, 677555, 677555, 677555, 605026, 605026, 605026, 605026, 605026, 605026, 662867, 662867, 662867] |
|A |4 |662867|910113|[0, 200001, 677555, 677555, 677555, 677555, 677555, 605026, 605026, 605026, 605026, 605026, 605026, 662867, 662867, 662867, 662867] |
|A |4 |662867|910657|[0, 200001, 677555, 677555, 677555, 677555, 677555, 605026, 605026, 605026, 605026, 605026, 605026, 662867, 662867, 662867, 662867, 662867]|
+--------+-----+------+------+-------------------------------------------------------------------------------------------------------------------------------------------+
В некоторых случаях существует несколько id1 одного уровня. Я хочу, чтобы Collect_list принял это во внимание.
Например, на уровне 4 у нас есть два уникальных идентификатора 1: 605026 и 662867. Для идентификатора 2 910387 это соответствует идентификатору 1 662867 на уровне 4. Я не хочу включать 605026 в список.
Список, который я хочу собрать, должен включать только один идентификатор 1 на уровень, захватывая путь к дереву до уровня 1.
Для id2: 910657 этот список должен быть [662867,677555, 200001, 0]
Как этого можно добиться с помощью PySpark API?
@SachinHosmani - извините, я имел в виду уровень 4. Я обновил вопрос.
Добавил ответ. Я понял, что не использовал столбец группы. Должно ли все это быть сделано для каждой группы?
@SachinHosmani, да, в моем реальном случае использования я собираюсь применить функцию, собирающую коллекцию для каждой группы, и объединить ее обратно в новый фрейм данных.
Прежде всего, я бы предложил переименовать столбцы вашей таблицы, чтобы id1
было parent
, а id2
— как node
. Ваша таблица по существу фиксирует отношения родитель-потомок между узлами. Ваша цель — построить полные пути от каждого узла.
Это трудоемкий процесс, поскольку он требует повторных соединений вплоть до начала пути. В худшем случае потребуется столько же соединений, сколько и самый длинный путь.
Вот что я попробовал
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("HierarchicalPath").getOrCreate()
data = [
("A", 1, 0, 200001),
("A", 2, 200001, 677555),
("A", 3, 677555, 605026),
("A", 3, 677555, 662867),
("A", 3, 677555, 676423),
("A", 3, 677555, 659933),
("A", 3, 677555, 660206),
("A", 4, 605026, 675767),
("A", 4, 605026, 681116),
("A", 4, 605026, 913248),
("A", 4, 605026, 910758),
("A", 4, 605026, 913773),
("A", 4, 605026, 698738),
("A", 4, 662867, 910387),
("A", 4, 662867, 910758),
("A", 4, 662867, 910387),
("A", 4, 662867, 910113),
("A", 4, 662867, 910657)
]
columns = ["group_id", "level", "parent", "node"]
df = spark.createDataFrame(data, columns)
df = df.withColumn("path", F.array("parent"))
df.show(truncate=False)
# Iteratively build the full path
max_level = df.select(F.max("level")).collect()[0][0]
current_level = max_level
# Keep a copy of the original df to preserve child-parent relationships
original_df = df
while current_level > 1:
# Repeatedly join to get the parent information from other original df
# and overwrite the "growing" `df`
joined_df = df.alias("child").join(
original_df.alias("parent"),
F.col("child.parent") == F.col("parent.node"),
"left" # Left join because some paths are shorter than others
).select(
F.col("child.group_id"),
F.col("child.level"),
F.col("parent.parent").alias("parent"),
F.col("child.node"),
# Append the latest parent to the path only if it's not null
F.expr("CASE WHEN parent.parent IS NOT NULL THEN array_union(child.path, array(parent.parent)) ELSE child.path END").alias("path")
)
df = joined_df
current_level -= 1
df.show(truncate=False)
# Some massaging in the end to produce accurate results: add the node itself
# to the path and reverse the list
result_df = df.select(
"node",
F.expr("array_union(reverse(path), array(node))").alias("full_path")
).orderBy("level")
result_df.show(truncate=False)
Дает мне это как результат
+------+-----------------------------------+
|node |full_path |
+------+-----------------------------------+
|200001|[0, 200001] |
|677555|[0, 200001, 677555] |
|605026|[0, 200001, 677555, 605026] |
|662867|[0, 200001, 677555, 662867] |
|676423|[0, 200001, 677555, 676423] |
|659933|[0, 200001, 677555, 659933] |
|660206|[0, 200001, 677555, 660206] |
|675767|[0, 200001, 677555, 605026, 675767]|
|681116|[0, 200001, 677555, 605026, 681116]|
|910387|[0, 200001, 677555, 662867, 910387]|
|910758|[0, 200001, 677555, 662867, 910758]|
|910387|[0, 200001, 677555, 662867, 910387]|
|910113|[0, 200001, 677555, 662867, 910113]|
|910657|[0, 200001, 677555, 662867, 910657]|
|913248|[0, 200001, 677555, 605026, 913248]|
|910758|[0, 200001, 677555, 605026, 910758]|
|913773|[0, 200001, 677555, 605026, 913773]|
|698738|[0, 200001, 677555, 605026, 698738]|
+------+-----------------------------------+
Отличное решение. Единственное замечание: номер узла также фигурирует в списке full_path. Этого я не хочу. Но с этим можно легко справиться.
Я не уверен, что понял это: «Например, на втором уровне у нас есть два уникальных идентификатора 1: 605026 и 662867. Для идентификатора 2 910387 это соответствует идентификатору 1 662867 на уровне 4. Я не хочу включать 605026 в список. .». На самом деле на втором уровне у вас есть только один уникальный
id1
, то есть200001
, верно? Вы ошиблись в том, что там описали? Будет полезно, если вы проясните, о какой именно строке вы говорите, как она должна выглядеть на самом деле и почему.