Pyspark добавляет в фрейм данных последовательный и детерминированный индекс

Мне нужно добавить столбец индекса в фрейм данных с тремя очень простыми ограничениями:

  • начать с 0

  • быть последовательным

  • быть детерминированным

Я уверен, что мне не хватает чего-то очевидного, потому что примеры, которые я нахожу, выглядят очень запутанными для такой простой задачи или используют непоследовательные, недетерминированные, все более монотонные идентификаторы. Я не хочу заархивировать с индексом, а затем должен разделять ранее разделенные столбцы, которые теперь находятся в одном столбце, потому что мои фреймы данных находятся в терабайтах, и это просто кажется ненужным. Мне не нужно ни разбивать, ни упорядочивать по чему-либо, и примеры, которые я нахожу, делают это (с использованием оконных функций и row_number). Все, что мне нужно, это простая последовательность целых чисел от 0 до df.count. Что мне здесь не хватает?

1, 2, 3, 4, 5

DataFrames по своей сути неупорядочены. Это одна из основных причин, по которой они работают для параллельной обработки - любой исполнитель может взять любую часть данных и выполнить свою работу. Вы жестяная банка вводите заказ (как вы показали), но как он может быть детерминированным, если вы ничего не заказываете?

pault 13.09.2018 19:01

Кстати, я считаю, что monotonically_increasing_id будет детерминированным до тех пор, пока вы не измените количество разделов.

pault 13.09.2018 19:03

Справедливо, может быть, я использую слово index вне контекста. Я имею в виду: как добавить столбец с упорядоченной, монотонно увеличивающейся на 1 последовательностью 0: df.count?

xv70 13.09.2018 19:05
4
3
6 984
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

What I mean is: how can I add a column with an ordered, monotonically increasing by 1 sequence 0:df.count? (from comments)

Здесь вы можете использовать row_number(), но для этого вам нужно указать orderBy(). Поскольку у вас нет столбца для заказа, просто используйте monotonically_increasing_id().

from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql import Window

df = df.withColumn(
    "index",
    row_number().over(Window.orderBy(monotonically_increasing_id()))-1
)

Кроме того, row_number() начинается с 1, поэтому вам придется вычесть 1, чтобы он начинался с 0. Последним значением будет df.count - 1.


I don't want to zip with index and then have to separate the previously separated columns that are now in a single column

Вы жестяная банка используете zipWithIndex, если вы следуете за ним с вызовом map, чтобы избежать превращения всех разделенных столбцов в один столбец:

cols = df.columns
df = df.rdd.zipWithIndex().map(lambda row: (row[1],) + tuple(row[0])).toDF(["index"] + cols

Не уверен в производительности, но вот трюк.

Note - toPandas will collect all the data to driver

from pyspark.sql import SparkSession

# speed up toPandas using arrow
spark = SparkSession.builder.appName('seq-no') \
        .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
        .config("spark.sql.execution.arrow.enabled", "true") \
        .getOrCreate()

df = spark.createDataFrame([
    ('id1', "a"),
    ('id2', "b"),
    ('id2', "c"),
], ["ID", "Text"])

df1 = spark.createDataFrame(df.toPandas().reset_index()).withColumnRenamed("index","seq_no")

df1.show()

+------+---+----+
|seq_no| ID|Text|
+------+---+----+
|     0|id1|   a|
|     1|id2|   b|
|     2|id2|   c|
+------+---+----+

Другие вопросы по теме