Pyspark - выберите отдельные значения из каждого столбца

Я пытаюсь найти все отдельные значения в каждом столбце фрейма данных и показать в одной таблице.

Пример данных:

|-----------|-----------|-----------|
|   COL_1   |   COL_2   |   COL_3   | 
|-----------|-----------|-----------|
|     A     |     C     |     D     |
|     A     |     C     |     D     |
|     A     |     C     |     E     |
|     B     |     C     |     E     |
|     B     |     C     |     F     |
|     B     |     C     |     F     |
|-----------|-----------|-----------|

Пример вывода:

|-----------|-----------|-----------|
|   COL_1   |   COL_2   |   COL_3   | 
|-----------|-----------|-----------|
|     A     |     C     |     D     |
|     B     |           |     E     |
|           |           |     F     |
|-----------|-----------|-----------|

Это вообще возможно? Мне удалось сделать это в отдельных таблицах, но было бы намного лучше все в одной таблице.

Любые идеи?

1
0
5 041
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Самым простым здесь было бы использовать pyspark.sql.functions.collect_set для всех столбцов:

import pyspark.sql.functions as f
df.select(*[f.collect_set(c).alias(c) for c in df.columns]).show()
#+------+-----+---------+
#| COL_1|COL_2|    COL_3|
#+------+-----+---------+
#|[B, A]|  [C]|[F, E, D]|
#+------+-----+---------+

Очевидно, это возвращает данные в виде одной строки.

Если вместо этого вы хотите, чтобы результат был таким, как вы написали в своем вопросе (одна строка на уникальное значение для каждого столбца), это выполнимо, но требует довольно много гимнастики pyspark (и любое решение, вероятно, будет гораздо менее эффективным).

Тем не менее, я представляю вам несколько вариантов:

Вариант 1. Расчленить и соединить

Вы можете использовать pyspark.sql.functions.posexplode для разделения элементов в наборе значений для каждого столбца вместе с индексом в массиве. Сделайте это для каждого столбца отдельно, а затем соедините полученный список DataFrames с помощью functools.reduce:

from functools import reduce 

unique_row = df.select(*[f.collect_set(c).alias(c) for c in df.columns])

final_df = reduce(
    lambda a, b: a.join(b, how="outer", on="pos"),
    (unique_row.select(f.posexplode(c).alias("pos", c)) for c in unique_row.columns)
).drop("pos")

final_df.show()
#+-----+-----+-----+
#|COL_1|COL_2|COL_3|
#+-----+-----+-----+
#|    A| null|    E|
#| null| null|    D|
#|    B|    C|    F|
#+-----+-----+-----+

Вариант 2. Выбрать по позиции

Сначала вычислите размер максимального массива и сохраните его в новом столбце max_length. Затем выберите элементы из каждого массива, если значение существует в этом индексе.

Мы снова используем pyspark.sql.functions.posexplode, но на этот раз просто для создания столбца, представляющего индекс в каждом массиве для извлечения.

Наконец, мы используем этот трюк, который позволяет вам использовать значение столбца в качестве параметра.

final_df= df.select(*[f.collect_set(c).alias(c) for c in df.columns])\
    .withColumn("max_length", f.greatest(*[f.size(c) for c in df.columns]))\
    .select("*", f.expr("posexplode(split(repeat(',', max_length-1), ','))"))\
    .select(
        *[
            f.expr(
                "case when size({c}) > pos then {c}[pos] else null end AS {c}".format(c=c))
            for c in df.columns
        ]
    )

final_df.show()
#+-----+-----+-----+
#|COL_1|COL_2|COL_3|
#+-----+-----+-----+
#|    B|    C|    F|
#|    A| null|    E|
#| null| null|    D|
#+-----+-----+-----+

Другие вопросы по теме