Как создать N количество шаблонов Kafka динамически во время выполнения - весенняя загрузка

У меня есть приложение весенней загрузки, которому необходимо подключить N кластеров Kafka. на основе некоторого условия шаблон Kafka необходимо переключить и отправить сообщение

Я видел некоторые решения для создания отдельных bean-компонентов шаблона Kafka, но в моем случае количество кластеров изменится во время развертывания.

бывший:

@Bean(name = "cluster1")
public KafkaTemplate<String, String> kafkaTemplatesample1() {
    return new KafkaTemplate<>(devProducerFactory1());
}

@Bean(name = "cluster2")
public KafkaTemplate<String, String> kafkaTemplatesample2() {
    return new KafkaTemplate<>(devProducerFactory2());
}

есть ли другое решение для этого? если вы можете поделиться примером кода, это очень ценно

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
0
164
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Предположим, что каждый кластер можно описать следующими атрибутами:

@Getter
@Setter
public class KafkaCluster {
  private String beanName;
  private List<String> bootstrapServers;
}

Например, в application.properties определены два кластера:

kafka.clusters[0].bean-name=cluster1
kafka.clusters[0].bootstrap-servers=CLUSTER_1_URL
kafka.clusters[1].bean-name=cluster2
kafka.clusters[1].bootstrap-servers=CLUSTER_2_URL

Эти свойства необходимы перед созданием экземпляров bean-компонентов для регистрации определений bean-компонентов, что делает KafkaTemplate непригодным для этого случая. Вместо этого для их программной привязки используется Binder API.

@ConfigurationProperties определения бинов могут быть зарегистрированы в реализации интерфейса BeanDefinitionRegistryPostProcessor.

public class KafkaTemplateDefinitionRegistrar implements BeanDefinitionRegistryPostProcessor {

  private final List<KafkaCluster> clusters;

  public KafkaTemplateDefinitionRegistrar(Environment environment) {
    clusters= Binder.get(environment)
        .bind("kafka.clusters", Bindable.listOf(KafkaCluster.class))
        .orElseThrow(IllegalStateException::new);
  }

  @Override
  public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
    clusters.forEach(cluster -> {
      GenericBeanDefinition beanDefinition = new GenericBeanDefinition();
      beanDefinition.setBeanClass(KafkaTemplate.class);
      beanDefinition.setInstanceSupplier(() -> kafkaTemplate(cluster));
      registry.registerBeanDefinition(cluster.getBeanName(), beanDefinition);
    });
  }

  @Override
  public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
  }

  public ProducerFactory<String, String> producerFactory(KafkaCluster kafkaCluster) {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(
        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
        kafkaCluster.getBootstrapServers());
    configProps.put(
        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
        StringSerializer.class);
    configProps.put(
        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        StringSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
  }

  public KafkaTemplate<String, String> kafkaTemplate(KafkaCluster kafkaCluster) {
    return new KafkaTemplate<>(producerFactory(kafkaCluster));
  }
}

Класс конфигурации для bean-компонента KafkaTemplate:

@Configuration
public class KafkaTemplateDefinitionRegistrarConfiguration {
  @Bean
  public static KafkaTemplateDefinitionRegistrar beanDefinitionRegistrar(Environment environment) {
    return new KafkaTemplateDefinitionRegistrar(environment);
  }
}

Кроме того, исключите KafkaTemplateDefinitionRegistrar в основном классе, чтобы предотвратить создание bean-компонента KafkaAutoConfiguration по умолчанию. Это, вероятно, не лучший способ, потому что все остальные KafkaTemplate bean-компоненты в этом случае не создаются.

@SpringBootApplication(exclude = {KafkaAutoConfiguration.class})

Наконец, ниже приведен простой тест, который доказывает существование двух бобов KafkaAutoConfiguration.

@SpringBootTest
class SpringBootApplicationTest {
    @Autowired
    List<KafkaTemplate<String,String>> kafkaTemplates;
    
    @Test
    void kafkaTemplatesSizeTest() {
        Assertions.assertEquals(kafkaTemplates.size(), 2);
    }
}

Для справки: Создайте N бинов с помощью BeanDefinitionRegistryPostProcessor , Spring Boot Динамическое создание бинов из файла свойств

Другие вопросы по теме