Разбить несколько файлов JSON с помощью Python

Я пытаюсь прочитать несколько файлов JSON из папки, разбить их с помощью фрейма данных и записать в место как дельту.

Ниже приведен мой пример json-файла:

'{"result": [{"approval": "Approved", "assigned_to": "", "assignment_group": "Business Requests", "opened_at": "08/10/2016 08:36:35", "opened_by": "Kay", "priority": "4 - Low", "sys_created_by": "[email protected]", "sys_created_on": "08/10/2016 08:36:35", "urgency": "3 - Low"}, {"approval": "Approved", "assigned_to": "", "assignment_group": "Business Requests", "opened_at": "08/10/2016 10:13:11", "opened_by": "Alan", "priority": "4 - Low", "sys_created_on": "08/10/2016 10:13:11", "urgency": "3 - Low"}, ................

Но я получаю следующую ошибку: AnalysisException: можно использовать только типы данных структуры со звездочкой. Атрибут: ArrayBuffer(Result)

Любое предложение или помощь по устранению этой ошибки?

Я использую приведенный ниже код:

from pyspark.sql.functions import *
import glob
spark.conf.set("spark.sql.legacy.json.allowEmptyString.enabled", True)
for file in glob.glob(f"/dbfs/mnt/landing/item/2023*/*.json"):
    file = file[5:]
    df = spark.read.option('multiline',True).json(file)
    df_exp = df.select(explode(col("result")).alias('Result')).select('Result.*')
    df_explode = df_exp.drop_duplicates()
    df_explode.write.format('delta').mode('append').option('inferSchema',True).save('/mnt/landing/finalitem/')

Я не в курсе, пожалуйста, можете ли вы помочь

iroh 04.10.2023 13:48

Почему вы используете file = file[5:]?

The_spider 04.10.2023 13:57

удалить /dbfs и выбрать файл из /mnt/landing...

iroh 04.10.2023 14:40

вы используете Azure DataBricks?

DileeprajnarayanThumula 04.10.2023 18:55

@JonSG мои намерения скорее противоположные, но я понимаю твое впечатление.

funky-future 04.10.2023 21:48

@DileeprajnarayanThumula — Да, я использую Azure Databricks

iroh 05.10.2023 08:46

Любая помощь, пожалуйста. Застрял на этом и не могу двигаться вперед

iroh 05.10.2023 12:53

@iroh я пробовал этот код

DileeprajnarayanThumula 05.10.2023 13:26

из pyspark.sql import SparkSession из pyspark.sql.functions import взрыв_аутер, col spark = SparkSession.builder.appName("JSONExplode").getOrCreate() json_file_path = "/FileStore/tables/sample020202.json" df = spark.read. option('multiline', True).json(json_file_path) df_exp = df.select(explode_outer(col("result")).alias('Result')).sele‌​ct('Result.*') df_exp.show () df_explode = df_exp.dropDuulates() delta_destination_path = "/FileStore/tables/finalitem" df_explode.write.format('delta').mode('append').option('infe‌​rSchema', True).save( delta_destination_path)

DileeprajnarayanThumula 05.10.2023 13:26
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
9
91
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Появляется сообщение об ошибке «AnalysisException: можно только развернуть звездочку типов данных структуры. Атрибут: ArrayBuffer(Result)» означает, что вы пытаетесь использовать функцию разнесения для столбца, который не структурирован как список или массив. По сути, вы можете использовать функцию разнесения только для столбцов, имеющих определенную структуру, а не для любого столбца.

Я попробовал приведенный ниже код:

Я использовал файл JSON в качестве входных данных для кода PySpark для чтения и разложения.

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode_outer, col
spark = SparkSession.builder.appName("JSONExplode").getOrCreate()
json_file_path = "/FileStore/tables/sample020202.json"
df = spark.read.option('multiline', True).json(json_file_path)
df_exp = df.select(explode_outer(col("result")).alias('Result')).select('Result.*')
df_exp.show()
df_explode = df_exp.dropDuplicates()
delta_destination_path = "/FileStore/tables/finalitem"
df_explode.write.format('delta').mode('append').option('inferSchema', True).save(delta_destination_path)

В приведенном выше коде я использовал функцию explode_outer в столбце «результат».

explode_outer(col("result")) для разнесения элементов массива. Использование explode_outer предполагает, что он предназначен для обработки потенциальных нулевых значений внутри массива.

Чтобы проверить, правильно ли распознается столбец «результат» как массив структур, вы можете проверить схему.

Это сработало... Спасибо.

iroh 12.10.2023 12:14

Другие вопросы по теме

Как просмотреть значение заблокированной переменной из группы переменных, сохраненной в библиотеке конвейеров Azure?
Почему я получаю сообщение об ошибке недопустимой модели с моделью «gpt-35-turbo-16k» в Azure Open AI, но не с «gpt-35-turbo»?
Не удалось развернуть приложение-функцию Azure с помощью сценария Python, содержащего azure.storage.fileshare
Отправка электронной почты через API Microsoft Graph с использованием Python с субъектом-службой
Как использовать производный столбец и шаблон столбца в adf, чтобы разделить несколько столбцов на 4?
Импорт библиотеки пользовательского интерфейса чата Android Azure?
Как написать конвейер для приложения Vite React с помощью Azure Devops и службы веб-приложений Azure
Azure download_blob в Python
Как я могу подключиться к серверу sql на виртуальной машине в Azure?
Подключение воздушного потока: как создать подключение воздушного потока к хранилищу BLOB-объектов Azure с помощью субъекта-службы?