Я пытаюсь использовать Kafka ByteArrayDeserializer для чтения записей avro из темы Kafka. Но ниже исключения.
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String
Мой код:
val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
val kafkaParams: Map[String, Object] = Map(
"bootstrap.servers" -> "kafka-server:9092",
"key.serializer" -> classOf[StringSerializer],
"value.serializer" -> classOf[StringSerializer],
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[ByteArrayDeserializer],
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean),
"security.protocol" -> "SSL",
"ssl.truststore.location" -> "truststore",
"ssl.truststore.password" -> "pass",
"ssl.keystore.location" -> "keystore.jks",
"ssl.keystore.password" -> "pass",
"group.id" -> "group1"
)
val topics: Array[String] = Array("topics")
val kafkaDstream = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
val schema = parser.parse(new String(Files.readAllBytes(Paths.get("avro2.avsc"))))
val datumReader = new SpecificDatumReader[GenericRecord](schema)
val processedStream = kafkaDstream.map(record => {
val x = new ByteArrayInputStream(record.value().getBytes()) // throwing exception here
val binaryDecoder = DecoderFactory.get.binaryDecoder(x, null)
datumReader.read(null, binaryDecoder)
})
processedStream.map(rec => rec.get("taskId")).print
Любая помощь приветствуется.
Спасибо.
Вы используете Subscribe[String, String]
.
Вы хотите Subscribe[String, Array[Byte]]
Тогда record.value()
уже является байтовым массивом, не имеющим метода getBytes
Вызвано: java.lang.ClassCastException: [B нельзя преобразовать в java.lang.String
Это исключение означает, что есть объект типа [B
, который был приведен к java.lang.String
и завершился ошибкой.
[B
— это строковое представлениеArray[Byte]
:
jshell> byte[] bytes = new byte[10]
bytes ==> byte[10] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }
jshell> bytes.toString()
$2 ==> "[B@6e8cf4c6"
«Почему это вообще происходит?» ты спрашиваешь. Это потому, что следующая строка (молча) предполагает, что все, что вы делаете, когда-либо может быть String
.
val x = new ByteArrayInputStream(record.value().getBytes()) // throwing exception here
Вместо этого используйте Структурированную потоковую передачу Spark. Вы не пожалеете об этом решении.