Развернуть массив структур в столбцы в PySpark

У меня есть фрейм данных Spark, полученный из Google Analytics, который выглядит следующим образом:

id     customDimensions (Array<Struct>)
100    [ {"index": 1, "value": "Earth"}, {"index": 2, "value": "Europe"}]
101    [ {"index": 1, "value": "Mars" }]

У меня также есть фрейм данных "метаданные специальных параметров", который выглядит следующим образом:

index   name
1       planet
2       continent

Я бы использовал индексы в df метаданных, чтобы развернуть свои пользовательские измерения в столбцы. Результат должен выглядеть следующим образом:

id     planet     continent
100    Earth      Europe
101    Mars       null

Я пробовал следующий подход, и он отлично работает, но крайне неэффективен. Я хотел бы знать, есть ли лучший подход.

# Select the two relevant columns
cd = df.select('id', 'customDimensions')

# Explode customDimensions so that each row now has a {index, value}
cd = cd.withColumn('customDimensions', F.explode(cd.customDimensions))

# Put the index and value into their own columns
cd = cd.select('id', 'customDimensions.index', 'customDimensions.value')

# Join with metadata to obtain the name from the index
metadata = metadata.select('index', 'name')
cd = (cd
         .join(metadata, cd.index == metadata.index, 'left')
         .drop(metadata.index))

# Pivot cd so that each row has the id, and we have columns for each custom dimension
piv = cd.groupBy('id').pivot('name').agg(F.first(F.col('value')))

# Join back to restore the other columns
return df.join(piv, df.id == piv.id).drop(piv.id)

Предположения:

  • Существует до 250 индексов специальных параметров, и имена известны только через фрейм данных метаданных.
  • Исходный фрейм данных имеет несколько других столбцов, которые я хотел бы сохранить (отсюда и соединение в конце моего решения)

Я предполагаю, что DataFrame метаданных намного меньше исходного dataframe. Вы можете повысить производительность, если соберете отдельные значения столбца name и передадите их в качестве параметра для pivot. Что-то вроде: piv = cd.groupBy("id").pivot("name", values=[row["name"] for row in metadata.select("name").distinct().collect()]).agg(F.first("‌​value")). Да, за звонок в collect взимается предоплата, но без аргумента pivot это может перевесить стоимость values.

pault 07.12.2018 19:31
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
2
1
1 885
1

Ответы 1

Соединения - очень дорогостоящая операция, поскольку приводит к перетасовке данных. По возможности избегайте этого или постарайтесь его оптимизировать.

В вашем коде есть два соединения. Последнее соединение с возвратом столбцов можно вообще избежать. Другое соединение с фреймом данных метаданных может быть оптимизировано. Поскольку метаданные df имеют всего 250 строк и их очень много, вы можете использовать подсказку broadcast() в соединении. Это позволит избежать перетасовки большего фрейма данных.

Я внес несколько предложенных изменений в код, но они не тестировались, так как у меня нет ваших данных.

# df columns list
df_columns = df.columns

# Explode customDimensions so that each row now has a {index, value}
cd = df.withColumn('customDimensions', F.explode(cd.customDimensions))

# Put the index and value into their own columns
cd = cd.select(*df_columns, 'customDimensions.index', 'customDimensions.value')

# Join with metadata to obtain the name from the index
metadata = metadata.select('index', 'name')
cd = cd.join(broadcast(metadata), "index", 'left')

# Pivot cd so that each row has the id, and we have columns for each custom dimension
piv = cd.groupBy(df_columns).pivot('name').agg(F.first(F.col('value')))


return piv

Подозреваю, что pivot здесь дороже, чем join.

pault 07.12.2018 19:08

Другие вопросы по теме