PySpark создает связь между столбцами DataFrame

Я пытаюсь реализовать некоторую логику, чтобы установить связь между идентификатором и ссылкой на основе приведенной ниже логики.

Логика -

  • если id 1 имеет ссылку на 2, а 2 имеет ссылку на 3, то отношение 1 -> 2, 1 -> 3, 2 -> 1, 2 -> 3, 3 -> 1, 3 -> 2
  • Точно так же, если 1 с 4, 4 с 7 и 7 с 5, то соотношение 1 -> 4, 1 -> 5, 1 -> 7, 4 -> 1, 4 -> 5, 4 -> 7, 5 -> 1 , 5 -> 4, 5 -> 7

Входной фрейм данных -

+---+----+
| id|link|
+---+----+
|  1|   2|
|  3|   1|
|  4|   2|
|  6|   5|
|  9|   7|
|  9|  10|
+---+----+

Я пытаюсь добиться результата ниже

+---+----+
| Id|Link|
+---+----+
|  1|   2|
|  1|   3|
|  1|   4|
|  2|   1|
|  2|   3|
|  2|   4|
|  3|   1|
|  3|   2|
|  3|   4|
|  4|   1|
|  4|   2|
|  4|   3|
|  5|   6|
|  6|   5|
|  7|   9|
|  7|  10|
|  9|   7|
|  9|  10|
| 10|   7|
| 10|   9|
+---+----+

Я пробовал много способов, но это совсем не работает. Я также пробовал следующие коды

df = spark.createDataFrame([(1, 2), (3, 1), (4, 2), (6, 5), (9, 7), (9, 10)], ["id", "link"])
ids = df.select("Id").distinct().rdd.flatMap(lambda x: x).collect()
links = df.select("Link").distinct().rdd.flatMap(lambda x: x).collect()
combinations = [(id, link) for id in ids for link in links]
df_combinations = spark.createDataFrame(combinations, ["Id", "Link"])
result = df_combinations.join(df, ["Id", "Link"], "left_anti").union(df).dropDuplicates()
result = result.sort(asc("Id"), asc("Link"))

И

df = spark.createDataFrame([(1, 2), (3, 1), (4, 2), (6, 5), (9, 7), (9, 10)], ["id", "link"])

combinations = df.alias("a").crossJoin(df.alias("b")) \
    .filter(F.col("a.id") != F.col("b.id"))\
    .select(col("a.id").alias("a_id"), col("b.id").alias("b_id"), col("a.link").alias("a_link"), col("b.link").alias("b_link"))

window = Window.partitionBy("a_id").orderBy("a_id", "b_link")
paths = combinations.groupBy("a_id", "b_link") \
    .agg(F.first("b_id").over(window).alias("id")) \
    .groupBy("id").agg(F.collect_list("b_link").alias("links"))

result = paths.select("id", F.explode("links").alias("link"))
result = result.union(df.selectExpr("id as id_", "link as link_"))

Любая помощь приветствуется.

Laravel с Turbo JS
Laravel с Turbo JS
Turbo - это библиотека JavaScript для упрощения создания быстрых и высокоинтерактивных веб-приложений. Она работает с помощью техники под названием...
Типы ввода HTML: Лучшие практики и советы
Типы ввода HTML: Лучшие практики и советы
HTML, или HyperText Markup Language , является стандартным языком разметки, используемым для создания веб-страниц. Типы ввода HTML - это различные...
Аутсорсинг разработки PHP для индивидуальных веб-решений
Аутсорсинг разработки PHP для индивидуальных веб-решений
Услуги PHP-разработки могут быть экономически эффективным решением для компаний, которые ищут высококачественные услуги веб-разработки по доступным...
Понимание Python и переход к SQL
Понимание Python и переход к SQL
Перед нами лабораторная работа по BloodOath:
Слишком много useState? Давайте useReducer!
Слишком много useState? Давайте useReducer!
Современный фронтенд похож на старую добрую веб-разработку, но с одной загвоздкой: страница в браузере так же сложна, как и бэкенд.
Узнайте, как использовать теги <ul> и <li> для создания неупорядоченных списков в HTML
Узнайте, как использовать теги <ul> и <li> для создания неупорядоченных списков в HTML
HTML предоставляет множество тегов для структурирования и организации содержимого веб-страницы. Одним из наиболее часто используемых тегов для...
3
0
72
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Это не общий подход, но вы можете использовать пакет graphframes. Вам может быть сложно настроить его, но его можно использовать, результат прост.

import os
sc.addPyFile(os.path.expanduser('graphframes-0.8.1-spark3.0-s_2.12.jar'))

from graphframes import *

e = df.select('id', 'link').toDF('src', 'dst')
v = e.select('src').toDF('id') \
  .union(e.select('dst')) \
  .distinct()

g = GraphFrame(v, e)

sc.setCheckpointDir("/tmp/graphframes")
df = g.connectedComponents()

df.join(df.withColumnRenamed('id', 'link'), ['component'], 'inner') \
  .drop('component') \
  .filter('id != link') \
  .show()

+---+----+
| id|link|
+---+----+
|  7|  10|
|  7|   9|
|  3|   2|
|  3|   4|
|  3|   1|
|  5|   6|
|  6|   5|
|  9|  10|
|  9|   7|
|  1|   2|
|  1|   4|
|  1|   3|
| 10|   9|
| 10|   7|
|  4|   2|
|  4|   1|
|  4|   3|
|  2|   4|
|  2|   1|
|  2|   3|
+---+----+

ConnectedComponents метод возвращает идентификатор компонента для каждой вершины, который уникален для каждой группы вершин (соединенных ребром и разделенных, если нет ребра с другим компонентом). Таким образом, вы можете сделать декартово произведение для каждого компонента без самой вершины.

Добавлен ответ

Вдохновленный описанным выше подходом, я поднял голову и нашел пакет networkx.

import networkx as nx

df = df.toPandas()
G = nx.from_pandas_edgelist(df, 'id', 'link')
components = [[list(c)] for c in nx.connected_components(G)]

df2 = spark.createDataFrame(components, ['array']) \
  .withColumn('component', f.monotonically_increasing_id()) \
  .select('component', f.explode('array').alias('id'))

df2.join(df2.withColumnRenamed('id', 'link'), ['component'], 'inner') \
  .drop('component') \
  .filter('id != link') \
  .show()

+---+----+
| id|link|
+---+----+
|  1|   2|
|  1|   3|
|  1|   4|
|  2|   1|
|  2|   3|
|  2|   4|
|  3|   1|
|  3|   2|
|  3|   4|
|  4|   1|
|  4|   2|
|  4|   3|
|  5|   6|
|  6|   5|
|  9|  10|
|  9|   7|
| 10|   9|
| 10|   7|
|  7|   9|
|  7|  10|
+---+----+

Ты прав. Пробовал настроить библиотеку. Добавил баночку. Но получаю ошибку компиляции на Pycharm для этой строки кода «из импорта графических фреймов *».

Avijit 14.02.2023 22:21

Знаете ли вы какой-либо процесс настройки библиотеки Python из Jar на Pycharm. На данный момент я запускаю весь проект на Pycharm. Как только он будет успешно протестирован на Local Pycharm, я запущу его на EMR.

Avijit 15.02.2023 00:06

Другие вопросы по теме