Pyspark: поиск регулярных выражений с текстом в списке withColumn

Я новичок в Spark, и у меня возникла глупая проблема «какой лучший подход». По сути, у меня есть карта (dict), которую я хотел бы перебрать. Во время каждой итерации я хочу выполнить поиск по столбцу в фрейме данных искры, используя регулярное выражение rlike, и назначить ключ словаря новому столбцу, используя withColumn

maps = {"groceries": ["hot chocolate", "milk", "sugar", "flour"],
        "laundry": ["soap", "detergent", "fabric softener"]
}

Пример данных показан ниже

+--------------------+-----------+
|                  id|item_bought|
+--------------------+-----------+
|uiq7Zq52Bww4pZXc3xri|       Soap|
|fpJatwxTeObcbuJH25UI|  Detergent|
|MdK1q5gBygIGFYyvbz8J|       Milk|
+--------------------+-----------+

Я хочу получить фрейм данных, который выглядит так:

+--------------------+-----------+---------+
|                  id|item_bought|    class|
+--------------------+-----------+---------+
|uiq7Zq52Bww4pZXc3xri|       Soap|  Laundry|
|fpJatwxTeObcbuJH25UI|  Detergent|  Laundry|
|MdK1q5gBygIGFYyvbz8J|       Milk|Groceries|
+--------------------+-----------+---------+

У меня более 100 млн записей, и мне нужен подход, основанный на передовом опыте Spark (распределенные вычисления). Один из подходов, который приходит на ум, — пройтись по карте и использовать rlike или str.contains для поиска по регулярному выражению, как показано ниже:

for key, value in maps.items():
    pattern = '|'.join([f'(?i){x}' for x in value]). # ignore case
    df.withColumn("class", col("item_bought").rlike(pattern))

Но это возвращает true или false для rlike поиска. Я хочу заменить true или false значением key.

Кроме того, учитывая, что у меня есть 100 млн (до 150 млн) записей, является ли циклический просмотр карты лучшим подходом?

РЕДАКТИРОВАТЬ

Что, если бы items_bought в df были специальные символы (или дополнительный текст)?

+--------------------+----------------+
|                  id|     item_bought|
+--------------------+----------------+
|uiq7Zq52Bww4pZXc3xri|   Soap -&ju10kg|
|fpJatwxTeObcbuJH25UI|Detergent x.ju2i|
|MdK1q5gBygIGFYyvbz8J|            Milk|
+--------------------+----------------+

Я не хочу сначала очищать текст, просто назначаю классы на основе поиска по ключевым словам регулярного выражения.

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
0
63
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

В вашей ситуации я превращу карту в фрейм данных. Я предполагаю, что результирующий фрейм данных будет относительно небольшим. Используйте присоединение к зарубежной трансляции. Что это делает, так это то, что он распределяет небольшой df на каждый рабочий узел, избегая перетасовки.

#Create df from maps
    df_ref = spark.createDataFrame(maps.items(), schema =('class','item_bought')).withColumn('item_bought',explode('item_bought')).withColumn('item_bought', initcap('item_bought'))

#Broadcast join    
    df.join(broadcast(df_ref), how='left', on='item_bought').show()


+-----------+--------------------+---------+
|item_bought|                  id|    class|
+-----------+--------------------+---------+
|       Soap|uiq7Zq52Bww4pZXc3xri|  laundry|
|  Detergent|fpJatwxTeObcbuJH25UI|  laundry|
|       Milk|MdK1q5gBygIGFYyvbz8J|groceries|
+-----------+--------------------+---------+

После вашего редактирования

df_ref = spark.createDataFrame(maps.items(), schema =('class','item_bought1')).withColumn('item_bought1',explode('item_bought1')).withColumn('item_bought1', initcap('item_bought1'))


df.withColumn('item_bought1',regexp_extract('item_bought','^[A-Za-z]+',0)).join(broadcast(df_ref), how='left', on='item_bought1').show()

+------------+--------------------+----------------+---------+
|item_bought1|                  id|     item_bought|    class|
+------------+--------------------+----------------+---------+
|        Soap|uiq7Zq52Bww4pZXc3xri|            Soap|  laundry|
|   Detergent|fpJatwxTeObcbuJH25UI|       Detergent|  laundry|
|        Milk|MdK1q5gBygIGFYyvbz8J|            Milk|groceries|
|        Soap|uiq7Zq52Bww4pZXc3xri|   Soap -&ju10kg|  laundry|
|   Detergent|fpJatwxTeObcbuJH25UI|Detergent x.ju2i|  laundry|
|        Milk|MdK1q5gBygIGFYyvbz8J|            Milk|groceries|

+------------+--------------------+---------------- -+---------+

Спасибо @wwnde Будет ли это работать, если items_bought не будут точными совпадениями? Пожалуйста, смотрите обновление вопроса.

deramos 22.03.2022 13:48

Что вы подразумеваете под точными совпадениями, примерами?

wwnde 22.03.2022 13:52

Я обновил вопрос, чтобы показать примеры

deramos 22.03.2022 13:53

Регулярно выражайте основной df, а затем присоединяйтесь к трансляции

wwnde 22.03.2022 14:10

@deramos, пожалуйста, примите ответ, если он решил ваши проблемы

wwnde 22.03.2022 21:33

Другие вопросы по теме