Spring Kafka воспроизводит старые сообщения

У меня есть требование воспроизвести старые смещения Kafka в случае возникновения каких-либо проблем. Есть ли в Spring Kafka какой-нибудь API для воспроизведения старого смещения темы?

0
0
1 353
1

Ответы 1

Да, есть. Взгляните на ConsumerSeekAware. Использование описано здесь. А именно об этом способе:

void registerSeekCallback(ConsumerSeekCallback callback);

в документации написано:

called when the container is started; this callback should be used when seeking at some arbitrary time after initialization. You should save a reference to the callback; if you are using the same listener in multiple containers (or in a ConcurrentMessageListenerContainer) you should store the callback in a ThreadLocal or some other structure keyed by the listener Thread.

Сам обратный вызов задокументирован здесь.

Кроме того, в более новых версиях (начиная с 2.0) вы можете использовать ConsumerAwareRebalanceListrener для выполнения поиска на Consumer при назначении разделов.

Gary Russell 01.11.2018 21:08

Спасибо Лачезар. Спасибо, Гэри.

Arun Kumar 02.11.2018 11:31

Я попытался реализовать consumerSeekAware и зарегистрировать consumerseekcallback с помощью threadlocal. Он работает в классе слушателя. Но я хочу вызвать метод поиска из другого класса, например запуск из REST API. Я хотел бы установить смещение темы и раздела с помощью некоторого REST API, чтобы у меня был контроль над воспроизведением старого смещения всякий раз, когда это необходимо. Любая помощь в том, как я могу этого добиться?

Arun Kumar 13.11.2018 17:13

Другие вопросы по теме