Я делаю базовое преобразование в своем фрейме данных pyspark, но здесь я использую несколько операторов .withColumn.
def trim_and_lower_col(col_name):
return F.when(F.trim(col_name) == "", F.lit("unspecified")).otherwise(F.lower(F.trim(col_name)))
df = (
source_df.withColumn("browser", trim_and_lower_col("browser"))
.withColumn("browser_type", trim_and_lower_col("browser_type"))
.withColumn("domains", trim_and_lower_col("domains"))
)
Я читал, что создание нескольких операторов withColumn не очень эффективно, и вместо этого я должен использовать df.select(). Я пробовал это:
cols_to_transform = [
"browser",
"browser_type",
"domains"
]
df = (
source_df.select([trim_and_lower_col(col).alias(col) for col in cols_to_transform] + source_df.columns)
)
но это дает мне повторяющуюся ошибку столбца
Что еще я могу попробовать?
Вы называете свои новые столбцы следующим образом: .alias(col)
.
Это означает, что они имеют то же имя, что и столбец, который вы используете для создания нового.
При создании (используя .withColumn
) это не представляет проблемы. Как только вы пытаетесь select
, Spark не знает, какой столбец выбрать.
Вы можете исправить это, например, указав суффикс для новых столбцов:
cols_to_transform = [
"browser",
"browser_type",
"domains"
]
df = (
source_df.select([trim_and_lower_col(col).alias(f"{col}_new") for col in cols_to_transform] + source_df.columns)
)
Другим решением, которое действительно загрязняет DAG, будет:
cols_to_transform = [
"browser",
"browser_type",
"domains"
]
for col in cols_to_transform:
source_df = source_df.withColumn(col, trim_and_lower_col(col))
что-то вроде карты?
Я добавил итеративный пример, который немного красивее с точки зрения кода. С точки зрения производительности оба выполнения выполняются одинаково быстро, но создание DAG происходит медленнее при использовании подхода .withColumn. Вы также можете просто связать другой оператор select, который переименовывает столбцы обратно, но это также добавит шаг в DAG.
Дублирующийся столбец возникает из-за того, что вы дважды передаете каждый преобразованный столбец в этом списке, один раз как новый преобразованный столбец (через .alias
) как исходный столбец (по имени в source_df.columns
). Это решение позволит вам использовать один оператор select, сохранить порядок столбцов и не столкнуться с проблемой дублирования:
df = (
source_df.select([trim_and_lower_col(col).alias(col) if col in cols_to_transform else col for col in source_df.columns])
)
Объединение множества .withColumn
действительно создает проблему, поскольку неразрешенный план запроса может стать довольно большим и вызвать ошибку StackOverflow в драйвере Spark во время оптимизации плана запроса. Здесь можно найти одно хорошее объяснение этой проблемы: https://medium.com/@manuzhang/the-hidden-cost-of-spark-withcolumn-8ffea517c015
это дало бы мне ошибку, что: Объект функции не имеет псевдонима атрибута. Пожалуйста, проверьте написание и/или тип данных объекта.
это странно, я только что дважды проверил, что это работает в Code Authoring в Foundry. вы уверены, что нет опечатки?
Если у вас есть только эти несколько withColumns, продолжайте их использовать. Это все еще более читабельно, поэтому более удобно в обслуживании и не требует пояснений..
Если вы посмотрите на это, вы увидите, что искра говорит быть осторожным с withColumns, когда у вас их около 200.
Использование select также делает ваш код более подверженным ошибкам, поскольку его сложнее читать.
Теперь, если у вас много столбцов, я бы определил
cols_to_transform = [
"browser",
"browser_type",
"domains"
]
cols_to_keep = [c for c in df.columns if c not in cols_to_transform]
cols_transformed = [trim_and_lower_col(c).alias(c) for c in cols_to_transform]
source_df.select(*cols_to_keep, *cols_transformed)
Это даст вам тот же порядок столбцов, что и withColumns.
нет ли способа просто «заменить» столбец? я не хочу менять имя