У меня есть DataFrame с именем product_relationship_current, и я выполняю самосоединение для получения нового DataFrame, как показано ниже:
Сначала я даю ему псевдоним, чтобы я мог рассматривать их как два разных фрейма данных:
val pr1 = product_relationship_current.alias("pr1").where(col("TYPE").isin("contains", "CONTAINS"))
val pr2 = product_relationship_current.alias("pr2")
И затем я делаю самосоединение, чтобы получить новый фрейм данных:
val stackoutput = pr1.join(pr2, pr1("PRODUCT_VERSION_ID_RELATED_FK") === pr2("PRODUCT_VERSION_ID_FK"), "left")
.select(pr1("PRODUCT_ID"), pr1("PRODUCT_VERSION"), pr1("RELATED_PRODUCT_ID"), pr1("RELATED_PRODUCT_VERSION"), pr1("TYPE"), pr1("PRODUCT_VERSION_ID_RELATED_FK"))
.distinct()
Но я ищу другой способ сделать это, не выполняя самосоединение, поэтому мне не нужно дважды загружать один и тот же фрейм данных, потому что его выполнение занимает так много времени. (мой фрейм данных product_relationship_current слишком велик).
Это SQL-запрос, который я пытался выполнить с помощью spark scala:
select
distinct pr1.product_id as IO,
pr1.product_version as IOV,
pr1.related_product_id,
pr1.related_product_version,
pr1.type,
pr1.product_version_id_related_fk
from
product_relationship_current as pr1
left join product_relationship_current as pr2 on pr1.product_version_id_related_fk = pr2.product_version_id_fk
where
pr1.type = 'contains'
Какой уровень хранения я должен указать в persist() ? Только диск?
Я разделю этот ответ на 2 части: мой ответ и вопрос.
Мой ответ
Если вы хотите избежать двойного чтения фрейма данных, вы можете использовать df.cache для его кэширования в память/диск. df.cache в основном df.persist использует уровень хранения по умолчанию (MEMORY_AND_DISK).
Я сделал импровизированный файл CSV (data.csv) с вашими столбцами. Тогда я написал что-то вроде этого:
val df = spark.read.option("delimiter", ";").option("header", "true").csv("data.csv").cache
val df1 = df.filter(df("TYPE").isin("contains", "CONTAINS"))
val output = df1.alias("df1")
.join(df.alias("df"), col("df1.PRODUCT_VERSION_ID_RELATED_FK") === col("df.PRODUCT_VERSION_ID_FK"), "left")
.select(df1("PRODUCT_ID"), df1("PRODUCT_VERSION"), df1("RELATED_PRODUCT_ID"), df1("RELATED_PRODUCT_VERSION"), df1("TYPE"), df1("PRODUCT_VERSION_ID_RELATED_FK"))
.distinct()
Итак, вы видите, что я использовал метод .cache в первой строке. Теперь, как мы можем проверить, что это сработало? Я взглянул на пользовательский интерфейс Spark (по умолчанию работает на порту 4040 везде, где запущен ваш процесс драйвера. В моем случае я запускал spark-shell локально, чтобы иметь доступ к пользовательскому интерфейсу на localhost:4040)
Глядя на план запроса или его визуализацию, мы можем понять, каков эффект .cache. Я запускал приведенный выше код дважды: один раз, когда я не .cache свой фрейм данных, и один раз, когда я это сделал.
Первое изображение, которое я оставлю здесь, это то, где я не .cache свой фрейм данных.
На втором изображении я сделал .cache свой фрейм данных.
Таким образом, вы видите разницу в верхней части изображений: поскольку фрейм данных был кэширован, файл CSV (или любой другой источник, который вы используете) не будет считываться дважды: вы видите, что у нас есть блок InMemoryTableScan в обеих ветвях вместо Scan csv блок.
Важно: Если ваши данные настолько велики, что не помещаются в объединенную память ваших исполнителей, данные будут перенесены на диск. Это сделает эту операцию медленнее.
Мой вопрос к вам: похоже, вы выполняете левое соединение с pr1 левой таблицей, а затем выбираете столбцы только из части таблицы pr1. Это объединение вообще необходимо? Похоже, вы можете просто выбрать нужные столбцы, а затем .distinct фрейм данных.
Надеюсь это поможет!
Здравствуйте, спасибо за ваш ответ .. да, это правда, я ошибся при выборе, выбрав только левую сторону
О, хорошо, рад, что ответ помог :) Удачи!
«Но я ищу другой способ сделать это, не выполняя самосоединение, поэтому мне не нужно загружать один и тот же фрейм данных дважды, потому что он выполняется так долго», почему бы вам не сохранить df тогда ?