Я пытаюсь прочитать несколько файлов JSON из папки, разбить их с помощью фрейма данных и записать в место как дельту.
Ниже приведен мой пример json-файла:
'{"result": [{"approval": "Approved", "assigned_to": "", "assignment_group": "Business Requests", "opened_at": "08/10/2016 08:36:35", "opened_by": "Kay", "priority": "4 - Low", "sys_created_by": "[email protected]", "sys_created_on": "08/10/2016 08:36:35", "urgency": "3 - Low"}, {"approval": "Approved", "assigned_to": "", "assignment_group": "Business Requests", "opened_at": "08/10/2016 10:13:11", "opened_by": "Alan", "priority": "4 - Low", "sys_created_on": "08/10/2016 10:13:11", "urgency": "3 - Low"}, ................
Но я получаю следующую ошибку:
AnalysisException: можно использовать только типы данных структуры со звездочкой. Атрибут: ArrayBuffer(Result)
Любое предложение или помощь по устранению этой ошибки?
Я использую приведенный ниже код:
from pyspark.sql.functions import *
import glob
spark.conf.set("spark.sql.legacy.json.allowEmptyString.enabled", True)
for file in glob.glob(f"/dbfs/mnt/landing/item/2023*/*.json"):
file = file[5:]
df = spark.read.option('multiline',True).json(file)
df_exp = df.select(explode(col("result")).alias('Result')).select('Result.*')
df_explode = df_exp.drop_duplicates()
df_explode.write.format('delta').mode('append').option('inferSchema',True).save('/mnt/landing/finalitem/')
Почему вы используете file = file[5:]?
удалить /dbfs и выбрать файл из /mnt/landing...
вы используете Azure DataBricks?
@JonSG мои намерения скорее противоположные, но я понимаю твое впечатление.
@DileeprajnarayanThumula — Да, я использую Azure Databricks
Любая помощь, пожалуйста. Застрял на этом и не могу двигаться вперед
@iroh я пробовал этот код
из pyspark.sql import SparkSession из pyspark.sql.functions import взрыв_аутер, col spark = SparkSession.builder.appName("JSONExplode").getOrCreate() json_file_path = "/FileStore/tables/sample020202.json" df = spark.read. option('multiline', True).json(json_file_path) df_exp = df.select(explode_outer(col("result")).alias('Result')).select('Result.*') df_exp.show () df_explode = df_exp.dropDuulates() delta_destination_path = "/FileStore/tables/finalitem" df_explode.write.format('delta').mode('append').option('inferSchema', True).save( delta_destination_path)






Появляется сообщение об ошибке «AnalysisException: можно только развернуть звездочку типов данных структуры. Атрибут: ArrayBuffer(Result)» означает, что вы пытаетесь использовать функцию разнесения для столбца, который не структурирован как список или массив. По сути, вы можете использовать функцию разнесения только для столбцов, имеющих определенную структуру, а не для любого столбца.
Я попробовал приведенный ниже код:
Я использовал файл JSON в качестве входных данных для кода PySpark для чтения и разложения.
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode_outer, col
spark = SparkSession.builder.appName("JSONExplode").getOrCreate()
json_file_path = "/FileStore/tables/sample020202.json"
df = spark.read.option('multiline', True).json(json_file_path)
df_exp = df.select(explode_outer(col("result")).alias('Result')).select('Result.*')
df_exp.show()
df_explode = df_exp.dropDuplicates()
delta_destination_path = "/FileStore/tables/finalitem"
df_explode.write.format('delta').mode('append').option('inferSchema', True).save(delta_destination_path)

В приведенном выше коде я использовал функцию explode_outer в столбце «результат».
explode_outer(col("result")) для разнесения элементов массива. Использование explode_outer предполагает, что он предназначен для обработки потенциальных нулевых значений внутри массива.
Чтобы проверить, правильно ли распознается столбец «результат» как массив структур, вы можете проверить схему.

Это сработало... Спасибо.
Я не в курсе, пожалуйста, можете ли вы помочь