У меня есть фрейм данных Spark, полученный из Google Analytics, который выглядит следующим образом:
id customDimensions (Array<Struct>)
100 [ {"index": 1, "value": "Earth"}, {"index": 2, "value": "Europe"}]
101 [ {"index": 1, "value": "Mars" }]
У меня также есть фрейм данных "метаданные специальных параметров", который выглядит следующим образом:
index name
1 planet
2 continent
Я бы использовал индексы в df метаданных, чтобы развернуть свои пользовательские измерения в столбцы. Результат должен выглядеть следующим образом:
id planet continent
100 Earth Europe
101 Mars null
Я пробовал следующий подход, и он отлично работает, но крайне неэффективен. Я хотел бы знать, есть ли лучший подход.
# Select the two relevant columns
cd = df.select('id', 'customDimensions')
# Explode customDimensions so that each row now has a {index, value}
cd = cd.withColumn('customDimensions', F.explode(cd.customDimensions))
# Put the index and value into their own columns
cd = cd.select('id', 'customDimensions.index', 'customDimensions.value')
# Join with metadata to obtain the name from the index
metadata = metadata.select('index', 'name')
cd = (cd
.join(metadata, cd.index == metadata.index, 'left')
.drop(metadata.index))
# Pivot cd so that each row has the id, and we have columns for each custom dimension
piv = cd.groupBy('id').pivot('name').agg(F.first(F.col('value')))
# Join back to restore the other columns
return df.join(piv, df.id == piv.id).drop(piv.id)
Предположения:





Соединения - очень дорогостоящая операция, поскольку приводит к перетасовке данных. По возможности избегайте этого или постарайтесь его оптимизировать.
В вашем коде есть два соединения. Последнее соединение с возвратом столбцов можно вообще избежать. Другое соединение с фреймом данных метаданных может быть оптимизировано. Поскольку метаданные df имеют всего 250 строк и их очень много, вы можете использовать подсказку broadcast() в соединении. Это позволит избежать перетасовки большего фрейма данных.
Я внес несколько предложенных изменений в код, но они не тестировались, так как у меня нет ваших данных.
# df columns list
df_columns = df.columns
# Explode customDimensions so that each row now has a {index, value}
cd = df.withColumn('customDimensions', F.explode(cd.customDimensions))
# Put the index and value into their own columns
cd = cd.select(*df_columns, 'customDimensions.index', 'customDimensions.value')
# Join with metadata to obtain the name from the index
metadata = metadata.select('index', 'name')
cd = cd.join(broadcast(metadata), "index", 'left')
# Pivot cd so that each row has the id, and we have columns for each custom dimension
piv = cd.groupBy(df_columns).pivot('name').agg(F.first(F.col('value')))
return piv
Подозреваю, что pivot здесь дороже, чем join.
Я предполагаю, что DataFrame метаданных намного меньше исходного dataframe. Вы можете повысить производительность, если соберете отдельные значения столбца name и передадите их в качестве параметра для pivot. Что-то вроде:
piv = cd.groupBy("id").pivot("name", values=[row["name"] for row in metadata.select("name").distinct().collect()]).agg(F.first("value")). Да, за звонок вcollectвзимается предоплата, но без аргументаpivotэто может перевесить стоимостьvalues.