Анализ вложенной полезной нагрузки JSON с использованием pyspark

Я пытаюсь проанализировать и сгладить вложенные данные с помощью pyspark. любые предложения о том, как анализировать такой файл JSON

Вот пример кода, который уже был опробован, но безуспешно.

jsonData  = """{
    "data": {
        "unique_id1": {
            "random_code1": {
                "name": "some_name",
                "status": "value1"
            },
            "random_code2": {
                "name": "some_name",
                "status": "value2"
            }
        },
        "unique_id2": {
            "random_code3": {
                "name": "some_name",
                "status": "value2"
            },
            "random_code4": {
                "name": "some_name",
                "status": "value2"
            }
        }
    }

}"""
df = spark.read.option("multiLine", "true").json(spark.sparkContext.parallelize([jsonData]))

data_schema = df.schema["data"].dataType.simpleString()
data_schema  = re.sub(r"([\w\-]+)(?=:struct<name)", "_RandomCode", data_schema)
data_schema  = re.sub(r"([\w\-]+)(?=:struct<_RandomCode)", "_Ids", data_schema)
data_schema = re.sub(r"(?<=,|<)([^,<]+)(?=:)", r"`\1`", data_schema)

Ожидаемый результат

_Ids              _RandomCode              name
unique_id1         random_code1            some_name
unique_id1         random_code2            some_name

unique_id2         random_code3            some_name
unique_id2         random_code4            some_name
Как сделать HTTP-запрос в Javascript?
Как сделать HTTP-запрос в Javascript?
В JavaScript вы можете сделать HTTP-запрос, используя объект XMLHttpRequest или более новый API fetch. Вот пример для обоих методов:
0
0
65
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вы можете проанализировать данные, используя map, а затем преобразовать их обратно в нужный формат и извлечь из данных соответствующие столбцы.

from pyspark.sql.functions import explode_outer

jsonData  = """{
    "data": {
        "unique_id1": {
            "random_code1": {
                "name": "some_name",
                "status": "value1"
            },
            "random_code2": {
                "name": "some_name",
                "status": "value2"
            }
        },
        "unique_id2": {
            "random_code3": {
                "name": "some_name",
                "status": "value2"
            },
            "random_code4": {
                "name": "some_name",
                "status": "value2"
            }
        }
    }

}"""
df = spark.read.option("multiLine", "true").json(spark.sparkContext.parallelize([jsonData]))

def parse_data(x):

    # Top level keys:
    out = []
    outer_list = [(key, x[0].asDict()[key].asDict()) for key in x[0].asDict().keys()]

    for row in outer_list :
        for sub_row in [(key, row[1][key]['name']) for key in row[1].keys()]:
            out.append([row[0], sub_row[0], sub_row[1]])
    
    return out

# Convert to RDD and parse the data, convert the RDD
# back to a dataframe and explode.
df = df.rdd.map(lambda x: (1, parse_data(x))).toDF(['dummy', 'parsed_data'])
df = df.withColumn('parsed_data', explode_outer(df['parsed_data']))

# Fetch desired columns from the parsed data.
df = (df.withColumn('_Ids', df['parsed_data'].getItem(0))
        .withColumn('_RandomCode', df['parsed_data'].getItem(1))
        .withColumn('name', df['parsed_data'].getItem(2))
        .drop('dummy', 'parsed_data'))

df.show()

+----------+------------+---------+
|      _Ids| _RandomCode|     name|
+----------+------------+---------+
|unique_id1|random_code1|some_name|
|unique_id1|random_code2|some_name|
|unique_id2|random_code3|some_name|
|unique_id2|random_code4|some_name|
+----------+------------+---------+

Другие вопросы по теме

PySpark: фильтрация задержки по разнице дат
Использование структурированной потоковой передачи PySpark: как отправить обработанные данные клиенту через WebSocket
Как перезаписать файл паркета в том же месте с помощью PySpark
Создать столбец подмножества массива структур без разбивки
Потоковая передача Spark + интеграция с Kafka, чтение данных из Kafka каждые 15 минут и сохранение смещения последнего чтения с помощью PySpark
Не удалось загрузить предварительный просмотр: размер записной книжки превысил ограничение в байтах
Как сделать левое соединение, чтобы ключи могли иметь множественную степень детализации с помощью Spark?
Ошибки токена (доступа) при подключении к MS SQL Server из блокнотов Python DataBricks через драйвер JDBC PySPark с использованием субъекта службы Azure и MSAL
Как создать отдельные диапазоны дат из набора диапазонов в sql
Java.lang.NoClassDefFoundError: org/apache/hadoop/fs/impl/prefetch/PrefetchingStatistics при запуске pyspark