Как вставить конкатенированные значения из фрейма данных в другой фрейм данных в Pyspark?

Я создаю столбец интервал времени и добавляю его в существующий столбец Фрейм данных в Pyspark. В идеале time_interval должен быть в формате "ЧЧмм" с округлением минут до ближайшей 15-минутной отметки (815, 830, 845, 900 и т. д.).

У меня есть искровой sql-код, который выполняет логику за меня, но как мне взять это значение, объединенное в виде строкового столбца, и вставить его в существующий фрейм данных?

time_interval = sqlContext.sql("select extract(hour from current_timestamp())||floor(extract(minute from current_timestamp())/15)*15")

time_interval.show()

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|concat(CAST(hour(current_timestamp()) AS STRING), CAST((FLOOR((CAST(minute(current_timestamp()) AS DOUBLE) / CAST(15 AS DOUBLE))) * CAST(15 AS BIGINT)) AS STRING))|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                                                               1045|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+

baseDF = sqlContext.sql("select * from test_table")
newBase = baseDF.withColumn("time_interval", lit(str(time_interval)))

newBase.select("time_interval").show()

+--------------------+
|       time_interval|
+--------------------+
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
+--------------------+
only showing top 20 rows

Таким образом, фактические ожидаемые результаты должны просто отображать фактическое строковое значение в новом столбце, который я создаю, а не это конкатенированное значение из фрейма данных. Что-то вроде ниже:

newBase.select("time_interval").show(1)
+-------------+
|time_interval|
+-------------+
|    1045     |                                                                                                                                           
+-------------+

попробуйте это: newBase = baseDF.selectExpr("*, extract(hour from current_timestamp())||floor(extract(minute from current_timestamp())/15)*15 AS time_interval")

pault 30.05.2019 18:29

спасибо Паулт, "selectExpr" сработал как шарм!

Guy 06.06.2019 21:21
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
2
61
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Поскольку time_interval — это тип фрейма данных, для этого случая нужно collect и extract the required value out from dataframe.

Попробуйте так:

newBase = baseDF.withColumn("time_interval", lit(str(time_interval.collect()[0][0])))
newBase.show()

(или)

Используя функцию select(expr()):

newBase = baseDF.select("*",expr("string(extract(hour from current_timestamp())||floor(extract(minute from current_timestamp())/15)*15) AS time_interval"))

Как упоминалось в комментариях вина, используя функцию selectExpr():

newBase = baseDF.selectExpr("*","string(extract(hour from current_timestamp())||floor(extract(minute from current_timestamp())/15)*15) AS time_interval")

Пример:

>>> from pyspark.sql.functions import *
>>> from pyspark.sql.types import IntegerType
>>> time_interval = spark.sql("select extract(hour from current_timestamp())||floor(extract(minute from current_timestamp())/15)*15")
>>> baseDF=spark.createDataFrame([1,2,3,4],IntegerType())
>>> newBase = baseDF.withColumn("time_interval", lit(str(time_interval.collect()[0][0])))
>>> newBase.show()
+-----+-------------+
|value|time_interval|
+-----+-------------+
|    1|         1245|
|    2|         1245|
|    3|         1245|
|    4|         1245|
+-----+-------------+

Другие вопросы по теме