Как получить данные второго фрейма данных для всех значений определенных значений столбцов, совпадающих в первом фрейме данных?

Имейте два кадра данных, как показано ниже

first_df
 |-- company_id: string (nullable = true)
 |-- max_dd: date (nullable = true)
 |-- min_dd: date (nullable = true)
 |-- mean: double (nullable = true)
 |-- count: long (nullable = false)

second_df 
 |-- company_id: string (nullable = true)
 |-- max_dd: date (nullable = true)
 |-- mean: double (nullable = true)
 |-- count: long (nullable = false)

У меня есть данные о некоторых компаниях в second_df . Мне нужно получить данные из second_df для тех идентификаторов компаний, которые перечислены в first_df.

какая искра API полезна здесь для меня? Как мне это сделать ?

Спасибо.

Расширение вопроса:

Если нет сохраненных записей, то first_df будет пустым. Следовательно, first_df("mean") и first_df("count") будут нулевыми, в результате чего "acc_new_mean" будет нулевым. В этом случае мне нужно установить «new_mean» как second_df («mean»), как это сделать? Я пробовал так, но это не работает Любая подсказка, как обрабатывать здесь .withColumn ("new_mean", ...) ???

val acc_new_mean = (second_df("mean") + first_df("mean")) / (second_df("count") + first_df("count"))
    val acc_new_count  =  second_df("count") + first_df("count")


    val new_df = second_df.join(first_df.withColumnRenamed("company_id", "right_company_id").as("a"), 
                                 (  $"a.right_company_id"  === second_df("company_id") && ( second_df("min_dd")  > $"a.max_dd" ) ) 
                            , "leftOuter")
                            .withColumn("new_mean", if (acc_new_mean == null) lit(second_df("mean")) else  acc_new_mean )

@summerbulb любая помощь / предложение, пожалуйста?

Shyam 05.04.2019 13:32

@dytyniak любая помощь/предложение, пожалуйста?

Shyam 05.04.2019 13:33

@jezrael любая помощь/предложение, пожалуйста?

Shyam 05.04.2019 13:35

Можете ли вы добавить какой-нибудь пример? Кажется, что оба кадра данных одинаковы, и это зависит от того, какие данные вы хотите. Вы можете либо присоединиться, либо объединить оба фрейма данных с объединением.

koiralo 05.04.2019 13:45

Приведите примеры данных и то, что вы пробовали.

simon_dmorias 05.04.2019 15:26
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
3
5
72
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

ПОДХОД 1:

Если вам сложно объединить 2 фрейма данных с помощью API соединения фрейма данных, вы можете использовать sql, если вам удобно в sql. Для этого вы можете зарегистрировать свои 2 фрейма данных как таблицы в искровой памяти и написать sql поверх этого.

second_df.registerTempTable("table_second_df")
first_df.registerTempTable("table_first_df")

val new_df = spark.sql("select distinct s.* from table_second_df s join table_first_df f on s.company_id=f.company_id")
new_df.show()

Как вы просили, я добавил логику.

Представьте, что ваш first_df выглядит следующим образом:

+----------+----------+----------+----+-----+
|company_id|    max_dd|    min_dd|mean|count|
+----------+----------+----------+----+-----+
|         A|2019-04-05|2019-04-01|  10|  100|
|         A|2019-04-06|2019-04-02|  20|  200|
|         B|2019-04-08|2019-04-01|  30|  300|
|         B|2019-04-09|2019-04-02|  40|  400|
+----------+----------+----------+----+-----+

Представьте, что ваш second_df выглядит следующим образом:

+----------+----------+----+-----+
|company_id|    max_dd|mean|count|
+----------+----------+----+-----+
|         A|2019-04-03|  10|  100|
|         A|2019-04-02|  20|  200|
+----------+----------+----+-----+

Поскольку идентификатор компании A присутствует во второй таблице, я взял последнюю запись max_dd из second_df. Для идентификатора компании B его нет в second_df Я взял последнюю max_dd запись от first_df.

Пожалуйста, найдите код ниже.

first_df.registerTempTable("table_first_df")
second_df.registerTempTable("table_second_df")
val new_df = spark.sql("select company_id,max_dd,min_dd,mean,count from (select distinct s.company_id,s.max_dd,null as min_dd,s.mean,s.count,row_number() over (partition by s.company_id order by s.max_dd desc) rno from table_second_df s join table_first_df f on s.company_id=f.company_id) where rno=1 union select company_id,max_dd,min_dd,mean,count from (select distinct f.*,row_number() over (partition by f.company_id order by f.max_dd desc) rno from table_first_df f left join table_second_df s  on s.company_id=f.company_id where s.company_id is null) where rno=1")
new_df.show()

Ниже результат:

ПОДХОД 2:

Вместо создания временной таблицы, как я упоминал в Approach 1, вы можете использовать join из dataframe's API. Это та же логика в Approach 1, но здесь я использую dataframe's API для этого. Пожалуйста, не забудьте импортировать org.apache.spark.sql.expressions.Window, как я использовал Window.patitionBy в приведенном ниже коде.

val new_df = second_df.as('s).join(first_df.as('f),$"s.company_id" === $"f.company_id","inner").drop($"min_dd").withColumn("min_dd",lit("")).select($"s.company_id", $"s.max_dd",$"min_dd", $"s.mean", $"s.count").dropDuplicates.withColumn("Rno", row_number().over(Window.partitionBy($"s.company_id").orderBy($"s.max_dd".desc))).filter($"Rno" === 1).drop($"Rno").union(first_df.as('f).join(second_df.as('s),$"s.company_id" === $"f.company_id","left_anti").select($"f.company_id", $"f.max_dd",$"f.min_dd", $"f.mean", $"f.count").dropDuplicates.withColumn("Rno", row_number().over(Window.partitionBy($"f.company_id").orderBy($"f.max_dd".desc))).filter($"Rno" === 1).drop($"Rno"))
new_df.show()

Ниже результат:

Пожалуйста, дай мне знать, если возникнут какие-либо вопросы.

спасибо, Сарат, но если совпадений не найдено, мне нужно брать значения только из first_df... если найдено из new_df, как это можно сделать? в искровом API

Shyam 05.04.2019 14:50

second_df исходит из Cassandra, где для каждого company_id возможно несколько целых чисел, мне нужно выбрать только последнюю из присутствующих записей. если нет записи, мне нужно взять значения столбца first_df

Shyam 05.04.2019 14:51

спасибо, могу я узнать, что это за "left_anti"? в first_df.as('f).join(second_df.as('s),$"s.company_id" === $"f.company_id","left_anti") ?

Shyam 08.04.2019 09:52

Left_anti выбирает данные из first_df, где идентификаторы этих компаний отсутствуют в second_df. Если решение работает, отметьте ответ как принятый @Shyam

Sarath KS 08.04.2019 12:26

выдает ошибку org.apache.spark.sql.AnalysisException: невозможно разрешить 'f.min_dd' заданные входные столбцы: [f.company_id, f.max_dd, f.count, f.mean];;

Shyam 08.04.2019 14:24

не могли бы вы посмотреть «Расширение вопроса» в отредактированном вопросе, пожалуйста

Shyam 08.04.2019 14:39

не могли бы вы проверить, есть ли у вас доступ к этому блокноту... databricks-prod-cloudfront.cloud.databricks.com/public/…

Shyam 08.04.2019 14:49
Ответ принят как подходящий
 val acc_new_mean = //new mean literaal
 val acc_new_count  =   //new count literaal


          val resultDf = computed_df.join(accumulated_results_df.as("a"), 
                             (  $"company_id"  === computed_df("company_id")  ) 
                        , "leftOuter")
                        .withColumn("new_mean", when( acc_new_mean.isNull,lit(computed_df("mean")) ).otherwise(acc_new_mean) )
                        .withColumn("new_count", when( acc_new_count.isNull,lit(computed_df("count")) ).otherwise(acc_new_count) )
                         .select(
                            computed_df("company_id"),
                            computed_df("max_dd"),
                            col("new_mean").as("mean"),
                            col("new_count").as("count")
                          )

Другие вопросы по теме