Применение функции ко всем ячейкам в Spark DataFrame

Я пытаюсь преобразовать некоторый код Pandas в Spark для масштабирования. myfunc — это оболочка сложного API, которая принимает строку и возвращает новую строку (это означает, что я не могу использовать векторизованные функции).

def myfunc(ds):
    for attribute, value in ds.items():
        value = api_function(attribute, value)
        ds[attribute] = value
    return ds

df = df.apply(myfunc, axis='columns')

myfunc берет DataSeries, разбивает ее на отдельные ячейки, вызывает API для каждой ячейки и создает новую DataSeries с теми же именами столбцов. Это эффективно изменяет все ячейки в DataFrame.

Я новичок в Spark и хочу перевести эту логику с помощью pyspark. Я преобразовал свои панды DataFrame в Spark:

spark = SparkSession.builder.appName('My app').getOrCreate()
spark_schema = StructType([StructField(c, StringType(), True) for c in df.columns])
spark_df = spark.createDataFrame(df, schema=spark_schema)

Здесь я теряюсь. Нужен ли мне UDF, pandas_udf? Как выполнить итерацию по всем ячейкам и вернуть новую строку для каждой с помощью myfunc? spark_df.foreach() ничего не возвращает и не имеет функции map().

Я могу изменить myfunc с DataSeries -> DataSeries на string -> string, если это необходимо.

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
0
13 331
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Вариант 1. Используйте UDF для одного столбца за раз

Самым простым подходом было бы переписать вашу функцию так, чтобы она принимала строку в качестве аргумента (чтобы это было string -> string) и использовала UDF. Есть хороший пример здесь. Это работает на одном столбце за раз. Итак, если ваш DataFrame имеет разумное количество столбцов, вы можете применить UDF к каждому столбцу по одному:

from pyspark.sql.functions import col
new_df = df.select(udf(col("col1")), udf(col("col2")), ...)

Пример

df = sc.parallelize([[1, 4], [2,5], [3,6]]).toDF(["col1", "col2"])
df.show()
+----+----+
|col1|col2|
+----+----+
|   1|   4|
|   2|   5|
|   3|   6|
+----+----+

def plus1_udf(x):
    return x + 1
plus1 = spark.udf.register("plus1", plus1_udf)

new_df = df.select(plus1(col("col1")), plus1(col("col2")))
new_df.show()
+-----------+-----------+
|plus1(col1)|plus1(col2)|
+-----------+-----------+
|          2|          5|
|          3|          6|
|          4|          7|
+-----------+-----------+

Вариант 2: Сопоставьте весь DataFrame сразу

map доступен для Scala DataFrame, но на данный момент недоступен в PySpark. API нижнего уровня СДР имеет функцию map в PySpark. Итак, если у вас слишком много столбцов для преобразования по одному, вы можете работать с каждой отдельной ячейкой в ​​DataFrame следующим образом:

def map_fn(row):
    return [api_function(x) for (column, x) in row.asDict().items()

column_names = df.columns
new_df = df.rdd.map(map_fn).toDF(df.columns)

Пример

df = sc.parallelize([[1, 4], [2,5], [3,6]]).toDF(["col1", "col2"])
def map_fn(row):
   return [value + 1 for (_, value) in row.asDict().items()]

columns = df.columns
new_df = df.rdd.map(map_fn).toDF(columns)
new_df.show()
+----+----+
|col1|col2|
+----+----+
|   2|   5|
|   3|   6|
|   4|   7|
+----+----+

Контекст

документацияforeach дает только пример печати, но мы можем убедиться, глядя на код, что он действительно ничего не возвращает.

Вы можете прочитать о pandas_udf в эта почта, но кажется, что он больше всего подходит для векторизованных функций, которые, как вы указали, вы не можете использовать из-за api_function.

Пример UDF следует обычному пути добавления своего вывода в качестве нового столбца. Как мне применить его ко всем существующим ячейкам?

Steven 02.02.2019 20:28

@Стивен - Спасибо за комментарий. Я немного расширил свой ответ, чтобы более конкретно ответить на ваш вопрос. Надеюсь, поможет!

Jason 09.02.2019 21:22

Спасибо, я все еще борюсь со Спарком на некоторых фронтах, поэтому я могу во всем этом разобраться.

Steven 09.02.2019 21:37

Я ценю ваш ответ. Чего мне не хватало, так это понимания, которое я описываю в своем ответе, в частности, возможности затенения существующих столбцов с помощью withColumn. Вариант 1 двигался в правильном направлении, но затенение столбцов и назначение цепочки преобразований по столбцам обратно исходной переменной на каждой итерации — это то, что дало мне полное решение редактирования, казалось бы, изменяемого DataFrame на месте, как df = df.apply(). делает в пандах. Еще раз спасибо.

Steven 12.02.2019 00:43
Ответ принят как подходящий

Решение:

udf_func = udf(func, StringType())
for col_name in spark_df.columns:
    spark_df = spark_df.withColumn(col_name, udf_func(lit(col_name), col_name))
return spark_df.toPandas()

Есть 3 ключевых момента, которые помогли мне понять это:

  1. Если вы используете withColumn с именем существующего столбца (col_name), Spark «перезаписывает»/затеняет исходный столбец. По сути, это создает видимость редактирования столбца напрямую, как если бы он был изменяемым.
  2. Создавая цикл для исходных столбцов и повторно используя одну и ту же переменную DataFrame spark_df, я использую тот же принцип для имитации изменяемого DataFrame, создавая цепочку преобразований по столбцам, каждый раз «перезаписывая» столбец (согласно # 1 - см. ниже )
  3. Spark UDFs ожидает, что все параметры будут типами Column, что означает, что он пытается разрешить значения столбца для каждого параметра. Поскольку первый параметр api_function — это буквальное значение, которое будет одинаковым для всех строк в векторе, вы должны использовать функцию lit(). Простая передача col_name в функцию попытается извлечь значения столбца для этого столбца. Насколько я мог судить, передача col_name эквивалентна передаче col(col_name).

Предполагая 3 столбца «a», «b» и «c», развертывание этой концепции будет выглядеть так:

spark_df = spark_df.withColumn('a', udf_func(lit('a'), 'a')
                   .withColumn('b', udf_func(lit('b'), 'b')
                   .withColumn('c', udf_func(lit('c'), 'c')

Другие вопросы по теме