У меня есть огромный фрейм данных PySpark, содержащий 1,5 млрд строк, включая столбец fieldA
. У меня есть список из 8,8 млн уникальных значений fieldA
, которые я хочу отфильтровать из 1,5 млрд строк. Однако я думаю, что из-за большого размера данных я продолжаю получать ошибки типа StackOverflowError
или OutOfMemoryError
.
Я попытался разделить список размером 8,8 млн на более мелкие списки по 20 тыс. значений, а также разделить кадры данных размером 1,5 млрд на более мелкие кадры данных по 15 млн строк каждый. Затем для каждого кадра данных из 15 миллионов строк непрерывно (в цикле) отфильтровывайте разные 20 тысяч значений fieldA
(temp_df = temp_df.filter(~col('fieldA').isin(fieldA_part_list))
), пока не будут отфильтрованы все 8,8 миллионов значений, а затем записывайте окончательные temp_df
в файлы паркета. Повторите эти действия для следующих 15 миллионов строк кадров данных. Однако я думаю, что это привело к сотням .filter()
, и, возможно, именно это дало мне StackOverflowError
, когда я попытался записать в файлы паркета первый 15-мегабайтный фрейм данных.
Затем я попытался отфильтровать полные 8,8 млн значений из каждого 15-мегабайтного кадра данных. Для каждого 15-мегабайтного кадра данных я записывал отфильтрованные результаты в файлы паркета. Однако, когда я попытался записать файлы паркета, я получил OutOfMemoryError
на первом 15-мегабайтном кадре данных.
Как я могу эффективно отфильтровать строки, соответствующие любому из 8,8 млн значений fieldA
из 1,5 млрд строк кадра данных?
Спасибо! leftanti
присоединиться действительно было правильным решением.
Есть несколько вещей, которые вы можете сделать, чтобы повысить эффективность использования памяти нашего кода.
Вначале выберите только те столбцы, которые вам нужны: это поможет снизить потребление памяти вашего temp_df
DataFrame.
Используйте join
вместо isin
, если ваш список элементов слишком велик: это, вероятно, вызывает накладные расходы при фильтрации. Я предлагаю преобразовать список элементов в DataFrame и использовать технику соединения, например leftanti
, чтобы получить все элементы из вашего temp_df
, которых нет в вашем fieldA_part_list
.
Попытка DataFrame.ExceptionAll (pyspark.sql.DataFrame.ExceptionAll): это также хорошая встроенная операция pyspark, которую вы можете попробовать, которая в основном возвращает все строки из temp_df
, которых нет в fieldA_part_list
(учитывая, что теперь это фрейм данных).
Вот несколько примеров, которые вы можете использовать в качестве основы:
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
fieldA_part_list = ["someValue1", "someValue2", "someValue3"]
# transform the fieldA_list as a dataframe
fieldA_part_df = spark.createDataFrame(fieldA_part_list, StringType()).toDF("value")
join_condition_expr = col("fieldA") == col("value")
# Perform leftanti to get only temp_df records
temp_df = temp_df.join(fieldA_part_df, on= join_condition_expr, how = "leftanti")
# Alternatively using exceptAll - use one or other given memory pessure is on
another_result_df = temp_df.exceptAll(fieldA_part_df)
Из любопытства, используете ли вы для выполнения этих операций свой локальный компьютер или кластер Databricks?
Это кластер Spark, управляемый изнутри (а не Databricks).
Спасибо! Я выбрал leftanti
, это было намного быстрее и работа была выполнена!
попробуй
"left_anti"
присоединяйся.df_filtered = df.join(df_unique, df["fieldA"].eqNullSafe(df_unique["fieldA"]), "left_anti")
Ссылка: spark.apache.org/docs/3.1.1/api/python/reference/api/…