Я создаю столбец интервал времени и добавляю его в существующий столбец Фрейм данных в Pyspark. В идеале time_interval должен быть в формате "ЧЧмм" с округлением минут до ближайшей 15-минутной отметки (815, 830, 845, 900 и т. д.).
У меня есть искровой sql-код, который выполняет логику за меня, но как мне взять это значение, объединенное в виде строкового столбца, и вставить его в существующий фрейм данных?
time_interval = sqlContext.sql("select extract(hour from current_timestamp())||floor(extract(minute from current_timestamp())/15)*15")
time_interval.show()
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|concat(CAST(hour(current_timestamp()) AS STRING), CAST((FLOOR((CAST(minute(current_timestamp()) AS DOUBLE) / CAST(15 AS DOUBLE))) * CAST(15 AS BIGINT)) AS STRING))|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| 1045|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
baseDF = sqlContext.sql("select * from test_table")
newBase = baseDF.withColumn("time_interval", lit(str(time_interval)))
newBase.select("time_interval").show()
+--------------------+
| time_interval|
+--------------------+
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
+--------------------+
only showing top 20 rows
Таким образом, фактические ожидаемые результаты должны просто отображать фактическое строковое значение в новом столбце, который я создаю, а не это конкатенированное значение из фрейма данных. Что-то вроде ниже:
newBase.select("time_interval").show(1)
+-------------+
|time_interval|
+-------------+
| 1045 |
+-------------+
спасибо Паулт, "selectExpr" сработал как шарм!
Поскольку time_interval
— это тип фрейма данных, для этого случая нужно collect
и extract the required value out from dataframe
.
Попробуйте так:
newBase = baseDF.withColumn("time_interval", lit(str(time_interval.collect()[0][0])))
newBase.show()
(или)
Используя функцию select(expr())
:
newBase = baseDF.select("*",expr("string(extract(hour from current_timestamp())||floor(extract(minute from current_timestamp())/15)*15) AS time_interval"))
Как упоминалось в комментариях вина, используя функцию selectExpr()
:
newBase = baseDF.selectExpr("*","string(extract(hour from current_timestamp())||floor(extract(minute from current_timestamp())/15)*15) AS time_interval")
Пример:
>>> from pyspark.sql.functions import *
>>> from pyspark.sql.types import IntegerType
>>> time_interval = spark.sql("select extract(hour from current_timestamp())||floor(extract(minute from current_timestamp())/15)*15")
>>> baseDF=spark.createDataFrame([1,2,3,4],IntegerType())
>>> newBase = baseDF.withColumn("time_interval", lit(str(time_interval.collect()[0][0])))
>>> newBase.show()
+-----+-------------+
|value|time_interval|
+-----+-------------+
| 1| 1245|
| 2| 1245|
| 3| 1245|
| 4| 1245|
+-----+-------------+
попробуйте это:
newBase = baseDF.selectExpr("*, extract(hour from current_timestamp())||floor(extract(minute from current_timestamp())/15)*15 AS time_interval")