В pyspark я пытаюсь заменить несколько текстовых значений в столбце значением, присутствующим в столбцах, имена которых присутствуют в столбце calc (формула).
Итак, чтобы было понятно, вот пример:
Вход:
|param_1|param_2|calc
|-------|-------|--------
|Cell 1 |Cell 2 |param_1-param_2
|Cell 3 |Cell 4 |param_2/param_1
Необходим вывод:
|param_1|param_2|calc
|-------|-------|--------
|Cell 1 |Cell 2 |Cell 1-Cell 2
|Cell 3 |Cell 4 |Cell 4/Cell 3
В столбце calc значением по умолчанию является формула. Это может быть что-то такое же простое, как приведенное выше, или что-то вроде «2*(param_8-param_4)/param_2-(param_3/param_7)». Я ищу что-то, чтобы заменить все param_x значениями в связанных столбцах, касающихся имен.
Я пробовал много вещей, но ничего не работает, и большую часть времени, когда я использую replace или regex_replace со столбцом для значения замены, возникает ошибка, что столбец не повторяется.
Кроме того, столбцы param_1, param_2, ..., param_x генерируются динамически, и значения столбца calc могут быть в некоторых из этих столбцов, но не обязательно во всех.
Не могли бы вы помочь мне по этому вопросу с динамическим решением?
Большое спасибо. С наилучшими пожеланиями
Какая связь между моей необходимостью замены переменных в формуле и функциями concat ? И я только что прочитал всю документацию, которую вы предоставили, на случай, если будет указано новое назначение, но нет, они просто объединяют данные.
Так где же формула? Это было неясно, из того, что вы предоставили, мне кажется, что calc - это объединение param1 и 2
В столбце calc значением по умолчанию является формула. Это может быть что-то такое же простое, как приведенное выше, или что-то вроде «2*(param_8-param_4)/param_2-(param_3/param_7)». Я ищу что-то, чтобы заменить все param_x значениями в связанных столбцах, касающихся имен.
Обновление: оказалось, я неправильно понял требование. Это будет работать:
for exp in ["regexp_replace(calc, '"+col+"', "+col+")" for col in df.schema.names]:
df=df.withColumn("calc", F.expr(exp))
Еще одно обновление: для обработки нулевых значений добавьте объединение:
for exp in ["coalesce(regexp_replace(calc, '"+col+"', "+col+"), calc)" for col in df.schema.names]:
df=df.withColumn("calc", F.expr(exp))
Ввод, вывод:
------- Некоторое время сохраняем раздел ниже только для справки -------
Вы не можете сделать это напрямую, так как вы не сможете напрямую использовать значение столбца, если вы не соберете объект python (что, очевидно, не рекомендуется).
Это будет работать с тем же:
df = spark.createDataFrame([["1","2", "param_1 - param_2"],["3","4", "2*param_1 + param_2"]]).toDF("param_1", "param_2", "calc");
df.show()
df=df.withColumn("row_num", F.row_number().over(Window.orderBy(F.lit("dummy"))))
as_dict = {row.asDict()["row_num"]:row.asDict()["calc"] for row in df.select("row_num", "calc").collect()}
expression = f"""CASE {' '.join([f"WHEN row_num ='{k}' THEN ({v})" for k,v in as_dict.items()])} \
ELSE NULL END""";
df.withColumn("Result", F.expr(expression)).show();
Ввод, вывод:
Привет спасибо. Это отлично подходит для следующего шага другого процесса. Он отлично работает с числами, но фактическим шагом здесь является замена метки другой меткой, мой пример вывода действительно то, что мне нужно сейчас. Если у вас есть хитрость, чтобы сделать это :-)
@Cazau А, хорошо, понял. Только что понял, позвольте мне проверить, я обновлю свой ответ, как только закончу.
@Cazau Соответствующим образом обновил ответ, дайте мне знать, если вам нужна помощь.
Большое спасибо @Ronak Jain, оба решения будут использованы для нашего проекта. Это почти нормально, просто небольшое неудобство, если, например, столбец calc = param_1 и столбец param_2 равен нулю, тогда столбец calc также равен нулю.
@Cazau Это можно исправить с помощью простого колы. дайте мне знать, если вам нужна помощь с тем же. Кроме того, рассмотрите возможность голосования и примите ответ, если он поможет.
@Cazau Исправлена проблема с Null для вас, обновленный ответ :)
Отлично, большая помощь, спасибо @Ronak Jain!
Как насчет того, чтобы взять учебник по
concat
,concat_ws
. Попробуйте использовать их для решения этой проблемы, и если у вас возникнут проблемы, сообщите нам, что это за ошибка? документацию по pyspark можно найти здесь spark.apache.org/docs/latest/api/python/reference/index.html