Как создать универсальную потребительскую фабрику Кафки?

Прямо сейчас я пытаюсь заставить два моих микросервиса взаимодействовать друг с другом. Первый отправляет такое сообщение:

@PostMapping("/testPost")
    public ResponseEntity<?> testPost(@RequestBody UserCreatedEvent userCreatedEvent) {
        try {
            ProducerRecord<String, Object> producerRecord =
                    new ProducerRecord<>("user", "user-created", userCreatedEvent);
            producerRecord.headers().add("spring.kafka.serialization.selector", "userCreated".getBytes());

            SendResult<String, Object> result =
                    kafkaTemplate.send(producerRecord).get();

            return ResponseEntity.ok(string);
        } catch (Exception e) {
            return ResponseEntity.internalServerError().body("Broker error!");
        }
    }

Вот конфигурация производителя:

@Configuration
public class ProducerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String serverAddress;

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
                org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                serverAddress);
        configProps.put(
                org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        configProps.put(
                org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                JsonSerializer.class);

        configProps.put(
                org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG,
                "all");
        configProps.put(
                org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
                true);
        configProps.put(
                org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG,
                5);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Поэтому, когда мой потребительский микросервис получает сообщение, я получаю множество исключений, вызванных:

Caused by: java.lang.IllegalArgumentException: The class 'org.example.testsender.dto.UserCreatedEvent' is not in the trusted packages: [java.util, java.lang, org.example.userservice.event, org.example.userservice.event.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).

Также по какой-то причине мой потребитель не видит ErrorHandlingDeserializer:

java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer

Вот моя потребительская конфигурация:

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Bean
    public ConsumerFactory<String, Object> consumerFactory(DelegatingDeserializer delegatingDeserializer) {
        Map<String, Object> props = new HashMap<>();
        props.put(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        props.put(
                ConsumerConfig.GROUP_ID_CONFIG,
                groupId);
        props.put(
                JsonDeserializer.TRUSTED_PACKAGES,
                "*"
        );

        props.put(
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                ErrorHandlingDeserializer.class);
        props.put(
                ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS,
                StringDeserializer.class
        );

        props.put(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                ErrorHandlingDeserializer.class);
        props.put(
                ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS,
                DelegatingDeserializer.class
        );



        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), delegatingDeserializer);
    }

    @Bean
    public DelegatingDeserializer delegatingDeserializer() {
        Map<String, Deserializer<?>> delegates = new HashMap<>();
        delegates.put("userCreated", new JsonDeserializer<>(UserCreatedEvent.class));

        return new DelegatingDeserializer(delegates);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(
            ConsumerFactory<String, Object> consumerFactory,
            DefaultErrorHandler errorHandler) {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(3);
        factory.setCommonErrorHandler(errorHandler);

        return factory;
    }

    @Bean
    public DefaultErrorHandler errorHandler(KafkaTemplate<String, Object> kafkaTemplate) {
        DefaultErrorHandler errorHandler =
                new DefaultErrorHandler(
                        new DeadLetterPublishingRecoverer(kafkaTemplate), new FixedBackOff(3000, 3));
        errorHandler.addRetryableExceptions(ServiceUnavailable.class);
        errorHandler.addNotRetryableExceptions(
                ServerSideException.class,
                ClientSideException.class,
                DeserializationException.class,
                JsonProcessingException.class,
                MismatchedInputException.class);

        return errorHandler;
    }
}

Итак, когда я устанавливаю потребительские свойства следующим образом:

 props.put(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                ErrorHandlingDeserializer.class);
        props.put(
                ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS,
                JsonDeserializer.class
        );
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, UserCreatedEvent.class);

Мое сообщение отправляется в тему недоставленных писем. Почему? Почему он не был отправлен в dlt с предыдущей конфигурацией? И главный вопрос: как сделать генерическую потребительскую фабрику? На данный момент у меня есть только один тип событий, но позже у меня будет больше, стоит ли мне создавать фабрику потребителей для каждого? Я не думаю, что это хорошая идея. Согласно документации, как я понял, была возможность использовать

DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG

Но сейчас такой вещи больше нет, поэтому я заменил ее на @Bean DelegatingDeserializer (вы можете увидеть это в моей потребительской конфигурации), может быть, я сделал это неправильно? Как правильно создать универсальную потребительскую фабрику, поддерживающую несколько типов событий для одной темы? Кстати, вот мой список:

@Component
@Slf4j
public class UserCreatedKafkaEventListener {

    @KafkaHandler(isDefault = true)
    public void defaultHandler(Object o) {
        log.info("Received unknown event: {}", o);
    }

    @KafkaListener(topics = "user", groupId = "users")
    public void userCreatedEventHandler(ConsumerRecord<String, UserCreatedEvent> consumerRecord) {
        String key = consumerRecord.key();
        UserCreatedEvent event = consumerRecord.value();

        log.info("Handling event with key: {}", key);

    }
}

ОБНОВЛЕНО Я создал репозиторий GitHub с тестовым проектом, чтобы воспроизвести проблему https://github.com/Denstran/TestKafkaProject

ОБНОВЛЕНО 2 Я исправил проблему с десериализацией, добавив этот containerFactory = "containerFactory" к @KafkaListener. Но сейчас я имею дело с другой проблемой. У меня есть разные классы, помеченные @KafkaListener, и эти классы имеют @KafkaHandler методы для определенных типов объектов событий, подобных этому.

@KafkaListener(topics = "users", groupId = "user")
@Component
@Slf4j
public class UserCreatedKafkaEventListener {
    @KafkaHandler
    public void handleEvent(UserCreatedEvent userCreatedEvent) {
        log.info("Handling user created event: {}, {}",
                userCreatedEvent.getClass().getCanonicalName(), userCreatedEvent);
    }

    @KafkaHandler(isDefault = true)
    public void listenDefault(Object object) {
        log.info("Handling default object: {}", object);
        log.info("Handling object with class name: {}, object: {}",
                object.getClass().getCanonicalName(), object);
    }
}

Итак, проблема в том, что теперь все события, которые я отправляю через производителя, попадают в класс @KafkaHandler(isDefault = true) из UserCreatedKafkaEventListener, а не в другие классы с обработчиками для их типов. Вот обновленная конфигурация потребителя и производителя моего нового проекта, ссылку на который я уже предоставил:

@Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
        consumerProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        consumerProps.put(JsonDeserializer.TYPE_MAPPINGS, "userCreated:org.example.userservice.dto.UserCreatedEvent, userUpdated:org.example.userservice.dto.UserUpdatedEvent");
        consumerProps.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, true);
        consumerProps.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, false);

        return new DefaultKafkaConsumerFactory<>(consumerProps);
    }
@Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> producerConfig = new HashMap<>();
        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverAddress);
        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        producerConfig.put(JsonSerializer.TYPE_MAPPINGS,
                "userCreated:org.example.producerservice.dto.UserCreatedEvent, userUpdated:org.example.producerservice.dto.UserUpdatedEvent");
        producerConfig.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, true);

        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
        producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        producerConfig.put(ProducerConfig.RETRIES_CONFIG, 5);
        return new DefaultKafkaProducerFactory<>(producerConfig);
    }
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
0
65
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG был точно таким же, как и DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG. Итак, это заголовок, который вы можете использовать на стороне потребителя для определения типа цели.

Однако похоже, что вы имеете дело с JSON. Итак, или вы можете положиться на заголовки производителя (что похоже на случай этой ошибки для входящего типа org.example.testsender.dto.UserCreatedEvent) или создать сопоставление для JsonDeserializer: https://docs.spring.io/spring-kafka/reference /kafka/serdes.html#json-serde

Он не попадает в ErrorHandlingDeserializer, потому что вы явно указываете десериализатор:

return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), delegatingDeserializer);

Если мы сделаем это в заводской конфигурации, то все соответствующие свойства будут проигнорированы.

Я создаю новый проект с микросервисом производителя и микросервисом потребителя, затем следую статье «Типы сопоставления» и добавляю следующие свойства своему производителю и потребителю: consumerProps.put(JsonDeserializer.TYPE_MAPPINGS, "userCreated:org.example.userservice.dto.UserCreatedEvent");configProps.put(JsonSerializer.TYPE_MAPPINGS, "userCreated:org.example.producerservice.dto.UserCreatedEven‌​t"); Итак, теперь сообщения поступают только в DLT, но все еще не сопоставляются с правильным типом.

denstran 28.08.2024 19:53

Кроме того, я заметил, что когда я перехватываю событие в @KafkaHandler(isDefault = true) с аргументами метода Object object и печатаю имя класса - это java.lang.String, я не знаю почему, потому что в конфигурации производителя у меня есть следующая строка producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CON‌​FIG, JsonSerializer.class);, а в моей потребительской конфигурации consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CO‌​NFIG, ErrorHandlingDeserializer.class); consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZ‌​ER_CLASS, JsonDeserializer.class); так Я не могу понять, откуда взялся String

denstran 28.08.2024 20:03

Такое ощущение, что ваша фабрика контейнеров вышла из строя: в Spring Boot есть десериализатор строк. Было бы здорово, если бы вы поделились с нами своим простым проектом, который можно было бы воспроизвести и поиграть с нами на нашей стороне. Поделитесь, пожалуйста, через репозиторий GitHub.

Artem Bilan 28.08.2024 20:24

Вот он github.com/Denstran/TestKafkaProject

denstran 28.08.2024 20:30

Хорошо, я исправил проблему с десериализацией, добавив containerFactory = "containerFactory" к @KafkaListener, но теперь столкнулся с другой проблемой. Я использую @KafkaListener в разных классах (а не в методах), потому что хочу обрабатывать каждое событие в специальном классе, но все события теперь передаются только в UserCreatedKafkaEventListener, а поскольку для них нет @KafkaHandler - они попадают в Кафку по умолчанию. обработчик, и это не то, что я хочу. Я хочу, чтобы они попадали в другие классы, аннотированные @KafkaListener, и обрабатывались обработчиками Kafka, которые ждут своего типа.

denstran 29.08.2024 15:30

Дополнительные объяснения находятся в разделе ОБНОВЛЕНО 2 вопроса.

denstran 29.08.2024 15:45

Это хорошо, что вы придумали оригинальную проблему. К сожалению, пока не было возможности ознакомиться с вашим проектом. Я думаю, что ваша следующая проблема заслуживает отдельного вопроса на StackOverflow.

Artem Bilan 29.08.2024 19:18
Ответ принят как подходящий

Итак, следуя словам Атема Билана, я проверил, не используется ли моя потребительская фабрика в Spring Boot или нет, и обнаружил, что это определенно так. Как же так? Ну, как сказал Артем, Spring Boot предоставляет @Bean из ConcurrentKafkaListenerContainerFactory из коробки и, как я погуглил, если вы хотите его переопределить, имя бина должно быть «kafkaListenerContainerFactory» (согласно этому), в противном случае нужно указать какую фабрику контейнеров использовать в параметре containerFactory@KafkaListener вот так @KafkaListener(topics = "users", groupId = "user", containerFactory = "containerFactory"). Кроме того, чтобы решить проблему десериализации, важно указать сопоставления типов следующим образом:

producerConfig.put(JsonSerializer.TYPE_MAPPINGS, "userCreated:org.example.producerservice.dto.UserCreatedEvent");
consumerConfig.put(JsonDeserializer.TYPE_MAPPINGS, "userCreated:org.example.userservice.dto.UserCreatedEvent");

Это позволяет нашему потребителю понять, что тип ведьмы должен десериализовать объект из-за токена «userCreated», после чего вы должны указать имя класса вашего объекта (читайте Типы сопоставления) в весенних документах.

Другие вопросы по теме