Пытаясь записать DataFrame в HDFS, я столкнулся со следующей проблемой:
org.apache.spark.sql.AnalysisException: Attribute name " "someName1"" contains invalid character(s) among " ,;{}()\n\t = ". Please use alias to rename it.;
Источником для DataFrame является csv-файл:
"value_hash", "someName1"
79000000000, name1
79000000000, name2
Этот csv прочитан:
val dataFrame = SparkSession.read.option("header", "true").csv(path)
Затем я выбираю и привожу типы для этого dataFrame:
def castColumn(colName: String, colType: String): Column = col(colName).cast(DataType.fromJson(colType))
val featureColumns: Seq[(PathString, String)] = dataFrame.columns.tail.map(f=>(f, "\"string\"")).toSeq
val columns = (schema ++ featureColumns).
map { case (colName, colType) => castColumn(colName, colType) }
dataFrame.select(columns.toSeq: _*)
где схема имеет тип: Map[String, String] и имеет значение по умолчанию Map("value_hash" -> ""string"")
В этом коде я добавляю схему featureColumns к схеме по умолчанию (поскольку я не знал полной исходной схемы - она создавалась динамически после чтения csv из hdfs).
Затем я пытаюсь записать этот DataFrame в hdfs-path:
dataFrame
.repartition(1)
.write
.parquet(outputPath)
Следуя рекомендации в сообщении об ошибке, я попытался использовать псевдоним для каждого столбца, поэтому:
filteredRemove.columns.foreach{
ca=>filteredRemove.select(col(ca).alias(ca)))
}
и так:
filteredRemove.columns.foreach{
ca=>filteredRemove.select(col(ca).alias(ca)).show()
}
Но результат был тот же: org.apache.spark.sql.AnalysisException...
Замечает пробелы и кавычки в ""someName1"", я также пытался очистить псевдоним от этого:
filteredRemove.columns.foreach{
ca=>filteredRemove.select(col(ca).alias(ca.trim.substring(0,ca.length-1))).show()
}
Но без какого-либо результата. Я все еще сталкиваюсь с исключением, которое показано выше.
Что я делаю не так?
Я попытался удалить это пространство с помощью String.trim(). Но без результата. Я показал эту попытку в своем посте
Пожалуйста, добавьте воспроизводимый код с образцом
Дэвид, я обновил вопрос с образцом
В посте беспорядок и отсутствует код. Пожалуйста, отредактируйте его, чтобы он отражал согласованный поток, который приводит к исключению (исходное, обратите внимание, в результате других попыток)
Дэвид, я обновляю свой вопрос с полным рабочим процессом
Не совсем то, что я имел в виду. Смотрите мой ответ, который включает в себя создание проблемы и решение. Дайте мне минуту, и я также продемонстрирую переименование столбца Suck.





Установите для ignoreLeadingWhiteSpace значение true, чтобы устранить проблему уже во время загрузки.
Исходный код
val df_original = spark.read.option("header",true).csv(path)
println((for(c <- df_original.columns) yield s"`$c`").mkString(","))
`value_hash`,` "someName1"`
Способы переименования столбца после загрузки
val df_renamed_1 = df_original.withColumnRenamed(" \"someName1\"", "someName1")
println((for(c <- df_renamed_1.columns) yield s"`$c`").mkString(","))
`value_hash`,`someName1`
val df_renamed_2 = df_original.withColumnRenamed(""" "someName1"""", "someName1")
println((for(c <- df_renamed_2.columns) yield s"`$c`").mkString(","))
`value_hash`,`someName1`
Способы избежать проблемы во время загрузки (включая option("ignoreLeadingWhiteSpace",true))
val df_fixed = spark.read.option("header",true).option("ignoreLeadingWhiteSpace",true).csv(path)
println((for(c <- df_fixed.columns) yield s"`$c`").mkString(","))
`value_hash`,`someName1`
Я попробую. Через 10 минут скажу результат
Похоже, что
"someName1"имеет лидирующее место.