Я новичок в Spark, и у меня возникла глупая проблема «какой лучший подход». По сути, у меня есть карта (dict), которую я хотел бы перебрать. Во время каждой итерации я хочу выполнить поиск по столбцу в фрейме данных искры, используя регулярное выражение rlike
, и назначить ключ словаря новому столбцу, используя withColumn
maps = {"groceries": ["hot chocolate", "milk", "sugar", "flour"],
"laundry": ["soap", "detergent", "fabric softener"]
}
Пример данных показан ниже
+--------------------+-----------+
| id|item_bought|
+--------------------+-----------+
|uiq7Zq52Bww4pZXc3xri| Soap|
|fpJatwxTeObcbuJH25UI| Detergent|
|MdK1q5gBygIGFYyvbz8J| Milk|
+--------------------+-----------+
Я хочу получить фрейм данных, который выглядит так:
+--------------------+-----------+---------+
| id|item_bought| class|
+--------------------+-----------+---------+
|uiq7Zq52Bww4pZXc3xri| Soap| Laundry|
|fpJatwxTeObcbuJH25UI| Detergent| Laundry|
|MdK1q5gBygIGFYyvbz8J| Milk|Groceries|
+--------------------+-----------+---------+
У меня более 100 млн записей, и мне нужен подход, основанный на передовом опыте Spark (распределенные вычисления). Один из подходов, который приходит на ум, — пройтись по карте и использовать rlike
или str.contains
для поиска по регулярному выражению, как показано ниже:
for key, value in maps.items():
pattern = '|'.join([f'(?i){x}' for x in value]). # ignore case
df.withColumn("class", col("item_bought").rlike(pattern))
Но это возвращает true
или false
для rlike поиска. Я хочу заменить true или false значением key
.
Кроме того, учитывая, что у меня есть 100 млн (до 150 млн) записей, является ли циклический просмотр карты лучшим подходом?
РЕДАКТИРОВАТЬ
Что, если бы items_bought
в df
были специальные символы (или дополнительный текст)?
+--------------------+----------------+
| id| item_bought|
+--------------------+----------------+
|uiq7Zq52Bww4pZXc3xri| Soap -&ju10kg|
|fpJatwxTeObcbuJH25UI|Detergent x.ju2i|
|MdK1q5gBygIGFYyvbz8J| Milk|
+--------------------+----------------+
Я не хочу сначала очищать текст, просто назначаю классы на основе поиска по ключевым словам регулярного выражения.
В вашей ситуации я превращу карту в фрейм данных. Я предполагаю, что результирующий фрейм данных будет относительно небольшим. Используйте присоединение к зарубежной трансляции. Что это делает, так это то, что он распределяет небольшой df на каждый рабочий узел, избегая перетасовки.
#Create df from maps
df_ref = spark.createDataFrame(maps.items(), schema =('class','item_bought')).withColumn('item_bought',explode('item_bought')).withColumn('item_bought', initcap('item_bought'))
#Broadcast join
df.join(broadcast(df_ref), how='left', on='item_bought').show()
+-----------+--------------------+---------+
|item_bought| id| class|
+-----------+--------------------+---------+
| Soap|uiq7Zq52Bww4pZXc3xri| laundry|
| Detergent|fpJatwxTeObcbuJH25UI| laundry|
| Milk|MdK1q5gBygIGFYyvbz8J|groceries|
+-----------+--------------------+---------+
После вашего редактирования
df_ref = spark.createDataFrame(maps.items(), schema =('class','item_bought1')).withColumn('item_bought1',explode('item_bought1')).withColumn('item_bought1', initcap('item_bought1'))
df.withColumn('item_bought1',regexp_extract('item_bought','^[A-Za-z]+',0)).join(broadcast(df_ref), how='left', on='item_bought1').show()
+------------+--------------------+----------------+---------+
|item_bought1| id| item_bought| class|
+------------+--------------------+----------------+---------+
| Soap|uiq7Zq52Bww4pZXc3xri| Soap| laundry|
| Detergent|fpJatwxTeObcbuJH25UI| Detergent| laundry|
| Milk|MdK1q5gBygIGFYyvbz8J| Milk|groceries|
| Soap|uiq7Zq52Bww4pZXc3xri| Soap -&ju10kg| laundry|
| Detergent|fpJatwxTeObcbuJH25UI|Detergent x.ju2i| laundry|
| Milk|MdK1q5gBygIGFYyvbz8J| Milk|groceries|
+------------+--------------------+---------------- -+---------+
Что вы подразумеваете под точными совпадениями, примерами?
Я обновил вопрос, чтобы показать примеры
Регулярно выражайте основной df, а затем присоединяйтесь к трансляции
@deramos, пожалуйста, примите ответ, если он решил ваши проблемы
Спасибо @wwnde Будет ли это работать, если
items_bought
не будут точными совпадениями? Пожалуйста, смотрите обновление вопроса.