Скажем, у меня есть следующий фрейм данных:
val df = spark.sparkContext.parallelize(Seq(
("A", "12", 50),
("A", "13", 100),
("A", "14", 30),
("B", "15", 40),
("C", "16", 60),
("C", "17", 70)
)).toDF("Name", "Time", "Value")
и я поворачиваюсь по «Времени»:
val pivoted = df.groupBy($"Name").
pivot("Time").
agg(coalesce(sum($"Value"),lit(0)))
pivoted.show()
что приводит к:
+----+----+----+----+----+----+----+
|Name| 12| 13| 14| 15| 16| 17|
+----+----+----+----+----+----+----+
| B|null|null|null| 40|null|null|
| C|null|null|null|null| 60| 70|
| A| 50| 100| 30|null|null|null|
+----+----+----+----+----+----+----+
До тех пор все в порядке. Я хочу добавить столбец рядом с «столбцом 17», вычислив сумму каждой строки. Таким образом, ожидаемый результат должен быть:
+----+----+----+----+----+----+----+----+
|Name| 12| 13| 14| 15| 16| 17|sum |
+----+----+----+----+----+----+----+----+
| B|null|null|null| 40|null|null|40 |
| C|null|null|null|null| 60| 70|130 |
| A| 50| 100| 30|null|null|null|180 |
+----+----+----+----+----+----+----+----+
(Noobly) Я пытался добавить 'withColumn', но это не удалось:
val pivotedWithSummation = df.groupBy($"Name").
pivot("Time").
agg(coalesce(sum($"Value"),lit(0))).
withColumn("summation", sum($"Value"))
Я пришел с этим отвечать, но не смог его применить :/
Я использую Scala v.2.11.8 и Spark 2.3.1.
Заранее спасибо!
получить сумму значений из исходного входного фрейма данных и присоединиться к вашему сводному фрейму данных
scala> val pivoted = df.groupBy($"Name").pivot("Time").agg(coalesce(sum($"Value"),lit(0)))
pivoted: org.apache.spark.sql.DataFrame = [Name: string, 12: bigint ... 5 more fields]
scala> pivoted.show
+----+----+----+----+----+----+----+
|Name| 12| 13| 14| 15| 16| 17|
+----+----+----+----+----+----+----+
| B|null|null|null| 40|null|null|
| C|null|null|null|null| 60| 70|
| A| 50| 100| 30|null|null|null|
+----+----+----+----+----+----+----+
scala> val sumOfValuesDF = df.groupBy($"Name").sum("value")
sumOfValuesDF: org.apache.spark.sql.DataFrame = [Name: string, sum(value): bigint]
scala> sumOfValuesDF.show
+----+----------+
|Name|sum(value)|
+----+----------+
| B| 40|
| C| 130|
| A| 180|
+----+----------+
scala> val pivotedWithSummation = pivoted.join(sumOfValuesDF, "Name")
pivotedWithSummation: org.apache.spark.sql.DataFrame = [Name: string, 12: bigint ... 6 more fields]
scala> pivotedWithSummation.show
+----+----+----+----+----+----+----+----------+
|Name| 12| 13| 14| 15| 16| 17|sum(value)|
+----+----+----+----+----+----+----+----------+
| B|null|null|null| 40|null|null| 40|
| C|null|null|null|null| 60| 70| 130|
| A| 50| 100| 30|null|null|null| 180|
+----+----+----+----+----+----+----+----------+