Я использую spark 2.3 и пытаюсь передавать данные из Kafka с помощью Dstreams (используя DStreams для достижения определенного варианта использования, который мы не смогли использовать с помощью Structured Streaming).
Тема Kafka содержит данные в формате avro. Я хочу прочитать эти данные с помощью Spark DStreams и интерпретировать их как строку json.
Я пытаюсь сделать что-то вроде этого,
val kafkaParams: Map[String, Object] = Map(
"bootstrap.servers" -> "kafka-servers",
"key.serializer" -> classOf[StringSerializer],
"value.serializer" -> classOf[StringSerializer],
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[org.apache.spark.sql.avro.AvroDeserializer],
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean),
"group.id" -> "group1"
)
val kafkaDstream = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
val processedStream = kafkaDstream.map(record => (record.key(), record.value()))
processedStream.foreachRDD(
someRdd =>
someRdd.foreach(
paths=> {
println(paths._2)
}
)
)
Но я не вижу обработки данных (сообщение об ошибке ниже), что, я думаю, связано с тем, что AvroDeserializer доступен только после Spark 2.4.0.
Caused by: org.apache.kafka.common.KafkaException: Could not instantiate class org.apache.spark.sql.avro.AvroDeserializer Does it have a public no-argument constructor?
Любая идея о том, как я могу добиться этого?
Спасибо.
Десериализатор Avro от Spark не является десериализатором Kafka (кстати, у вас не может быть дубликатов ключей в карте конфигурации). Этот класс предназначен для SparkSQL/структурированной потоковой передачи, а не для (устаревшей) потоковой передачи.
Неясно, как ваш производитель сериализует данные, но если вы используете реестр Confluent Schema Registry, вам нужно будет использовать собственный класс Confluent KafkaAvroDeserializer, а затем вы должны использовать [String, GenericRecord]
в качестве типов потоков. Данные никогда не преобразуются автоматически в JSON, и использование String в качестве типа потока не удастся при использовании Avro Deserializer.
Спасибо за ответ @OneCricketeer. Поскольку мы не используем реестр Confluent Schema, я попытался использовать Kafka ByteArrayDeserializer, но также столкнулся с некоторыми проблемами. Запостил для этого отдельный вопрос stackoverflow.com/questions/74564728/….