Spark: агрегация с динамическим фильтром на кадре данных в scala

У меня есть фрейм данных, например

scala> testDf.show()
+------+--------+---------+------------+----------------------------------------+
|    id|    item|    value|  value_name|                               condition|
+------+--------+---------+------------+----------------------------------------+
|    11|    3210|        0|         OFF|                                value==0|
|    12|    3210|        1|         OFF|                                value==0|
|    13|    3210|        0|         OFF|                                value==0|
|    14|    3210|        0|         OFF|                                value==0|
|    15|    3210|        1|         OFF|                                value==0|
|    16|    5440|        5|          ON|                     value>0 && value<10|
|    17|    5440|        0|          ON|                     value>0 && value<10|
|    18|    5440|        6|          ON|                     value>0 && value<10|
|    19|    5440|        7|          ON|                     value>0 && value<10|
|    20|    5440|        0|          ON|                     value>0 && value<10|
|    21|    7780|        A|        TYPE|   Set("A","B").contains(value.toString)|
|    22|    7780|        A|        TYPE|   Set("A","B").contains(value.toString)|
|    23|    7780|        A|        TYPE|   Set("A","B").contains(value.toString)|
|    24|    7780|        C|        TYPE|   Set("A","B").contains(value.toString)|
|    25|    7780|        C|        TYPE|   Set("A","B").contains(value.toString)|
+------+--------+---------+------------+----------------------------------------+

scala> testDf.printSchema
root
 |-- id: string (nullable = true)
 |-- item: string (nullable = true)
 |-- value: string (nullable = true)
 |-- value_name: string (nullable = true)
 |-- condition: string (nullable = true)

Я хочу удалить несколько строк со столбцом «условие». Но я в беде.

Я попытался с приведенным ниже тестовым кодом. Но, похоже, это не работает должным образом.

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.Row
import scala.collection.mutable

val encoder = RowEncoder(testDf.schema);

testDf.flatMap(row => {
  val result = new mutable.MutableList[Row];
  val setting_value = row.getAs[String]("setting_value").toInt
  val condition = row.getAs[String]("condition").toBoolean
  if (condition){
      result+=row;
  };
  result;
})(encoder).show();

И это ошибка.

19/05/30 02:04:31 ERROR TaskSetManager: Task 0 in stage 267.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 267.0 failed 4 times, most recent failure: Lost task 0.3 in stage 267.0 (TID 3763, .compute.internal, executor 1): java.lang.IllegalArgumentException: For input string: "setting_value==0"
        at scala.collection.immutable.StringLike$class.parseBoolean(StringLike.scala:291)
        at scala.collection.immutable.StringLike$class.toBoolean(StringLike.scala:261)
        at scala.collection.immutable.StringOps.toBoolean(StringOps.scala:29)
        at $anonfun$1.apply(<console>:40)
        at $anonfun$1.apply(<console>:37)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Я хочу сохранить строки, соответствующие значению столбца условия. Это желаемый результат.

+------+--------+---------+------------+----------------------------------------+
|    id|    item|    value|  value_name|                               condition|
+------+--------+---------+------------+----------------------------------------+
|    11|    3210|        0|         OFF|                                value==0|
|    13|    3210|        0|         OFF|                                value==0|
|    14|    3210|        0|         OFF|                                value==0|
|    16|    5440|        5|          ON|                     value>0 && value<10|
|    18|    5440|        6|          ON|                     value>0 && value<10|
|    19|    5440|        7|          ON|                     value>0 && value<10|
|    21|    7780|        A|        TYPE|   Set("A","B").contains(value.toString)|
|    22|    7780|        A|        TYPE|   Set("A","B").contains(value.toString)|
|    23|    7780|        A|        TYPE|   Set("A","B").contains(value.toString)|
+------+--------+---------+------------+----------------------------------------+

Пожалуйста, помогите мне, если у вас есть хорошая идея. Спасибо.

Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
0
529
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

В приведенном выше случае Spark пытается преобразовать значение String в логическое значение. Это не оценка самого выражения.
И оценка выражения должна выполняться пользователем с использованием внешней библиотеки или пользовательского кода.
Самый близкий (хотя и не точный сценарий), который я мог придумать, это
Как оценить математическое выражение, заданное в строковой форме? .

Я пробовал использовать функцию eval в scala.tools.reflect.ToolBox. Но не работает...

sproutee 30.05.2019 09:26
Ответ принят как подходящий

Вот один из способов использования scala отражение API с функцией UDF. udf обрабатывает оба случая для значений int и string:

import scala.reflect.runtime.currentMirror
import scala.tools.reflect.ToolBox

val tb = currentMirror.mkToolBox()

val df = Seq(("0","value==0"),
("1", "value==0"),
("6", """value>0 && value<10"""),
("7", """value>0 && value<10"""),
("0", """value>0 && value<10"""),
("A", """Set("A","B").contains(value.toString)"""),
("C", """Set("A","B").contains(value.toString)""")).toDF("value", "condition")

def isAllDigits(x: String) = x.forall(Character.isDigit)

val evalExpressionUDF = udf((value: String, expr: String) => {
  val result =  isAllDigits(value) match {
    case true => tb.eval(tb.parse(expr.replace("value", s"""${value.toInt}""")))
    case false => tb.eval(tb.parse(expr.replace("value", s""""${value}"""")))
  }

  result.asInstanceOf[Boolean]
})

df.withColumn("eval", evalExpressionUDF($"value", $"condition"))
  .where($"eval" === true)
  .show(false)

Чехлы для evalExpressionUDF:

  • int: замените выражение фактическим значением int, затем выполните строковый код с помощью mkToolBox
  • строка: заключите строковое значение в "", затем замените выражение строкой в ​​двойных кавычках и выполните строковый код

Выход:

+-----+-------------------------------------+----+ 
|value|                           condition |eval| 
+-----+-------------------------------------+----+ 
|0    |value==0                             |true| 
|6    |value>0 && value<10                  |true| 
|7    |value>0 && value<10                  |true| 
|A    |Set("A","B").contains(value.toString)|true| 
+-----+-------------------------------------+----+

PS: я знаю, что производительность вышеупомянутого решения может быть плохой, поскольку она вызывает отражение, хотя я не знаю альтернативы.

Большое спасибо. Ваш ответ очень полезен. Но я получаю ошибку ниже. scala> testDf.withColumn("eval", evalExpressionUDF($"value", $"condition")).show() Ошибка: ` org.apache.spark.SparkException: задача не сериализуема Причина: java.io.NotSerializableException: scala.tools.reflect.ToolBoxFactory$ToolBoxImpl Стек сериализации: - объект не сериализуем (класс: scala.tools.reflect.ToolBoxFactory $ToolBoxImpl, значение: scala.tools.reflect.ToolBoxFactory$ToolBoxImpl@7341617c) `

sproutee 31.05.2019 01:36

Привет, какая у тебя версия Spark? и какая среда spark-shell,databricks?

abiratsis 31.05.2019 01:46

В среде искрового кластера с несколькими узлами кажется, что возникает ошибка. Есть ли способ обойти это?

sproutee 31.05.2019 01:47

Я использую искровую оболочку. и моя версия искры 2.4.0. Версия Scala 2.11.12.

sproutee 31.05.2019 01:48

Я только что запустил его в локальном режиме, и я получаю ту же ошибку.

sproutee 31.05.2019 02:06

Мой testDF обрабатывается несколько раз из исходных данных. Если я объявлю образец данных с помощью val df = Seq и выполню, ошибки не произойдет.

sproutee 31.05.2019 02:10

можешь попробовать добавить этот импорт import scala.reflect.runtime.universe._?

abiratsis 31.05.2019 02:26

поэтому, если вы запустите пример, как я сделал выше, он работает, но в вашем случае вы правильно выполняете больше функций с этим фреймом данных?

abiratsis 31.05.2019 02:38

да. Я создаю testDf, объединяя некоторые исходные данные. Нет никакой другой функции, кроме запроса соединения с использованием «spark.sql».

sproutee 31.05.2019 02:44

Боюсь, я не могу помочь, не имея возможности воспроизвести его, хотя это, безусловно, связано с val tb = currentMirror.mkToolBox() и тем фактом, что Spark не может сериализовать экземпляр ToolBoxImpl. Пожалуйста, попробуйте еще одно изменение, чтобы заменить предыдущую строку на import scala.reflect.runtime.universe val cm = universe.runtimeMirror(getClass.getClassLoader) val tb = cm.mkToolBox()

abiratsis 31.05.2019 03:09

Я использовал Universe.runtimeMirror и получаю ту же ошибку. Я нашел другую статью, нужно ли переопределять udf Function1? stackoverflow.com/questions/38848847/…

sproutee 31.05.2019 03:16

да, это кажется разумным. Одним из решений может быть перемещение загрузчика классов внутри udf: val evalExpressionUDF = udf((value: String, expr: String) => { val cm = universe.runtimeMirror(getClass.getClassLoader) val tb = cm.mkToolBox() val result = isAllDigits(value) match { case true => tb.eval(tb.parse(expr.replace("value", s"""${value.toInt}"""))) case false => tb.eval(tb.parse(expr.replace("value", s""""${value}""""))) } result.asInstanceOf[Boolean] })

abiratsis 31.05.2019 03:28

После перемещения загрузчика классов я получаю ту же ошибку. Но стек ошибок немного отличается. org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(Clo‌​sureCleaner.scala:40‌​3) ... 51 elided Caused by: java.io.NotSerializableException: scala.reflect.runtime.JavaMirrors$JavaMirror Serialization stack: - object not serializable (class: scala.reflect.runtime.JavaMirrors$JavaMirror, value: JavaMirror with scala.tools.nsc.interpreter.IMain$TranslatingClassLoader@476‌​c137b of type class scala.tools.nsc.interpreter

sproutee 31.05.2019 03:41

Хорошо, мы открыли врата ада вместе со Spark и Reflection! Как вы указали, попробуйте переопределить UDF, как показано в ссылке, которую вы отправили выше stackoverflow.com/questions/38848847/…. Я буду в отпуске и больше не смогу помочь, если проблема не исчезнет, ​​пожалуйста, задайте другой вопрос. Удачи

abiratsis 31.05.2019 10:23

Здравствуйте, я попытался переопределить UDF. Но я пока не могу найти идеального решения. Т.Т.

sproutee 08.06.2019 10:08

Если бы вы могли отправить эту часть кода, это было бы очень полезно для синхронизации.

abiratsis 08.06.2019 10:25

Другие вопросы по теме