Как избежать самосоединения в искровом скала

У меня есть DataFrame с именем product_relationship_current, и я выполняю самосоединение для получения нового DataFrame, как показано ниже:

Сначала я даю ему псевдоним, чтобы я мог рассматривать их как два разных фрейма данных:

val pr1 = product_relationship_current.alias("pr1").where(col("TYPE").isin("contains", "CONTAINS"))
val pr2 = product_relationship_current.alias("pr2")

И затем я делаю самосоединение, чтобы получить новый фрейм данных:

val stackoutput = pr1.join(pr2, pr1("PRODUCT_VERSION_ID_RELATED_FK") === pr2("PRODUCT_VERSION_ID_FK"), "left")
  .select(pr1("PRODUCT_ID"), pr1("PRODUCT_VERSION"), pr1("RELATED_PRODUCT_ID"), pr1("RELATED_PRODUCT_VERSION"), pr1("TYPE"), pr1("PRODUCT_VERSION_ID_RELATED_FK"))
  .distinct()

Но я ищу другой способ сделать это, не выполняя самосоединение, поэтому мне не нужно дважды загружать один и тот же фрейм данных, потому что его выполнение занимает так много времени. (мой фрейм данных product_relationship_current слишком велик).

Это SQL-запрос, который я пытался выполнить с помощью spark scala:

select 
  distinct pr1.product_id as IO, 
  pr1.product_version as IOV, 
  pr1.related_product_id, 
  pr1.related_product_version, 
  pr1.type, 
  pr1.product_version_id_related_fk 
from 
  product_relationship_current as pr1 
  left join product_relationship_current as pr2 on pr1.product_version_id_related_fk = pr2.product_version_id_fk 
where 
  pr1.type = 'contains' 

«Но я ищу другой способ сделать это, не выполняя самосоединение, поэтому мне не нужно загружать один и тот же фрейм данных дважды, потому что он выполняется так долго», почему бы вам не сохранить df тогда ?

Young 13.11.2022 22:28

Какой уровень хранения я должен указать в persist() ? Только диск?

mr.Penguin 14.11.2022 10:21
Как настроить Tailwind CSS с React.js и Next.js?
Как настроить Tailwind CSS с React.js и Next.js?
Tailwind CSS - единственный фреймворк, который, как я убедился, масштабируется в больших командах. Он легко настраивается, адаптируется к любому...
LeetCode запись решения 2536. Увеличение подматриц на единицу
LeetCode запись решения 2536. Увеличение подматриц на единицу
Увеличение подматриц на единицу - LeetCode
Переключение светлых/темных тем
Переключение светлых/темных тем
В Microsoft Training - Guided Project - Build a simple website with web pages, CSS files and JavaScript files, мы объясняем, как CSS можно...
Отношения "многие ко многим" в Laravel с методами присоединения и отсоединения
Отношения "многие ко многим" в Laravel с методами присоединения и отсоединения
Отношения "многие ко многим" в Laravel могут быть немного сложными, но с помощью Eloquent ORM и его моделей мы можем сделать это с легкостью. В этой...
В PHP
В PHP
В большой кодовой базе с множеством различных компонентов классы, функции и константы могут иметь одинаковые имена. Это может привести к путанице и...
Карта дорог Беладжар PHP Laravel
Карта дорог Беладжар PHP Laravel
Laravel - это PHP-фреймворк, разработанный для облегчения разработки веб-приложений. Laravel предоставляет различные функции, упрощающие разработку...
1
2
140
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я разделю этот ответ на 2 части: мой ответ и вопрос.

Мой ответ

Если вы хотите избежать двойного чтения фрейма данных, вы можете использовать df.cache для его кэширования в память/диск. df.cache в основном df.persist использует уровень хранения по умолчанию (MEMORY_AND_DISK).

Я сделал импровизированный файл CSV (data.csv) с вашими столбцами. Тогда я написал что-то вроде этого:

val df = spark.read.option("delimiter", ";").option("header", "true").csv("data.csv").cache
val df1 = df.filter(df("TYPE").isin("contains", "CONTAINS"))
val output = df1.alias("df1")
  .join(df.alias("df"), col("df1.PRODUCT_VERSION_ID_RELATED_FK") === col("df.PRODUCT_VERSION_ID_FK"), "left")
  .select(df1("PRODUCT_ID"), df1("PRODUCT_VERSION"), df1("RELATED_PRODUCT_ID"), df1("RELATED_PRODUCT_VERSION"), df1("TYPE"), df1("PRODUCT_VERSION_ID_RELATED_FK"))
  .distinct()

Итак, вы видите, что я использовал метод .cache в первой строке. Теперь, как мы можем проверить, что это сработало? Я взглянул на пользовательский интерфейс Spark (по умолчанию работает на порту 4040 везде, где запущен ваш процесс драйвера. В моем случае я запускал spark-shell локально, чтобы иметь доступ к пользовательскому интерфейсу на localhost:4040)

Глядя на план запроса или его визуализацию, мы можем понять, каков эффект .cache. Я запускал приведенный выше код дважды: один раз, когда я не .cache свой фрейм данных, и один раз, когда я это сделал.

Первое изображение, которое я оставлю здесь, это то, где я не .cache свой фрейм данных.

На втором изображении я сделал .cache свой фрейм данных.

Таким образом, вы видите разницу в верхней части изображений: поскольку фрейм данных был кэширован, файл CSV (или любой другой источник, который вы используете) не будет считываться дважды: вы видите, что у нас есть блок InMemoryTableScan в обеих ветвях вместо Scan csv блок.

Важно: Если ваши данные настолько велики, что не помещаются в объединенную память ваших исполнителей, данные будут перенесены на диск. Это сделает эту операцию медленнее.

Мой вопрос к вам: похоже, вы выполняете левое соединение с pr1 левой таблицей, а затем выбираете столбцы только из части таблицы pr1. Это объединение вообще необходимо? Похоже, вы можете просто выбрать нужные столбцы, а затем .distinct фрейм данных.

Надеюсь это поможет!

Здравствуйте, спасибо за ваш ответ .. да, это правда, я ошибся при выборе, выбрав только левую сторону

mr.Penguin 18.11.2022 23:59

О, хорошо, рад, что ответ помог :) Удачи!

Koedlt 20.11.2022 07:46

Другие вопросы по теме