У меня есть приведенный ниже код для кэширования различных разделов и сохранения их на карте, а затем для объединения их всех.
и я получаю следующую ошибку unionByName не является членом null
Var cache_map = Map[String,Dataframe]()
for (partition <- partitionlist){
var df_test = spark.read.format("delta").load("abfs://[email protected]/dirname")
.where((col("dt").like(partition+"%"))
cache_map(partition) = df_test.cache()
}
val cache_keys = cache_map.keys
var df_union=null
for (key <- cache_keys){
if (df_union==null){
df_union=cache_map.get(key)
}
else{
df_union=df_union.unionByName(cache_map.get(key)
}
}
Когда я делаю ниже
cache_map.get("20221120").unionByName(cache_map.get("20221119"))
Я получаю следующую ошибку
Ошибка: значение unionByName не является членом Option[org.apache.spark.sql.DataFrame]
Может ли кто-нибудь помочь мне понять, что происходит не так? У меня не так много опыта работы со Spark с использованием scala, как с pyspark. Любая помощь приветствуется.





Как указано в Exception, вы пытаетесь вызвать функцию unionByName для Option[DataFrame] и она терпит неудачу, потому что для типа Option такой функции нет. Вам нужно либо отобразить, либо получить базовый фрейм данных из опции, чтобы иметь возможность использовать функции df.
Вы можете попробовать, например, что-то вроде этого:
cache_map.get("20221120").map{_.unionByName(cache_map.get("20221119").getOrElse(spark.emptyDataFrame))}
Ошибка, которую вы видите, маскирует реальную проблему, заключающуюся в том, что вы неправильно решаете проблему.
Насколько я могу судить, вы читаете в (разделенном?) каталоге, а затем хотите только подмножество данных в DataFrame в переменной df_union.
Вы делаете это, делая несколько проходов по данным, и каждый раз, когда вы находите набор записей, которые вам нравятся, вы сохраняете их на карту, прежде чем окончательно объединить все ваши наборы результатов.
Этого можно добиться примерно следующим образом, где df означает ваши данные:
val df = Seq(
("a1", 1),
("b1", 2),
("c1", 3),
("d1", 4),
("e1", 5),
).toDF("dt", "x")
// List of partition names you want to include in your final data
val partitionList: List[String] = List("a", "c", "e")
val filterCol: Column = col("dt")
// build a single filter expression
val filterExpr: Column = partitionList.map(pt => filterCol.like(s"$pt%")).reduce(_ or _)
// execute filtering
val filteredDf = df.filter(filterExpr)