Выражение искры переименовать список столбцов после агрегирования

Я написал ниже код для группировки и агрегирования столбцов

 val gmList = List("gc1","gc2","gc3")
 val aList = List("val1","val2","val3","val4","val5")

 val cype = "first"

 val exprs = aList.map((_ -> cype )).toMap

 dfgroupBy(gmList.map (col): _*).agg (exprs).show

но это создает столбцы с добавлением имени агрегации во всех столбцах, как показано ниже

поэтому я хочу сначала использовать псевдоним этого имени (val1) -> val1, я хочу сделать этот код универсальным как часть exprs

  +----------+----------+-------------+-------------------------+------------------+---------------------------+------------------------+-------------------+
 |    gc1   |  gc2     | gc3         |        first(val1)      |      first(val2)|       first(val3)          |       first(val4)      |       first(val5) |
 +----------+----------+-------------+-------------------------+------------------+---------------------------+------------------------+-------------------+
2
0
902
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Вы можете немного изменить способ генерации выражения и использовать там функцию alias:

import org.apache.spark.sql.functions.col
val aList = List("val1","val2","val3","val4","val5")
val exprs = aList.map(c => first(col(c)).alias(c) )
dfgroupBy( gmList.map(col) : _*).agg(exprs.head , exprs.tail: _*).show

здесь проблема "первая" также исправлена, я хочу сделать это также genric, например, exprs должны принимать любые агрегации, такие как "first" "mean" "min" и т. д.

kcoder 26.10.2018 09:18

Как бы вы соотносили, какая функция должна применяться к какому столбцу?

philantrovert 26.10.2018 11:51

в моем требовании я хочу применить одну и ту же функцию для всех столбцов

kcoder 26.10.2018 12:36
Ответ принят как подходящий

Один из подходов - присвоить агрегированным столбцам псевдонимы исходных имен столбцов в последующем select. Я также предлагаю обобщить единую агрегатную функцию (например, first) до списка функций, как показано ниже:

import org.apache.spark.sql.functions._

val df = Seq(
  (1, 10, "a1", "a2", "a3"),
  (1, 10, "b1", "b2", "b3"),
  (2, 20, "c1", "c2", "c3"),
  (2, 30, "d1", "d2", "d3"),
  (2, 30, "e1", "e2", "e3")
).toDF("gc1", "gc2", "val1", "val2", "val3")

val gmList = List("gc1", "gc2")
val aList = List("val1", "val2", "val3")

// Populate with different aggregate methods for individual columns if necessary
val fList = List.fill(aList.size)("first")

val afPairs = aList.zip(fList)
// afPairs: List[(String, String)] = List((val1,first), (val2,first), (val3,first))

df.
  groupBy(gmList.map(col): _*).agg(afPairs.toMap).
  select(gmList.map(col) ::: afPairs.map{ case (v, f) => col(s"$f($v)").as(v) }: _*).
  show
// +---+---+----+----+----+
// |gc1|gc2|val1|val2|val3|
// +---+---+----+----+----+
// |  2| 20|  c1|  c2|  c3|
// |  1| 10|  a1|  a2|  a3|
// |  2| 30|  d1|  d2|  d3|
// +---+---+----+----+----+

Вот более общая версия, которая будет работать с любыми агрегатными функциями и не требует предварительного именования агрегатных столбцов. Создайте свой сгруппированный df, как обычно, затем используйте:

val colRegex = raw"^.+\((.*?)\)".r
val newCols = df.columns.map(c => col(c).as(colRegex.replaceAllIn(c, m => m.group(1))))
df.select(newCols: _*)

При этом будет извлечено только то, что указано в скобках, независимо от того, какая агрегатная функция вызывается (например, first(val) -> val, sum(val) -> val, count(val) -> val и т. д.).

Другие вопросы по теме