Прямо сейчас я пытаюсь заставить два моих микросервиса взаимодействовать друг с другом. Первый отправляет такое сообщение:
@PostMapping("/testPost")
public ResponseEntity<?> testPost(@RequestBody UserCreatedEvent userCreatedEvent) {
try {
ProducerRecord<String, Object> producerRecord =
new ProducerRecord<>("user", "user-created", userCreatedEvent);
producerRecord.headers().add("spring.kafka.serialization.selector", "userCreated".getBytes());
SendResult<String, Object> result =
kafkaTemplate.send(producerRecord).get();
return ResponseEntity.ok(string);
} catch (Exception e) {
return ResponseEntity.internalServerError().body("Broker error!");
}
}
Вот конфигурация производителя:
@Configuration
public class ProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String serverAddress;
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
serverAddress);
configProps.put(
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
configProps.put(
org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG,
"all");
configProps.put(
org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
true);
configProps.put(
org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG,
5);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Поэтому, когда мой потребительский микросервис получает сообщение, я получаю множество исключений, вызванных:
Caused by: java.lang.IllegalArgumentException: The class 'org.example.testsender.dto.UserCreatedEvent' is not in the trusted packages: [java.util, java.lang, org.example.userservice.event, org.example.userservice.event.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
Также по какой-то причине мой потребитель не видит ErrorHandlingDeserializer
:
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
Вот моя потребительская конфигурация:
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public ConsumerFactory<String, Object> consumerFactory(DelegatingDeserializer delegatingDeserializer) {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
groupId);
props.put(
JsonDeserializer.TRUSTED_PACKAGES,
"*"
);
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ErrorHandlingDeserializer.class);
props.put(
ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS,
StringDeserializer.class
);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ErrorHandlingDeserializer.class);
props.put(
ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS,
DelegatingDeserializer.class
);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), delegatingDeserializer);
}
@Bean
public DelegatingDeserializer delegatingDeserializer() {
Map<String, Deserializer<?>> delegates = new HashMap<>();
delegates.put("userCreated", new JsonDeserializer<>(UserCreatedEvent.class));
return new DelegatingDeserializer(delegates);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(
ConsumerFactory<String, Object> consumerFactory,
DefaultErrorHandler errorHandler) {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(3);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
@Bean
public DefaultErrorHandler errorHandler(KafkaTemplate<String, Object> kafkaTemplate) {
DefaultErrorHandler errorHandler =
new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate), new FixedBackOff(3000, 3));
errorHandler.addRetryableExceptions(ServiceUnavailable.class);
errorHandler.addNotRetryableExceptions(
ServerSideException.class,
ClientSideException.class,
DeserializationException.class,
JsonProcessingException.class,
MismatchedInputException.class);
return errorHandler;
}
}
Итак, когда я устанавливаю потребительские свойства следующим образом:
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ErrorHandlingDeserializer.class);
props.put(
ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS,
JsonDeserializer.class
);
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, UserCreatedEvent.class);
Мое сообщение отправляется в тему недоставленных писем. Почему? Почему он не был отправлен в dlt с предыдущей конфигурацией? И главный вопрос: как сделать генерическую потребительскую фабрику? На данный момент у меня есть только один тип событий, но позже у меня будет больше, стоит ли мне создавать фабрику потребителей для каждого? Я не думаю, что это хорошая идея. Согласно документации, как я понял, была возможность использовать
DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG
Но сейчас такой вещи больше нет, поэтому я заменил ее на @Bean DelegatingDeserializer (вы можете увидеть это в моей потребительской конфигурации), может быть, я сделал это неправильно? Как правильно создать универсальную потребительскую фабрику, поддерживающую несколько типов событий для одной темы? Кстати, вот мой список:
@Component
@Slf4j
public class UserCreatedKafkaEventListener {
@KafkaHandler(isDefault = true)
public void defaultHandler(Object o) {
log.info("Received unknown event: {}", o);
}
@KafkaListener(topics = "user", groupId = "users")
public void userCreatedEventHandler(ConsumerRecord<String, UserCreatedEvent> consumerRecord) {
String key = consumerRecord.key();
UserCreatedEvent event = consumerRecord.value();
log.info("Handling event with key: {}", key);
}
}
ОБНОВЛЕНО Я создал репозиторий GitHub с тестовым проектом, чтобы воспроизвести проблему https://github.com/Denstran/TestKafkaProject
ОБНОВЛЕНО 2
Я исправил проблему с десериализацией, добавив этот containerFactory = "containerFactory"
к @KafkaListener
. Но сейчас я имею дело с другой проблемой. У меня есть разные классы, помеченные @KafkaListener
, и эти классы имеют @KafkaHandler
методы для определенных типов объектов событий, подобных этому.
@KafkaListener(topics = "users", groupId = "user")
@Component
@Slf4j
public class UserCreatedKafkaEventListener {
@KafkaHandler
public void handleEvent(UserCreatedEvent userCreatedEvent) {
log.info("Handling user created event: {}, {}",
userCreatedEvent.getClass().getCanonicalName(), userCreatedEvent);
}
@KafkaHandler(isDefault = true)
public void listenDefault(Object object) {
log.info("Handling default object: {}", object);
log.info("Handling object with class name: {}, object: {}",
object.getClass().getCanonicalName(), object);
}
}
Итак, проблема в том, что теперь все события, которые я отправляю через производителя, попадают в класс @KafkaHandler(isDefault = true)
из UserCreatedKafkaEventListener
, а не в другие классы с обработчиками для их типов. Вот обновленная конфигурация потребителя и производителя моего нового проекта, ссылку на который я уже предоставил:
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
consumerProps.put(JsonDeserializer.TYPE_MAPPINGS, "userCreated:org.example.userservice.dto.UserCreatedEvent, userUpdated:org.example.userservice.dto.UserUpdatedEvent");
consumerProps.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, true);
consumerProps.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, false);
return new DefaultKafkaConsumerFactory<>(consumerProps);
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverAddress);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
producerConfig.put(JsonSerializer.TYPE_MAPPINGS,
"userCreated:org.example.producerservice.dto.UserCreatedEvent, userUpdated:org.example.producerservice.dto.UserUpdatedEvent");
producerConfig.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, true);
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 5);
return new DefaultKafkaProducerFactory<>(producerConfig);
}
DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG
был точно таким же, как и DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG
. Итак, это заголовок, который вы можете использовать на стороне потребителя для определения типа цели.
Однако похоже, что вы имеете дело с JSON. Итак, или вы можете положиться на заголовки производителя (что похоже на случай этой ошибки для входящего типа org.example.testsender.dto.UserCreatedEvent
) или создать сопоставление для JsonDeserializer
: https://docs.spring.io/spring-kafka/reference /kafka/serdes.html#json-serde
Он не попадает в ErrorHandlingDeserializer
, потому что вы явно указываете десериализатор:
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), delegatingDeserializer);
Если мы сделаем это в заводской конфигурации, то все соответствующие свойства будут проигнорированы.
Кроме того, я заметил, что когда я перехватываю событие в @KafkaHandler(isDefault = true)
с аргументами метода Object object
и печатаю имя класса - это java.lang.String
, я не знаю почему, потому что в конфигурации производителя у меня есть следующая строка producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
, а в моей потребительской конфигурации consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
так Я не могу понять, откуда взялся String
Такое ощущение, что ваша фабрика контейнеров вышла из строя: в Spring Boot есть десериализатор строк. Было бы здорово, если бы вы поделились с нами своим простым проектом, который можно было бы воспроизвести и поиграть с нами на нашей стороне. Поделитесь, пожалуйста, через репозиторий GitHub.
Вот он github.com/Denstran/TestKafkaProject
Хорошо, я исправил проблему с десериализацией, добавив containerFactory = "containerFactory"
к @KafkaListener
, но теперь столкнулся с другой проблемой. Я использую @KafkaListener
в разных классах (а не в методах), потому что хочу обрабатывать каждое событие в специальном классе, но все события теперь передаются только в UserCreatedKafkaEventListener
, а поскольку для них нет @KafkaHandler - они попадают в Кафку по умолчанию. обработчик, и это не то, что я хочу. Я хочу, чтобы они попадали в другие классы, аннотированные @KafkaListener, и обрабатывались обработчиками Kafka, которые ждут своего типа.
Дополнительные объяснения находятся в разделе ОБНОВЛЕНО 2 вопроса.
Это хорошо, что вы придумали оригинальную проблему. К сожалению, пока не было возможности ознакомиться с вашим проектом. Я думаю, что ваша следующая проблема заслуживает отдельного вопроса на StackOverflow.
Итак, следуя словам Атема Билана, я проверил, не используется ли моя потребительская фабрика в Spring Boot или нет, и обнаружил, что это определенно так. Как же так? Ну, как сказал Артем, Spring Boot предоставляет @Bean
из ConcurrentKafkaListenerContainerFactory
из коробки и, как я погуглил, если вы хотите его переопределить, имя бина должно быть «kafkaListenerContainerFactory» (согласно этому), в противном случае нужно указать какую фабрику контейнеров использовать в параметре containerFactory
@KafkaListener
вот так @KafkaListener(topics = "users", groupId = "user", containerFactory = "containerFactory")
. Кроме того, чтобы решить проблему десериализации, важно указать сопоставления типов следующим образом:
producerConfig.put(JsonSerializer.TYPE_MAPPINGS, "userCreated:org.example.producerservice.dto.UserCreatedEvent");
consumerConfig.put(JsonDeserializer.TYPE_MAPPINGS, "userCreated:org.example.userservice.dto.UserCreatedEvent");
Это позволяет нашему потребителю понять, что тип ведьмы должен десериализовать объект из-за токена «userCreated», после чего вы должны указать имя класса вашего объекта (читайте Типы сопоставления) в весенних документах.
Я создаю новый проект с микросервисом производителя и микросервисом потребителя, затем следую статье «Типы сопоставления» и добавляю следующие свойства своему производителю и потребителю:
consumerProps.put(JsonDeserializer.TYPE_MAPPINGS, "userCreated:org.example.userservice.dto.UserCreatedEvent");
configProps.put(JsonSerializer.TYPE_MAPPINGS, "userCreated:org.example.producerservice.dto.UserCreatedEvent");
Итак, теперь сообщения поступают только в DLT, но все еще не сопоставляются с правильным типом.