Проблема потребителя Spring boot kafka с десериализацией AVRO GENERIC_RECORD с использованием реестра схемы Glue

У меня есть темы, написанные kafka connect в формате AVRO GENERIC_RECORD с использованием Glue Schema Registry. Я могу потреблять тех, кто использует документацию, используя простую программу Java. Однако мне трудно читать их с помощью приложения Spring Boot.

Мой простой класс конфигурации

@EnableKafka
@Configuration

public class KafkaAvroConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String brokers;
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    // Creating a Listener
    @Bean
    public ConcurrentKafkaListenerContainerFactory<GenericRecord, GenericRecord> concurrentKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<GenericRecord, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<GenericRecord, GenericRecord> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        // Creating a Map of string-object pairs
        Map<String, Object> config = new HashMap<>();

        // Adding the Configuration
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());

        config.put(AWSSchemaRegistryConstants.AWS_REGION, region);
        config.put(AWSSchemaRegistryConstants.REGISTRY_NAME, registryName);

        config.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
        config.put(AWSSchemaRegistryConstants.SCHEMA_NAMING_GENERATION_CLASS,
                MySchemaNamingStrategy.class.getName());

        return config;
    }
}

И класс слушателя

@Component

public class KafkaAvroConsumer {

    @Autowired
    KafkaTemplate<GenericRecord, GenericRecord> kafkaTemplate;

    @KafkaListener(topics = "gsr1.HR.DEPARTMENTS")
    public void listenDepartment(ConsumerRecord<GenericRecord, GenericRecord> record) {

        //System.out.println("DEPARTMENTS key   schema = " + record.key().getSchema().toString());
        GenericRecord key = record.key();
        GenericRecord value = record.value();
        System.out.println("            record.key() = " + key);
        System.out.println("          record.value() = " + value);
        System.out.println("      Key  DEPARTMENT_ID = " + key.get("DEPARTMENT_ID"));
        System.out.println("         DEPARTMENT_NAME = " + (String) value.get("DEPARTMENT_NAME"));
    }

}

Это дает мне ошибку «GenericRecord key = record.key();», похоже, что они не были десериализованы в GenericRecord, вместо этого они представляют собой просто необработанные байты.

Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class org.apache.avro.generic.GenericRecord (java.lang.String is in module java.base of loader 'bootstrap'; org.apache.avro.generic.GenericRecord is in unnamed module of loader 'app')

Я искал, и в весенней документации метод DefaultKafkaConsumerFactory также принимает класс десериализации ключа и значения также в качестве параметров. Итак, я попытался сделать это, но это не компилируется. GlueSchemaRegistryKafkaDeserializer также не принимает аргумент типа

    public ConsumerFactory<GenericRecord, GenericRecord> consumerFactory() {
        Deserializer<GenericRecord> avroDeser =  new GlueSchemaRegistryKafkaDeserializer();
        avroDeser.configure(consumerConfigs(), false);
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), avroDeser, avroDeser);
    }

Любая помощь в том, как заставить это работать. Я тоже задал вопрос в GSR github https://github.com/awslabs/aws-glue-schema-registry/issues/241

Вот ПОМ

<?xml version = "1.0" encoding = "UTF-8"?>
<project xmlns = "http://maven.apache.org/POM/4.0.0" xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.0.1</version>
        <relativePath/> 
    </parent>
    <groupId>com.test</groupId>
    <artifactId>SpringBootKafkaAvro</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>SpringBootKafkaAvro</name>
    <description>Spring boot Kafka Avro using Glue Schema registry</description>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>software.amazon.glue</groupId>
            <artifactId>schema-registry-serde</artifactId>
            <version>1.1.14</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jsr310</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

Итак, фабрика не скомпилировалась, так где же еще на самом деле используется consumerConfigs()? В журналах вы видите value.deserializer = ... StringDeserializer, значит? Почему бы не настроить десериализаторы в свойствах?

OneCricketeer 05.01.2023 23:44

Не могли бы вы добавить свой pom.xml (по крайней мере, зависимости, касающиеся kafka и реестра схем). Также в качестве наивного вопроса вы создали необходимые классы из схемы (схем) avro. И, как упоминал @OneCricketeer, как насчет application.properties?

Csisanyi 06.01.2023 00:56

POM добавила @Csisany. Темы пишутся с использованием соединения Kafka как GenericRecord., поэтому нет необходимости в классах схемы avro.

Anand K 06.01.2023 15:19

@OneCricketeer, ConsumerConfigs () используются только в ConsumerFactory. свойства установлены ниже, ключ и значение avro. И используя springboot, они не десериализуют ключ или значение, и это дает указанную выше ошибку (невозможно преобразовать строку в genericrecord). Работает нормально в обычной java программе. config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName()); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());

Anand K 06.01.2023 15:19

Spring по-прежнему является «обычной Java-программой». Если ваш метод никогда не используется, spring-kafka просто по умолчанию использует StringDeserializer. Почему бы не использовать файл свойств?

OneCricketeer 06.01.2023 15:39

Под обычной Java-программой я имел в виду не использование Springboot. И потребительские свойства, и конфиги клея задаются в методе ConsumerConfigs(). ConsumerConfigs() используются в качестве аргумента при создании ConsumerFactory(), а ConsumerFactory() используется в качестве аргумента в concurrentKafkaListenerContainerFactory(). Так что все настроено правильно.

Anand K 06.01.2023 15:56
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
2
6
91
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я понял, в чем дело. В классе конфигурации SpringBoot ожидает, что имя фабричного компонента будет kafkaListenerContainerFactory. Я назвал его concurrentKafkaListenerContainerFactory, что вызывает проблему неправильной загрузки конфигураций потребителя и клея.

По умолчанию bean-компонент с именем kafkaListenerContainerFactory ожидал.

https://docs.spring.io/spring-kafka/docs/current/reference/html/#kafka-listener-annotation

Другие вопросы по теме