У меня есть столбец user_contacts_attributes в искровом кадре данных:
+------------------+-------------------------------------+
| user_name |user_contacts_attributes |
+------------------+-------------------------------------+
| Test | "user": { "id": "16","username":"sam","level":"2.00"} |
+------------------+-------------------------------------+
фрейм данных имеет схему, как показано ниже:
user_name:string
user_contacts_attributes:struct
user:struct
id:string
level:string
username:string
Результирующий фрейм данных должен быть таким, как показано ниже:
+------------------++------------------+---------------------------+
| user_name |parent |child | value |
+------------------+---------------------------++------------------+
| Test|user | id | 16 |
| Test|user |level | 2.00|
| Test|user |username | sam |
+------------------+---------------------------+-------------------+
Я пробовал написать UDF, подобный этому PySpark "взорвать" диктовку в столбце Но не удалось. Мне нужен udf, который я могу применить к каждой из этих строк фрейма данных.





Используйте SQL-выражение , чтобы создать новый столбец, содержащий массив из именованных_структур, где каждая структура содержит имя поля и значение поля одного элемента json:
from pyspark.sql import functions as F
df = ....
base = 'user_contacts_attributes.user' # according the the schema 'user' is a fixed string
# get all json elements:
fields = df.select(f'{base}.*').schema.fieldNames()
# create the expression for the array
expr = "array(" + \
",".join([f'named_struct("child", "{field}", "value", {base}.{field})' for field in fields]) \
+ ")"
expr теперь содержит строку
array(
named_struct("child", "id", "value", user_contacts_attributes.user.id),
named_struct("child", "level", "value", user_contacts_attributes.user.level),
named_struct("child", "username", "value", user_contacts_attributes.user.username)
)
Наконец взорвите массив и удалите промежуточные столбцы:
df.withColumn('parent', F.lit('user'))\
.withColumn('values', F.expr(expr)) \
.withColumn('values', F.explode("values")) \
.withColumn("child", F.col("values.child")) \
.withColumn("value", F.col("values.value")) \
.drop("user_contacts_attributes", "values") \
.show(truncate=False)
Выход:
+---------+------+--------+-----+
|user_name|parent|child |value|
+---------+------+--------+-----+
|Test |user |id |16 |
|Test |user |level |2.00 |
|Test |user |username|sam |
+---------+------+--------+-----+
По возможности следует избегать использования UDF, поскольку они медленнее функций SQL.
Это работает, большое спасибо!