Я читал «Весенние микросервисы в действии (2021)», потому что хотел освежить в памяти микросервисы.
Теперь с Spring Boot 3 кое-что изменилось. В книге был представлен простой пример того, как отправлять сообщения в тему и как использовать сообщения в тему.
Проблема в том, что представленные примеры просто не работают с Spring Boot 3. Отправка сообщений из проекта Spring Boot 2 работает. Базовый проект можно найти здесь:
https://github.com/ihuaylupo/manning-smia/tree/master/chapter10
Пример 1 (организация-сервис):
Рассмотрим этот конфиг:
spring.cloud.stream.bindings.output.destination=orgChangeTopic
spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.kafka.binder.zkNodes=kafka #kafka is used as a network alias in docker-compose
spring.cloud.stream.kafka.binder.brokers=kafka
И этот компонент (класс), который может быть введен в службу в этом проекте
@Component
public class SimpleSourceBean {
private Source source;
private static final Logger logger = LoggerFactory.getLogger(SimpleSourceBean.class);
@Autowired
public SimpleSourceBean(Source source){
this.source = source;
}
public void publishOrganizationChange(String action, String organizationId){
logger.debug("Sending Kafka message {} for Organization Id: {}", action, organizationId);
OrganizationChangeModel change = new OrganizationChangeModel(
OrganizationChangeModel.class.getTypeName(),
action,
organizationId,
UserContext.getCorrelationId());
source.output().send(MessageBuilder.withPayload(change).build());
}
}
Этот код отправляет сообщение в тему (назначение) orgChangeTopic. Насколько я понимаю, при первом создании сообщения тема создается.
Вопрос 1: Как мне сделать эту Spring Boot 3? Config-Wise и "Code-Wise"?
Пример 2:
Рассмотрим этот конфиг:
spring.cloud.stream.bindings.input.destination=orgChangeTopic
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.bindings.input.group=licensingGroup
spring.cloud.stream.kafka.binder.zkNodes=kafka
spring.cloud.stream.kafka.binder.brokers=kafka
И этот код:
@SpringBootApplication
@RefreshScope
@EnableDiscoveryClient
@EnableFeignClients
@EnableEurekaClient
@EnableBinding(Sink.class)
public class LicenseServiceApplication {
public static void main(String[] args) {
SpringApplication.run(LicenseServiceApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void loggerSink(OrganizationChangeModel orgChange) {
log.info("Received an {} event for organization id {}",
orgChange.getAction(), orgChange.getOrganizationId());
}
Предполагается, что этот метод должен срабатывать всякий раз, когда сообщение запускается в orgChangeTopic, мы хотим, чтобы метод loggerSink срабатывал.
Как мне это сделать в Spring Boot 3?
В Spring Cloud Stream 4.0.0 (версия, используемая, если вы используете Boot 3) удалены некоторые вещи, такие как EnableBinding,StreamListener, и т. д. Мы устарели от них раньше в 3.x и, наконец, удалили их в версии 4.0.0. Модель программирования на основе аннотаций удалена в пользу стиля функционального программирования, включенного в проект Spring Cloud Function. По сути, вы выражаете свою бизнес-логику как java.util.function.Funciton|Consumer|Supplier и т. д. для процессора, приемника и источника соответственно. Для ситуаций со специальными источниками, как в вашем первом примере, Spring Cloud Stream предоставляет StreamBridge API для пользовательских отправок.
Ваш пример №1 можно переписать так:
@Component
public class SimpleSourceBean {
@Autowired
StreamBridge streamBridge
public void publishOrganizationChange(String action, String organizationId){
logger.debug("Sending Kafka message {} for Organization Id: {}", action, organizationId);
OrganizationChangeModel change = new OrganizationChangeModel(
OrganizationChangeModel.class.getTypeName(),
action,
organizationId,
UserContext.getCorrelationId());
streamBridge.send("output-out-0", MessageBuilder.withPayload(change).build());
}
}
Конфигурация
spring.cloud.stream.bindings.output-out-0.destination=orgChangeTopic
spring.cloud.stream.kafka.binder.brokers=kafka
Просто чтобы вы знали, вам больше не нужно это свойство zkNode. Ни тип контента, поскольку фреймворк автоматически преобразует его для вас.
StreamBridge send принимает имя привязки и полезную нагрузку. Имя привязки может быть любым, но из соображений согласованности мы использовали здесь output-out-0. Прочтите справочные документы, чтобы узнать больше о причинах этого имени привязки.
Если у вас есть простой источник, который работает по таймеру, вы можете выразить это просто как поставщик, как показано ниже (вместо использования StreamBrdige).
@Bean
public Supplier<OrganizationChangeModel> ouput() {
return () -> {
// return the payload
};
}
spring.cloud.function.definition=output
spring.cloud.bindings.output-out-0.destination=...
Пример #2
@Bean
public Consumer<OrganizationChangeModel> loggerSink() {
return model -> {
log.info("Received an {} event for organization id {}",
orgChange.getAction(), orgChange.getOrganizationId());
};
}
Конфигурация:
spring.cloud.function.definition=loggerSink
spring.cloud.stream.bindings.loggerSink-in-0.destination=orgChangeTopic
spring.cloud.stream.bindings.loggerSinnk-in-0.group=licensingGroup
spring.cloud.stream.kafka.binder.brokers=kafka
Если вы хотите, чтобы имена привязки ввода/вывода были именно input или output, а не с in-0, out-0 и т. д., есть способы сделать это. Подробности для этого находятся в справочных документах.
Ты парень с репозиторием gitlab, ха-ха. Я провел хорошее время, пытаясь разобраться с репозиториями потокового облака. Большое спасибо!