Этот обработчик ошибок не может обрабатывать исключения org.apache.kafka.common.KafkaException; информация о записи отсутствует

При переносе Java с Spring 2.x на 3.x я также обновил свою зависимость Spring-Kafka до версии 3.0.13. Теперь, когда старый обработчик ошибок, который я использовал, устарел, SeekToCurrentBatchErrorHandler. Вот чем я сейчас пользуюсь:
factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(2000, 5)));

Но теперь я начал получать эту ошибку:

java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.common.KafkaException's; no record information is available
    at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:204)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1961)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1396)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
    at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.kafka.common.KafkaException: Unexpected error from SyncGroup: The server experienced an unexpected error when processing the request.
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:852)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:771)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1260)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1235)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:617)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:427)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:312)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1307)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1676)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1651)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1452)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1344)
    ... 2 more

Зависимость:

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>3.0.13</version>
        </dependency>

Заранее спасибо.

Ожидается, что новый обработчик ошибок будет иметь ту же бизнес-логику, что и SeekToCurrentBatchErrorHandler.

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
0
184
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Эта ошибка была устранена. На самом деле я указал неправильный адрес начальной загрузки, который не был виден в файле журнала. Итак, я немного подправил DefaultErrorHandler, и теперь он работает. Это всего лишь вопрос видимости ошибок, если вы используете файл журнала вместо consoleLogger.

public class CustomSeekToCurrentBatchErrorHandler extends DefaultErrorHandler {
private static final Logger ALERT_LOG                     = LoggerFactory
        .getLogger(LoggerConstant.ALERT_LOG);

@Override
public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer, MessageListenerContainer container, boolean batchListener) {
    ALERT_LOG.error("Kafka Exception", thrownException);
}

}

Другие вопросы по теме