Имейте два кадра данных, как показано ниже
first_df
|-- company_id: string (nullable = true)
|-- max_dd: date (nullable = true)
|-- min_dd: date (nullable = true)
|-- mean: double (nullable = true)
|-- count: long (nullable = false)
second_df
|-- company_id: string (nullable = true)
|-- max_dd: date (nullable = true)
|-- mean: double (nullable = true)
|-- count: long (nullable = false)
У меня есть данные о некоторых компаниях в second_df . Мне нужно получить данные из second_df для тех идентификаторов компаний, которые перечислены в first_df.
какая искра API полезна здесь для меня? Как мне это сделать ?
Спасибо.
Расширение вопроса:
Если нет сохраненных записей, то first_df будет пустым. Следовательно, first_df("mean") и first_df("count") будут нулевыми, в результате чего "acc_new_mean" будет нулевым. В этом случае мне нужно установить «new_mean» как second_df («mean»), как это сделать? Я пробовал так, но это не работает Любая подсказка, как обрабатывать здесь .withColumn ("new_mean", ...) ???
val acc_new_mean = (second_df("mean") + first_df("mean")) / (second_df("count") + first_df("count"))
val acc_new_count = second_df("count") + first_df("count")
val new_df = second_df.join(first_df.withColumnRenamed("company_id", "right_company_id").as("a"),
( $"a.right_company_id" === second_df("company_id") && ( second_df("min_dd") > $"a.max_dd" ) )
, "leftOuter")
.withColumn("new_mean", if (acc_new_mean == null) lit(second_df("mean")) else acc_new_mean )
@dytyniak любая помощь/предложение, пожалуйста?
@jezrael любая помощь/предложение, пожалуйста?
Можете ли вы добавить какой-нибудь пример? Кажется, что оба кадра данных одинаковы, и это зависит от того, какие данные вы хотите. Вы можете либо присоединиться, либо объединить оба фрейма данных с объединением.
Приведите примеры данных и то, что вы пробовали.
ПОДХОД 1:
Если вам сложно объединить 2 фрейма данных с помощью API соединения фрейма данных, вы можете использовать sql, если вам удобно в sql. Для этого вы можете зарегистрировать свои 2 фрейма данных как таблицы в искровой памяти и написать sql поверх этого.
second_df.registerTempTable("table_second_df")
first_df.registerTempTable("table_first_df")
val new_df = spark.sql("select distinct s.* from table_second_df s join table_first_df f on s.company_id=f.company_id")
new_df.show()
Как вы просили, я добавил логику.
Представьте, что ваш first_df
выглядит следующим образом:
+----------+----------+----------+----+-----+
|company_id| max_dd| min_dd|mean|count|
+----------+----------+----------+----+-----+
| A|2019-04-05|2019-04-01| 10| 100|
| A|2019-04-06|2019-04-02| 20| 200|
| B|2019-04-08|2019-04-01| 30| 300|
| B|2019-04-09|2019-04-02| 40| 400|
+----------+----------+----------+----+-----+
Представьте, что ваш second_df
выглядит следующим образом:
+----------+----------+----+-----+
|company_id| max_dd|mean|count|
+----------+----------+----+-----+
| A|2019-04-03| 10| 100|
| A|2019-04-02| 20| 200|
+----------+----------+----+-----+
Поскольку идентификатор компании A
присутствует во второй таблице, я взял последнюю запись max_dd
из second_df
. Для идентификатора компании B
его нет в second_df
Я взял последнюю max_dd
запись от first_df
.
Пожалуйста, найдите код ниже.
first_df.registerTempTable("table_first_df")
second_df.registerTempTable("table_second_df")
val new_df = spark.sql("select company_id,max_dd,min_dd,mean,count from (select distinct s.company_id,s.max_dd,null as min_dd,s.mean,s.count,row_number() over (partition by s.company_id order by s.max_dd desc) rno from table_second_df s join table_first_df f on s.company_id=f.company_id) where rno=1 union select company_id,max_dd,min_dd,mean,count from (select distinct f.*,row_number() over (partition by f.company_id order by f.max_dd desc) rno from table_first_df f left join table_second_df s on s.company_id=f.company_id where s.company_id is null) where rno=1")
new_df.show()
Ниже результат:
ПОДХОД 2:
Вместо создания временной таблицы, как я упоминал в Approach 1
, вы можете использовать join
из dataframe's
API. Это та же логика в Approach 1
, но здесь я использую dataframe's
API для этого. Пожалуйста, не забудьте импортировать org.apache.spark.sql.expressions.Window
, как я использовал Window.patitionBy
в приведенном ниже коде.
val new_df = second_df.as('s).join(first_df.as('f),$"s.company_id" === $"f.company_id","inner").drop($"min_dd").withColumn("min_dd",lit("")).select($"s.company_id", $"s.max_dd",$"min_dd", $"s.mean", $"s.count").dropDuplicates.withColumn("Rno", row_number().over(Window.partitionBy($"s.company_id").orderBy($"s.max_dd".desc))).filter($"Rno" === 1).drop($"Rno").union(first_df.as('f).join(second_df.as('s),$"s.company_id" === $"f.company_id","left_anti").select($"f.company_id", $"f.max_dd",$"f.min_dd", $"f.mean", $"f.count").dropDuplicates.withColumn("Rno", row_number().over(Window.partitionBy($"f.company_id").orderBy($"f.max_dd".desc))).filter($"Rno" === 1).drop($"Rno"))
new_df.show()
Ниже результат:
Пожалуйста, дай мне знать, если возникнут какие-либо вопросы.
спасибо, Сарат, но если совпадений не найдено, мне нужно брать значения только из first_df... если найдено из new_df, как это можно сделать? в искровом API
second_df исходит из Cassandra, где для каждого company_id возможно несколько целых чисел, мне нужно выбрать только последнюю из присутствующих записей. если нет записи, мне нужно взять значения столбца first_df
спасибо, могу я узнать, что это за "left_anti"? в first_df.as('f).join(second_df.as('s),$"s.company_id" === $"f.company_id","left_anti") ?
Left_anti выбирает данные из first_df, где идентификаторы этих компаний отсутствуют в second_df. Если решение работает, отметьте ответ как принятый @Shyam
выдает ошибку org.apache.spark.sql.AnalysisException: невозможно разрешить 'f.min_dd
' заданные входные столбцы: [f.company_id, f.max_dd, f.count, f.mean];;
не могли бы вы посмотреть «Расширение вопроса» в отредактированном вопросе, пожалуйста
не могли бы вы проверить, есть ли у вас доступ к этому блокноту... databricks-prod-cloudfront.cloud.databricks.com/public/…
val acc_new_mean = //new mean literaal
val acc_new_count = //new count literaal
val resultDf = computed_df.join(accumulated_results_df.as("a"),
( $"company_id" === computed_df("company_id") )
, "leftOuter")
.withColumn("new_mean", when( acc_new_mean.isNull,lit(computed_df("mean")) ).otherwise(acc_new_mean) )
.withColumn("new_count", when( acc_new_count.isNull,lit(computed_df("count")) ).otherwise(acc_new_count) )
.select(
computed_df("company_id"),
computed_df("max_dd"),
col("new_mean").as("mean"),
col("new_count").as("count")
)
@summerbulb любая помощь / предложение, пожалуйста?