Я пытаюсь преобразовать некоторый код Pandas в Spark для масштабирования. myfunc — это оболочка сложного API, которая принимает строку и возвращает новую строку (это означает, что я не могу использовать векторизованные функции).
def myfunc(ds):
for attribute, value in ds.items():
value = api_function(attribute, value)
ds[attribute] = value
return ds
df = df.apply(myfunc, axis='columns')
myfunc берет DataSeries, разбивает ее на отдельные ячейки, вызывает API для каждой ячейки и создает новую DataSeries с теми же именами столбцов. Это эффективно изменяет все ячейки в DataFrame.
Я новичок в Spark и хочу перевести эту логику с помощью pyspark. Я преобразовал свои панды DataFrame в Spark:
spark = SparkSession.builder.appName('My app').getOrCreate()
spark_schema = StructType([StructField(c, StringType(), True) for c in df.columns])
spark_df = spark.createDataFrame(df, schema=spark_schema)
Здесь я теряюсь. Нужен ли мне UDF, pandas_udf? Как выполнить итерацию по всем ячейкам и вернуть новую строку для каждой с помощью myfunc? spark_df.foreach() ничего не возвращает и не имеет функции map().
Я могу изменить myfunc с DataSeries -> DataSeries на string -> string, если это необходимо.






Самым простым подходом было бы переписать вашу функцию так, чтобы она принимала строку в качестве аргумента (чтобы это было string -> string) и использовала UDF. Есть хороший пример здесь. Это работает на одном столбце за раз. Итак, если ваш DataFrame имеет разумное количество столбцов, вы можете применить UDF к каждому столбцу по одному:
from pyspark.sql.functions import col
new_df = df.select(udf(col("col1")), udf(col("col2")), ...)
df = sc.parallelize([[1, 4], [2,5], [3,6]]).toDF(["col1", "col2"])
df.show()
+----+----+
|col1|col2|
+----+----+
| 1| 4|
| 2| 5|
| 3| 6|
+----+----+
def plus1_udf(x):
return x + 1
plus1 = spark.udf.register("plus1", plus1_udf)
new_df = df.select(plus1(col("col1")), plus1(col("col2")))
new_df.show()
+-----------+-----------+
|plus1(col1)|plus1(col2)|
+-----------+-----------+
| 2| 5|
| 3| 6|
| 4| 7|
+-----------+-----------+
map доступен для Scala DataFrame, но на данный момент недоступен в PySpark.
API нижнего уровня СДР имеет функцию map в PySpark. Итак, если у вас слишком много столбцов для преобразования по одному, вы можете работать с каждой отдельной ячейкой в DataFrame следующим образом:
def map_fn(row):
return [api_function(x) for (column, x) in row.asDict().items()
column_names = df.columns
new_df = df.rdd.map(map_fn).toDF(df.columns)
df = sc.parallelize([[1, 4], [2,5], [3,6]]).toDF(["col1", "col2"])
def map_fn(row):
return [value + 1 for (_, value) in row.asDict().items()]
columns = df.columns
new_df = df.rdd.map(map_fn).toDF(columns)
new_df.show()
+----+----+
|col1|col2|
+----+----+
| 2| 5|
| 3| 6|
| 4| 7|
+----+----+
документацияforeach дает только пример печати, но мы можем убедиться, глядя на код, что он действительно ничего не возвращает.
Вы можете прочитать о pandas_udf в эта почта, но кажется, что он больше всего подходит для векторизованных функций, которые, как вы указали, вы не можете использовать из-за api_function.
@Стивен - Спасибо за комментарий. Я немного расширил свой ответ, чтобы более конкретно ответить на ваш вопрос. Надеюсь, поможет!
Спасибо, я все еще борюсь со Спарком на некоторых фронтах, поэтому я могу во всем этом разобраться.
Я ценю ваш ответ. Чего мне не хватало, так это понимания, которое я описываю в своем ответе, в частности, возможности затенения существующих столбцов с помощью withColumn. Вариант 1 двигался в правильном направлении, но затенение столбцов и назначение цепочки преобразований по столбцам обратно исходной переменной на каждой итерации — это то, что дало мне полное решение редактирования, казалось бы, изменяемого DataFrame на месте, как df = df.apply(). делает в пандах. Еще раз спасибо.
Решение:
udf_func = udf(func, StringType())
for col_name in spark_df.columns:
spark_df = spark_df.withColumn(col_name, udf_func(lit(col_name), col_name))
return spark_df.toPandas()
Есть 3 ключевых момента, которые помогли мне понять это:
withColumn с именем существующего столбца (col_name), Spark «перезаписывает»/затеняет исходный столбец. По сути, это создает видимость редактирования столбца напрямую, как если бы он был изменяемым.spark_df, я использую тот же принцип для имитации изменяемого DataFrame, создавая цепочку преобразований по столбцам, каждый раз «перезаписывая» столбец (согласно # 1 - см. ниже )UDFs ожидает, что все параметры будут типами Column, что означает, что он пытается разрешить значения столбца для каждого параметра. Поскольку первый параметр api_function — это буквальное значение, которое будет одинаковым для всех строк в векторе, вы должны использовать функцию lit(). Простая передача col_name в функцию попытается извлечь значения столбца для этого столбца. Насколько я мог судить, передача col_name эквивалентна передаче col(col_name).Предполагая 3 столбца «a», «b» и «c», развертывание этой концепции будет выглядеть так:
spark_df = spark_df.withColumn('a', udf_func(lit('a'), 'a')
.withColumn('b', udf_func(lit('b'), 'b')
.withColumn('c', udf_func(lit('c'), 'c')
Пример UDF следует обычному пути добавления своего вывода в качестве нового столбца. Как мне применить его ко всем существующим ячейкам?