У меня есть простое приложение Java Spring Boot, которое должно использовать сообщения темы из Kafka как объектный класс, но оно вызывается только как строка. вызывается только handleDefault.. почему?
@Component
@KafkaListener(topics = "product-created-events-topic", groupId = "product-created-events")
public class ProductCreatedEventHandler {
private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
@KafkaHandler(isDefault = true)
public void handle(ProductCreatedEvent productCreatedEvent){
LOGGER.info("get msg from kafka:"+productCreatedEvent.title());
}
@KafkaHandler(isDefault = true)
public void handleDefault(String message) {
LOGGER.warn("Received unknown message type from Kafka: " + message);
}
}
Мне нравится, чтобы он использовался как объект ProductCreatedEvent, а не как строка.
Это класс:
package com.test.ws.core;
import java.math.BigDecimal;
public class ProductCreatedEvent {
private String productId;
private String title;
private BigDecimal price;
private Integer quantity;
public ProductCreatedEvent(String productId, String title, BigDecimal price, Integer quantity) {
this.productId = productId;
this.title = title;
this.price = price;
this.quantity = quantity;
}
public String productId() {
return this.productId;
}
public ProductCreatedEvent setProductId(String productId) {
this.productId = productId;
return this;
}
public String title() {
return this.title;
}
public ProductCreatedEvent setTitle(String title) {
this.title = title;
return this;
}
public BigDecimal price() {
return this.price;
}
public ProductCreatedEvent setPrice(BigDecimal price) {
this.price = price;
return this;
}
public Integer quantity() {
return this.quantity;
}
public ProductCreatedEvent setQuantity(Integer quantity) {
this.quantity = quantity;
return this;
}
}
и класс начальной загрузки:
@SpringBootApplication
public class ConsumersApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumersApplication.class, args);
}
}
Конфигурация приложения Spring Boot:
spring.application.name=consumers
server.port=8083
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.group-id=product-created-events
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
Это тема:
~/kafka/kafka_2.13-3.7.0/bin$ ./kafka-console-consumer.sh --topic product-created-events-topic --from-beginning --bootstrap-server localhost:9092 --property print.key=true --property print.value=true
ea59cd46-b205-4769-84d7-69cf37b3ba79 {"productId":"ea59cd46-b205-4769-84d7-69cf37b3ba79","title":"iphone1","price":222,"quantity":19}
ea59cd46-b205-4769-84d7-69cf37b3ba79 {"productId":"ea59cd46-b205-4769-84d7-69cf37b3ba79","title":"iphone1","price":222,"quantity":19}
b1f9dc43-58b4-44e5-9184-afe9159cd757 {"productId":"b1f9dc43-58b4-44e5-9184-afe9159cd757","title":"iphone2","price":212,"quantity":19}




Я думаю, вам не хватает свойства spring.kafka.consumer.value-deserializer, поскольку потребитель десериализует запись, а производитель сериализует ее. Обычно я использовал Apache Avro, а не JSON, но нашел пример Spring, который делает следующее:
Производитель использует JsonSerializer; потребитель использует ByteArrayDeserializer вместе с JsonMessageConverter, который преобразуется в тип аргумента метода прослушивателя.
Тогда у вас есть еще один, более продвинутый пример, где производитель использует JsonSerializer и есть несколько слушателей-потребителей, ожидающих разных типов. Прослушиватель-потребитель выбирается динамически на основе сопоставления типов производитель-потребитель.
Производитель использует JsonSerializer; потребитель использует ByteArrayDeserializer вместе с ByteArrayJsonMessageConverter, который преобразуется в требуемый тип аргумента метода прослушивателя. В этом случае мы не можем определить тип (поскольку тип используется для выбора метода для вызова). Поэтому мы настраиваем сопоставление типов на стороне производителя и потребителя. См. application.yml для стороны производителя и bean-конвертер на стороне потребителя.