Чтение json-файла с Corrupt_Record в Spark Java

Я работаю с искровым java-приложением с искровой версией 2.7. Я пытаюсь загрузить многострочный файл JSON, который мог содержать поврежденные записи в соответствии с моей схемой. Я передаю схему при ее загрузке, но проблема в том, что она отклоняет весь файл как одну поврежденную запись, даже если есть один объект JSON, который не удовлетворяет схеме, которую я предоставляю.

Мой файл Json выглядит примерно так:

[
{Json_object},
{Json_object},
{Json_object}
]

Я вручную создал для него схему (StructType) и загрузил ее так:

Dataset<Row> df = spark.read().option("multiline", "true").option("mode","PERMISSIVE").option("columnNameOfCorruptRecord","_corrupt_record").schema(schema).json("filepath");

Проблема в том, что даже если один объект JSON не соответствует схеме, например, если attribute1 в моей схеме имеет целочисленный тип и находится в форме строки для одного из объектов json, тогда объект json должен попасть внутрь поврежденной_записи, т.е. я получаю что-то вроде-

+------------+---------------+---------------+
| attribute1 |   attribute2  |_corrupt_record|
+------------+---------------+---------------+
|    null    |     null      |             [{|
|            |               | all_json_obj  |
|            |               |          ...  |
|            |               |         }]    |
+------------+---------------+---------------+

И он отлично работает с типичными однострочными json-объектами, где символ новой строки '\n' используется в качестве разделителя, никаких проблем с этим не возникает и идеальные результаты. Может кто-нибудь сказать мне, что мне здесь не хватает?

PS: вопрос не ограничивается искрой java, поведение одинаково и в scala, и в python.

Как настроить Tailwind CSS с React.js и Next.js?
Как настроить Tailwind CSS с React.js и Next.js?
Tailwind CSS - единственный фреймворк, который, как я убедился, масштабируется в больших командах. Он легко настраивается, адаптируется к любому...
LeetCode запись решения 2536. Увеличение подматриц на единицу
LeetCode запись решения 2536. Увеличение подматриц на единицу
Увеличение подматриц на единицу - LeetCode
Переключение светлых/темных тем
Переключение светлых/темных тем
В Microsoft Training - Guided Project - Build a simple website with web pages, CSS files and JavaScript files, мы объясняем, как CSS можно...
Отношения &quot;многие ко многим&quot; в Laravel с методами присоединения и отсоединения
Отношения &quot;многие ко многим&quot; в Laravel с методами присоединения и отсоединения
Отношения "многие ко многим" в Laravel могут быть немного сложными, но с помощью Eloquent ORM и его моделей мы можем сделать это с легкостью. В этой...
В PHP
В PHP
В большой кодовой базе с множеством различных компонентов классы, функции и константы могут иметь одинаковые имена. Это может привести к путанице и...
Карта дорог Беладжар PHP Laravel
Карта дорог Беладжар PHP Laravel
Laravel - это PHP-фреймворк, разработанный для облегчения разработки веб-приложений. Laravel предоставляет различные функции, упрощающие разработку...
3
0
82
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Глядя на ваш вывод, который я собираюсь воспроизвести здесь:

+------------+---------------+---------------+
| attribute1 |   attribute2  |_corrupt_record|
+------------+---------------+---------------+
|    null    |     null      |             [{|
|            |               | all_json_obj  |
|            |               |          ...  |
|            |               |         }]    |
+------------+---------------+---------------+

Если вы посмотрите на первую и последнюю строку, вы увидите, что Corrupt_Records — это [{ и }]. Это заставляет меня думать, что, возможно, этих символов { и } там быть не должно. Возможно ли, что ваш файл json на самом деле выглядит примерно так:

[{
{Json_object},
{Json_object},
{Json_object}
}]

Если это так, то эти {} фигурные скобки прямо между [] квадратными скобками самого высокого уровня создадут впечатление, что массив самого высокого уровня содержит только 1 объект с неправильной схемой. Если это так, не могли бы вы попытаться удалить эти фигурные скобки прямо между квадратными скобками вашего массива?

Просто чтобы дать вам работающий пример, рассмотрим следующий файл json:

[
    {
        "id": 1,
        "object": {
            "val1": "thisValue",
            "val2": "otherValue"
        }
    },
    {
        "id": 2,
        "object": {
            "val1": "hehe",
            "val2": "test"
        }
    },
    {
        "id": 3,
        "object": {
            "val1": "yes",
            "val2": "no"
        }
    }
]

Чтение этого json-файла в искровой оболочке (искровая версия 2.4.5) с помощью следующей команды:

Val df = spark.read.option("multiline", "true").json("test.json")

Дает мне следующий вывод:

scala> df.show(false)
+---+-----------------------+
|id |object                 |
+---+-----------------------+
|1  |[thisValue, otherValue]|
|2  |[hehe, test]           |
|3  |[yes, no]              |
+---+-----------------------+


scala> df.printSchema
root
 |-- id: long (nullable = true)
 |-- object: struct (nullable = true)
 |    |-- val1: string (nullable = true)
 |    |-- val2: string (nullable = true)

Это всего лишь крошечный пример, чтобы дать вам что-то работающее.

Но взгляните на эти строки [{ и }] в вашем поврежденном фрейме данных!

Надеюсь, это поможет :)

Спасибо за помощь, но нет. у меня нет дополнительных фигурных скобок на корневом уровне внутри массива, которые сделали бы его единым объектом. У меня есть аналогичный файл json, как вы показали в рабочем примере.

Prateek Gautam 21.11.2022 05:57

О, правда, это интересно! Я кое-что узнал из ответа M_S :) Может быть, тогда вы можете попытаться избежать многострочного вывода? Вы можете преобразовать свой json (используя jq, например: Programminghistorian.org/en/lessons/json-and-jq), используя «Компактный вывод» и прочитать в своем json без многострочного варианта?

Koedlt 21.11.2022 09:23
Ответ принят как подходящий

Боюсь, что это не сработает, по крайней мере, с текущей версией Spark.

Я не коммиттер Spark, но я провел расследование и вот что нашел. Я не уверен, что это правда на 100%, но, возможно, это будет полезно для вас (по крайней мере, в качестве отправной точки для дальнейшего изучения).

Я копался в коде Spark и обнаружил, что существует большая разница между многострочным и стандартным файлом:

  • С многострочным значением false Spark использует TextInputJsonDataSource для чтения этого файла, здесь вы можете увидеть, как операция чтения выглядит в коде Исходный код Spark:

    override def readFile(
        conf: Configuration,
        file: PartitionedFile,
        parser: JacksonParser,
        schema: StructType): Iterator[InternalRow] = {
      val linesReader = new HadoopFileLinesReader(file, parser.options.lineSeparatorInRead, conf)
      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => linesReader.close()))
      val textParser = parser.options.encoding
        .map(enc => CreateJacksonParser.text(enc, _: JsonFactory, _: Text))
        .getOrElse(CreateJacksonParser.text(_: JsonFactory, _: Text))
    
      val safeParser = new FailureSafeParser[Text](
        input => parser.parse(input, textParser, textToUTF8String),
        parser.options.parseMode,
        schema,
        parser.options.columnNameOfCorruptRecord)
      linesReader.flatMap(safeParser.parse)
    }
    

Здесь мы видим, что Spark читает файл построчно, а затем вызывает flatMap для обработки каждой строки с помощью синтаксического анализатора, поэтому позже легко найти искаженную запись и сгенерировать для них _corrupt_record.

Когда вы устанавливаете для многострочного параметра значение true, Spark будет использовать MultiLineJsonDataSource (спойлер — ранее он назывался WholeFileJsonDataSource). Здесь вы можете найти функцию для чтения данных: Исходный код Spark

  override def readFile(
      conf: Configuration,
      file: PartitionedFile,
      parser: JacksonParser,
      schema: StructType): Iterator[InternalRow] = {
    def partitionedFileString(ignored: Any): UTF8String = {
      Utils.tryWithResource {
        CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath)))
      } { inputStream =>
        UTF8String.fromBytes(ByteStreams.toByteArray(inputStream))
      }
    }
    val streamParser = parser.options.encoding
      .map(enc => CreateJacksonParser.inputStream(enc, _: JsonFactory, _: InputStream))
      .getOrElse(CreateJacksonParser.inputStream(_: JsonFactory, _: InputStream))

    val safeParser = new FailureSafeParser[InputStream](
      input => parser.parse[InputStream](input, streamParser, partitionedFileString),
      parser.options.parseMode,
      schema,
      parser.options.columnNameOfCorruptRecord)

    safeParser.parse(
      CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath))))
  }

Теперь давайте взглянем на JsonParser и его общий анализ функций: Исходный код Spark

  def parse[T](
      record: T,
      createParser: (JsonFactory, T) => JsonParser,
      recordLiteral: T => UTF8String): Iterable[InternalRow] = {
    try {
      Utils.tryWithResource(createParser(factory, record)) { parser =>
        // a null first token is equivalent to testing for input.trim.isEmpty
        // but it works on any token stream and not just strings
        parser.nextToken() match {
          case null => None
          case _ => rootConverter.apply(parser) match {
            case null => throw QueryExecutionErrors.rootConverterReturnNullError()
            case rows => rows.toSeq
          }
        }
      }
    } catch {
      case e: SparkUpgradeException => throw e
      case e @ (_: RuntimeException | _: JsonProcessingException | _: MalformedInputException) =>
        // JSON parser currently doesnt support partial results for corrupted records.
        // For such records, all fields other than the field configured by
        // `columnNameOfCorruptRecord` are set to `null`
        throw BadRecordException(() => recordLiteral(record), () => None, e)
      case e: CharConversionException if options.encoding.isEmpty =>
        val msg =
          """JSON parser cannot handle a character in its input.
            |Specifying encoding as an input option explicitly might help to resolve the issue.
            |""".stripMargin + e.getMessage
        val wrappedCharException = new CharConversionException(msg)
        wrappedCharException.initCause(e)
        throw BadRecordException(() => recordLiteral(record), () => None, wrappedCharException)
      case PartialResultException(row, cause) =>
        throw BadRecordException(
          record = () => recordLiteral(record),
          partialResult = () => Some(row),
          cause)
    }
  }

Здесь вы можете видеть, что Json генерирует не PartialResultException, а, вероятно, одно из этих двух: JsonProcessingException | MalformedInputException

Из-за этого этот код выдает это исключение: BadRecordException(() => recordLiteral(record), () => None, e) где запись = наш поток = весь файл.

Это исключение позже интерпретируется FailureSafeParser, который генерирует для вас выходные строки и просто копирует данные в _corrupt_record.

В общем, я пытался найти информацию в коммитах и ​​Jira, но я думаю, что эта тема - настоящий беспорядок. Я нашел начальную фиксацию, которая добавила эту функциональность с этим сообщением:

[SPARK-18352][SQL] Support parsing multiline json files

## What changes were proposed in this pull request?

If a new option `wholeFile` is set to `true` the JSON reader will parse each file (instead of a single line) as a value. This is done with Jackson streaming and it should be capable of parsing very large documents, assuming the row will fit in memory.

Because the file is not buffered in memory the corrupt record handling is also slightly different when `wholeFile` is enabled: the corrupt column will contain the filename instead of the literal JSON if there is a parsing failure. It would be easy to extend this to add the parser location (line, column and byte offsets) to the output if desired.

«поврежденный столбец будет содержать имя файла вместо буквального JSON, если произойдет синтаксический анализ» - похоже, это изменилось позже (на самом деле у вас есть буквальный Json в этом столбце), но я думаю, что общий подход тот же.

Итак, возвращаясь к вопросам: «Я хочу знать, является ли это преднамеренным поведением или просто ошибкой!» - я думаю, что это не ошибка и не предполагаемое поведение, а следствие того, как изначально был реализован синтаксический анализатор Джексона, и на данный момент мы должны жить с этим

Спасибо за такой подробный ответ. Также я думаю, что они должны работать над этим, потому что это лишает смысла чтение многострочных файлов json. После этого я начну искать исходный код. Награжу репутацией за награду, как только переполнение стека позволит мне это сделать.

Prateek Gautam 21.11.2022 07:03

Другие вопросы по теме