Ошибка выдачи Kafka после обновления весенней загрузки Avro

Поэтому я обновил свой avro атрибутом int по умолчанию.

{
  "name": "minimum",
  "type": [
    "null",
    "int"
  ],
  "default": null
}

В реестре схем я установил совместимость вперед.

Мой потребитель существует в том же проекте, что и производитель, поэтому использует тот же сгенерированный avro. во время тестирования я заметил, что теперь потребитель выдает следующую ошибку:

org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [org.apache.avro.generic.GenericData$Record] to [com.example.dto.CarDto] for GenericMessage [payload = {....}

Я проверил конфиг, и он имеет следующий набор:

spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.properties.specific.avro.reader=true

Нет инструментов разработчика, и я использую весеннюю загрузку 2.5.

ОБНОВЛЯТЬ: Это не имеет ничего общего с обновлением файла avro, я не заметил, но исключение было подавлено. И я изменил, что сделал, теперь регистрирует это. Итак, мой вопрос остается в силе, некоторые темы говорят, что это потому, что класс avro находится в другом пакете?

Моя структура:

Src-> main-> avro-> Car.avsc

Src-> main-> java-> com-> пример-> dto-> CarDto.java

Src->main->java->com->example->configs->производитель/потребитель

Src-> main-> java-> com-> пример-> service-> ConsumerProcessor.java

Мой конфиг

    @Slf4j
@Configuration
public class ConsumerConfig {

    @Value("${kafka.bootstrapAddress}")
    private String bootstrapAddress;


    @Value("${kafka.schemaRegistryUrl}")
    private String schemaRegistryUrl;

    // Consumers Configs

    private Map<String, Object> getConsumerProps() {
        final var props = new HashMap<String, Object>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "topic-a");
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        return props;
    }

    private <T, K> ConcurrentKafkaListenerContainerFactory<K, T> getConcurrentKafkaListenerContainerFactory(final ConsumerFactory<K, T> kafkaConsumerFactory, final KafkaTemplate<K, T> template) {
        final ConcurrentKafkaListenerContainerFactory<K, T> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory);

        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

    private KafkaAvroDeserializer getAvroDeserializer() {
        final var deserializerProps = new HashMap<String, Object>();
        deserializerProps.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
        deserializerProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
        final var deserializer = new KafkaAvroDeserializer();
        deserializer.configure(deserializerProps, false);
        return deserializer;
    }


    @Bean
    public ConcurrentKafkaListenerContainerFactory<Integer, com.exmaple.dto.CarDto> dataKafkaListenerContainerFactory(
            final ConsumerFactory<Integer, com.example.dto.CarDto> kafkaConsumerFactory,
            final KafkaTemplate<Integer, com.example.dto.CarDto> template) {
        return getConcurrentKafkaListenerContainerFactory(kafkaConsumerFactory, template);
    }

    @Bean("kafkaListenerContainerFactory")
    public ConsumerFactory<Integer, com.example.dto.CarDto> dataConsumerFactory() {
        final Map<String, Object> props = getConsumerProps();
        final KafkaAvroDeserializer deserializer = getAvroDeserializer();

        return new DefaultKafkaConsumerFactory<>(props, new IntegerDeserializer(), new ErrorHandlingDeserializer(deserializer));
    }
}

Вот мой авро

{
  "type": "record",
  "name": “CarDto”,
  "namespace": "com.example.dto",
  "fields": [
    {
      "name": “id”,
      "type": [
        "null",
        "int"
      ],
      "default": null
    },   
       {
      "name": "minimum",
      "type": ["null", {"type": "int", "logicalType": "date"}],
      "default": null
    }
  ]
}

Мой потребитель

@Slf4j
@Service
public class dataConsumer implements AvroKafkaConsumer<CarDto> {

    Integer counter = 0;

    @Override
    @KafkaListener(topics = “topic-a”, containerFactory = “dataKafkaListenerContainerFactory")
    public void listen(final CarDto carDto, final Acknowledgment acknowledgment) {

        acknowledgment.acknowledge();
    }

    @Override
    public String getName() {
        return “data”;
    }
}

Итак, я заметил, что containerFactory называется иначе, чем в потребительской службе. "dataKafkaListenerContainerFactory" это имеет значение?

А в конфиге его

@Bean("kafkaListenerContainerFactory")
    public ConsumerFactory<Integer, com.example.dto.CarDto> dataConsumerFactory() {

Итак, я сделал некоторые методы очистки и удаления и добавил следующее

 private Map<String, Object> getProps() {
     final var props = new HashMap<String, Object>();
 ……. existing props

     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Integer.class);
     props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
     props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);


     return props;
 }

 @Bean
 public ConcurrentKafkaListenerContainerFactory<Integer, CarDto> myKafkaContainerFactory() {
     final ConcurrentKafkaListenerContainerFactory<Integer, CarDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
     factory.setConsumerFactory(kafkaConsumerFactory());
     factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
     return factory;
 }

 public ConsumerFactory<Integer, CarDto> kafkaConsumerFactory() {
     final Map<String, Object> props = getProps();
     final KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer();

     return new DefaultKafkaConsumerFactory<>(props, new IntegerDeserializer(), new ErrorHandlingDeserializer(deserializer));
 }

Как генерируется класс Java?

Nic Pegg 10.01.2023 19:56

Что вы имеете в виду под "другой пакет"? com.exmple.service может импортировать из com.example.dto просто отлично. Ваша ошибка не связана с компиляцией, а только с десериализацией во время выполнения, поэтому с импортом все в порядке.

OneCricketeer 10.01.2023 20:03

@OneCricketeer, я читал некоторые другие темы, в которых они упомянули, что это может быть проблема с загрузчиком классов или способом создания класса. Но я все еще не уверен, почему это вызывает проблему

user1555190 11.01.2023 11:14

@NicPegg через плагин Gradle... он автоматически генерирует класс.

user1555190 11.01.2023 11:14

Я не думаю, что это так. Вы можете переписать того же потребителя без Spring, и с пакетами все будет в порядке. Пожалуйста, покажите минимальный воспроизводимый пример вашего приложения Spring

OneCricketeer 11.01.2023 16:41

@OneCricketeer привет, я добавил свою конфигурацию для потребителя

user1555190 11.01.2023 17:22

Кажется, вам не хватает KafkaAvroDeserializer<CarDto> типов

OneCricketeer 11.01.2023 17:51

@OneCricketeer, как мне его создать... у меня есть окончательный вариант KafkaAvroDeserializer deserializer = getAvroDeserializer(); .. как добавить типы?

user1555190 11.01.2023 18:10

Добавьте угловые скобки, как показано на рисунке.

OneCricketeer 12.01.2023 15:45

@OneCricketeer не думаю, что это возможно .. Тип «io.confluent.kafka.serializers.KafkaAvroDeserializer» не имеет параметров типа

user1555190 12.01.2023 16:48

Я также заметил, должен ли мой потребитель данных называться «kafkaListenerContainerFactory», если это правильно?

user1555190 12.01.2023 19:42
@Bean("kafkaListenerContainerFactory") - фабрика-потребитель не должна иметь такое название. У вас не может быть 2 бобов с одинаковым именем; оно конфликтует с правильным именем фабрики контейнеров. Вы можете использовать другое имя для фабрики контейнеров, если вы укажете его в @KafkaListener.
Gary Russell 12.01.2023 19:47

@GaryRussell Я внес изменения, и все та же ошибка сохраняется. Я немного потерял, как обойти это.

user1555190 12.01.2023 21:08

Я не могу помочь с авро; Я просто указал, что ваши имена бобов были перепутаны. Кажется, вы все еще звоните на фабрику по производству потребительских товаров kafkaListenerContainerFactory, что не имеет смысла. Но теперь ваша фабрика контейнеров соответствует имени в @KafkaListener, так что все должно быть в порядке, но это не имеет ничего общего с десериализатором avro, создающим GenericData$Record — может быть, запустить его в отладчике?

Gary Russell 12.01.2023 21:16

@GaryRussell Спасибо, я немного почистил, и теперь все работает. Я до сих пор не уверен, зачем мне нужны все эти дополнительные свойства, но теперь все работает. Я новичок в kafka, поэтому не на 100%, но я читал другие примеры конфигураций, в которых все это было установлено.

user1555190 12.01.2023 23:27
Как сделать движок для футбольного матча? (простой вариант)
Как сделать движок для футбольного матча? (простой вариант)
Футбол. Для многих людей, живущих на земле, эта игра - больше, чем просто спорт. И эти люди всегда мечтают стать футболистом или менеджером. Но, к...
Знайте свои исключения!
Знайте свои исключения!
В Java исключение - это событие, возникающее во время выполнения программы, которое нарушает нормальный ход выполнения инструкций программы. Когда...
Лучшая компания по разработке спортивных приложений
Лучшая компания по разработке спортивных приложений
Ищете лучшую компанию по разработке спортивных приложений? Этот список, несомненно, облегчит вашу работу!
Blibli Automation Journey - Как захватить сетевой трафик с помощью утилиты HAR в Selenium 4
Blibli Automation Journey - Как захватить сетевой трафик с помощью утилиты HAR в Selenium 4
Если вы являетесь веб-разработчиком или тестировщиком, вы можете быть знакомы с Selenium, популярным инструментом для автоматизации работы...
Фото ️🔁 Radek Jedynak 🔃 on ️🔁 Unsplash 🔃
Фото ️🔁 Radek Jedynak 🔃 on ️🔁 Unsplash 🔃
Что такое Java 8 Streams API? Java 8 Stream API
Деревья поиска (Алгоритм4 Заметки к учебнику)
Деревья поиска (Алгоритм4 Заметки к учебнику)
(1) Двоичные деревья поиска: среднее lgN, наихудшее N для вставки и поиска.
0
15
105
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Отредактировал конфиг

 private Map<String, Object> getProps() {
     final var props = new HashMap<String, Object>();
 ……. existing props

     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Integer.class);
     props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
     props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);


     return props;
 }

 @Bean
 public ConcurrentKafkaListenerContainerFactory<Integer, CarDto> myKafkaContainerFactory() {
     final ConcurrentKafkaListenerContainerFactory<Integer, CarDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
     factory.setConsumerFactory(kafkaConsumerFactory());
     factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
     return factory;
 }

 public ConsumerFactory<Integer, CarDto> kafkaConsumerFactory() {
     final Map<String, Object> props = getProps();
     final KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer();

     return new DefaultKafkaConsumerFactory<>(props, new IntegerDeserializer(), new ErrorHandlingDeserializer(deserializer));
 }

Вы можете вызвать deserializer.configure(props)

OneCricketeer 13.01.2023 00:33

@OneCricketeer, что это делает? И нужно ли это, если фабрика-потребитель также имеет те же значения.

user1555190 13.01.2023 19:26

Он передает значения KafkaAvroDeserializerConfig экземпляру десериализатора напрямую, а не потребительским свойствам из фабрики.

OneCricketeer 13.01.2023 20:35

Другие вопросы по теме