У меня есть фрейм данных, например
scala> testDf.show()
+------+--------+---------+------------+----------------------------------------+
| id| item| value| value_name| condition|
+------+--------+---------+------------+----------------------------------------+
| 11| 3210| 0| OFF| value==0|
| 12| 3210| 1| OFF| value==0|
| 13| 3210| 0| OFF| value==0|
| 14| 3210| 0| OFF| value==0|
| 15| 3210| 1| OFF| value==0|
| 16| 5440| 5| ON| value>0 && value<10|
| 17| 5440| 0| ON| value>0 && value<10|
| 18| 5440| 6| ON| value>0 && value<10|
| 19| 5440| 7| ON| value>0 && value<10|
| 20| 5440| 0| ON| value>0 && value<10|
| 21| 7780| A| TYPE| Set("A","B").contains(value.toString)|
| 22| 7780| A| TYPE| Set("A","B").contains(value.toString)|
| 23| 7780| A| TYPE| Set("A","B").contains(value.toString)|
| 24| 7780| C| TYPE| Set("A","B").contains(value.toString)|
| 25| 7780| C| TYPE| Set("A","B").contains(value.toString)|
+------+--------+---------+------------+----------------------------------------+
scala> testDf.printSchema
root
|-- id: string (nullable = true)
|-- item: string (nullable = true)
|-- value: string (nullable = true)
|-- value_name: string (nullable = true)
|-- condition: string (nullable = true)
Я хочу удалить несколько строк со столбцом «условие». Но я в беде.
Я попытался с приведенным ниже тестовым кодом. Но, похоже, это не работает должным образом.
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.Row
import scala.collection.mutable
val encoder = RowEncoder(testDf.schema);
testDf.flatMap(row => {
val result = new mutable.MutableList[Row];
val setting_value = row.getAs[String]("setting_value").toInt
val condition = row.getAs[String]("condition").toBoolean
if (condition){
result+=row;
};
result;
})(encoder).show();
И это ошибка.
19/05/30 02:04:31 ERROR TaskSetManager: Task 0 in stage 267.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 267.0 failed 4 times, most recent failure: Lost task 0.3 in stage 267.0 (TID 3763, .compute.internal, executor 1): java.lang.IllegalArgumentException: For input string: "setting_value==0"
at scala.collection.immutable.StringLike$class.parseBoolean(StringLike.scala:291)
at scala.collection.immutable.StringLike$class.toBoolean(StringLike.scala:261)
at scala.collection.immutable.StringOps.toBoolean(StringOps.scala:29)
at $anonfun$1.apply(<console>:40)
at $anonfun$1.apply(<console>:37)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Я хочу сохранить строки, соответствующие значению столбца условия. Это желаемый результат.
+------+--------+---------+------------+----------------------------------------+
| id| item| value| value_name| condition|
+------+--------+---------+------------+----------------------------------------+
| 11| 3210| 0| OFF| value==0|
| 13| 3210| 0| OFF| value==0|
| 14| 3210| 0| OFF| value==0|
| 16| 5440| 5| ON| value>0 && value<10|
| 18| 5440| 6| ON| value>0 && value<10|
| 19| 5440| 7| ON| value>0 && value<10|
| 21| 7780| A| TYPE| Set("A","B").contains(value.toString)|
| 22| 7780| A| TYPE| Set("A","B").contains(value.toString)|
| 23| 7780| A| TYPE| Set("A","B").contains(value.toString)|
+------+--------+---------+------------+----------------------------------------+
Пожалуйста, помогите мне, если у вас есть хорошая идея. Спасибо.
В приведенном выше случае Spark пытается преобразовать значение String в логическое значение. Это не оценка самого выражения.
И оценка выражения должна выполняться пользователем с использованием внешней библиотеки или пользовательского кода.
Самый близкий (хотя и не точный сценарий), который я мог придумать, это
Как оценить математическое выражение, заданное в строковой форме? .
Вот один из способов использования scala отражение API с функцией UDF. udf обрабатывает оба случая для значений int и string:
import scala.reflect.runtime.currentMirror
import scala.tools.reflect.ToolBox
val tb = currentMirror.mkToolBox()
val df = Seq(("0","value==0"),
("1", "value==0"),
("6", """value>0 && value<10"""),
("7", """value>0 && value<10"""),
("0", """value>0 && value<10"""),
("A", """Set("A","B").contains(value.toString)"""),
("C", """Set("A","B").contains(value.toString)""")).toDF("value", "condition")
def isAllDigits(x: String) = x.forall(Character.isDigit)
val evalExpressionUDF = udf((value: String, expr: String) => {
val result = isAllDigits(value) match {
case true => tb.eval(tb.parse(expr.replace("value", s"""${value.toInt}""")))
case false => tb.eval(tb.parse(expr.replace("value", s""""${value}"""")))
}
result.asInstanceOf[Boolean]
})
df.withColumn("eval", evalExpressionUDF($"value", $"condition"))
.where($"eval" === true)
.show(false)
Чехлы для evalExpressionUDF
:
mkToolBox
""
, затем замените выражение строкой в двойных кавычках и выполните строковый кодВыход:
+-----+-------------------------------------+----+
|value| condition |eval|
+-----+-------------------------------------+----+
|0 |value==0 |true|
|6 |value>0 && value<10 |true|
|7 |value>0 && value<10 |true|
|A |Set("A","B").contains(value.toString)|true|
+-----+-------------------------------------+----+
PS: я знаю, что производительность вышеупомянутого решения может быть плохой, поскольку она вызывает отражение, хотя я не знаю альтернативы.
Большое спасибо. Ваш ответ очень полезен. Но я получаю ошибку ниже. scala> testDf.withColumn("eval", evalExpressionUDF($"value", $"condition")).show()
Ошибка: ` org.apache.spark.SparkException: задача не сериализуема Причина: java.io.NotSerializableException: scala.tools.reflect.ToolBoxFactory$ToolBoxImpl Стек сериализации: - объект не сериализуем (класс: scala.tools.reflect.ToolBoxFactory $ToolBoxImpl, значение: scala.tools.reflect.ToolBoxFactory$ToolBoxImpl@7341617c) `
Привет, какая у тебя версия Spark? и какая среда spark-shell
,databricks
?
В среде искрового кластера с несколькими узлами кажется, что возникает ошибка. Есть ли способ обойти это?
Я использую искровую оболочку. и моя версия искры 2.4.0. Версия Scala 2.11.12.
Я только что запустил его в локальном режиме, и я получаю ту же ошибку.
Мой testDF обрабатывается несколько раз из исходных данных. Если я объявлю образец данных с помощью val df = Seq
и выполню, ошибки не произойдет.
можешь попробовать добавить этот импорт import scala.reflect.runtime.universe._
?
поэтому, если вы запустите пример, как я сделал выше, он работает, но в вашем случае вы правильно выполняете больше функций с этим фреймом данных?
да. Я создаю testDf, объединяя некоторые исходные данные. Нет никакой другой функции, кроме запроса соединения с использованием «spark.sql».
Боюсь, я не могу помочь, не имея возможности воспроизвести его, хотя это, безусловно, связано с val tb = currentMirror.mkToolBox()
и тем фактом, что Spark не может сериализовать экземпляр ToolBoxImpl. Пожалуйста, попробуйте еще одно изменение, чтобы заменить предыдущую строку на import scala.reflect.runtime.universe val cm = universe.runtimeMirror(getClass.getClassLoader) val tb = cm.mkToolBox()
Я использовал Universe.runtimeMirror и получаю ту же ошибку. Я нашел другую статью, нужно ли переопределять udf Function1? stackoverflow.com/questions/38848847/…
да, это кажется разумным. Одним из решений может быть перемещение загрузчика классов внутри udf: val evalExpressionUDF = udf((value: String, expr: String) => { val cm = universe.runtimeMirror(getClass.getClassLoader) val tb = cm.mkToolBox() val result = isAllDigits(value) match { case true => tb.eval(tb.parse(expr.replace("value", s"""${value.toInt}"""))) case false => tb.eval(tb.parse(expr.replace("value", s""""${value}""""))) } result.asInstanceOf[Boolean] })
После перемещения загрузчика классов я получаю ту же ошибку. Но стек ошибок немного отличается. org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403) ... 51 elided Caused by: java.io.NotSerializableException: scala.reflect.runtime.JavaMirrors$JavaMirror Serialization stack: - object not serializable (class: scala.reflect.runtime.JavaMirrors$JavaMirror, value: JavaMirror with scala.tools.nsc.interpreter.IMain$TranslatingClassLoader@476c137b of type class scala.tools.nsc.interpreter
Хорошо, мы открыли врата ада вместе со Spark и Reflection! Как вы указали, попробуйте переопределить UDF, как показано в ссылке, которую вы отправили выше stackoverflow.com/questions/38848847/…. Я буду в отпуске и больше не смогу помочь, если проблема не исчезнет, пожалуйста, задайте другой вопрос. Удачи
Здравствуйте, я попытался переопределить UDF. Но я пока не могу найти идеального решения. Т.Т.
Если бы вы могли отправить эту часть кода, это было бы очень полезно для синхронизации.
Я пробовал использовать функцию eval в scala.tools.reflect.ToolBox. Но не работает...