Поэтому я обновил свой avro атрибутом int по умолчанию.
{
"name": "minimum",
"type": [
"null",
"int"
],
"default": null
}
В реестре схем я установил совместимость вперед.
Мой потребитель существует в том же проекте, что и производитель, поэтому использует тот же сгенерированный avro. во время тестирования я заметил, что теперь потребитель выдает следующую ошибку:
org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [org.apache.avro.generic.GenericData$Record] to [com.example.dto.CarDto] for GenericMessage [payload = {....}
Я проверил конфиг, и он имеет следующий набор:
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.properties.specific.avro.reader=true
Нет инструментов разработчика, и я использую весеннюю загрузку 2.5.
ОБНОВЛЯТЬ: Это не имеет ничего общего с обновлением файла avro, я не заметил, но исключение было подавлено. И я изменил, что сделал, теперь регистрирует это. Итак, мой вопрос остается в силе, некоторые темы говорят, что это потому, что класс avro находится в другом пакете?
Моя структура:
Src-> main-> avro-> Car.avsc
Src-> main-> java-> com-> пример-> dto-> CarDto.java
Src->main->java->com->example->configs->производитель/потребитель
Src-> main-> java-> com-> пример-> service-> ConsumerProcessor.java
Мой конфиг
@Slf4j
@Configuration
public class ConsumerConfig {
@Value("${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Value("${kafka.schemaRegistryUrl}")
private String schemaRegistryUrl;
// Consumers Configs
private Map<String, Object> getConsumerProps() {
final var props = new HashMap<String, Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "topic-a");
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return props;
}
private <T, K> ConcurrentKafkaListenerContainerFactory<K, T> getConcurrentKafkaListenerContainerFactory(final ConsumerFactory<K, T> kafkaConsumerFactory, final KafkaTemplate<K, T> template) {
final ConcurrentKafkaListenerContainerFactory<K, T> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
private KafkaAvroDeserializer getAvroDeserializer() {
final var deserializerProps = new HashMap<String, Object>();
deserializerProps.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
deserializerProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
final var deserializer = new KafkaAvroDeserializer();
deserializer.configure(deserializerProps, false);
return deserializer;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, com.exmaple.dto.CarDto> dataKafkaListenerContainerFactory(
final ConsumerFactory<Integer, com.example.dto.CarDto> kafkaConsumerFactory,
final KafkaTemplate<Integer, com.example.dto.CarDto> template) {
return getConcurrentKafkaListenerContainerFactory(kafkaConsumerFactory, template);
}
@Bean("kafkaListenerContainerFactory")
public ConsumerFactory<Integer, com.example.dto.CarDto> dataConsumerFactory() {
final Map<String, Object> props = getConsumerProps();
final KafkaAvroDeserializer deserializer = getAvroDeserializer();
return new DefaultKafkaConsumerFactory<>(props, new IntegerDeserializer(), new ErrorHandlingDeserializer(deserializer));
}
}
Вот мой авро
{
"type": "record",
"name": “CarDto”,
"namespace": "com.example.dto",
"fields": [
{
"name": “id”,
"type": [
"null",
"int"
],
"default": null
},
{
"name": "minimum",
"type": ["null", {"type": "int", "logicalType": "date"}],
"default": null
}
]
}
Мой потребитель
@Slf4j
@Service
public class dataConsumer implements AvroKafkaConsumer<CarDto> {
Integer counter = 0;
@Override
@KafkaListener(topics = “topic-a”, containerFactory = “dataKafkaListenerContainerFactory")
public void listen(final CarDto carDto, final Acknowledgment acknowledgment) {
acknowledgment.acknowledge();
}
@Override
public String getName() {
return “data”;
}
}
Итак, я заметил, что containerFactory называется иначе, чем в потребительской службе. "dataKafkaListenerContainerFactory" это имеет значение?
А в конфиге его
@Bean("kafkaListenerContainerFactory")
public ConsumerFactory<Integer, com.example.dto.CarDto> dataConsumerFactory() {
Итак, я сделал некоторые методы очистки и удаления и добавил следующее
private Map<String, Object> getProps() {
final var props = new HashMap<String, Object>();
……. existing props
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Integer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);
return props;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, CarDto> myKafkaContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<Integer, CarDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
public ConsumerFactory<Integer, CarDto> kafkaConsumerFactory() {
final Map<String, Object> props = getProps();
final KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer();
return new DefaultKafkaConsumerFactory<>(props, new IntegerDeserializer(), new ErrorHandlingDeserializer(deserializer));
}
Что вы имеете в виду под "другой пакет"? com.exmple.service может импортировать из com.example.dto просто отлично. Ваша ошибка не связана с компиляцией, а только с десериализацией во время выполнения, поэтому с импортом все в порядке.
@OneCricketeer, я читал некоторые другие темы, в которых они упомянули, что это может быть проблема с загрузчиком классов или способом создания класса. Но я все еще не уверен, почему это вызывает проблему
@NicPegg через плагин Gradle... он автоматически генерирует класс.
Я не думаю, что это так. Вы можете переписать того же потребителя без Spring, и с пакетами все будет в порядке. Пожалуйста, покажите минимальный воспроизводимый пример вашего приложения Spring
@OneCricketeer привет, я добавил свою конфигурацию для потребителя
Кажется, вам не хватает KafkaAvroDeserializer<CarDto> типов
@OneCricketeer, как мне его создать... у меня есть окончательный вариант KafkaAvroDeserializer deserializer = getAvroDeserializer(); .. как добавить типы?
Добавьте угловые скобки, как показано на рисунке.
@OneCricketeer не думаю, что это возможно .. Тип «io.confluent.kafka.serializers.KafkaAvroDeserializer» не имеет параметров типа
Я также заметил, должен ли мой потребитель данных называться «kafkaListenerContainerFactory», если это правильно?
@Bean("kafkaListenerContainerFactory")
- фабрика-потребитель не должна иметь такое название. У вас не может быть 2 бобов с одинаковым именем; оно конфликтует с правильным именем фабрики контейнеров. Вы можете использовать другое имя для фабрики контейнеров, если вы укажете его в @KafkaListener
.
@GaryRussell Я внес изменения, и все та же ошибка сохраняется. Я немного потерял, как обойти это.
Я не могу помочь с авро; Я просто указал, что ваши имена бобов были перепутаны. Кажется, вы все еще звоните на фабрику по производству потребительских товаров kafkaListenerContainerFactory, что не имеет смысла. Но теперь ваша фабрика контейнеров соответствует имени в @KafkaListener, так что все должно быть в порядке, но это не имеет ничего общего с десериализатором avro, создающим GenericData$Record — может быть, запустить его в отладчике?
@GaryRussell Спасибо, я немного почистил, и теперь все работает. Я до сих пор не уверен, зачем мне нужны все эти дополнительные свойства, но теперь все работает. Я новичок в kafka, поэтому не на 100%, но я читал другие примеры конфигураций, в которых все это было установлено.
Отредактировал конфиг
private Map<String, Object> getProps() {
final var props = new HashMap<String, Object>();
……. existing props
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Integer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);
return props;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, CarDto> myKafkaContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<Integer, CarDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
public ConsumerFactory<Integer, CarDto> kafkaConsumerFactory() {
final Map<String, Object> props = getProps();
final KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer();
return new DefaultKafkaConsumerFactory<>(props, new IntegerDeserializer(), new ErrorHandlingDeserializer(deserializer));
}
Вы можете вызвать deserializer.configure(props)
@OneCricketeer, что это делает? И нужно ли это, если фабрика-потребитель также имеет те же значения.
Он передает значения KafkaAvroDeserializerConfig экземпляру десериализатора напрямую, а не потребительским свойствам из фабрики.
Как генерируется класс Java?