Spark читает CSV с плохими записями

Я пытаюсь прочитать CSV-файл в Spark, используя заранее определенную схему. Для чего я использую:

df = (spark.read.format("csv")
        .schema(schema)
        .option("sep", ";")
        .load(
            file_path,
            header=True,
            encoding = "utf-8"))

В этом случае данные загружаются без проблем. Теперь, когда я указываю путь к неверным записям, я не получаю записей:

df = (
        spark.read.format("csv")
        .schema(schema)
        .option("sep", ";")
        .option(
            "badRecordsPath",
            bad_records_path,
        )
        .load(
            file_path,
            header=True,
            encoding = "utf-8",
        ))

Все записи сбрасываются в путь к поврежденным записям с ошибкой. MALFORMED_CSV_RECORD (SQLSTATE: KD000) хотя используемый schema точно такой же. Почему я получаю эту ошибку?

Не могли бы вы поделиться небольшим примером того, как выглядит CSV?

smurphy 11.07.2024 03:41

Итак, я подробно изучил данные и обнаружил, что каждая строка в CSV-файле заканчивается разделителем «:», и это, похоже, вызывает проблему. Есть ли способ прочитать такие файлы в искре?

Tarique 11.07.2024 07:08

df = sqlContext.read \ .format("com.databricks.spark.csv") \ .option("header", "true") \ .option("delimiter", ";") \ .option("mode" , "DROPMALFORMED") \ .option("charset", "UTF-8") \ .schema(schema) \ .load(final_file_path) df.display()

Dileep Raj Narayan Thumula 11.07.2024 09:29

@Tarique Можете ли вы попробовать приведенный выше код и сообщить мне, помог ли он вам?

Dileep Raj Narayan Thumula 11.07.2024 09:31
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
4
74
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Вы можете использовать .option("mode", "DROPMALFORMED"), чтобы пропустить плохие строки.

df = sqlContext.read \
    .format("com.databricks.spark.csv") \
    .option("header", "true") \
    .option("delimiter", ";") \
    .option("mode", "DROPMALFORMED") \
    .option("charset", "UTF-8") \
    .schema(schema) \
    .load(final_file_path)
df.display()

Полученные результаты:

name    age job
John    30  Developer
Jane    25  Designer

Код решения пропускает строки в CSV-файле, которые имеют неправильное количество разделителей или не соответствуют указанной схеме. Это предотвращает возникновение ошибок из этих строк в дальнейшем в коде, отфильтровывая их во время процесса первоначального чтения и анализа данных.

На самом деле мне нужны эти записи, поскольку в каждой строке есть дополнительный разделитель. Решение, которое, наконец, сработало для меня, состояло в том, чтобы добавить в конце схемы одно дополнительное поле, которое читалось как нулевое, а затем удалить его после чтения. Таким образом, схема проверяется, а подлинные плохие записи перемещаются в карантин.

Tarique 12.07.2024 07:16

Отлично, что вы решили проблему. не могли бы вы добавить сюда решение и принять его, чтобы оно могло быть полностью использовано сообществом.

Dileep Raj Narayan Thumula 12.07.2024 07:19
Ответ принят как подходящий

Решение, которое сработало для меня, заключалось в добавлении дополнительного случайного StructField в схему в конце (поскольку порядок полей имеет значение для CSV). Я использовал StringType, но подойдет любой тип. Spark попытается проанализировать дополнительный столбец после последнего разделителя в конце строки и заполнить этот столбец nulls. Исходные данные будут проверены предоставленной схемой, а плохие записи будут перемещены в карантин. После прочтения данных лишний столбец можно удалить.

Обновлено: добавление примера

Предположим, у нас есть файл csv со следующими данными:

col1;col2;col3;col4;col5;
3088548;"1166263";"something";"something";;

В приведенном выше файле у нас есть 5 столбцов с разделителем в конце. Если мы определим нашу схему всего с 5 столбцами и попытаемся прочитать файл, мы не получим никаких результатов, и все записи будут сброшены в badRecords с ошибкой MALFORMED:

Но если мы определим нашу схему с одним дополнительным случайным столбцом в конце, мы сможем читать файл, одновременно проверяя схему:

После прочтения мы можем удалить лишний столбец (col6).

Не могли бы вы поделиться примером кода этого решения?

smurphy 15.07.2024 21:20

@smurphy конечно, отредактировал ответ с примером

Tarique 16.07.2024 14:42

Другие вопросы по теме