Как настроить два экземпляра Kafka StreamsBuilderFactoryBean при весенней загрузке

Используя spring-boot-2.1.3, spring-kafka-2.2.4, я хочу иметь две конфигурации потоков (например, иметь разные application.id или подключаться к другому кластеру и т. д.). Итак, я определил конфигурацию первого потока в значительной степени в соответствии с документами, затем добавил второй с другим именем и второй StreamsBuilderFactoryBean (тоже с другим именем):

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "myappId1000");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    //...
    return new KafkaStreamsConfiguration(props);
}

@Bean(name = "myKappConfig")
public KafkaStreamsConfiguration myKafkaAppIdConfiguration() {
    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "myappId9999");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    //...
    return new KafkaStreamsConfiguration(props);
}

@Bean(name = "myKappConfigStreamBuilder")
public StreamsBuilderFactoryBean myAppStreamBuilder(
        @Qualifier("myKappConfig") KafkaStreamsConfiguration myKafkaAppIdConfiguration) {
    return new StreamsBuilderFactoryBean(myKafkaAppIdConfiguration);
}

Однако, когда я пытаюсь запустить приложение, я получаю:

Parameter 0 of method kafkaStreamsFactoryBeanConfigurer in org.springframework.boot.autoconfigure.kafka.KafkaStreamsAnnotationDrivenConfiguration required a single bean, but 2 were found: - &defaultKafkaStreamsBuilder: defined by method 'defaultKafkaStreamsBuilder' in class path resource [org/springframework/kafka/annotation/KafkaStreamsDefaultConfiguration.class] - &myKappConfigStreamBuilder: defined by method 'myAppStreamBuilder' in class path resource [com/teramedica/kafakaex001web/KafkaConfig.class]

потому что код в автоконфигурации весенней загрузки делает:

@Bean
public KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer(
        StreamsBuilderFactoryBean factoryBean) {
    return new KafkaStreamsFactoryBeanConfigurer(this.properties, factoryBean);
}

Если не считать полной замены KafkaStreamsAnnotationDrivenConfiguration, как мне определить более одного StreamsBuilderFactoryBean. Или, наоборот, как я могу изменить свойства для данного потока?

как я вижу, вы настроили свой StreamsBuilderFactoryBean, а один был автонастроен внутри KafkaStreamsDefaultConfiguration. в случае, если вам нужно определить два компоновщика потоков самостоятельно, может быть, вам не нужен bean-компонент из автоконфигурации? так что просто исключите такую ​​автонастройку или удалите @EnableKafkaStreams

Vasyl Sarzhynskyi 05.03.2019 21:18

Вам просто нужно отметить один как @Primary; Однако загрузка, вероятно, должна быть немного более снисходительной.

Gary Russell 05.03.2019 21:20

Тот, который должен быть помечен @Primary, определен в KafkaStreamsDefaultConfiguration и является кодом Spring, а не моим, поэтому я не могу его пометить (по крайней мере, не с помощью аннотации к самому классу). В качестве альтернативы можно квалифицировать StreamsBuilderFactoryBean, используемый в KafkaStreamsAnnotationDrivenConfiguration#kafkaStreamsFactor‌​yBeanConfigurer. Но опять же, не мой код. Я мог бы удалить EnableKafkaStreams, но тогда я дублирую существующий код, когда на самом деле просто хочу добавить. Похоже, я должен иметь возможность определить более одной конфигурации, не начиная с нуля.

mconner 05.03.2019 22:49

Извините - я не получаю уведомления о комментариях на ваш вопрос, только мой ответ, поэтому я этого не видел. Смотрите мой ответ там; Я согласен, что ботинок не должен блевать в этой ситуации.

Gary Russell 07.03.2019 15:45
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Версия Java на основе версии загрузки
Версия Java на основе версии загрузки
Если вы зайдете на официальный сайт Spring Boot , там представлен start.spring.io , который упрощает создание проектов Spring Boot, как показано ниже.
Документирование API с помощью Swagger на Springboot
Документирование API с помощью Swagger на Springboot
В предыдущей статье мы уже узнали, как создать Rest API с помощью Springboot и MySql .
3
4
1 165
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Отметьте одну фабричную фасоль знаком @Primary.

Есть ли способ пометить bean-компонент @Primary, определенный в существующем коде? Я хотел бы отметить KafkaStreamsDefaultConfiguration.defaultKafkaStreamsBuilder (весенний код) как основной. Хотя я думаю, что мог бы дублировать то, что он делает, пометить это как первичный, а затем добавить еще один. Тогда исходный компонент просто не будет использоваться, что немного неуклюже. Мне интересно, может быть, в конце концов, было бы чище отключить автоматическую настройку и сделать это самостоятельно, как предложил @VasiliySarzhynskyi.

mconner 07.03.2019 15:36

Вам нужно будет переопределить их bean. >bean would just be unused нет, его вообще не будет, его заменит ваше определение бина. Я предлагаю вам создать проблему против Boot, чтобы попросить их более элегантно обработать ваш вариант использования.

Gary Russell 07.03.2019 15:43

> Вам нужно будет переопределить их bean-компонент. > bean будет просто неиспользованным, нет, его вообще не будет, он будет заменен вашим определением bean. Мне также нужно удалить EnableKafkaStreams, так как нет условного выражения. Но я вижу, что это единственная фасоль там, в любом случае,

mconner 07.03.2019 16:03

Просто столкнулся с той же проблемой и попытался исключить KafkaStreamsAnnotationDrivenConfiguration, но потерпел неудачу с сообщением «Следующие классы не могут быть исключены, поскольку они не являются классами автоматической настройки: - org.springframework.boot.autoconfigure.kafka. ожидал? KafkaStreamsAnnotationDrivenConfiguration относится к области пакета.

Vassilis 26.03.2019 13:57

Не задавайте новые вопросы в комментариях; если исключение не работает, это может быть ошибка при загрузке; задайте новый вопрос с более подробной информацией или, если вы считаете, что это ошибка, откройте проблему с Boot.

Gary Russell 26.03.2019 14:07

@Gary Russell Не могли бы вы рассказать о решении? В моем случае это не работает. Я переопределяю DEFAULT_STREAMS_BUILDER_BEAN_NAME как @Primary и все равно получаю Parameter 0 of method kafkaStreamsFactoryBeanConfigurer in org.springframework.boot.autoconfigure.kafka.KafkaStreamsAnn‌​otationDrivenConfigu‌​ration required a single bean, but 2 were found - &defaultKafkaStreamsBuilder: defined by method 'defaultKafkaStreamsBuilder' in class path resource [com/.../MyConfig.class] - &snapshotKafkaStreamsBuilder: defined by method 'snapshotKafkaStreamsBuilder' in class path resource [com/../MyConfig.class]

Vassilis 26.03.2019 14:42

Пожалуйста, задайте новый вопрос и покажите свою конфигурацию; код и т. д. слишком сложно читать в комментариях.

Gary Russell 26.03.2019 15:23

Извините за беспорядок.. вот он: stackoverflow.com/questions/55359604/…

Vassilis 26.03.2019 15:30

Другие вопросы по теме