При переносе Java с Spring 2.x на 3.x я также обновил свою зависимость Spring-Kafka до версии 3.0.13. Теперь, когда старый обработчик ошибок, который я использовал, устарел, SeekToCurrentBatchErrorHandler.
Вот чем я сейчас пользуюсь:factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(2000, 5)));
Но теперь я начал получать эту ошибку:
java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.common.KafkaException's; no record information is available
at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:204)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1961)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1396)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.kafka.common.KafkaException: Unexpected error from SyncGroup: The server experienced an unexpected error when processing the request.
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:852)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:771)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1260)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1235)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:617)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:427)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:312)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1307)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1676)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1651)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1452)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1344)
... 2 more
Зависимость:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.0.13</version>
</dependency>
Заранее спасибо.
Ожидается, что новый обработчик ошибок будет иметь ту же бизнес-логику, что и SeekToCurrentBatchErrorHandler.




Эта ошибка была устранена. На самом деле я указал неправильный адрес начальной загрузки, который не был виден в файле журнала. Итак, я немного подправил DefaultErrorHandler, и теперь он работает. Это всего лишь вопрос видимости ошибок, если вы используете файл журнала вместо consoleLogger.
public class CustomSeekToCurrentBatchErrorHandler extends DefaultErrorHandler {
private static final Logger ALERT_LOG = LoggerFactory
.getLogger(LoggerConstant.ALERT_LOG);
@Override
public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer, MessageListenerContainer container, boolean batchListener) {
ALERT_LOG.error("Kafka Exception", thrownException);
}
}