Я работаю над простым проектом ETL, который читает файлы CSV, выполняет некоторые изменения в каждом столбце, а затем записывает результат в формате JSON. Я хотел бы, чтобы последующие процессы читали мои результаты чтобы быть уверенным, что мой вывод соответствует согласованная схема, но моя проблема в том, что даже если я определяю моя схема ввода с nullable=false для всех полей, нули могут прокрасться в и повредить мои выходные файлы, и, похоже, нет (эффективного) способа, которым я могу заставить Spark применять «не нуль» для моих полей ввода.
Это похоже на функцию, как указано ниже в Spark, The Definitive Guide:
when you define a schema where all columns are declared to not have null values , Spark will not enforce that and will happily let null values into that column. The nullable signal is simply to help Spark SQL optimize for handling that column. If you have null values in columns that should not have null values, you can get an incorrect result or see strange exceptions that can be hard to debug.
Я написал небольшую утилиту проверки, чтобы просмотреть каждую строку фрейма данных и вызвать ошибку, если в любом из столбцов (на любом уровне вложенность, в случае полей или подполей, таких как карта, структура или массив.)
Мне интересно, в частности: Я ПОВТОРНО ИЗОБРЕТАЛ КОЛЕСО С ЭТОЙ ПРОВЕРКОЙ? Существуют ли какие-либо существующие библиотеки или Методы Spark, которые сделают это для меня (в идеале лучше, чем то, что я реализовал)?
Утилита проверки и упрощенная версия моего пайплайна показаны ниже. В представленном виде вызов утилита проверки закомментирована. Если вы запустите без включенной утилиты проверки, вы увидите этот результат в /tmp/output.csv.
cat /tmp/output.json/*
(one + 1),(two + 1)
3,4
"",5
Вторая строка после заголовка должна быть числом, но это пустая строка (именно так spark записывает нулевое значение, я думаю.) Этот вывод был бы проблематичным для нижестоящие компоненты, которые читают вывод моего задания ETL: этим компонентам нужны только целые числа.
Теперь я могу включить проверку, раскомментировав строку
//checkNulls(inDf)
Когда я это делаю, я получаю исключение, которое сообщает мне о недопустимом нулевом значении и печатает из всей оскорбительной строки, например:
java.lang.RuntimeException: found null column value in row: [null,4]
Один из возможных альтернативных подходов, приведенный в Spark/Definitive Guide
Spark, The Definitive Guide упоминает возможность сделать это:
<dataframe>.na.drop()
Но это (насколько мне известно) молча отбрасывает плохие записи, а не помечает плохие. Затем я мог бы сделать «установить вычитание» на входе до и после падения, но это похоже на тяжелый удар по производительности, чтобы узнать, что является нулевым, а что нет. На первый взгляд, я бы предпочитаю мой метод .... Но мне все еще интересно, может ли быть какой-то лучший способ. Полный код приведен ниже. Спасибо !
package org
import java.io.PrintWriter
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.types._
// before running, do; rm -rf /tmp/out* /tmp/foo*
object SchemaCheckFailsToExcludeInvalidNullValue extends App {
import NullCheckMethods._
//val input = "2,3\n\"xxx\",4" // this will be dropped as malformed
val input = "2,3\n,4" // BUT.. this will be let through
new PrintWriter("/tmp/foo.csv") { write(input); close }
lazy val sparkConf = new SparkConf()
.setAppName("Learn Spark")
.setMaster("local[*]")
lazy val sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
val spark = sparkSession
val schema = new StructType(
Array(
StructField("one", IntegerType, nullable = false),
StructField("two", IntegerType, nullable = false)
)
)
val inDf: DataFrame =
spark.
read.
option("header", "false").
option("mode", "dropMalformed").
schema(schema).
csv("/tmp/foo.csv")
//checkNulls(inDf)
val plusOneDf = inDf.selectExpr("one+1", "two+1")
plusOneDf.show()
plusOneDf.
write.
option("header", "true").
csv("/tmp/output.csv")
}
object NullCheckMethods extends Serializable {
def checkNull(columnValue: Any): Unit = {
if (columnValue == null)
throw new RuntimeException("got null")
columnValue match {
case item: Seq[_] =>
item.foreach(checkNull)
case item: Map[_, _] =>
item.values.foreach(checkNull)
case item: Row =>
item.toSeq.foreach {
checkNull
}
case default =>
println(
s"bad object [ $default ] of type: ${default.getClass.getName}")
}
}
def checkNulls(row: Row): Unit = {
try {
row.toSeq.foreach {
checkNull
}
} catch {
case err: Throwable =>
throw new RuntimeException(
s"found null column value in row: ${row}")
}
}
def checkNulls(df: DataFrame): Unit = {
df.foreach { row => checkNulls(row) }
}
}





Вы можете использовать встроенный метод Row любойNull для разделения кадра данных и обработки обоих разделений по-разному:
val plusOneNoNulls = plusOneDf.filter(!_.anyNull)
val plusOneWithNulls = plusOneDf.filter(_.anyNull)
Если вы не планируете иметь процесс ручной обработки нулей, использование встроенных методов DataFrame.na проще, поскольку они уже реализуют все обычные способы автоматической обработки нулей (т. е. удаление или заполнение их значениями по умолчанию).
идеально ! намного лучше, чем мой хоки, доморощенный подход. спасибо!