Добавление уникального последовательного номера строки в фрейм данных в pyspark

Я хочу добавить уникальный номер строки в свой фрейм данных в pyspark и не хочу использовать методы monotonicallyIncreasingId и partitionBy. Я думаю, что этот вопрос может быть копией аналогичных вопросов, заданных ранее, но я все еще ищу совета, правильно ли я делаю это или нет. Ниже приведен фрагмент моего кода: У меня есть файл csv со следующим набором входных записей:

1,VIKRANT SINGH RANA    ,NOIDA   ,10000
3,GOVIND NIMBHAL        ,DWARKA  ,92000
2,RAGHVENDRA KUMAR GUPTA,GURGAON ,50000
4,ABHIJAN SINHA         ,SAKET   ,65000
5,SUPER DEVELOPER       ,USA     ,50000
6,RAJAT TYAGI           ,UP      ,65000
7,AJAY SHARMA           ,NOIDA   ,70000
8,SIDDHARTH BASU        ,SAKET   ,72000
9,ROBERT                ,GURGAON ,70000

и я загрузил этот CSV-файл в фреймворк.

PATH_TO_FILE="file:///u/user/vikrant/testdata/EMP_FILE.csv"

emp_df = spark.read.format("com.databricks.spark.csv") \
  .option("mode", "DROPMALFORMED") \
  .option("header", "true") \
  .option("inferschema", "true") \
  .option("delimiter", ",").load(PATH_TO_FILE)

+------+--------------------+--------+----------+
|emp_id|            emp_name|emp_city|emp_salary|
+------+--------------------+--------+----------+
|     1|VIKRANT SINGH RAN...|NOIDA   |     10000|
|     3|GOVIND NIMBHAL   ...|DWARKA  |     92000|
|     2|RAGHVENDRA KUMAR ...|GURGAON |     50000|
|     4|ABHIJAN SINHA    ...|SAKET   |     65000|
|     5|SUPER DEVELOPER  ...|USA     |     50000|
|     6|RAJAT TYAGI      ...|UP      |     65000|
|     7|AJAY SHARMA      ...|NOIDA   |     70000|
|     8|SIDDHARTH BASU   ...|SAKET   |     72000|
|     9|ROBERT           ...|GURGAON |     70000|
+------+--------------------+--------+----------+

empRDD = emp_df.rdd.zipWithIndex()
newRDD=empRDD.map(lambda x: (list(x[0]) + [x[1]]))
 newRDD.take(2);
[[1, u'VIKRANT SINGH RANA    ', u'NOIDA   ', 10000, 0], [3, u'GOVIND NIMBHAL        ', u'DWARKA  ', 92000, 1]]

когда я включил значение int в свой список, я потерял схему фрейма данных.

newdf=newRDD.toDF(['emp_id','emp_name','emp_city','emp_salary','row_id'])
newdf.show();

+------+--------------------+--------+----------+------+
|emp_id|            emp_name|emp_city|emp_salary|row_id|
+------+--------------------+--------+----------+------+
|     1|VIKRANT SINGH RAN...|NOIDA   |     10000|     0|
|     3|GOVIND NIMBHAL   ...|DWARKA  |     92000|     1|
|     2|RAGHVENDRA KUMAR ...|GURGAON |     50000|     2|
|     4|ABHIJAN SINHA    ...|SAKET   |     65000|     3|
|     5|SUPER DEVELOPER  ...|USA     |     50000|     4|
|     6|RAJAT TYAGI      ...|UP      |     65000|     5|
|     7|AJAY SHARMA      ...|NOIDA   |     70000|     6|
|     8|SIDDHARTH BASU   ...|SAKET   |     72000|     7|
|     9|ROBERT           ...|GURGAON |     70000|     8|
+------+--------------------+--------+----------+------+

Правильно ли я делаю? или есть ли лучший способ добавить или сохранить схему фрейма данных в pyspark?

Возможно ли использовать метод zipWithIndex для добавления уникального последовательного номера строки также для фрейма данных большого размера? Можем ли мы использовать этот row_id для повторного разделения фрейма данных для равномерного распределения данных по разделам?

Что именно вы имеете в виду под потерянной схемой? Это началось как не целочисленные столбцы, а затем перешло в строки? Также почему вы не хотите использовать монотонно увеличивающиеся идентификаторы?

Willy 31.10.2018 14:33

Монотонное увеличение - это не добавление последовательного приращения. Я просто добавляю случайный уникальный номер в мой фрейм данных .. и partitionby с оконной функцией переносит данные n разделов в один раздел

vikrant rana 31.10.2018 15:01

Утерянная схема. Я имел в виду, что при преобразовании rdd в dataframe я также должен указать имя столбца. Есть ли способ сохранить схему rdd при преобразовании ее в rdd, а затем из rdd в фрейм данных.

vikrant rana 31.10.2018 15:04

Сделайте row_number поверх набора разделов, а затем закажите по вашему выбору. Почему вы хотите использовать RDD? После того, как все DataFrames / DataSets разбиты на RDD, это не означает, что вам нужно его использовать. Избегайте RDD любой ценой. И вы теряете порядок или схему? Я уверен, что это порядок, а не схема.

pvy4917 31.10.2018 15:26

Я думаю, что могу использовать предложение partitionby с оконной функцией вместо того, чтобы использовать только order by .. таким образом данные не будут перемещаться в один раздел .. Я попробую это.

vikrant rana 31.10.2018 16:47

Привет, вы можете проверить все способы сделать это вместе с опасностями здесь из этого ответа здесь

mkaran 13.05.2021 10:02
1
6
10 920
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Я нашел решение, и оно очень простое. поскольку у меня нет столбца в моем фрейме данных, который имеет одинаковое значение во всех строках, поэтому использование row_number не создает уникальных номеров строк при использовании его с предложением partitionBy.

Давайте добавим новый столбец в существующий фрейм данных с некоторым значением по умолчанию в нем.

emp_df= emp_df.withColumn("new_column",lit("ABC"))

и создайте оконную функцию с paritionBy, используя столбец "new_column"

w = Window().partitionBy('new_column').orderBy(lit('A'))
df = emp_df.withColumn("row_num", row_number().over(w)).drop("new_column")

вы получите желаемый результат:

+------+--------------------+--------+----------+-------+
|emp_id|            emp_name|emp_city|emp_salary|row_num|
+------+--------------------+--------+----------+-------+
|     1|VIKRANT SINGH RAN...|NOIDA   |     10000|      1|
|     2|RAGHVENDRA KUMAR ...|GURGAON |     50000|      2|
|     7|AJAY SHARMA      ...|NOIDA   |     70000|      3|
|     9|ROBERT           ...|GURGAON |     70000|      4|
|     4|ABHIJAN SINHA    ...|SAKET   |     65000|      5|
|     8|SIDDHARTH BASU   ...|SAKET   |     72000|      6|
|     5|SUPER DEVELOPER  ...|USA     |     50000|      7|
|     3|GOVIND NIMBHAL   ...|DWARKA  |     92000|      8|
|     6|RAJAT TYAGI      ...|UP      |     65000|      9|
+------+--------------------+--------+----------+-------+

более простой способ: withColumn ("index", F.row_number (). over (Window.orderBy (monoto‌ nically_increasing_i‌ d ())) - 1)

zhao yufei 16.10.2019 04:47

@zhaoyufei зачем нужно добавлять -1 в последней части?

Piko Monde 29.05.2020 10:32

@PikoMonde для моего использования - это создание индекса в диапазоне от 0 до некоторого числа, поэтому я добавляю -1. если ваш вариант использования от 1, вы можете просто удалить часть -1, это нормально.

zhao yufei 30.05.2020 07:47

Использование Spark SQL:

df = spark.sql("""
SELECT 
    row_number() OVER (
        PARTITION BY '' 
        ORDER BY '' 
    ) as id,
    *
FROM 
    VALUES 
    ('Bob  ', 20),
    ('Alice', 21),
    ('Gary ', 21),
    ('Kent ', 25),
    ('Gary ', 35)
""")

Выход:

>>> df.printSchema()
root
 |-- id: integer (nullable = true)
 |-- col1: string (nullable = false)
 |-- col2: integer (nullable = false)

>>> df.show()
+---+-----+----+
| id| col1|col2|
+---+-----+----+
|  1|Bob  |  20|
|  2|Alice|  21|
|  3|Gary |  21|
|  4|Kent |  25|
|  5|Gary |  35|
+---+-----+----+

Другие вопросы по теме