Как создать файл схемы в Spark

Я пытаюсь прочитать файл схемы (который является текстовым файлом) и применить его к моему CSV-файлу без заголовка. Поскольку у меня уже есть файл схемы, я не хочу использовать опцию InferSchema, которая является накладными расходами.

Мой файл схемы ввода выглядит так, как показано ниже,

"num IntegerType","letter StringType"

Я пытаюсь создать файл схемы, приведенный ниже,

val schema_file = spark.read.textFile("D:\\Users\\Documents\\schemaFile.txt")
val struct_type = schema_file.flatMap(x => x.split(",")).map(b => (b.split(" ")(0).stripPrefix("\"").asInstanceOf[String],b.split(" ")(1).stripSuffix("\"").asInstanceOf[org.apache.spark.sql.types.DataType])).foreach(x=>println(x))

Я получаю сообщение об ошибке, как показано ниже

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.types.DataType

- поле (класс: "org.apache.spark.sql.types.DataType", имя: "_2") - корневой класс: "scala.Tuple2"

и пытаемся использовать это как файл схемы при использовании spark.read.csv, как показано ниже, и записывать его как файл ORC

  val df=spark.read
      .format("org.apache.spark.csv")
      .option("header", false)
      .option("inferSchema", true)
      .option("samplingRatio",0.01)
      .option("nullValue", "NULL")
      .option("delimiter","|")
      .schema(schema_file)
      .csv("D:\\Users\\sampleFile.txt")
      .toDF().write.format("orc").save("D:\\Users\\ORC")

Нужна помощь в преобразовании текстового файла в файл схемы и преобразовании моего входного файла CSV в ORC.

Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
5
0
4 195
4
Перейти к ответу Данный вопрос помечен как решенный

Ответы 4

Ответ принят как подходящий

Чтобы создать схему из файла text, создайте функцию для match и type и верните DataType как

def getType(raw: String): DataType = {
  raw match {
    case "ByteType" => ByteType
    case "ShortType" => ShortType
    case "IntegerType" => IntegerType
    case "LongType" => LongType
    case "FloatType" => FloatType
    case "DoubleType" => DoubleType
    case "BooleanType" => BooleanType
    case "TimestampType" => TimestampType
    case _ => StringType
  }
}

Теперь создайте схему, прочитав файл схемы как

val schema = Source.fromFile("schema.txt").getLines().toList
  .flatMap(_.split(",")).map(_.replaceAll("\"", "").split(" "))
  .map(x => StructField(x(0), getType(x(1)), true))

Теперь прочтите файл csv как

spark.read
  .option("samplingRatio", "0.01")
  .option("delimiter", "|")
  .option("nullValue", "NULL")
  .schema(StructType(schema))
  .csv("data.csv")

Надеюсь это поможет!

Спасибо @ShankarKiorala Если, схема ввода похожа на balance decimal(10,0). Как я могу обновить код в функции сопоставления? Поскольку десятичное число является обычным явлением, но прецессия будет отличаться для разных полей, что-то вроде case "decimal(10,0)" => DecimalType(10,0)

Gladiator 24.05.2018 10:47

@BalakrishnanRamasamy Я думаю, что case "decimal(10,0)" => { val decimal = raw.split("(")(1).replace(")", "").split(",") DecimalType(decimal(0).toInt, decimal(1).toInt) } должен работать, но не тестировался.

koiralo 24.05.2018 10:53

Спасибо, @ShankarKoirala, основной проблемой для меня было создание корпуса match. Скажем, предположим, что файл схемы - balance decimal(10,0) amount decimal(20,1). Как в этом случае будет выглядеть мой регистр совпадений?

Gladiator 24.05.2018 11:18

Я не уверен в этом, но соответствие первой части десятичного типа данных должно работать.

koiralo 24.05.2018 12:04

Что-то вроде этого немного более надежно, поскольку оно использует хранилище метаданных улья:

    import org.apache.hadoop.hive.metastore.api.FieldSchema
    def sparkToHiveSchema(schema: StructType): List[FieldSchema]  = {
        schema.map(field => new FieldSchema(field.name,field.dataType.catalogString,field.getComment.getOrElse(""))).toList
    }
``


Вы можете указать схему следующим образом:

import org.apache.spark.sql.types.{StructType, StructField, StringType,IntegerType}; 

Например:

val schema = new StructType(
Array(
   StructField("Age",IntegerType,true),
  StructField("Name",StringType,true),
  )
)

val data = spark.read.option("header", "false").schema(schema).csv("filename.csv")
data.show()

Это напрямую создаст его в фрейме данных

Вы можете создать файл JSON с именем schema.json в формате ниже.

{
  "fields": [
    {
      "metadata": {},
      "name": "first_fields",
      "nullable": true,
      "type": "string"
    },
    {
      "metadata": {},
      "name": "double_field",
      "nullable": true,
      "type": "double"
    }
  ],
  "type": "struct"
}

Создайте схему структуры из чтения этого файла

rdd = spark.sparkContext.wholeTextFiles("s3://<bucket>/schema.json")
text = rdd.collect()[0][1]
dict = json.loads(str(text))
custom_schema = StructType.fromJson(dict)

После этого вы можете использовать структуру в качестве схемы для чтения файла csv.

val df=spark.read
      .format("org.apache.spark.csv")
      .option("header", false)
      .option("inferSchema", true)
      .option("samplingRatio",0.01)
      .option("nullValue", "NULL")
      .option("delimiter","|")
      .schema(custom_schema)
      .csv("D:\\Users\\sampleFile.txt")
      .toDF().write.format("orc").save("D:\\Users\\ORC")

Другие вопросы по теме