Я пытаюсь прочитать файл схемы (который является текстовым файлом) и применить его к моему CSV-файлу без заголовка. Поскольку у меня уже есть файл схемы, я не хочу использовать опцию InferSchema, которая является накладными расходами.
Мой файл схемы ввода выглядит так, как показано ниже,
"num IntegerType","letter StringType"
Я пытаюсь создать файл схемы, приведенный ниже,
val schema_file = spark.read.textFile("D:\\Users\\Documents\\schemaFile.txt")
val struct_type = schema_file.flatMap(x => x.split(",")).map(b => (b.split(" ")(0).stripPrefix("\"").asInstanceOf[String],b.split(" ")(1).stripSuffix("\"").asInstanceOf[org.apache.spark.sql.types.DataType])).foreach(x=>println(x))
Я получаю сообщение об ошибке, как показано ниже
Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.types.DataType
- поле (класс: "org.apache.spark.sql.types.DataType", имя: "_2") - корневой класс: "scala.Tuple2"
и пытаемся использовать это как файл схемы при использовании spark.read.csv, как показано ниже, и записывать его как файл ORC
val df=spark.read
.format("org.apache.spark.csv")
.option("header", false)
.option("inferSchema", true)
.option("samplingRatio",0.01)
.option("nullValue", "NULL")
.option("delimiter","|")
.schema(schema_file)
.csv("D:\\Users\\sampleFile.txt")
.toDF().write.format("orc").save("D:\\Users\\ORC")
Нужна помощь в преобразовании текстового файла в файл схемы и преобразовании моего входного файла CSV в ORC.





Чтобы создать схему из файла text, создайте функцию для match и type и верните DataType как
def getType(raw: String): DataType = {
raw match {
case "ByteType" => ByteType
case "ShortType" => ShortType
case "IntegerType" => IntegerType
case "LongType" => LongType
case "FloatType" => FloatType
case "DoubleType" => DoubleType
case "BooleanType" => BooleanType
case "TimestampType" => TimestampType
case _ => StringType
}
}
Теперь создайте схему, прочитав файл схемы как
val schema = Source.fromFile("schema.txt").getLines().toList
.flatMap(_.split(",")).map(_.replaceAll("\"", "").split(" "))
.map(x => StructField(x(0), getType(x(1)), true))
Теперь прочтите файл csv как
spark.read
.option("samplingRatio", "0.01")
.option("delimiter", "|")
.option("nullValue", "NULL")
.schema(StructType(schema))
.csv("data.csv")
Надеюсь это поможет!
@BalakrishnanRamasamy Я думаю, что case "decimal(10,0)" => { val decimal = raw.split("(")(1).replace(")", "").split(",") DecimalType(decimal(0).toInt, decimal(1).toInt) } должен работать, но не тестировался.
Спасибо, @ShankarKoirala, основной проблемой для меня было создание корпуса match. Скажем, предположим, что файл схемы - balance decimal(10,0) amount decimal(20,1). Как в этом случае будет выглядеть мой регистр совпадений?
Я не уверен в этом, но соответствие первой части десятичного типа данных должно работать.
Что-то вроде этого немного более надежно, поскольку оно использует хранилище метаданных улья:
import org.apache.hadoop.hive.metastore.api.FieldSchema
def sparkToHiveSchema(schema: StructType): List[FieldSchema] = {
schema.map(field => new FieldSchema(field.name,field.dataType.catalogString,field.getComment.getOrElse(""))).toList
}
``
Вы можете указать схему следующим образом:
import org.apache.spark.sql.types.{StructType, StructField, StringType,IntegerType};
Например:
val schema = new StructType(
Array(
StructField("Age",IntegerType,true),
StructField("Name",StringType,true),
)
)
val data = spark.read.option("header", "false").schema(schema).csv("filename.csv")
data.show()
Это напрямую создаст его в фрейме данных
Вы можете создать файл JSON с именем schema.json в формате ниже.
{
"fields": [
{
"metadata": {},
"name": "first_fields",
"nullable": true,
"type": "string"
},
{
"metadata": {},
"name": "double_field",
"nullable": true,
"type": "double"
}
],
"type": "struct"
}
Создайте схему структуры из чтения этого файла
rdd = spark.sparkContext.wholeTextFiles("s3://<bucket>/schema.json")
text = rdd.collect()[0][1]
dict = json.loads(str(text))
custom_schema = StructType.fromJson(dict)
После этого вы можете использовать структуру в качестве схемы для чтения файла csv.
val df=spark.read
.format("org.apache.spark.csv")
.option("header", false)
.option("inferSchema", true)
.option("samplingRatio",0.01)
.option("nullValue", "NULL")
.option("delimiter","|")
.schema(custom_schema)
.csv("D:\\Users\\sampleFile.txt")
.toDF().write.format("orc").save("D:\\Users\\ORC")
Спасибо @ShankarKiorala Если, схема ввода похожа на
balance decimal(10,0). Как я могу обновить код в функции сопоставления? Поскольку десятичное число является обычным явлением, но прецессия будет отличаться для разных полей, что-то вродеcase "decimal(10,0)" => DecimalType(10,0)