У меня есть два фрейма данных df1
+---+---+----------+
| n|val| distances|
+---+---+----------+
| 1| 1|0.27308652|
| 2| 1|0.24969208|
| 3| 1|0.21314497|
+---+---+----------+
и df2
+---+---+----------+
| x1| x2| w|
+---+---+----------+
| 1| 2|0.03103427|
| 1| 4|0.19012526|
| 1| 10|0.26805446|
| 1| 8|0.26825935|
+---+---+----------+
Я хочу добавить новый столбец в df1
под названием gamma
, который будет содержать сумму значений w
из df2
, когда df1.n == df2.x1 OR df1.n == df2.x2
Я попытался использовать udf, но, очевидно, выбор из другого фрейма данных не сработает, потому что значения должны быть определены перед расчетом
gamma_udf = udf(lambda n: float(df2.filter("x1 = %d OR x2 = %d"%(n,n)).groupBy().sum('w').rdd.map(lambda x: x).collect()[0]), FloatType())
df1.withColumn('gamma1', gamma_udf('n'))
Есть ли способ сделать это с join
или groupby
без использования циклов?
Вы не можете ссылаться на DataFrame внутри udf
. Как вы уже упоминали, эту проблему лучше всего решить с помощью join
.
IIUC, вы ищете что-то вроде:
from pyspark.sql import Window
import pyspark.sql.functions as F
df1.alias("L").join(df2.alias("R"), (df1.n == df2.x1) | (df1.n == df2.x2), how = "left")\
.select("L.*", F.sum("w").over(Window.partitionBy("n")).alias("gamma"))\
.distinct()\
.show()
#+---+---+----------+----------+
#| n|val| distances| gamma|
#+---+---+----------+----------+
#| 1| 1|0.27308652|0.75747334|
#| 3| 1|0.21314497| null|
#| 2| 1|0.24969208|0.03103427|
#+---+---+----------+----------+
Или, если вам удобнее синтаксис pyspark-sql
, вы можете зарегистрировать временные таблицы и сделать:
df1.registerTempTable("df1")
df2.registerTempTable("df2")
sqlCtx.sql(
"SELECT DISTINCT L.*, SUM(R.w) OVER (PARTITION BY L.n) AS gamma "
"FROM df1 L LEFT JOIN df2 R ON L.n = R.x1 OR L.n = R.x2"
).show()
#+---+---+----------+----------+
#| n|val| distances| gamma|
#+---+---+----------+----------+
#| 1| 1|0.27308652|0.75747334|
#| 3| 1|0.21314497| null|
#| 2| 1|0.24969208|0.03103427|
#+---+---+----------+----------+
Объяснение
В обоих случаях мы выполняем оставил присоединиться от df1
к df2
. Это сохранит все строки в df1
независимо от того, есть ли совпадения.
Предложение соединения - это условие, которое вы указали в своем вопросе. Таким образом, будут объединены все строки в df2
, где либо x1
, либо x2
равно n
.
Затем выберите все строки из левых таблиц, плюс мы группируем (разбиваем) по n
и суммируем значения w
. Будет получена сумма по всем строкам, которые соответствуют условию соединения, для каждого значения n
.
Наконец, мы возвращаем только отдельные строки, чтобы исключить дубликаты.
Что, если размер df2
намного больше, чем размер df1
, будет ли сумма взята через все значения в df2
?
Сумма будет по всем значениям в df2, которые соответствуют условию соединения по разделу на. У тебя это не работает? Если да, то не могли бы вы привести пример?
Я не уверен, немного новичок в pyspark. Я просто пытаюсь понять, как работает твой ответ.
df1.join(df2, (df1.n == df2.x1) | (df1.n == df2.x2)).groupBy(df1.n).sum("w")
?