Я использую Spring Cloud Stream с Spring Boot. Мое приложение очень простое:
ExampleService.class:
@EnableBinding(Processor1.class)
@Service
public class ExampleService {
@StreamListener(Processor1.INPUT)
@SendTo(Processor1.OUTPUT)
public String dequeue(String message){
System.out.println("New message: " + message);
return message;
}
@SendTo(Processor1.OUTPUT)
public String queue(String message){
return message;
}
}
Procesor1.class:
public interface Processor1 {
String INPUT = "input1";
String OUTPUT = "output1";
@Input(Processor1.INPUT)
SubscribableChannel input1();
@Output(Processor1.OUTPUT)
MessageChannel output1();
}
application.properties:
spring.cloud.stream.bindings.input1.destination=test_input
spring.cloud.stream.bindings.input1.group=test_group
spring.cloud.stream.bindings.input1.binder=binder1
spring.cloud.stream.bindings.output1.destination=test_output
spring.cloud.stream.bindings.output1.binder=binder1
spring.cloud.stream.binders.binder1.type=rabbit
spring.cloud.stream.binders.binder1.environment.spring.rabbitmq.host = localhost
Сценарии:
1) Когда я помещаю сообщение в очередь test_input.test_group, сообщение печатается правильно и отправляется на обмен test_output. Так что ExampleService :: dequeue работает хорошо.
2) Когда я вызываю метод ExampleService :: queue (извне класса, в тесте), сообщение никогда не отправляется в обмен test_output.
Я работаю с Spring Boot 2.0.6.RELEASE и Spring Cloud Stream 2.0.2.RELEASE.
Кто-нибудь знает, почему сценарий 2) не работает? Заранее спасибо.
Что заставляет вас поверить, что @SendTo поддерживается сам по себе? @SendTo - это дополнительная аннотация, используемая многими проектами, не только Spring Cloud Stream; насколько я знаю, нет ничего, что будет искать его самостоятельно.
Попробуйте вместо этого аннотацию Spring Integration @Publisher (с @EnablePublisher).
РЕДАКТИРОВАТЬ
Чтобы принудительно использовать CGLIB вместо прокси-сервера JDK, вы можете сделать это ...
@Bean
public static BeanFactoryPostProcessor bfpp() {
return bf -> {
bf.getBean(IntegrationContextUtils.PUBLISHER_ANNOTATION_POSTPROCESSOR_NAME,
PublisherAnnotationBeanPostProcessor.class).setProxyTargetClass(true);
};
}
Имеет ОЧЕНЬ смысл иметь аннотацию Spring Cloud Stream, делающую это, почему бы охватывать только случаи 1) прослушивание сообщения из потока и 2) перенаправление сообщения из одного потока в другой, а не случай создания сообщения в ручей?
Возможно, но зачем дублировать существующие возможности; в конце концов, приложения Spring-Cloud-Stream на основе MessageChannel основаны на Spring Integration Foundation. Большинство проектов Spring основано на существующем фундаменте. Опять же, если есть какая-то документация, которая подводит вас к выводу, что она должна работать, откройте проблему GitHub, чтобы либо исправить документацию, либо убедительно обосновать необходимость ее поддержки.
К вашему сведению, используя Publisher, это головная боль, потому что он не поддерживает прокси, такие как StreamListener (stackoverflow.com/questions/54150939/…)
Я исправил работу, чтобы избежать ранней инициализации bean-компонента.
У меня такая же проблема