Spring 6: Spring Cloud Stream Kafka — замена @EnableBinding

Я читал «Весенние микросервисы в действии (2021)», потому что хотел освежить в памяти микросервисы.

Теперь с Spring Boot 3 кое-что изменилось. В книге был представлен простой пример того, как отправлять сообщения в тему и как использовать сообщения в тему.

Проблема в том, что представленные примеры просто не работают с Spring Boot 3. Отправка сообщений из проекта Spring Boot 2 работает. Базовый проект можно найти здесь:

https://github.com/ihuaylupo/manning-smia/tree/master/chapter10

Пример 1 (организация-сервис):

Рассмотрим этот конфиг:

spring.cloud.stream.bindings.output.destination=orgChangeTopic
spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.kafka.binder.zkNodes=kafka #kafka is used as a network alias in docker-compose
spring.cloud.stream.kafka.binder.brokers=kafka

И этот компонент (класс), который может быть введен в службу в этом проекте

@Component
public class SimpleSourceBean {
    private Source source;

    private static final Logger logger = LoggerFactory.getLogger(SimpleSourceBean.class);

    @Autowired
    public SimpleSourceBean(Source source){
        this.source = source;
    }

    public void publishOrganizationChange(String action, String organizationId){
       logger.debug("Sending Kafka message {} for Organization Id: {}", action, organizationId);
        OrganizationChangeModel change =  new OrganizationChangeModel(
                OrganizationChangeModel.class.getTypeName(),
                action,
                organizationId,
                UserContext.getCorrelationId());

        source.output().send(MessageBuilder.withPayload(change).build());
    }
}

Этот код отправляет сообщение в тему (назначение) orgChangeTopic. Насколько я понимаю, при первом создании сообщения тема создается.

Вопрос 1: Как мне сделать эту Spring Boot 3? Config-Wise и "Code-Wise"?


Пример 2:

Рассмотрим этот конфиг:

spring.cloud.stream.bindings.input.destination=orgChangeTopic
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.bindings.input.group=licensingGroup
spring.cloud.stream.kafka.binder.zkNodes=kafka
spring.cloud.stream.kafka.binder.brokers=kafka

И этот код:

@SpringBootApplication
@RefreshScope
@EnableDiscoveryClient
@EnableFeignClients
@EnableEurekaClient
@EnableBinding(Sink.class)
public class LicenseServiceApplication {


    public static void main(String[] args) {
        SpringApplication.run(LicenseServiceApplication.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void loggerSink(OrganizationChangeModel orgChange) {
        log.info("Received an {} event for organization id {}",
                orgChange.getAction(), orgChange.getOrganizationId());
    }

Предполагается, что этот метод должен срабатывать всякий раз, когда сообщение запускается в orgChangeTopic, мы хотим, чтобы метод loggerSink срабатывал.

Как мне это сделать в Spring Boot 3?

0
0
53
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

В Spring Cloud Stream 4.0.0 (версия, используемая, если вы используете Boot 3) удалены некоторые вещи, такие как EnableBinding,StreamListener, и т. д. Мы устарели от них раньше в 3.x и, наконец, удалили их в версии 4.0.0. Модель программирования на основе аннотаций удалена в пользу стиля функционального программирования, включенного в проект Spring Cloud Function. По сути, вы выражаете свою бизнес-логику как java.util.function.Funciton|Consumer|Supplier и т. д. для процессора, приемника и источника соответственно. Для ситуаций со специальными источниками, как в вашем первом примере, Spring Cloud Stream предоставляет StreamBridge API для пользовательских отправок.

Ваш пример №1 можно переписать так:

@Component
public class SimpleSourceBean {
    
    @Autowired
    StreamBridge streamBridge

    public void publishOrganizationChange(String action, String organizationId){
       logger.debug("Sending Kafka message {} for Organization Id: {}", action, organizationId);
        OrganizationChangeModel change =  new OrganizationChangeModel(
                OrganizationChangeModel.class.getTypeName(),
                action,
                organizationId,
                UserContext.getCorrelationId());

        streamBridge.send("output-out-0", MessageBuilder.withPayload(change).build());
    }
}

Конфигурация

spring.cloud.stream.bindings.output-out-0.destination=orgChangeTopic
spring.cloud.stream.kafka.binder.brokers=kafka

Просто чтобы вы знали, вам больше не нужно это свойство zkNode. Ни тип контента, поскольку фреймворк автоматически преобразует его для вас.

StreamBridge send принимает имя привязки и полезную нагрузку. Имя привязки может быть любым, но из соображений согласованности мы использовали здесь output-out-0. Прочтите справочные документы, чтобы узнать больше о причинах этого имени привязки.

Если у вас есть простой источник, который работает по таймеру, вы можете выразить это просто как поставщик, как показано ниже (вместо использования StreamBrdige).

@Bean
public Supplier<OrganizationChangeModel> ouput() {
  return () -> {
    // return the payload
  };
}

spring.cloud.function.definition=output
spring.cloud.bindings.output-out-0.destination=...

Пример #2

@Bean
public Consumer<OrganizationChangeModel> loggerSink() {
 return model -> {        
   log.info("Received an {} event for organization id {}",
                orgChange.getAction(), orgChange.getOrganizationId());
   };
}

Конфигурация:

spring.cloud.function.definition=loggerSink
spring.cloud.stream.bindings.loggerSink-in-0.destination=orgChangeTopic
spring.cloud.stream.bindings.loggerSinnk-in-0.group=licensingGroup
spring.cloud.stream.kafka.binder.brokers=kafka

Если вы хотите, чтобы имена привязки ввода/вывода были именно input или output, а не с in-0, out-0 и т. д., есть способы сделать это. Подробности для этого находятся в справочных документах.

Ты парень с репозиторием gitlab, ха-ха. Я провел хорошее время, пытаясь разобраться с репозиториями потокового облака. Большое спасибо!

Yll 19.01.2023 18:04

Другие вопросы по теме