У меня проблема с использованием RetryableTopic вместе с функциональностью opentracing springboot. Определение RetryableTopic выглядит следующим образом:
@RetryableTopic(
attempts = "3",
backoff = @Backoff(delay = 3000, multiplier = 2.0),
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE,
kafkaTemplate = "dltKafkaTemplate",
listenerContainerFactory = "retryEventListenerFactory",
exclude = {
DeserializationException.class,
SerializationException.class,
MessageConversionException.class,
ConversionException.class,
MethodArgumentResolutionException.class,
NoSuchMethodException.class,
ClassCastException.class
}
)
@KafkaListener(
topics = "dlt-msg-test-topic",
containerFactory = "retryEventListenerFactory")
public void consume(
String message, @Headers MessageHeaders messageHeaders) {
LOGGER.info("Received {}", message);
throw new RuntimeException("Test retry exception");
}
@DltHandler
public void dlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
LOGGER.info(in + " from " + topic);
}
Этот образец отлично работает сам по себе, вызывает повторные попытки и dlt. Как только я попытаюсь добавить в проект функциональность трассировки, представив, например, opentracing-spring-cloud-starter и связанные с ней зависимости, выдается следующая ошибка:
java.lang.UnsupportedOperationException: This implementation doesn't support this method
at org.springframework.kafka.core.ProducerFactory.getConfigurationProperties(ProducerFactory.java:120)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.determineSendTimeout(DeadLetterPublishingRecoverer.java:661)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.verifySendResult(DeadLetterPublishingRecoverer.java:636)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publish(DeadLetterPublishingRecoverer.java:628)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.send(DeadLetterPublishingRecoverer.java:524)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.sendOrThrow(DeadLetterPublishingRecoverer.java:489)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:461)
at org.springframework.kafka.listener.FailedRecordProcessor.getRecoveryStrategy(FailedRecordProcessor.java:181)
at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:134)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2674)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2555)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2429)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2307)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1981)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1365)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1356)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1251)
После отладки я обнаружил, что opentracing создает оболочку вокруг исходного потребителя, которая называется TracingKafkaConsumer. Этот TracingKafkaConsumer хранит исходного потребителя в качестве одного из своих полей. Поэтому, когда DefaultErrorHandler вызывается как часть цикла повтора вместо исходного потребителя, TracingKafkaConsumer передается в метод handleRemaining DefaultErrorHandler.
Если я правильно понимаю, после этого SeekUtils.seekOrRecover завершается с ошибкой, потому что он не может определить необходимые параметры от исходного потребителя, как это было бы при работе без функции трассировки. Я предполагаю, что эти параметры не могут быть найдены, потому что первоначальный потребитель теперь является полем в TracingKafkaConsumer и, следовательно, функция поиска, которая не работает, не знает, как получить то, что ей нужно, от исходного потребителя.
Мой вопрос в том, как я могу выйти из этой ситуации и обеспечить правильную совместную работу функций TracingKafkaConsumer и RetryableTopic?

Похоже, что открытая трассировка оборачивает ProducerFactory в другую фабрику (которая, предположительно, оборачивает производителей в трассирующих производителей).
java.lang.UnsupportedOperationException: This implementation doesn't support this method
at org.springframework.kafka.core.ProducerFactory.getConfigurationProperties(ProducerFactory.java:120)
Оболочка фабрики-производителя должна реализовать getConfigurationProperties() и вызвать фабрику делегатов, чтобы получить ее свойства.
Я предлагаю вам открыть баг против открытой трассировки, чтобы исправить их обертку, чтобы она реализовывала все необходимые методы путем делегирования реальной фабрике.
Тем не менее, все, что мы пытаемся сделать здесь, это определить настройку тайм-аута отправки для производителя.
Пожалуйста, откройте вопрос здесь: https://github.com/spring-projects/spring-kafka/issues мы можем быть немного более терпимыми, если фабрика-производитель не соблюдает контракт и возвращается к некоторому умолчанию тайм-аут.
РЕДАКТИРОВАТЬ
Вроде уже исправили...
Они исправили это пару лет назад; вы должны использовать старую версию
https://github.com/opentracing-contrib/java-kafka-client/pull/87
РЕДАКТИРОВАТЬ2
Ну, это было исправлено в 2021 году, но похоже, что релиза не было с 2020 года.
Может этот проект умер?
Спасибо, посмотрю на сыщика. было бы полезно, если бы spring-kafka была немного снисходительнее. Я попытаюсь открыть вопрос по ссылке, которую вы предоставили. может это к чему то приведет. еще раз спасибо за помощь
OpenTelemetry — это слияние OpenCensus и OpenTracing в единый проект:
https://opentelemetry.io/docs/migration/opentracing/
https://github.com/open-telemetry/opentelemetry-java
Как сейчас написано, ваш ответ неясен. Пожалуйста, отредактируйте , чтобы добавить дополнительные сведения, которые помогут другим понять, как это отвечает на заданный вопрос. Вы можете найти больше информации о том, как писать хорошие ответы в справочном центре.
Спасибо за ответ. Я видел эту проблему и оставил там комментарий о версии, которую можно было бы использовать, но пока нет ответа. В случае, если проект мертв, это означает, что для исправления этого мне нужно либо 1) разветвить код iib и выпустить версию самостоятельно, либо 2) переключиться на другую библиотеку трассировки. Во втором случае, есть ли библиотека трассировки, которую Spring предоставляет, которая определенно работает с RetryTopic?