У меня есть RDD
типа (String,Iterable[GenericData.Record])
. Теперь я хочу сохранить эти итерации по путям на основе ключей этого RDD. Так, например, если RDD содержал
("a",[1,2,3,4])
("b",[5,6,7,9])
Мне нужно сохранить [1,2,3,4] под result-path/a
и [5,6,7,8,9] под result-path/b
. Один из способов сделать это, который компилируется, но не работает во время выполнения, заключается в следующем:
implicit val spark: SparkSession = SparkSessionUtils.initSparkSession("Test")
implicit val sc: SparkContext = spark.sparkContext
val re:RDD[(String,Iterable[GenericData.Record])] = ???
val hadoopConf = new Configuration(sc.hadoopConfiguration)
re.forearch {
case (key,collection) =>
val reRDD = sc.makeRDD(collection)
reRDD.saveAsNewAPIHadoopFile(s"$uri/$key",
classOf[SpecificRecord],
classOf[NullWritable],
classOf[AvroKeyOutputFormat[SpecificRecord]],
hadoopConf)
}
Проблема здесь в том, что я не могу этого сделать, так как SparkContext не сериализуем. Поэтому я пытаюсь придумать способ, как преобразовать начальное re
в RDD
, чтобы я мог сделать следующее:
implicit val spark: SparkSession = SparkSessionUtils.initSparkSession("Test")
implicit val sc: SparkContext = spark.sparkContext
val re:Map[(String,RDD[GenericData.Record])] = ???
val hadoopConf = new Configuration(sc.hadoopConfiguration)
re.forearch {
case (key,rddCollection) =>
rddCollection.saveAsNewAPIHadoopFile(s"$uri/$key",
classOf[SpecificRecord],
classOf[NullWritable],
classOf[AvroKeyOutputFormat[SpecificRecord]],
hadoopConf)
}
Ключи могут быть собраны, а оригинальные RDD отфильтрованы для каждого ключа:
val re = rdd
.keys
.collect()
.map(v => v -> rdd.filter(_._1 == v).values)
.toMap