У меня есть приложение Spring Boot Kafka. Мои брокеры перерабатываются каждые несколько дней. Старые брокеры деинициализируются, а новые брокеры инициализируются.
У меня есть планировщик, который проверяет брокеров каждые несколько часов. Я хотел бы убедиться, что как только у нас появятся новые брокеры, мы должны перезагрузить все bean-компоненты, связанные со Spring Kafka. Очень похоже на KafkaAutoConfiguration, за исключением того, что мне нужен триггер при изменении значения брокера и программная загрузка автоматической конфигурации.
Как программно вызвать автоматическую настройку всякий раз, когда старые брокеры заменяются новыми?




Ваши требования звучат как Сервер конфигурации в Spring Cloud: https://cloud.spring.io/spring-cloud-static/Greenwich.SR2/multi/multi__spring_cloud_config_2.html#_spring_cloud_config_2 с его функцией @RefreshScope: https://cloud.spring.io/spring-cloud-static/Greenwich.SR2/multi/multi__spring_cloud_context_application_context_services.html#refresh-scope.
Итак, вам нужно указать свои собственные bean-компоненты и пометить их этой аннотацией:
@Bean
@RefreshScope
public ConsumerFactory<?, ?> kafkaConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties());
}
@Bean
@RefreshScope
public ProducerFactory<?, ?> kafkaProducerFactory() {
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
this.properties.buildProducerProperties());
String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
}
return factory;
}
Эти два bean-компонента полагаются на свойства конфигурации для подключения к брокеру Apache Kafka, и этого действительно достаточно, чтобы их можно было обновить. Всякий раз, когда происходит ContextRefreshedEvent, эти bean-компоненты будут повторно инициализированы с новыми свойствами конфигурации.
Я думаю, что потребители ConsumerFactory (MessageListenerContainer и KafkaListenerEndpointRegistry) также должны быть перезапущены в этом событии. Дело в том, что MessageListenerContainer запускает долгоживущий процесс и поэтому кэширует KafkaConsumer экземпляр для poll целей.
Всех потребителей ProducerFactory не нужно перезапускать. Даже если KafkaProducer кэшируется в DefaultKafkaProducerFactory, он будет повторно инициализирован во время фазы @RefreshScope.
ОБНОВИТЬ
I don’t use config server. I get the new hosts from consul catalog service.
Правильно, я не говорил, что вы используете Config Server. Это просто выглядит для меня похожим образом. Итак, с большой высоты я бы действительно взглянул на реализацию Config Client для вашего решения каталога Consul.
Тем не менее, вы все еще можете испустить RefreshEvent, который вызовет перезагрузку всех ваших @RefreshScope'd bean-компонентов. Для этого вам нужно реализовать ApplicationEventPublisherAware и генерировать это событие всякий раз, когда у вас есть обновление от Consul. Помните: контейнеры прослушивателя Kafka необходимо перезапустить. Для этого вы можете прослушивать RefreshScopeRefreshedEvent, так как вы действительно заинтересованы в перезапуске только тогда, когда все @RefreshScope обновлены.
Подробнее о область обновления: https://gist.github.com/dsyer/a43fe5f74427b371519af68c5c4904c7
Спасибо Артем. Я не использую конфигурационный сервер. Я получаю новые хосты от службы каталогов консула. Можно ли программно вызвать событие обновления контекста из планировщика? Я уже использую RefreshScope в своем проекте. Как мы отличаем этот RefreshScope от других?
Пожалуйста, смотрите ОБНОВЛЕНИЕ в моем ответе.
Еще раз спасибо - Итак, я наконец-то начал работать над этим - Что касается этого Тем не менее, вы все равно можете сгенерировать RefreshEvent, который вызовет перезагрузку всех ваших bean-компонентов @RefreshScope., как Refreshscope узнает, что нужно обновить для фабрики? Изменился только список брокеров, то есть как new RefreshEvent(this, brokers, "new brokers") переводится как «новый завод»? Мне не хватает этой ссылки.
Я думаю, вам нужно начать новый поток SO. В этом старом уже как-то потерялось - больше года назад...
Посмотрите, поможет ли что-нибудь из этого? stackoverflow.com/questions/51218086/…, oodlestechnologies.com/blogs/…, tothenew.com/blog/…, stackoverflow.com/questions/27998502/…