Невозможно отфильтровать кадры данных в огромном наборе данных в PySpark

У меня есть огромный фрейм данных PySpark, содержащий 1,5 млрд строк, включая столбец fieldA. У меня есть список из 8,8 млн уникальных значений fieldA, которые я хочу отфильтровать из 1,5 млрд строк. Однако я думаю, что из-за большого размера данных я продолжаю получать ошибки типа StackOverflowError или OutOfMemoryError.

Я попытался разделить список размером 8,8 млн на более мелкие списки по 20 тыс. значений, а также разделить кадры данных размером 1,5 млрд на более мелкие кадры данных по 15 млн строк каждый. Затем для каждого кадра данных из 15 миллионов строк непрерывно (в цикле) отфильтровывайте разные 20 тысяч значений fieldA (temp_df = temp_df.filter(~col('fieldA').isin(fieldA_part_list))), пока не будут отфильтрованы все 8,8 миллионов значений, а затем записывайте окончательные temp_df в файлы паркета. Повторите эти действия для следующих 15 миллионов строк кадров данных. Однако я думаю, что это привело к сотням .filter(), и, возможно, именно это дало мне StackOverflowError, когда я попытался записать в файлы паркета первый 15-мегабайтный фрейм данных.

Затем я попытался отфильтровать полные 8,8 млн значений из каждого 15-мегабайтного кадра данных. Для каждого 15-мегабайтного кадра данных я записывал отфильтрованные результаты в файлы паркета. Однако, когда я попытался записать файлы паркета, я получил OutOfMemoryError на первом 15-мегабайтном кадре данных.

Как я могу эффективно отфильтровать строки, соответствующие любому из 8,8 млн значений fieldA из 1,5 млрд строк кадра данных?

попробуй "left_anti" присоединяйся. df_filtered = df.join(df_unique, df["fieldA"].eqNullSafe(df_unique["fieldA"]), "left_anti") Ссылка: spark.apache.org/docs/3.1.1/api/python/reference/api/…

Ybhaw 31.07.2024 10:27

Спасибо! leftanti присоединиться действительно было правильным решением.

Rayne 01.08.2024 05:25
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
2
51
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Есть несколько вещей, которые вы можете сделать, чтобы повысить эффективность использования памяти нашего кода.

  • Вначале выберите только те столбцы, которые вам нужны: это поможет снизить потребление памяти вашего temp_df DataFrame.

  • Используйте join вместо isin, если ваш список элементов слишком велик: это, вероятно, вызывает накладные расходы при фильтрации. Я предлагаю преобразовать список элементов в DataFrame и использовать технику соединения, например leftanti, чтобы получить все элементы из вашего temp_df, которых нет в вашем fieldA_part_list.

  • Попытка DataFrame.ExceptionAll (pyspark.sql.DataFrame.ExceptionAll): это также хорошая встроенная операция pyspark, которую вы можете попробовать, которая в основном возвращает все строки из temp_df, которых нет в fieldA_part_list (учитывая, что теперь это фрейм данных).

Вот несколько примеров, которые вы можете использовать в качестве основы:

from pyspark.sql.functions import *
from pyspark.sql.types import StringType
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

fieldA_part_list = ["someValue1", "someValue2", "someValue3"]  

# transform the fieldA_list as a dataframe
fieldA_part_df = spark.createDataFrame(fieldA_part_list, StringType()).toDF("value")

join_condition_expr = col("fieldA") == col("value")

# Perform leftanti to get only temp_df records
temp_df = temp_df.join(fieldA_part_df, on= join_condition_expr, how = "leftanti")

# Alternatively using exceptAll - use one or other given memory pessure is on
another_result_df = temp_df.exceptAll(fieldA_part_df)

Из любопытства, используете ли вы для выполнения этих операций свой локальный компьютер или кластер Databricks?

Это кластер Spark, управляемый изнутри (а не Databricks).

Rayne 31.07.2024 11:53

Спасибо! Я выбрал leftanti, это было намного быстрее и работа была выполнена!

Rayne 01.08.2024 05:25

Другие вопросы по теме