Specificavrodeserializer и specificc.avro.reader

У меня есть вопрос о SpecificAvroDeserializer из-за имени класса, как я всегда предполагал, SpecificAvroSerializer будет создавать SpecificRecord, а SpecificAvroDeserializer сможет его использовать.

К моему удивлению, если я не установил свойство 'specific.avro.reader', SpecificAvroDeserializer попытается прочитать байтовый массив как GenericRecord из-за следующей строки кода в классе «AbstractKafkaAvroDeserializer» (https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java) ..

private DatumReader getDatumReader(Schema writerSchema, Schema readerSchema) {
boolean writerSchemaIsPrimitive =
    AvroSchemaUtils.getPrimitiveSchemas().values().contains(writerSchema);
// do not use SpecificDatumReader if writerSchema is a primitive
if (useSpecificAvroReader && !writerSchemaIsPrimitive) {
  if (readerSchema == null) {
    readerSchema = getReaderSchema(writerSchema);
  }
  return new SpecificDatumReader(writerSchema, readerSchema);
} else {
  if (readerSchema == null) {
    return new GenericDatumReader(writerSchema);
  }
  return new GenericDatumReader(writerSchema, readerSchema);
}

}

Я думал, что конструктор SpecificAvroDeserializer должен установить это, но это не так .. (https://github.com/confluentinc/schema-registry/blob/master/avro-serde/src/main/java/io/confluent/kafka/streams/serdes/avro/SpecificAvroDeserializer.java)

Так что же мне здесь не хватает?

Может быть хорошей проблемой для поиска в проекте реестра схем

OneCricketeer 26.10.2018 15:51

Я хотел бы думать, что по умолчанию он ложен, потому что не гарантируется, что ваш тип SpecificRecord находится в пути к классам, но я согласен с тем, что, явно пытаясь использовать этот десериализатор, у вас уже будет этот класс, доступный, поскольку вы пытаетесь прочитать этот конкретный объект из темы

OneCricketeer 26.10.2018 15:58
2
2
3 972
1

Ответы 1

Этот класс предназначен для использования через Serdes Kafka Streams, а не напрямую через Consumer API.

Вы должны использовать KafkaAvroDeserializer только с соответствующими настройками, которые передаются абстрактному классу, как вы обнаружили

Другие вопросы по теме