Я пытаюсь реализовать некоторую логику, чтобы установить связь между идентификатором и ссылкой на основе приведенной ниже логики.
Логика -
Входной фрейм данных -
+---+----+
| id|link|
+---+----+
| 1| 2|
| 3| 1|
| 4| 2|
| 6| 5|
| 9| 7|
| 9| 10|
+---+----+
Я пытаюсь добиться результата ниже
+---+----+
| Id|Link|
+---+----+
| 1| 2|
| 1| 3|
| 1| 4|
| 2| 1|
| 2| 3|
| 2| 4|
| 3| 1|
| 3| 2|
| 3| 4|
| 4| 1|
| 4| 2|
| 4| 3|
| 5| 6|
| 6| 5|
| 7| 9|
| 7| 10|
| 9| 7|
| 9| 10|
| 10| 7|
| 10| 9|
+---+----+
Я пробовал много способов, но это совсем не работает. Я также пробовал следующие коды
df = spark.createDataFrame([(1, 2), (3, 1), (4, 2), (6, 5), (9, 7), (9, 10)], ["id", "link"])
ids = df.select("Id").distinct().rdd.flatMap(lambda x: x).collect()
links = df.select("Link").distinct().rdd.flatMap(lambda x: x).collect()
combinations = [(id, link) for id in ids for link in links]
df_combinations = spark.createDataFrame(combinations, ["Id", "Link"])
result = df_combinations.join(df, ["Id", "Link"], "left_anti").union(df).dropDuplicates()
result = result.sort(asc("Id"), asc("Link"))
И
df = spark.createDataFrame([(1, 2), (3, 1), (4, 2), (6, 5), (9, 7), (9, 10)], ["id", "link"])
combinations = df.alias("a").crossJoin(df.alias("b")) \
.filter(F.col("a.id") != F.col("b.id"))\
.select(col("a.id").alias("a_id"), col("b.id").alias("b_id"), col("a.link").alias("a_link"), col("b.link").alias("b_link"))
window = Window.partitionBy("a_id").orderBy("a_id", "b_link")
paths = combinations.groupBy("a_id", "b_link") \
.agg(F.first("b_id").over(window).alias("id")) \
.groupBy("id").agg(F.collect_list("b_link").alias("links"))
result = paths.select("id", F.explode("links").alias("link"))
result = result.union(df.selectExpr("id as id_", "link as link_"))
Любая помощь приветствуется.
Это не общий подход, но вы можете использовать пакет graphframes. Вам может быть сложно настроить его, но его можно использовать, результат прост.
import os
sc.addPyFile(os.path.expanduser('graphframes-0.8.1-spark3.0-s_2.12.jar'))
from graphframes import *
e = df.select('id', 'link').toDF('src', 'dst')
v = e.select('src').toDF('id') \
.union(e.select('dst')) \
.distinct()
g = GraphFrame(v, e)
sc.setCheckpointDir("/tmp/graphframes")
df = g.connectedComponents()
df.join(df.withColumnRenamed('id', 'link'), ['component'], 'inner') \
.drop('component') \
.filter('id != link') \
.show()
+---+----+
| id|link|
+---+----+
| 7| 10|
| 7| 9|
| 3| 2|
| 3| 4|
| 3| 1|
| 5| 6|
| 6| 5|
| 9| 10|
| 9| 7|
| 1| 2|
| 1| 4|
| 1| 3|
| 10| 9|
| 10| 7|
| 4| 2|
| 4| 1|
| 4| 3|
| 2| 4|
| 2| 1|
| 2| 3|
+---+----+
ConnectedComponents метод возвращает идентификатор компонента для каждой вершины, который уникален для каждой группы вершин (соединенных ребром и разделенных, если нет ребра с другим компонентом). Таким образом, вы можете сделать декартово произведение для каждого компонента без самой вершины.
Добавлен ответ
Вдохновленный описанным выше подходом, я поднял голову и нашел пакет networkx.
import networkx as nx
df = df.toPandas()
G = nx.from_pandas_edgelist(df, 'id', 'link')
components = [[list(c)] for c in nx.connected_components(G)]
df2 = spark.createDataFrame(components, ['array']) \
.withColumn('component', f.monotonically_increasing_id()) \
.select('component', f.explode('array').alias('id'))
df2.join(df2.withColumnRenamed('id', 'link'), ['component'], 'inner') \
.drop('component') \
.filter('id != link') \
.show()
+---+----+
| id|link|
+---+----+
| 1| 2|
| 1| 3|
| 1| 4|
| 2| 1|
| 2| 3|
| 2| 4|
| 3| 1|
| 3| 2|
| 3| 4|
| 4| 1|
| 4| 2|
| 4| 3|
| 5| 6|
| 6| 5|
| 9| 10|
| 9| 7|
| 10| 9|
| 10| 7|
| 7| 9|
| 7| 10|
+---+----+
Знаете ли вы какой-либо процесс настройки библиотеки Python из Jar на Pycharm. На данный момент я запускаю весь проект на Pycharm. Как только он будет успешно протестирован на Local Pycharm, я запущу его на EMR.
Ты прав. Пробовал настроить библиотеку. Добавил баночку. Но получаю ошибку компиляции на Pycharm для этой строки кода «из импорта графических фреймов *».