Я пытаюсь прочитать CSV-файл в Spark, используя заранее определенную схему. Для чего я использую:
df = (spark.read.format("csv")
.schema(schema)
.option("sep", ";")
.load(
file_path,
header=True,
encoding = "utf-8"))
В этом случае данные загружаются без проблем. Теперь, когда я указываю путь к неверным записям, я не получаю записей:
df = (
spark.read.format("csv")
.schema(schema)
.option("sep", ";")
.option(
"badRecordsPath",
bad_records_path,
)
.load(
file_path,
header=True,
encoding = "utf-8",
))
Все записи сбрасываются в путь к поврежденным записям с ошибкой. MALFORMED_CSV_RECORD (SQLSTATE: KD000)
хотя используемый schema
точно такой же. Почему я получаю эту ошибку?
Итак, я подробно изучил данные и обнаружил, что каждая строка в CSV-файле заканчивается разделителем «:», и это, похоже, вызывает проблему. Есть ли способ прочитать такие файлы в искре?
df = sqlContext.read \ .format("com.databricks.spark.csv") \ .option("header", "true") \ .option("delimiter", ";") \ .option("mode" , "DROPMALFORMED") \ .option("charset", "UTF-8") \ .schema(schema) \ .load(final_file_path) df.display()
@Tarique Можете ли вы попробовать приведенный выше код и сообщить мне, помог ли он вам?
Вы можете использовать .option("mode", "DROPMALFORMED")
, чтобы пропустить плохие строки.
df = sqlContext.read \
.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("delimiter", ";") \
.option("mode", "DROPMALFORMED") \
.option("charset", "UTF-8") \
.schema(schema) \
.load(final_file_path)
df.display()
Полученные результаты:
name age job
John 30 Developer
Jane 25 Designer
Код решения пропускает строки в CSV-файле, которые имеют неправильное количество разделителей или не соответствуют указанной схеме. Это предотвращает возникновение ошибок из этих строк в дальнейшем в коде, отфильтровывая их во время процесса первоначального чтения и анализа данных.
На самом деле мне нужны эти записи, поскольку в каждой строке есть дополнительный разделитель. Решение, которое, наконец, сработало для меня, состояло в том, чтобы добавить в конце схемы одно дополнительное поле, которое читалось как нулевое, а затем удалить его после чтения. Таким образом, схема проверяется, а подлинные плохие записи перемещаются в карантин.
Отлично, что вы решили проблему. не могли бы вы добавить сюда решение и принять его, чтобы оно могло быть полностью использовано сообществом.
Решение, которое сработало для меня, заключалось в добавлении дополнительного случайного StructField
в схему в конце (поскольку порядок полей имеет значение для CSV). Я использовал StringType
, но подойдет любой тип. Spark попытается проанализировать дополнительный столбец после последнего разделителя в конце строки и заполнить этот столбец nulls
. Исходные данные будут проверены предоставленной схемой, а плохие записи будут перемещены в карантин. После прочтения данных лишний столбец можно удалить.
Обновлено: добавление примера
Предположим, у нас есть файл csv со следующими данными:
col1;col2;col3;col4;col5;
3088548;"1166263";"something";"something";;
В приведенном выше файле у нас есть 5 столбцов с разделителем в конце. Если мы определим нашу схему всего с 5 столбцами и попытаемся прочитать файл, мы не получим никаких результатов, и все записи будут сброшены в badRecords с ошибкой MALFORMED:
Но если мы определим нашу схему с одним дополнительным случайным столбцом в конце, мы сможем читать файл, одновременно проверяя схему:
После прочтения мы можем удалить лишний столбец (col6).
Не могли бы вы поделиться примером кода этого решения?
@smurphy конечно, отредактировал ответ с примером
Не могли бы вы поделиться небольшим примером того, как выглядит CSV?