У меня есть фрейм данных следующим образом:
---------------
id | name |
---------------
1 | joe |
1 | john |
2 | jane |
3 | jo |
---------------
Цель состоит в том, чтобы, если столбец «id» дублируется, добавить к нему возрастающее число, начиная с 1.
В Пандах я могу сделать это так:
count_id = df.groupby(['id']).cumcount()
count_num = count_id.replace(0, '').astype(str)
df['id'] += count_num
Я безуспешно пытался использовать ту же логику в PySpark.
Результат должен быть:
id | name |
---------------
1 | joe |
11 | john |
2 | jane |
3 | jo |
---------------
Как мне добиться того же в PySpark? Любая помощь приветствуется.
Чтобы воспроизвести этот вывод, вы можете использовать Window
, чтобы получить row_number
для каждого id
, а затем concat
, чтобы добавить его к id
.
import pyspark.sql.functions as f
from pyspark.sql import Window
w = Window.partitionBy("id").orderBy("name")
df.withColumn("row_number", f.row_number().over(w)-1)\
.withColumn(
"id",
f.when(
f.col("row_number") > 0,
f.concat(f.col("id"), f.col("row_number"))
).otherwise(f.col("id"))
)\
.drop("row_number")\
.show()
#+---+----+
#| id|name|
#+---+----+
#| 1| joe|
#| 11|john|
#| 3| jo|
#| 2|jane|
#+---+----+
Примечание: это преобразует столбец id
в столбец StringType
, если это еще не сделано.
Чтобы получить вывод, который вы изначально указали в вопросе как желаемый результат, вам нужно будет добавить столбец подсчета групп в дополнение к вычислению номера строки. Объединяйте номер строки только в том случае, если количество больше единицы.
import pyspark.sql.functions as f
from pyspark.sql import Window
w = Window.partitionBy("id")
df.withColumn("count", f.count("*").over(w))\
.withColumn("row_number", f.row_number().over(w.orderBy("name")))\
.withColumn(
"id",
f.when(
f.col("count") > 1,
f.concat(f.col("id"), f.col("row_number"))
).otherwise(f.col("id"))
)\
.drop("count", "row_number")\
.show()
#+---+----+
#| id|name|
#+---+----+
#| 11| joe|
#| 12|john|
#| 3| jo|
#| 2|jane|
#+---+----+