У меня есть следующие данные в фрейме данных df
:
{
"data":[
{
"id":"a",
"val":1
},
{
"id":"b",
"val":2
}
]
}
Теперь я хотел бы добавить новый столбец «тест» для идентификатора 'b'
, содержащий его значение 2
.
Я знаю, что могу сделать это с помощью:
(
df
.withColumn(
"test",
F.expr("filter(data,x->x.id=='b')")[0]["val"]
)
.show()
)
Даем желаемое:
+----------------+----+
| data |test|
+----------------+----+
|[{a, 1}, {b, 2}]| 2 |
+----------------+----+
Можно ли этого добиться более «родным» способом (без использования SQL)? Я знаю, что F.col("data")[1]["val"]
можно использовать, если в качестве примера я буду использовать индекс, а не идентификатор.
@ParSaMnS Я новичок в PySpark. «не использовать SQL» означает не возвращаться к F.expr()
. Вполне возможно, что это хороший способ справиться с этим - в этом случае мой вопрос был бы устаревшим, но мне казалось, что должно быть более «родное» решение; отсюда и вопрос.
Я загрузил ответ с некоторыми ссылками, которые могут помочь! Пожалуйста, держите меня в курсе, работает это или нет, потому что я тоже в это вовлечен :)
Когда вы сказали, что хотите достичь более «родного» способа (без использования SQL), хотите ли вы фильтровать его на основе Spark API или использовать код Python для фильтрации?
@Джонатан, я думал об Spark API, но мне была бы интересна любая альтернатива.
Вы имеете в виду вот так F.filter('data', lambda x: x.id == 'b')[0].val
?
@Эмма, мне это вообще-то не подходит. очевидно, F.filter() не поддерживает лямбда-выражения
Какую версию спарка вы используете? F.filter
— это версия 3.1+. Если вы используете версию 3.1+, не могли бы вы обновить свой вопрос обновленным кодом? filter
надо взять lambda
@Эмма, какой именно код следует обновить? Версия Spark в 15.3.x-scala2.12
Вы используете Databricks 15.3? это должен быть Spark 3.5, filter
должно работать на 3.5. Не могли бы вы показать обновленный код с помощью lambda
и показать все ошибки, которые вы видите?
@Эмма, Эмма, все работает правильно, как вы и предлагали. Извините за неудобства, должно быть, у меня была опечатка... не могли бы вы опубликовать свое предложение в качестве ответа, чтобы я мог принять его, пожалуйста?
Если я не ошибаюсь, вы можете использовать трансформации DataFrame
. Путем сглаживания вложенной структуры внутри столбца data
. Затем вы отфильтровываете строку id
, где она равна b
, и выбираете столбец val
из отфильтрованных строк. Затем вы присоединяете этот DataFrame к исходному DataFrame, чтобы добавить его в новый столбец.
Вот несколько ссылок, которые могут помочь лучше понять эту концепцию: Переполнение стека
Вы можете попробовать фильтр в API Pyspark.
df.withColumn("test",
F.filter('data', lambda x: x.id == 'b')[0].val)
Я не уверен, что, упоминая «не использовать SQL», вы также исключаете sqlalchemy или нет. но по личному опыту sqlalchemy — действительно хороший способ справиться с такой ситуацией. Это расширение Python.
pip install SQLAlchemy