У меня есть приложение весенней загрузки, которому необходимо подключить N кластеров Kafka. на основе некоторого условия шаблон Kafka необходимо переключить и отправить сообщение
Я видел некоторые решения для создания отдельных bean-компонентов шаблона Kafka, но в моем случае количество кластеров изменится во время развертывания.
бывший:
@Bean(name = "cluster1")
public KafkaTemplate<String, String> kafkaTemplatesample1() {
return new KafkaTemplate<>(devProducerFactory1());
}
@Bean(name = "cluster2")
public KafkaTemplate<String, String> kafkaTemplatesample2() {
return new KafkaTemplate<>(devProducerFactory2());
}
есть ли другое решение для этого? если вы можете поделиться примером кода, это очень ценно




Предположим, что каждый кластер можно описать следующими атрибутами:
@Getter
@Setter
public class KafkaCluster {
private String beanName;
private List<String> bootstrapServers;
}
Например, в application.properties определены два кластера:
kafka.clusters[0].bean-name=cluster1
kafka.clusters[0].bootstrap-servers=CLUSTER_1_URL
kafka.clusters[1].bean-name=cluster2
kafka.clusters[1].bootstrap-servers=CLUSTER_2_URL
Эти свойства необходимы перед созданием экземпляров bean-компонентов для регистрации определений bean-компонентов, что делает KafkaTemplate непригодным для этого случая. Вместо этого для их программной привязки используется Binder API.
@ConfigurationProperties определения бинов могут быть зарегистрированы в реализации интерфейса BeanDefinitionRegistryPostProcessor.
public class KafkaTemplateDefinitionRegistrar implements BeanDefinitionRegistryPostProcessor {
private final List<KafkaCluster> clusters;
public KafkaTemplateDefinitionRegistrar(Environment environment) {
clusters= Binder.get(environment)
.bind("kafka.clusters", Bindable.listOf(KafkaCluster.class))
.orElseThrow(IllegalStateException::new);
}
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
clusters.forEach(cluster -> {
GenericBeanDefinition beanDefinition = new GenericBeanDefinition();
beanDefinition.setBeanClass(KafkaTemplate.class);
beanDefinition.setInstanceSupplier(() -> kafkaTemplate(cluster));
registry.registerBeanDefinition(cluster.getBeanName(), beanDefinition);
});
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
}
public ProducerFactory<String, String> producerFactory(KafkaCluster kafkaCluster) {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaCluster.getBootstrapServers());
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
public KafkaTemplate<String, String> kafkaTemplate(KafkaCluster kafkaCluster) {
return new KafkaTemplate<>(producerFactory(kafkaCluster));
}
}
Класс конфигурации для bean-компонента KafkaTemplate:
@Configuration
public class KafkaTemplateDefinitionRegistrarConfiguration {
@Bean
public static KafkaTemplateDefinitionRegistrar beanDefinitionRegistrar(Environment environment) {
return new KafkaTemplateDefinitionRegistrar(environment);
}
}
Кроме того, исключите KafkaTemplateDefinitionRegistrar в основном классе, чтобы предотвратить создание bean-компонента KafkaAutoConfiguration по умолчанию. Это, вероятно, не лучший способ, потому что все остальные KafkaTemplate bean-компоненты в этом случае не создаются.
@SpringBootApplication(exclude = {KafkaAutoConfiguration.class})
Наконец, ниже приведен простой тест, который доказывает существование двух бобов KafkaAutoConfiguration.
@SpringBootTest
class SpringBootApplicationTest {
@Autowired
List<KafkaTemplate<String,String>> kafkaTemplates;
@Test
void kafkaTemplatesSizeTest() {
Assertions.assertEquals(kafkaTemplates.size(), 2);
}
}