@KafkaHandler в потребителе не использует сообщение темы как объектный класс, а только как строку

У меня есть простое приложение Java Spring Boot, которое должно использовать сообщения темы из Kafka как объектный класс, но оно вызывается только как строка. вызывается только handleDefault.. почему?

@Component
@KafkaListener(topics = "product-created-events-topic", groupId = "product-created-events")
public class ProductCreatedEventHandler {
    private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
    @KafkaHandler(isDefault = true)
    public void handle(ProductCreatedEvent productCreatedEvent){
        LOGGER.info("get msg from kafka:"+productCreatedEvent.title());

    }
    @KafkaHandler(isDefault = true)
    public void handleDefault(String message) {
        LOGGER.warn("Received unknown message type from Kafka: " + message);
    }
}

Мне нравится, чтобы он использовался как объект ProductCreatedEvent, а не как строка.

Это класс:

package com.test.ws.core;

import java.math.BigDecimal;

public class ProductCreatedEvent {
    private String productId;
    private String title;
    private BigDecimal price;
    private Integer quantity;

    public ProductCreatedEvent(String productId, String title, BigDecimal price, Integer quantity) {
        this.productId = productId;
        this.title = title;
        this.price = price;
        this.quantity = quantity;
    }

    public String productId() {
        return this.productId;
    }

    public ProductCreatedEvent setProductId(String productId) {
        this.productId = productId;
        return this;
    }

    public String title() {
        return this.title;
    }

    public ProductCreatedEvent setTitle(String title) {
        this.title = title;
        return this;
    }

    public BigDecimal price() {
        return this.price;
    }

    public ProductCreatedEvent setPrice(BigDecimal price) {
        this.price = price;
        return this;
    }

    public Integer quantity() {
        return this.quantity;
    }

    public ProductCreatedEvent setQuantity(Integer quantity) {
        this.quantity = quantity;
        return this;
    }
}

и класс начальной загрузки:

@SpringBootApplication
public class ConsumersApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumersApplication.class, args);
    }

}

Конфигурация приложения Spring Boot:

spring.application.name=consumers

server.port=8083

spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.group-id=product-created-events
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false

Это тема:

~/kafka/kafka_2.13-3.7.0/bin$ ./kafka-console-consumer.sh --topic product-created-events-topic --from-beginning --bootstrap-server localhost:9092 --property print.key=true --property print.value=true
ea59cd46-b205-4769-84d7-69cf37b3ba79    {"productId":"ea59cd46-b205-4769-84d7-69cf37b3ba79","title":"iphone1","price":222,"quantity":19}
ea59cd46-b205-4769-84d7-69cf37b3ba79    {"productId":"ea59cd46-b205-4769-84d7-69cf37b3ba79","title":"iphone1","price":222,"quantity":19}
b1f9dc43-58b4-44e5-9184-afe9159cd757    {"productId":"b1f9dc43-58b4-44e5-9184-afe9159cd757","title":"iphone2","price":212,"quantity":19}
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
0
70
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я думаю, вам не хватает свойства spring.kafka.consumer.value-deserializer, поскольку потребитель десериализует запись, а производитель сериализует ее. Обычно я использовал Apache Avro, а не JSON, но нашел пример Spring, который делает следующее:

Производитель использует JsonSerializer; потребитель использует ByteArrayDeserializer вместе с JsonMessageConverter, который преобразуется в тип аргумента метода прослушивателя.

Тогда у вас есть еще один, более продвинутый пример, где производитель использует JsonSerializer и есть несколько слушателей-потребителей, ожидающих разных типов. Прослушиватель-потребитель выбирается динамически на основе сопоставления типов производитель-потребитель.

Производитель использует JsonSerializer; потребитель использует ByteArrayDeserializer вместе с ByteArrayJsonMessageConverter, который преобразуется в требуемый тип аргумента метода прослушивателя. В этом случае мы не можем определить тип (поскольку тип используется для выбора метода для вызова). Поэтому мы настраиваем сопоставление типов на стороне производителя и потребителя. См. application.yml для стороны производителя и bean-конвертер на стороне потребителя.

Другие вопросы по теме