Мой потребитель Kafka повторяет попытку 10 раз ненормально и останавливает повторную попытку, просто добавляя журнал MDC?

У меня есть прослушиватель Kafka, я отправляю одно сообщение Kafka в тему, и мой слушатель получает 10 раз (второй - после того, как первый завершит свою функцию обработки, третий - после завершения второго... и так далее .)

Моя потребительская функция будет такой:

    @KafkaListener(topics = "xxx", groupId = "xxx")
    public void onMessageUser(ConsumerRecord<?, ?> record, Acknowledgment ack) {
        //some code
        //xxxx
        ack.acknowledge();
    }

Моя конфигурация Кафки:

spring.kafka.producer.retries=5
spring.kafka.producer.acks=1
spring.kafka.consumer.max-poll-records=50
spring.kafka.consumer.poll-timeout=5000
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=MANUAL

Я добавляю журнал MDC (сопоставленный диагностический контекст), и тогда потребитель становится прав!! Он останавливает повторные попытки 10 раз. Просто получите один раз и обработайте один раз.

MDC.put("requestId", xxx);

MDC — org.slf4j.MDC.

Где магия????


Обновлять: Большое спасибо за полезные предложения от @Artem Bilan и @Gary Russell. Я нашел причину. Вот код ошибки в фрагменте "какой-то код":

Map<String,Object> prop = new ConcurrentHashMap<>();   
prop.put(CommonConstants.Request.REQUEST_ID,MDC.get(CommonConstants.Request.REQUEST_ID));

«MDC.get(CommonConstants.Request.REQUEST_ID)» имеет нулевое значение, поэтому вызывает исключение нулевого указателя:

java.lang.NullPointerException: ноль в java.base/java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1011) в java.base/java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006)

И так как у меня нет блока try catch с кодом задачи, это исключение не печатается в моем журнале ошибок, поэтому я долго не могу найти эту ошибку. (У меня в проекте есть общий обработчик исключений Spring Web, но сообщение Kafka не проходит через мой обработчик исключений Spring Web)

Исключение вызывает политику повторных попыток Kafka по умолчанию для повторных попыток еще 9 раз.

Есть ли шанс увидеть от вас простой проект, в котором мы можем воспроизвести проблему на своей стороне и отладить ее?

Artem Bilan 28.04.2023 16:30

@ArtemBilan Конечно. Я поделился всем кодом (за исключением бесполезного бизнес-кода) в этом документе: docs.google.com/document/d/… Я сделал все возможное, чтобы предоставить как можно больше деталей. Если вам нужна более подробная информация, я пришлю вам то, что вы хотите знать.

XCCCCLY 29.04.2023 05:14

Что ж, не могли бы вы загрузить свой код через репозиторий github.com? Итак, мы можем разветвить его и запустить локально. Не уверен, однако, что действительно нужно так много специфического для бизнеса реквизита.

Artem Bilan 29.04.2023 13:59

Возможно, ваш some code выдает исключение, потому что MDC requestId отсутствует? Это приведет к поведению по умолчанию, состоящему из 9 повторных попыток с нулевой задержкой между ними. Вы не показываете свою конфигурацию журнала, но я предполагаю, что шаблон требует присутствия MDC.

Gary Russell 01.05.2023 17:24

@ArtemBilan Я разместил более подробную информацию здесь: docs.google.com/document/d/… Код для просмотра репозитория GitHub, я попробую, но не уверен, потому что он включает много настроек из внутренней системы.

XCCCCLY 04.05.2023 08:54
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
1
5
157
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Скорее всего, ваш some code выдает исключение, потому что MDC requestId отсутствует.

Это приведет к поведению по умолчанию, состоящему из 9 повторных попыток с нулевой задержкой между ними.

Вы не показываете свою конфигурацию журнала, но я предполагаю, что шаблон требует присутствия MDC.

Спасибо! Я проверил свой файл конфигурации журнала, шаблон журнала: <property name = "LOG_PATTERN" value = "%date{yyyy-MM-dd HH:mm:ss.SSS}|%X{Request-Id}| %X{User-Id}|%msg%n"/> Как вы упомянули, у него есть поле Request-Id. Мой вопрос: что это означает: «поведение по умолчанию при 9 повторных попытках», какой параметр конфигурации kafka определяет это время повторных попыток? Или время повтора по умолчанию от Kafka? Любая ссылка на документ для этого повторного поведения? Спасибо.

XCCCCLY 04.05.2023 05:05

Когда requestId равен нулю, какое исключение выдаст Logback? Я думаю, что это будет терпимо к нулевому значению? Либо напечатайте нулевое значение, либо проигнорируйте печать этого поля.

XCCCCLY 04.05.2023 06:08

Я разместил «некоторый код» здесь для более подробной информации: docs.google.com/document/d/…

XCCCCLY 04.05.2023 08:44
>which kafka configuration param define this retry times? См. документацию: docs.spring.io/spring-kafka/docs/current/reference/html/… Я не знаю, как ведет себя LogBack при отсутствии MDC, но в журнале должно отображаться исключение.
Gary Russell 04.05.2023 15:26

Большое спасибо. Я попытаюсь найти этот журнал исключений logback. Этот журнал может завершить все это расследование.

XCCCCLY 04.05.2023 18:00

Все еще немного запутался в исключении журнала. Когда я проверяю документ журнала о выводе mdc здесь: logback.qos.ch/manual/layouts.html#mdc Он говорит: «Если значение равно null, то выводится значение по умолчанию, указанное после оператора :-. Если значение по умолчанию не указано, выводится пустая строка." Таким образом, кажется, что нулевое значение не вызовет исключения.

XCCCCLY 05.05.2023 04:25

Как я уже сказал, я не знаю внутреннего устройства журнала, но есть разница между MDC, который существует, но имеет нулевое значение, и несуществующим. Отредактируйте вопрос, чтобы показать выброшенное исключение.

Gary Russell 05.05.2023 16:59

Я обновил описание вопроса, спасибо за вашу помощь, сэр.

XCCCCLY 06.05.2023 11:59

Другие вопросы по теме