При сохранении RDD в S3 в AVRO я получаю следующее предупреждение в консоли:
Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
Мне не удалось найти простой неявный, такой как saveAsAvroFile
, поэтому я покопался и пришел к следующему:
import org.apache.avro.Schema
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.{AvroJob, AvroKeyOutputFormat}
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
object AvroUtil {
def write[T](
path: String,
schema: Schema,
avroRdd: RDD[T],
job: Job = Job.getInstance()): Unit = {
val intermediateRdd = avroRdd.mapPartitions(
f = (iter: Iterator[T]) => iter.map(new AvroKey(_) -> NullWritable.get()),
preservesPartitioning = true
)
job.getConfiguration.set("avro.output.codec", "snappy")
job.getConfiguration.set("mapreduce.output.fileoutputformat.compress", "true")
AvroJob.setOutputKeySchema(job, schema)
intermediateRdd.saveAsNewAPIHadoopFile(
path,
classOf[AvroKey[T]],
classOf[NullWritable],
classOf[AvroKeyOutputFormat[T]],
job.getConfiguration
)
}
}
Я немного сбит с толку, поскольку не вижу, что не так, потому что файлы AVRO, кажется, выводятся правильно.
@OneCricketeer, это вы имеете в виду? github.com/databricks/spark-avro Похоже, он помечен как устаревший. Наша кодовая база зависит от низкоуровневых RDD. Есть ли шанс опубликовать пример, пожалуйста? Спасибо.
Да. Эта библиотека была объединена с восходящим потоком spark.apache.org/docs/latest/sql-data-sources-avro.html, и вам нужно будет преобразовать ваш RDD с помощью функции toDF stackoverflow.com/questions/38968351/spark-2-0-scala-rdd-tod е
Почему бы не написать Dataframe с библиотекой spark-avro?