У меня есть искровой фрейм данных следующим образом:
----------------------------------------------------------------------------------------------
| type | lctNbr | itmNbr | lastUpdatedDate | lctSeqId| T7797_PRD_LCT_TYP_CD| FXT_AIL_ID| pmyVbuNbr | upcId | vndModId|
____________________________________________________________________________
| prd_lct 145 147 2024-07-22T05:24:14 1 1 14 126 008236686661 35216
_____________________________________________________________________________
Я хочу сгруппировать этот фрейм данных по типу lctNbr, itmNbr и LastUpdatedDate. Я просто хочу, чтобы каждая запись была в формате json ниже:
"type": "prd_lct",
"lctNbr": 145,
"itmNbr": 147,
"lastUpdatedDate": "2024-07-22T05:24:14",
"locations": [
{
"lctSeqId": 1,
"prdLctTypCd": 1,
"fxtAilId": "14"
}
],
"itemDetails": [
{
"pmyVbuNbr": 126,
"upcId": "008236686661",
"vndModId": "35216"
]
}
Я пробовал использовать функции to_json, collect_list and map_from_entries
, но постоянно получаю ошибки при выполнении команды show и не могу перейти к правильному формату.
Вы пытаетесь объединить данные из нескольких строк или просто реорганизовать информацию в одной строке? Напоминаем: при использовании Collect_list для объединения данных из нескольких строк порядок элементов в результирующем массиве недетерминирован.
Вы можете сгруппировать нужные поля, а затем объединить F.collect_list(F.create_map(...))
, чтобы получить внутренние поля для locations
и itemDetails
.
Пример данных:
pandasDF = pd.DataFrame({
"type": ["prd_lct","prd_lct","test"],
"lctNbr": [145, 145, 148],
"itmNbr": [147, 147, 150],
"lastUpdatedDate": ["2024-07-22T05:24:14", "2024-07-22T05:24:14", "2024-07-22T05:24:15"],
"lctSeqId": [1,2,3],
"T7797_PRD_LCT_TYP_CD": [1,2,3],
"FXT_AIL_ID": ["14","15","16"],
"pmyVbuNbr": [126, 127, 128],
"upcId": ["008236686661","008236686662","008236686663"],
"vndModId": ["35216","35217","35218"]
})
+-------+------+------+-------------------+--------+--------------------+----------+---------+------------+--------+
| type|lctNbr|itmNbr| lastUpdatedDate|lctSeqId|T7797_PRD_LCT_TYP_CD|FXT_AIL_ID|pmyVbuNbr| upcId|vndModId|
+-------+------+------+-------------------+--------+--------------------+----------+---------+------------+--------+
|prd_lct| 145| 147|2024-07-22T05:24:14| 1| 1| 14| 126|008236686661| 35216|
|prd_lct| 145| 147|2024-07-22T05:24:14| 2| 2| 15| 127|008236686662| 35217|
| test| 148| 150|2024-07-22T05:24:15| 3| 3| 16| 128|008236686663| 35218|
+-------+------+------+-------------------+--------+--------------------+----------+---------+------------+--------+
Результирующий DataFrame и преобразование в список строк в кодировке JSON.
resultDF = sparkDF.groupby(
'type', 'lctNbr', 'itmNbr', 'lastUpdatedDate'
).agg(
F.collect_list(
F.create_map(
F.lit('lctSeqId'), F.col('lctSeqId'),
F.lit('prdLctTypCd'), F.col('T7797_PRD_LCT_TYP_CD'),
F.lit('fxtAilId'), F.col('FXT_AIL_ID'),
)
).alias('locations'),
F.collect_list(
F.create_map(
F.lit('pmyVbuNbr'), F.col('pmyVbuNbr'),
F.lit('upcId'), F.col('upcId'),
F.lit('vndModId'), F.col('vndModId'),
)
).alias('itemDetails')
)
resultJSON = result.toJSON().collect()
Поскольку resultJSON
будет списком строк в кодировке JSON, вы можете преобразовать его в словарь, используя следующее:
import ast
result_dict = [ast.literal_eval(x) for x in resultJSON]
[
{
"type": "prd_lct",
"lctNbr": 145,
"itmNbr": 147,
"lastUpdatedDate": "2024-07-22T05:24:14",
"locations": [
{
"lctSeqId": "1",
"prdLctTypCd": "1",
"fxtAilId": "14"
},
{
"lctSeqId": "2",
"prdLctTypCd": "2",
"fxtAilId": "15"
}
],
"itemDetails": [
{
"pmyVbuNbr": "126",
"upcId": "008236686661",
"vndModId": "35216"
},
{
"pmyVbuNbr": "127",
"upcId": "008236686662",
"vndModId": "35217"
}
]
},
{
"type": "test",
"lctNbr": 148,
"itmNbr": 150,
"lastUpdatedDate": "2024-07-22T05:24:15",
"locations": [
{
"lctSeqId": "3",
"prdLctTypCd": "3",
"fxtAilId": "16"
}
],
"itemDetails": [
{
"pmyVbuNbr": "128",
"upcId": "008236686663",
"vndModId": "35218"
}
]
}
Если вы опубликуете код, который вы пробовали, и полученные ошибки, возможно, кто-то сможет помочь вам это исправить.