У меня есть требование воспроизвести старые смещения Kafka в случае возникновения каких-либо проблем. Есть ли в Spring Kafka какой-нибудь API для воспроизведения старого смещения темы?
Да, есть. Взгляните на ConsumerSeekAware. Использование описано здесь. А именно об этом способе:
void registerSeekCallback(ConsumerSeekCallback callback);
в документации написано:
called when the container is started; this callback should be used when seeking at some arbitrary time after initialization. You should save a reference to the callback; if you are using the same listener in multiple containers (or in a ConcurrentMessageListenerContainer) you should store the callback in a ThreadLocal or some other structure keyed by the listener Thread.
Сам обратный вызов задокументирован здесь.
Спасибо Лачезар. Спасибо, Гэри.
Я попытался реализовать consumerSeekAware и зарегистрировать consumerseekcallback с помощью threadlocal. Он работает в классе слушателя. Но я хочу вызвать метод поиска из другого класса, например запуск из REST API. Я хотел бы установить смещение темы и раздела с помощью некоторого REST API, чтобы у меня был контроль над воспроизведением старого смещения всякий раз, когда это необходимо. Любая помощь в том, как я могу этого добиться?
Кроме того, в более новых версиях (начиная с 2.0) вы можете использовать
ConsumerAwareRebalanceListrenerдля выполнения поиска наConsumerпри назначении разделов.