Я пытаюсь прочитать данные из файла паркета в хранилище BLOB-объектов в блоках данных и записать их в дельта-таблицу.
Конфигурация кластера = 14,3 LTS (включая Apache Spark 3.5.0, Scala 2.12)
1.df = spark.read.format("parquet").load("/mnt/path") -- Чтение успешно 2.df.write.format("дельта").mode("перезаписать").saveAsTable(путь)
Здесь выдается эта ошибка SchemaColumnConvertNotSupportedException: столбец: [Col_Name], физический тип: INT64, логический тип: строка
Я попытался получить схему из паркета и применить ее во время чтения, но все равно получаю ошибку. Пробовал разные настройки искры, но результата нет.
# Extracting the schema
myschema = spark.read.option("header","true").option("recursiveFileLookup","true").parquet("/mnt/path").schema
print(myschema)
StructType([StructField('GatewayID', StringType(), True), StructField('Version', StringType(), True), StructField('Generation', StringType(), True), StructField('AppVersion', StringType(), True), StructField('UnitEntity', StringType(), True), StructField('SubID', StringType(), True), StructField('SIMCardID', StringType(), True), StructField('UnitNumber', StringType(), True), StructField('UnitType', StringType(), True), StructField('ISOCountryCode', StringType(), True), StructField('ReportTime', LongType(), True), StructField('MessageFormat', StringType(), True), StructField('MessagesAsString', StringType(), True)])
# Imposing the schema
df = spark.read.format("parquet").schema(myschema).load("/mnt/path")
# Writing to datalake
df.write.format("delta").mode("overwrite").saveAsTable(path)
Error:
Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException: column: [SubID], physicalType: INT64, logicalType: string
Analysis:
If you see the subID col. is string while extracting the schema but in target parquet as per error it is INT64.
I have tried to convert the datatype in dataframe and write to delta but same schema error.
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
df = df.withColumn("SubID", col("SubID").cast(IntegerType()))
Я добавил шаги в вопросе. Спасибо.
Как вы применяете схему при чтении ее с паркета. Пожалуйста, передайте схему, читая, как показано ниже, и проверьте
схема val = тип структуры val df = spark.read().parquet(путь, схема)
Я применяю схему, как показано ниже, но все равно получаю упомянутую ошибку несоответствия типов данных. myschema = spark.read.option("header","true").option("recursiveFileLookup","true").parquet("/mnt/path").schema df = spark.read.format ("паркет").schema(myschema).load("/mnt/path")
Согласно этой статье автора @ shanmugavel.chandrakasu,
Начиная с Databricks 7.3 и более поздних версий, при чтении файлов паркета Spark будет читать файлы в векторизованном формате. Это может быть причиной того, что он принимает собственный тип данных
string
(тип данных, полученный при чтении файла паркета) вместо приведения типа данныхint
.
Сначала попробуйте отключить векторизованный формат, а затем прочитать файл паркета из источника.
spark.conf.set("spark.sql.parquet.enableVectorizedReader","false")
Теперь прочитайте файл паркета и проверьте, совпадают ли типы данных исходного и целевого столбцов.
Если нет, используйте приведенный ниже код, чтобы привести его к типу данных INT64
. Упоминается, что у вашей цели есть INT64
, то есть long
, поэтому используйте LongType
во время каста.
from pyspark.sql.types import LongType
df = df.withColumn("SubID", col("SubID").cast(LongType()))
Можете ли вы предоставить дополнительную информацию об исходных и целевых типах данных и некоторые примеры входных данных, если это возможно, для лучшего понимания? и как вы обеспечиваете соблюдение схемы? пожалуйста, предоставьте пример кода.