Моя функция get_data
возвращает кортеж: два целых значения.
get_data_udf = udf(lambda id: get_data(spark, id), (IntegerType(), IntegerType()))
Мне нужно разделить их на две колонки val1
и val2
. Как я могу это сделать?
dfnew = df \
.withColumn("val", get_data_udf(col("id")))
Следует ли мне сохранить кортеж в столбце, например val
, а потом как-то разбить на две колонки. Или есть более короткий путь?
в scala вы можете использовать .withColumn("val1", col("val._1")).withColumn("val2", col("val._2"))
, не уверен, работает ли это в pyspark
Кортежи могут быть проиндексированы так же, как списки, поэтому вы можете добавить значение для первого столбца как get_data()[0]
, а для второго значения во втором столбце вы сделаете get_data()[1]
также вы можете выполнить v1, v2 = get_data()
и таким образом присвоить возвращаемые значения кортежа переменным v1
и v2
.
Взгляните на вопрос это здесь для дальнейших разъяснений.
Если я сделаю withColumn("val1", get_data_udf(col("id"))[0]).withColumn("val2", get_data_udf(col("id"))[1])
, то дважды позвоню в get_data_udf
. Разве это не так?
Кроме того, как я могу запустить v1, v2 = get_data()
, если я запускаю эту функцию по строкам в DataFrame?
поместите первый в цикл и добавляйте v1 и v2 к вашему df строка за строкой - вот как это может работать!
Не могли бы вы добавить какой-нибудь пример? Но в распределенном программировании циклы обычно не используются. Может я неправильно понял твою идею. Поэтому пример будет вам полезен.
@Markus: Если вы не хотите запускать udf дважды, вам нужно временно сохранить результат в отдельном столбце.
Например, у вас есть образец фрейма данных из одного столбца, как показано ниже.
val df = sc.parallelize(Seq(3)).toDF()
df.show()
// Ниже UDF, который вернет кортеж
def tupleFunction(): (Int,Int) = (1,2)
// мы создадим два новых столбца из вышеуказанного UDF
df.withColumn("newCol",typedLit(tupleFunction.toString.replace("(","").replace(")","")
.split(","))).select((0 to 1)
.map(i => col("newCol").getItem(i).alias(s"newColFromTuple$i")):_*).show
Это Scala, не так ли? Мне понадобится Python.
это правильно. Отличается только функция тюля. Кроме того, фактический код - это spark api. Он должен работать
Вы можете создать structFields в udf, чтобы получить доступ позже.
from pyspark.sql.types import *
get_data_udf = udf(lambda id: get_data(spark, id),
StructType([StructField('first', IntegerType()), StructField('second', IntegerType())]))
dfnew = df \
.withColumn("val", get_data_udf(col("id"))) \
.select('*', 'val.`first`'.alias('first'), 'val.`second`'.alias('second'))
В чем смысл .select('*'
?
это означает все столбцы.
Ах хорошо. Надо drop("val")
делать, да?
Не уверен в этом.
см. также stackoverflow.com/a/40962714/1138523