Polars – проблемы с производительностью – попытка создать новый фрейм данных для каждой строки

Мне нужно запустить алгоритм другой библиотеки для каждой строки большого df, но у меня возникли проблемы с преобразованием моего кода в полярные выражения для повышения производительности. Вот несколько примеров DF:

df_products = pl.DataFrame({
    'SKU':['apple','banana','carrot','date'],
    'DESCRIPTION': [
        "Wire Rope",
        "Connector",
        "Tap",
        "Zebra"
    ],
    'CATL3': [
        "Fittings",
        "Tube",
        "Tools",
        "Animal"
    ],
    'YELLOW_CAT': [
        "Rope Accessories",
        "Tube Fittings",
        "Forming Taps",
        "Striped"
    ],
    'INDEX': [0, 5, 25, 90],
    'EMBEDDINGS': [
        [1, 2, 3],
        [4, 5, 6],
        [7, 8, 9],
        [10,11,12]
    ],
})


df_items_sm_ex = pl.DataFrame({
    'PRODUCT_INFO':['apple','banana','carrot'],
    'SEARCH_SIMILARITY_SCORE': [
        [1., 0.87, 0.54, 0.33],
        [1., 0.83, 0.77, 0.55],
        [1., 0.92, 0.84, 0.65]
    ],
    'SEARCH_POSITION': [
        [0, 5, 25, 90],
        [1, 2, 151, 373],
        [3, 5, 95, 1500]
    ],
    'SKU':['apple','banana','carrot'],
    'YELLOW_CAT': [
        "Rope Accessories",
        "Tube Fittings",
        "Forming Taps"
    ],
    'CATL3': [
        "Fittings",
        "Tube",
        "Tools"
    ],
    'EMBEDDINGS': [
        [1, 2, 3],
        [4, 5, 6],
        [7, 8, 9]
    ],
})

а теперь код

Для каждой строки у меня есть 3 основные операции: создание базового нового фрейма данных, предварительная обработка/очистка/запуск прогнозов для фрейма данных, запись фрейма данных в несколько таблиц SQL. Я заметил, что шаги 1 и 2 выполняются дольше всего:

df_items_sm_ex.select(
        pl.struct(df_items_sm_ex.columns)
        .map_elements(lambda row: build_complements(
            row, df_items, rfc, rfc_comp, engine, current_datetime
            )))


def build_complements(row, df_products, ml, ml_comp, engine, current_datetime):
    try:
        #step 1 - generate the base new dataframe
        output_df = build_candidate_dataframe(row, df_products)
        #step 2 - preprocess / clean / run predictions on the dataframe
        output_df = process_candidate_output(df_products, output_df, ml, ml_comp)
        #step 3 write dataframes to SQL
        write_validate_complements(output_df, row, current_datetime, engine)
    except Exception as e:
        print(f'exception: {repr(e)}')

def build_candidate_dataframe(row, df_products):
    df_len = len(row['SEARCH_SIMILARITY_SCORE'])
    schema = {'QUERY': str,
              'SIMILARITY_SCORE': pl.Float32, 
              'POSITION': pl.Int64,
              'QUERY_SKU': str, 
              'QUERY_LEAF': str,
              'QUERY_CAT': str,
              'QUERY_EMBEDDINGS': pl.List(pl.Float32)
            }
    output_df = pl.DataFrame({'QUERY': [row['PRODUCT_INFO']] * df_len,
                    'SIMILARITY_SCORE': row['SEARCH_SIMILARITY_SCORE'], 
                    'POSITION': row['SEARCH_POSITION'],
                    'QUERY_SKU': [row['SKU']] * df_len, 
                    'QUERY_LEAF': [row['YELLOW_CAT']] * df_len,
                    'QUERY_CAT': [row['CATL3']] * df_len,
                    'QUERY_EMBEDDINGS': [row['EMBEDDINGS']] * df_len
                    }, schema=schema).sort("SIMILARITY_SCORE", descending=True)
                
    output_df = output_df.join(df_products[['SKU', 'EMBEDDINGS', 'INDEX', 'DESCRIPTION', 'CATL3', 'YELLOW_CAT']], left_on=['POSITION'], right_on=['INDEX'], how='left')
    output_df = output_df.rename({"DESCRIPTION": "SIMILAR_PRODUCT_INFO", "CATL3": "SIMILAR_PRODUCT_CAT", "YELLOW_CAT": "SIMILAR_PRODUCT_LEAF"})
    return output_df

def process_candidate_output(df_products, output_df, ml, ml_comp):
    combined_embeddings = (output_df.to_pandas()['QUERY_EMBEDDINGS'] + output_df.to_pandas()['EMBEDDINGS']) / 2
    output_df = output_df.with_columns(pl.Series(name='COMBINED_EMBEDDINGS', values=combined_embeddings))
    output_df = output_df[['QUERY', 'QUERY_SKU', 'QUERY_CAT', 'QUERY_LEAF', 'SIMILAR_PRODUCT_INFO', 'SIMILAR_PRODUCT_CAT', 'SIMILAR_PRODUCT_LEAF', 'SIMILARITY_SCORE', 'COMBINED_EMBEDDINGS', 'SKU', 'POSITION']]
    output_df = output_df.filter(
        pl.col('SKU') != output_df['QUERY_SKU'][0]
    )
    #ML predictions
    output_df = predict_complements(output_df, ml)
    output_df = output_df.filter(
        pl.col('COMPLEMENTARY_PREDICTIONS') == 1
    )
    #Other ML predictions
    output_df = predict_required_accessories(output_df, ml_comp)
    output_df = output_df.sort(by='LABEL_PROBABILITY', descending=True)
    return output_df

Можете ли вы предоставить минимальный работоспособный пример с входным фреймом данных и ожидаемым результатом? Что делают predict_complements и predict_required_accessories? Похоже, что эти шаги могут быть тяжелыми для вычислений.

Hericks 25.06.2024 18:08

Вам следует стараться не создавать объекты Python, которые затем можно использовать для создания нового файла df. Возможно, вам следует свести этот вопрос к рефакторингу build_candidate_dataframe, чтобы вы этого не делали. В его нынешнем виде я бы не стал пытаться ответить на этот вопрос, поскольку он требует слишком много предположений о том, как (например) выглядят row, df_products или lookup_dict.

Dean MacGregor 25.06.2024 18:33

@DeanMacGregor Я обновил пример df_products. Я удалил search_dict. Можете ли вы подробнее объяснить свои мысли на тему: «Вы должны стремиться не создавать объекты Python, которые затем используете для создания нового df.»? В моем конкретном случае фрейм данных df_products представляет собой длинный список элементов. Мне нужно создать рекомендации для каждого элемента; поэтому я создаю новый df... возможно, вы понимаете мои намерения, но я считаю, что не совсем ясно вижу ваше решение. Спасибо

tastycakezs 25.06.2024 19:19

@Hericks Я обновил пост, дайте мне знать, если этого будет недостаточно. Predict_complements использует случайный лес для маркировки кадра данных, если каждый элемент имеет значение True или False. Что касается меток True, Predict_required_accessories запускает отдельно обученный случайный лес, чтобы разбить метки True на 3 разные категории.

tastycakezs 25.06.2024 19:21
Оптимизация производительности модели: Руководство по настройке гиперпараметров в Python с Keras
Оптимизация производительности модели: Руководство по настройке гиперпараметров в Python с Keras
Настройка гиперпараметров - это процесс выбора наилучшего набора гиперпараметров для модели машинного обучения с целью оптимизации ее...
Развертывание модели машинного обучения с помощью Flask - Angular в Kubernetes
Развертывание модели машинного обучения с помощью Flask - Angular в Kubernetes
Kubernetes - это портативная, расширяемая платформа с открытым исходным кодом для управления контейнерными рабочими нагрузками и сервисами, которая...
Udacity Nanodegree Capstone Project: Классификатор пород собак
Udacity Nanodegree Capstone Project: Классификатор пород собак
Вы можете ознакомиться со скриптами проекта и данными на github .
Определение пород собак с помощью конволюционных нейронных сетей (CNN)
Определение пород собак с помощью конволюционных нейронных сетей (CNN)
В рамках финального проекта Udacity Data Scietist Nanodegree я разработал алгоритм с использованием конволюционных нейронных сетей (CNN) для...
Почему Python - идеальный выбор для проекта AI и ML
Почему Python - идеальный выбор для проекта AI и ML
Блог, которым поделился Harikrishna Kundariya в нашем сообществе Developer Nation Community.
0
4
112
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Для каждой строки df вы извлекаете данные из поляров в объекты Python, а затем повторно запускаете поляры df. Это очень дорого и по времени, и по памяти. Вместо этого вам следует хранить все в памяти поляров, если только по какой-то причине вы не можете этого сделать. Вы упоминаете, что большая часть времени уходит на этапы 1 и 2, но шаг 2 включает в себя ваши материалы ML, так что это все еще может быть узким местом.

Использование map_elements здесь не идеально, потому что вы ничего не возвращаете обратно в df, просто используйте обычный цикл for. Также я объединил две ваши функции в одну:

def build_and_process_candidate_output(i, df_items_sm_ex, df_products, ml, ml_comp):
    output_df = (
        df_items_sm_ex[i]
        .explode("SEARCH_SIMILARITY_SCORE", "SEARCH_POSITION")
        .rename(
            {
                "PRODUCT_INFO": "QUERY",
                "SEARCH_SIMILARITY_SCORE": "SIMILARITY_SCORE",
                "SEARCH_POSITION": "POSITION",
                "SKU": "QUERY_SKU",
                "YELLOW_CAT": "QUERY_LEAF",
                "CATL3": "QUERY_CAT",
                "EMBEDDINGS": "QUERY_EMBEDDINGS",
            }
        )
        .join(
            df_products.select(
                "SKU", "EMBEDDINGS", "INDEX", "DESCRIPTION", "CATL3", "YELLOW_CAT"
            ),
            left_on=["POSITION"],
            right_on=["INDEX"],
            how = "left",
        )
        .rename(
            {
                "DESCRIPTION": "SIMILAR_PRODUCT_INFO",
                "CATL3": "SIMILAR_PRODUCT_CAT",
                "YELLOW_CAT": "SIMILAR_PRODUCT_LEAF",
            }
        )
        .select(
            "QUERY",
            "QUERY_SKU",
            "QUERY_CAT",
            "QUERY_LEAF",
            "SIMILAR_PRODUCT_INFO",
            "SIMILAR_PRODUCT_CAT",
            "SIMILAR_PRODUCT_LEAF",
            "SIMILARITY_SCORE",
            
            # This assumes these are the same length
            (pl.col("QUERY_EMBEDDINGS").explode() + pl.col("EMBEDDINGS").explode())
            .implode()
            .over("POSITION")
            .alias("COMBINED_EMBEDDINGS"),

            "SKU",
            "POSITION",
        )
         # Given the sample data, this filter makes everything go away
         # which is why supplying good sample data is important
        .filter(pl.col("SKU") != pl.col("QUERY_SKU").first())
    )

    # ML predictions
    output_df = predict_complements(output_df, ml)
    output_df = output_df.filter(pl.col("COMPLEMENTARY_PREDICTIONS") == 1)
    # Other ML predictions
    output_df = predict_required_accessories(output_df, ml_comp)
    output_df = output_df.sort(by = "LABEL_PROBABILITY", descending=True)
    return output_df

Спасибо большое, это увеличило скорость выполнения в 10 раз. Я новичок в полярах, и то, как вы сформулировали код, поможет мне в будущих случаях использования поляров.

tastycakezs 26.06.2024 19:56

Другие вопросы по теме