Обнуляемость в схемах Spark sql по умолчанию является рекомендательной. Каков наилучший способ строго соблюдать его?

Я работаю над простым проектом ETL, который читает файлы CSV, выполняет некоторые изменения в каждом столбце, а затем записывает результат в формате JSON. Я хотел бы, чтобы последующие процессы читали мои результаты чтобы быть уверенным, что мой вывод соответствует согласованная схема, но моя проблема в том, что даже если я определяю моя схема ввода с nullable=false для всех полей, нули могут прокрасться в и повредить мои выходные файлы, и, похоже, нет (эффективного) способа, которым я могу заставить Spark применять «не нуль» для моих полей ввода.

Это похоже на функцию, как указано ниже в Spark, The Definitive Guide:

when you define a schema where all columns are declared to not have null values , Spark will not enforce that and will happily let null values into that column. The nullable signal is simply to help Spark SQL optimize for handling that column. If you have null values in columns that should not have null values, you can get an incorrect result or see strange exceptions that can be hard to debug.

Я написал небольшую утилиту проверки, чтобы просмотреть каждую строку фрейма данных и вызвать ошибку, если в любом из столбцов (на любом уровне вложенность, в случае полей или подполей, таких как карта, структура или массив.)

Мне интересно, в частности: Я ПОВТОРНО ИЗОБРЕТАЛ КОЛЕСО С ЭТОЙ ПРОВЕРКОЙ? Существуют ли какие-либо существующие библиотеки или Методы Spark, которые сделают это для меня (в идеале лучше, чем то, что я реализовал)?

Утилита проверки и упрощенная версия моего пайплайна показаны ниже. В представленном виде вызов утилита проверки закомментирована. Если вы запустите без включенной утилиты проверки, вы увидите этот результат в /tmp/output.csv.

cat /tmp/output.json/*
(one + 1),(two + 1)
3,4
"",5

Вторая строка после заголовка должна быть числом, но это пустая строка (именно так spark записывает нулевое значение, я думаю.) Этот вывод был бы проблематичным для нижестоящие компоненты, которые читают вывод моего задания ETL: этим компонентам нужны только целые числа.

Теперь я могу включить проверку, раскомментировав строку

   //checkNulls(inDf)

Когда я это делаю, я получаю исключение, которое сообщает мне о недопустимом нулевом значении и печатает из всей оскорбительной строки, например:

        java.lang.RuntimeException: found null column value in row: [null,4]

Один из возможных альтернативных подходов, приведенный в Spark/Definitive Guide

Spark, The Definitive Guide упоминает возможность сделать это:

<dataframe>.na.drop() 

Но это (насколько мне известно) молча отбрасывает плохие записи, а не помечает плохие. Затем я мог бы сделать «установить вычитание» на входе до и после падения, но это похоже на тяжелый удар по производительности, чтобы узнать, что является нулевым, а что нет. На первый взгляд, я бы предпочитаю мой метод .... Но мне все еще интересно, может ли быть какой-то лучший способ. Полный код приведен ниже. Спасибо !

package org

import java.io.PrintWriter
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.types._

// before running, do; rm -rf /tmp/out* /tmp/foo*
object SchemaCheckFailsToExcludeInvalidNullValue extends App {

  import NullCheckMethods._

  //val input = "2,3\n\"xxx\",4"          // this will be dropped as malformed
  val input = "2,3\n,4"                   // BUT.. this will be let through

  new PrintWriter("/tmp/foo.csv") { write(input); close }

  lazy val sparkConf = new SparkConf()
    .setAppName("Learn Spark")
    .setMaster("local[*]")
  lazy val sparkSession = SparkSession
    .builder()
    .config(sparkConf)
    .getOrCreate()
  val spark = sparkSession

  val schema = new StructType(
    Array(
      StructField("one", IntegerType, nullable = false),
      StructField("two", IntegerType, nullable = false)
    )
  )

  val inDf: DataFrame =
    spark.
      read.
      option("header", "false").
      option("mode", "dropMalformed").
      schema(schema).
      csv("/tmp/foo.csv")

  //checkNulls(inDf)

  val plusOneDf = inDf.selectExpr("one+1", "two+1")
  plusOneDf.show()

  plusOneDf.
    write.
    option("header", "true").
    csv("/tmp/output.csv")

}

object NullCheckMethods extends Serializable {

  def checkNull(columnValue: Any): Unit = {
    if (columnValue == null)
      throw new RuntimeException("got null")
    columnValue match {
      case item: Seq[_] =>
        item.foreach(checkNull)
      case item: Map[_, _] =>
        item.values.foreach(checkNull)
      case item: Row =>
        item.toSeq.foreach {
          checkNull
        }
      case default =>
        println(
          s"bad object [ $default ] of type: ${default.getClass.getName}")
    }
  }

  def checkNulls(row: Row): Unit = {
    try {
      row.toSeq.foreach {
        checkNull
      }
    } catch {
      case err: Throwable =>
        throw new RuntimeException(
          s"found null column value in row: ${row}")
    }
  }


  def checkNulls(df: DataFrame): Unit = {
    df.foreach { row => checkNulls(row) }
  }
}
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
2
0
842
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вы можете использовать встроенный метод Row любойNull для разделения кадра данных и обработки обоих разделений по-разному:

val plusOneNoNulls = plusOneDf.filter(!_.anyNull)
val plusOneWithNulls = plusOneDf.filter(_.anyNull)

Если вы не планируете иметь процесс ручной обработки нулей, использование встроенных методов DataFrame.na проще, поскольку они уже реализуют все обычные способы автоматической обработки нулей (т. е. удаление или заполнение их значениями по умолчанию).

идеально ! намного лучше, чем мой хоки, доморощенный подход. спасибо!

Chris Bedford 15.05.2019 02:39

Другие вопросы по теме