Есть ли способ изменить столбец на основе наличия двух значений в наборе значений из кадра данных databricks pyspark?
df = (
[
('E1', 'A1',''),
('E2', 'A2',''),
('F1', 'A3',''),
('F2', 'B1',''),
('F3', 'B2',''),
('G1', 'B3',''),
('G2', 'C1',''),
('G3', 'C2',''),
('G4', 'C3',''),
('H1', 'C4',''),
('H2', 'D1',''),
],
['old_comp_id', 'db_id', 'comment']
)
Проверяем наличие значений E1
и C1,
и отмечаем комментарием в обоих случаях, и ожидаемый результат должен быть:
df = (
[
('E1', 'A1','mark'),
('E2', 'A2',''),
('F1', 'A3',''),
('F2', 'B1',''),
('F3', 'B2',''),
('G1', 'B3',''),
('G2', 'C1','mark'),
('G3', 'C2',''),
('G4', 'C3',''),
('H1', 'C4',''),
('H2', 'D1',''),
],
['old_comp_id', 'db_id', 'comment']
)
Чтобы иметь возможность использовать несколько рабочих в Databricks, я думаю, что он должен использовать только структуру pyspark и не конвертировать в Pandas в любой момент.
Предположим, что у нас нет строки с элементом "C1". В этом случае входной фрейм данных будет:
df = (
[
('E1', 'A1',''),
('E2', 'A2',''),
('F1', 'A3',''),
('F2', 'B1',''),
('F3', 'B2',''),
('G1', 'B3',''),
('G3', 'C2',''),
('G4', 'C3',''),
('H1', 'C4',''),
('H2', 'D1',''),
],
['old_comp_id', 'db_id', 'comment']
)
и выход: будет точно равен входу.
Я решал преобразование в фреймворк данных pandas и зацикливание в наборе. Чтобы ускорить работу, я использовал многопроцессорную обработку, которая ограничивается запуском драйвера в Databricks и не распространяется автоматически на несколько рабочих процессов.
Я думаю, вам придется сделать это в два этапа. Сначала проверьте, встречаются ли значения C1
и E1
хотя бы один раз в обоих столбцах, и если да, то примените операции, подобные тому, что предложил @Steven:
from pyspark.sql.functions import col, when
df = spark.createDataFrame([
('E1', 'A1',''),
('E2', 'A2',''),
('F1', 'A3',''),
('F2', 'B1',''),
('F3', 'B2',''),
('G1', 'B3',''),
('G2', 'C1',''),
('G3', 'C2',''),
('G4', 'C3',''),
('H1', 'C4',''),
('H2', 'D1',''),
],
['old_comp_id', 'db_id', 'comment']
)
key_values = ["E1", "C1"]
df_old_comp_id_filtered = df.filter(col("old_comp_id").isin(key_values))
df_db_id_filtered = df.filter(col("db_id").isin(key_values))
if df_old_comp_id_filtered.count() == 0 or df_db_id_filtered.count() == 0:
df.show() # And preferably return original DF
df.withColumn("comment", when(col("old_comp_id").isin(key_values), "mark").when(col("db_id").isin(key_values), "mark")).show()
# If both key values exist:
+-----------+-----+-------+
|old_comp_id|db_id|comment|
+-----------+-----+-------+
| E1| A1| mark|
| E2| A2| |
| F1| A3| |
| F2| B1| |
| F3| B2| |
| G1| B3| |
| G2| C1| mark|
| G3| C2| |
| G4| C3| |
| H1| C4| |
| H2| D1| |
+-----------+-----+-------+
# Else
+-----------+-----+-------+
|old_comp_id|db_id|comment|
+-----------+-----+-------+
| E1| A1| |
| E2| A2| |
| F1| A3| |
| F2| B1| |
| F3| B2| |
| G1| B3| |
| G3| C2| |
| G4| C3| |
| H1| C4| |
| H2| D1| |
+-----------+-----+-------+
Однако, чтобы заставить несколько рабочих обработать это решение, мне пришлось бы инкапсулировать его в UDF, верно?
Нет, приведенный выше код полностью распараллеливается Spark — он использует собственные функции Spark SQL. Единственной горячей точкой «производительности» может быть операция count
, но если предположить, что фильтрация, которая происходит до того, как уже удалено большинство нерелевантных строк, то с этим все должно быть в порядке :)
что ты пробовал?