Выберите имя столбца для каждой строки для максимального значения в PySpark

У меня есть такой фрейм данных, показаны только два столбца, однако в исходном кадре данных много столбцов.

data = [(("ID1", 3, 5)), (("ID2", 4, 12)), (("ID3", 8, 3))]
df = spark.createDataFrame(data, ["ID", "colA", "colB"])
df.show()

+---+----+----+
| ID|colA|colB|
+---+----+----+
|ID1|   3|   5|
|ID2|   4|  12|
|ID3|   8|   3|
+---+----+----+

Я хочу извлечь имя столбца для каждой строки, которая имеет максимальное значение. Следовательно, ожидаемый результат такой

+---+----+----+-------+
| ID|colA|colB|Max_col|
+---+----+----+-------+
|ID1|   3|   5|   colB|
|ID2|   4|  12|   colB|
|ID3|   8|   3|   colA|
+---+----+----+-------+

В случае равенства, когда colA и colB имеют одинаковое значение, выберите первый столбец.

Как я могу добиться этого в pyspark

Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
6
2
7 662
5
Перейти к ответу Данный вопрос помечен как решенный

Ответы 5

попробуйте следующее:

from  pyspark.sql import functions as F
data = [(("ID1", 3, 5)), (("ID2", 4, 12)), (("ID3", 8, 3))]
df = spark.createDataFrame(data, ["ID", "colA", "colB"])
df.withColumn('max_col',
   F.when(F.col('colA') > F.col('colB'), 'colA').
     otherwise('colB')).show()

Урожайность:

+---+----+----+-------+
| ID|colA|colB|max_col|
+---+----+----+-------+
|ID1|   3|   5|   colB|
|ID2|   4|  12|   colB|
|ID3|   8|   3|   colA|
+---+----+----+-------+

Привет Элиор, решение будет работать, если у меня всего два столбца, однако у меня много столбцов

Hardik Gupta 31.05.2019 08:39

Привет, Хадрид, извини, что я пропустил это.

Elior Malul 31.05.2019 08:52

Привет, Хадрид, извини, что я пропустил это. Схема моего предлагаемого решения такова: (извините, я не смог ее закодировать, для этого вам нужна версия 2.4, которой у меня нет): 1. Добавьте столбец следующим образом: df.withColumn('arr', F.array('col1', 'col2', ... , 'coln')) 2. Добавьте столбец maxval: withColumn('max_val', F.array_max('arr')) 3. Наконец, используя функцию map (из СДР), выберите столбец со значением, равным значению в столбце max_val.

Elior Malul 31.05.2019 09:02

Есть несколько вариантов достижения этого. Я являюсь примером для одного и могу дать подсказку для остальных.

from pyspark.sql import functions as F
from pyspark.sql.window import Window as W
from pyspark.sql import types as T

data = [(("ID1", 3, 5)), (("ID2", 4, 12)), (("ID3", 8, 3))]
df = spark.createDataFrame(data, ["ID", "colA", "colB"])
df.show()

+---+----+----+
| ID|colA|colB|
+---+----+----+
|ID1|   3|   5|
|ID2|   4|  12|
|ID3|   8|   3|
+---+----+----+

#Below F.array creates an array of column name and value pair like [['colA', 3], ['colB', 5]] then F.explode break this array into rows like different column and value pair should be in different rows

df = df.withColumn(
    "max_val",
    F.explode(
        F.array([
            F.array([F.lit(cl), F.col(cl)]) for cl in df.columns[1:]
        ])
    )
)
df.show()
+---+----+----+----------+
| ID|colA|colB|   max_val|
+---+----+----+----------+
|ID1|   3|   5| [colA, 3]|
|ID1|   3|   5| [colB, 5]|
|ID2|   4|  12| [colA, 4]|
|ID2|   4|  12|[colB, 12]|
|ID3|   8|   3| [colA, 8]|
|ID3|   8|   3| [colB, 3]|
+---+----+----+----------+

#Then select columns so that column name and value should be in different columns
df = df.select(
    "ID", 
    "colA", 
    "colB", 
    F.col("max_val").getItem(0).alias("col_name"),
    F.col("max_val").getItem(1).cast(T.IntegerType()).alias("col_value"),
)
df.show()
+---+----+----+--------+---------+
| ID|colA|colB|col_name|col_value|
+---+----+----+--------+---------+
|ID1|   3|   5|    colA|        3|
|ID1|   3|   5|    colB|        5|
|ID2|   4|  12|    colA|        4|
|ID2|   4|  12|    colB|       12|
|ID3|   8|   3|    colA|        8|
|ID3|   8|   3|    colB|        3|
+---+----+----+--------+---------+

# Rank column values based on ID in desc order
df = df.withColumn(
    "rank",
    F.rank().over(W.partitionBy("ID").orderBy(F.col("col_value").desc()))
)
df.show()
+---+----+----+--------+---------+----+
| ID|colA|colB|col_name|col_value|rank|
+---+----+----+--------+---------+----+
|ID2|   4|  12|    colB|       12|   1|
|ID2|   4|  12|    colA|        4|   2|
|ID3|   8|   3|    colA|        8|   1|
|ID3|   8|   3|    colB|        3|   2|
|ID1|   3|   5|    colB|        5|   1|
|ID1|   3|   5|    colA|        3|   2|
+---+----+----+--------+---------+----+

#Finally Filter rank = 1 as max value have rank 1 because we ranked desc value
df.where("rank=1").show()
+---+----+----+--------+---------+----+
| ID|colA|colB|col_name|col_value|rank|
+---+----+----+--------+---------+----+
|ID2|   4|  12|    colB|       12|   1|
|ID3|   8|   3|    colA|        8|   1|
|ID1|   3|   5|    colB|        5|   1|
+---+----+----+--------+---------+----+

Другие варианты -

  • Используйте UDF на своей базе df и верните имя столбца с максимальным значением
  • В том же примере после создания столбца имени и значения столбца вместо ранга используйте группу ID, возьмите max col_value. Затем присоединитесь к предыдущему df.

Нам нужно получить максимальное значение, поэтому ранг = 1 не будет работать, верно?

Hardik Gupta 31.05.2019 09:12

@Hardikgupta rank=1 работает, так как я указал ранг в порядке убывания. ex 5 имеют ранг 1, 3 имеют ранг 2

Rakesh Kumar 31.05.2019 09:53

для окончательного вывода в первой строке будет холодно, верно?

Hardik Gupta 31.05.2019 11:02

@Hardikgupta да, верно, это должно быть colB, только что проверил, это проблема с типом данных col_value имеет строковый тип. Обновление ответа, чтобы дать правильный результат.

Rakesh Kumar 31.05.2019 11:07

@Hardikgupta Обновил ответ. Проблема возникла из-за несоответствия типов данных. преобразовал это в целочисленный тип F.col("max_val").getItem(1).cast(T.IntegerType()).alias("col‌​_value") и добавил импорт для этого вверху.

Rakesh Kumar 31.05.2019 11:11

@Hardikgupta, если это решит вашу проблему, примите ответ.

Rakesh Kumar 03.06.2019 11:33

эй, спасибо за ваш подробный ответ, очень ценю это

Hardik Gupta 03.06.2019 17:27

Вы можете использовать RDD API, чтобы добавить новый столбец:

df.rdd.map(lambda r: r.asDict())\
       .map(lambda r: Row(Max_col=max([i for i in r.items() if i[0] != 'ID'], 
                                      key=lambda kv: kv[1])[0], **r) )\
       .toDF()

В результате чего:

+---+-------+----+----+
| ID|Max_col|colA|colB|
+---+-------+----+----+
|ID1|   colB|   3|   5|
|ID2|   colB|   4|  12|
|ID3|   colA|   8|   3|
+---+-------+----+----+
Ответ принят как подходящий

Вы можете использовать UDF в каждой строке для вычислений по строкам и использовать struct для передачи нескольких столбцов в udf. Надеюсь это поможет.

from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from operator import itemgetter

data = [(("ID1", 3, 5,78)), (("ID2", 4, 12,45)), (("ID3", 70, 3,67))]
df = spark.createDataFrame(data, ["ID", "colA", "colB","colC"])
df.show()

+---+----+----+----+
| ID|colA|colB|colC|
+---+----+----+----+
|ID1|   3|   5|  78|
|ID2|   4|  12|  45|
|ID3|  70|   3|  70|
+---+----+----+----+
cols = df.columns

# to get max of values in a row
maxcol = F.udf(lambda row: max(row), IntegerType())
maxDF = df.withColumn("maxval", maxcol(F.struct([df[x] for x in df.columns[1:]])))
maxDF.show()

+---+----+----+----+-------+
|ID |colA|colB|colC|Max_col|
+---+----+----+----+-------+
|ID1|3   |5   |78  |78     |
|ID2|4   |12  |45  |45     |
|ID3|70  |3   |67  |70     |
+---+----+----+----+-------+

# to get max of value & corresponding column name

schema=StructType([StructField('maxval',IntegerType()),StructField('maxval_colname',StringType())])

maxcol = F.udf(lambda row: max(row,key=itemgetter(0)), schema)
maxDF = df.withColumn('maxfield', maxcol(F.struct([F.struct(df[x],F.lit(x)) for x in df.columns[1:]]))).\
select(df.columns+['maxfield.maxval','maxfield.maxval_colname'])

+---+----+----+----+------+--------------+
| ID|colA|colB|colC|maxval|maxval_colname|
+---+----+----+----+------+--------------+
|ID1| 3  | 5  | 78 | 78   | colC         |
|ID2| 4  | 12 | 45 | 45   | colC         |
|ID3| 70 | 3  | 67 | 68   | colA         |
+---+----+----+----+------+--------------+

Но что, если вам нужно имя столбца?

thebluephantom 25.12.2019 23:14

Значит, вам нужно имя столбца с максимальным значением.?

Suresh 26.12.2019 07:20

Действительно, как утверждают другие ответы.

thebluephantom 26.12.2019 09:52

Вы можете создать кортеж значений, столбцов (df[x],x) внутри структуры и получить максимум на нем.

Suresh 26.12.2019 10:37

Расширение того, что сделал Суреш.... возвращение соответствующего имени столбца

from pyspark.sql import functions as f
from pyspark.sql.types import IntegerType, StringType

import numpy as np

data = [(("ID1", 3, 5,78)), (("ID2", 4, 12,45)), (("ID3", 68, 3,67))]
df = spark.createDataFrame(data, ["ID", "colA", "colB","colC"])
df.show()

cols = df.columns
maxcol = f.udf(lambda row: cols[row.index(max(row)) +1], StringType())

maxDF = df.withColumn("Max_col", maxcol(f.struct([df[x] for x in df.columns[1:]])))
maxDF.show(truncate=False)

+---+----+----+----+------+
|ID |colA|colB|colC|Max_col|
+---+----+----+----+------+
|ID1|3   |5   |78  |colC  |
|ID2|4   |12  |45  |colC  |
|ID3|68  |3   |67  |colA  |
+---+----+----+----+------+

Другие вопросы по теме