У меня есть фрейм данных Spark, состоящий из этих двух столбцов. Я хочу присвоить идентификатор каждой строке на основе простого условия в столбце метки времени.
Для каждого пользователя, если разница между следующей и предыдущей отметкой времени составляет менее 10 секунд, я продолжаю добавлять один и тот же идентификатор, в противном случае я обновляю идентификатор и продолжаю, пока не назначу идентификатор каждой строке.
Все должно быть закодировано на питоне, так как я использую PySpark.
Чтобы упростить понимание, вот пример:
Запуск ДФ
+------------------+
| User| timestamp|
+------------------+
| user0| 100 |
| user1| 102 |
| user0| 109 |
| user2| 103 |
| user1| 108 |
| user0| 119 |
| user0| 140 |
| user0| 142 |
+------------------+
Желаемый DF примерно такой
+----------------------+
| User| timestamp| ID|
+----------------------+
| user0| 100 | 1|
| user1| 102 | 2|
| user0| 109 | 1|
| user2| 103 | 3|
| user1| 108 | 2|
| user0| 119 | 1|
| user0| 140 | 4|
| user0| 142 | 4|
+----------------------+
или, может быть, что-то вроде этого, если алгоритм назначает перед идентификатором для данного пользователя. Мне все равно, тоже нормально.
+----------------------+
| User| timestamp| ID|
+----------------------+
| user0| 100 | 1|
| user1| 102 | 3|
| user0| 109 | 1|
| user2| 103 | 4|
| user1| 108 | 3|
| user0| 119 | 1|
| user0| 140 | 2|
| user0| 142 | 2|
+----------------------+
Как видите, user0 с меткой времени 140 имеет другой идентификатор (2), потому что разница с предыдущей меткой времени больше 10.
Это было бы достаточно просто, если бы я мог зацикливать и динамически назначать каждую ячейку, но это противоречит цели использования искровых фреймов данных, а также я думаю, что это невозможно сделать, поскольку они неизменяемы.
Каков наиболее эффективный способ сделать это в Spark?
Вы можете сначала создать идентификатор для каждого пользователя, а затем объединить их для разных пользователей, как показано ниже.
import pyspark.sql.functions as F
from pyspark.sql.window import Window
w = Window.partitionBy('User').orderBy('timestamp')
df2 = df.withColumn(
'begin',
F.coalesce(
F.col('timestamp') - F.lag('timestamp').over(w) > 10,
F.lit(True)
).cast('int')
).withColumn(
'userid',
F.sum('begin').over(w.rowsBetween(Window.unboundedPreceding, 0))
).withColumn(
'ID',
F.dense_rank().over(Window.orderBy('userid', 'User'))
)
# If you just want to keep your columns, do:
# df2 = df2.select('User', 'timestamp', 'ID')
df2.show()
+-----+---------+-----+------+---+
| User|timestamp|begin|userid| ID|
+-----+---------+-----+------+---+
|user0| 100| 1| 1| 1|
|user0| 109| 0| 1| 1|
|user0| 119| 0| 1| 1|
|user1| 102| 1| 1| 2|
|user1| 108| 0| 1| 2|
|user2| 103| 1| 1| 3|
|user0| 140| 1| 2| 4|
|user0| 142| 0| 2| 4|
+-----+---------+-----+------+---+
@nonoDa извините, я забыл включить эту часть кода - см. отредактированный ответ
Привет, мне интересно, есть ли у вас какие-либо идеи, почему, если я загружаю паркет с помощью spark.read.load(filepath)
после того, как я использую код, который вы мне дали, для объединения временных меток, требуется много времени для выполнения любого действия, такого как простой count()
. Однако count()
действительно быстро, если я делаю это сразу после загрузки паркета. Я предполагаю, что это как-то связано, возможно, с paritionBy()
в коде, указанном в ответе, и, возможно, с coalesce()
. Я пробовал и repartition(1)
, и coalesce(1)
, но это все равно занимает много времени
Если вам нужна более высокая производительность, вы можете вызвать df.cache()
для кэширования фрейма данных, чтобы вычисление выполнялось только один раз.
нет необходимости менять разделы. Объединение в один раздел только сделает его медленнее.
но почему, если я позвоню count()
сразу после spark.read.load(filepath)
, это займет несколько мс, однако, если объединить временные метки (таким образом, уменьшив количество строк df), если я снова позвоню count()
, это займет несколько минут? это из-за этой строки F.coalesce(F.col('timestamp') - F.lag('timestamp').over(w) > 10,F.lit(True)? делает df.cache()
единственное, что я могу делать? заранее спасибо!
Вычисление идентификаторов занимает время, и из-за ленивой оценки это вычисление откладывается до тех пор, пока вы не вызовете count
. Таким образом, вы испытываете длительное время при звонке count
.
Прежде всего, большое спасибо за быстрый ответ, я пытаюсь понять код, так как я новичок в искрах, и это чрезвычайно полезно. где инициализируется переменная
w
? Я полагаю, это объект окна? а что должно быть?