Я пытаюсь проанализировать и сгладить вложенные данные с помощью pyspark. любые предложения о том, как анализировать такой файл JSON
Вот пример кода, который уже был опробован, но безуспешно.
jsonData = """{
"data": {
"unique_id1": {
"random_code1": {
"name": "some_name",
"status": "value1"
},
"random_code2": {
"name": "some_name",
"status": "value2"
}
},
"unique_id2": {
"random_code3": {
"name": "some_name",
"status": "value2"
},
"random_code4": {
"name": "some_name",
"status": "value2"
}
}
}
}"""
df = spark.read.option("multiLine", "true").json(spark.sparkContext.parallelize([jsonData]))
data_schema = df.schema["data"].dataType.simpleString()
data_schema = re.sub(r"([\w\-]+)(?=:struct<name)", "_RandomCode", data_schema)
data_schema = re.sub(r"([\w\-]+)(?=:struct<_RandomCode)", "_Ids", data_schema)
data_schema = re.sub(r"(?<=,|<)([^,<]+)(?=:)", r"`\1`", data_schema)
Ожидаемый результат
_Ids _RandomCode name
unique_id1 random_code1 some_name
unique_id1 random_code2 some_name
unique_id2 random_code3 some_name
unique_id2 random_code4 some_name

Вы можете проанализировать данные, используя map, а затем преобразовать их обратно в нужный формат и извлечь из данных соответствующие столбцы.
from pyspark.sql.functions import explode_outer
jsonData = """{
"data": {
"unique_id1": {
"random_code1": {
"name": "some_name",
"status": "value1"
},
"random_code2": {
"name": "some_name",
"status": "value2"
}
},
"unique_id2": {
"random_code3": {
"name": "some_name",
"status": "value2"
},
"random_code4": {
"name": "some_name",
"status": "value2"
}
}
}
}"""
df = spark.read.option("multiLine", "true").json(spark.sparkContext.parallelize([jsonData]))
def parse_data(x):
# Top level keys:
out = []
outer_list = [(key, x[0].asDict()[key].asDict()) for key in x[0].asDict().keys()]
for row in outer_list :
for sub_row in [(key, row[1][key]['name']) for key in row[1].keys()]:
out.append([row[0], sub_row[0], sub_row[1]])
return out
# Convert to RDD and parse the data, convert the RDD
# back to a dataframe and explode.
df = df.rdd.map(lambda x: (1, parse_data(x))).toDF(['dummy', 'parsed_data'])
df = df.withColumn('parsed_data', explode_outer(df['parsed_data']))
# Fetch desired columns from the parsed data.
df = (df.withColumn('_Ids', df['parsed_data'].getItem(0))
.withColumn('_RandomCode', df['parsed_data'].getItem(1))
.withColumn('name', df['parsed_data'].getItem(2))
.drop('dummy', 'parsed_data'))
df.show()
+----------+------------+---------+
| _Ids| _RandomCode| name|
+----------+------------+---------+
|unique_id1|random_code1|some_name|
|unique_id1|random_code2|some_name|
|unique_id2|random_code3|some_name|
|unique_id2|random_code4|some_name|
+----------+------------+---------+