Как я могу отметить подмножество фрейма данных pyspark на основе другого фрейма данных?
В таблице A подмножество определяется кодом в столбце code
. Для каждого кода у нас есть несколько регистров. У нас есть одна колонка, servico
. В таблице B у нас есть те же столбцы cod_1
и cod_2
, а также столбец, который мы хотим пересечь с таблицей A. В случае, если cod_1
и cod_2
присутствуют в подмножестве, определенном code
в столбце таблицы A servico
, мы заполняем столбец таблицы A rule
в регистре с cod_2
со значениями в таблице B rule
.
Единственное ограничение состоит в том, что все нужно делать в pyspark или SQL без collect(). Я хочу добиться максимальной производительности на сервере Databricks.
Таблица А определяется:
table_a= spark.createDataFrame([
('123', 'A1',''),
('123', 'E1',''),
('123', 'A3',''),
('123', 'B1',''),
('123', 'B2',''),
('123', 'B3',''),
('321', 'C1',''),
('321', 'C2',''),
('321', 'C3',''),
('321', 'C4',''),
('321', 'D1',''),
],
['code', 'servico', 'rule']
)
Таблица B определяется:
table_b = spark.createDataFrame([
('E1', 'A1','aaa'),
('E2', 'A2','bbb'),
('F1', 'A3','ccc'),
('F2', 'B1','ddd'),
('F3', 'B2','eee'),
('G1', 'B3','fff'),
('G2', 'C1','ggg'),
('G3', 'C2','hhh'),
('G4', 'C3','iii'),
('H1', 'C4','jjj'),
('H2', 'D1','kkk'),
],
['cod_1', 'cod_2', 'rule']
)
Ожидаемый результат:
result = spark.createDataFrame([
('123', 'A1','aaa'),
('123', 'E1',''),
('123', 'A3',''),
('123', 'B1',''),
('123', 'B2',''),
('123', 'B3',''),
('321', 'C1',''),
('321', 'C2',''),
('321', 'C3',''),
('321', 'C4',''),
('321', 'D1',''),
],
['code', 'servico', 'rule']
)
Единственный регистр, который заполняется таблицей B, является первым, потому что мы находим как A1
, так и E1
для кода 123
.
1. cod
есть servico
. Я исправлю в примере. Спасибо. 2. Вы также можете отметить как E1, так и A1. Думаю, что маркировать только Е1 проще.
Чтобы предотвратить громоздкий фрейм данных (т.е. составить таблицу соединений a, чтобы получить всю комбинацию) и обработать порядок ключей, я сначала собираю все сервисы каждого кода:
table_a_agg = table_a\
.withColumn('set_of_servico', func.collect_set('servico').over(Window.partitionBy('code')).cast(types.StringType()))
table_a_agg.show(100, False)
+----+-------+----+------------------------+
|code|servico|rule|set_of_servico |
+----+-------+----+------------------------+
|123 |A1 | |[B1, A3, A1, B2, E1, B3]|
|123 |E1 | |[B1, A3, A1, B2, E1, B3]|
|123 |A3 | |[B1, A3, A1, B2, E1, B3]|
|123 |B1 | |[B1, A3, A1, B2, E1, B3]|
|123 |B2 | |[B1, A3, A1, B2, E1, B3]|
|123 |B3 | |[B1, A3, A1, B2, E1, B3]|
|321 |C1 | |[C4, C3, D1, C1, C2] |
|321 |C2 | |[C4, C3, D1, C1, C2] |
|321 |C3 | |[C4, C3, D1, C1, C2] |
|321 |C4 | |[C4, C3, D1, C1, C2] |
|321 |D1 | |[C4, C3, D1, C1, C2] |
+----+-------+----+------------------------+
Причина, по которой set_of_servico
приводится к строковому типу, заключается в объединении. Чтобы выполнить соединение, мы можем проверить, входят ли cod_1
и cod_2
в set_of_servico
result = table_a_agg.select('code', 'servico', 'set_of_servico').alias('a')\
.join(table_b.alias('b'),
[func.col('a.set_of_servico').contains(func.col('b.cod_1')), func.col('a.set_of_servico').contains(func.col('b.cod_2'))],
how='left')
+----+-------+------------------------+-----+-----+----+
|code|servico|set_of_servico |cod_1|cod_2|rule|
+----+-------+------------------------+-----+-----+----+
|123 |A1 |[B1, A3, A1, B2, E1, B3]|E1 |A1 |aaa |
|123 |E1 |[B1, A3, A1, B2, E1, B3]|E1 |A1 |aaa |
|123 |A3 |[B1, A3, A1, B2, E1, B3]|E1 |A1 |aaa |
|123 |B1 |[B1, A3, A1, B2, E1, B3]|E1 |A1 |aaa |
|123 |B2 |[B1, A3, A1, B2, E1, B3]|E1 |A1 |aaa |
|123 |B3 |[B1, A3, A1, B2, E1, B3]|E1 |A1 |aaa |
|321 |C1 |[C4, C3, D1, C1, C2] |null |null |null|
|321 |C2 |[C4, C3, D1, C1, C2] |null |null |null|
|321 |C3 |[C4, C3, D1, C1, C2] |null |null |null|
|321 |C4 |[C4, C3, D1, C1, C2] |null |null |null|
|321 |D1 |[C4, C3, D1, C1, C2] |null |null |null|
+----+-------+------------------------+-----+-----+----+
Затем мы можем проверить, равен ли servico cod_1 и cod_2, чтобы отфильтровать строку, имеющую результат объединения, но на самом деле ее нет в комбинации:
result = result\
.select(
'code',
'servico',
func.when((func.col('servico')==func.col('cod_1'))|(func.col('servico')==func.col('cod_2')), func.col('rule')).otherwise(func.lit(None)).alias('rule')
)
result.show(100, False)
+----+-------+----+
|code|servico|rule|
+----+-------+----+
|123 |A1 |aaa |
|123 |E1 |aaa |
|123 |A3 |null|
|123 |B1 |null|
|123 |B2 |null|
|123 |B3 |null|
|321 |C1 |null|
|321 |C2 |null|
|321 |C3 |null|
|321 |C4 |null|
|321 |D1 |null|
+----+-------+----+
У меня два вопроса: 1. Где находится столбец
cod
в вашем примере table_a? 2. В вашем ожидаемом результате, почему вторая строка,('123', 'E1','')
, не должна быть заполнена «ааа»?E1
иA1
это комбинация, верно?