Поскольку Spring Cloud Stream не имеет аннотации для отправки нового сообщения в поток (@SendTo работает только при объявлении @StreamListener), я попытался использовать для этой цели аннотацию Spring Integration, то есть @Publisher.
Поскольку @Publisher принимает канал, а аннотации @EnableBinding Spring Cloud Stream могут связывать выходной канал с помощью аннотации @Output, я попытался смешать их следующим образом:
@EnableBinding(MessageSource.class)
@Service
public class ExampleService {
@Publisher(channel = MessageSource.OUTPUT)
public String sendMessage(String message){
return message;
}
}
Также я объявил аннотацию @EnablePublisher в файле конфигурации:
@SpringBootApplication
@EnablePublisher("")
public class ExampleApplication {
public static void main(String[] args){
SpringApplication.run(ExampleApplication.class, args);
}
}
Мой тест:
@RunWith(SpringRunner.class)
@SpringBootTest
public class ExampleServiceTest {
@Autowired
private ExampleService exampleService;
@Test
public void testQueue(){
exampleService.queue("Hi!");
System.out.println("Ready!");
}
}
Но я получаю следующую ошибку:
org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'com.example.ExampleServiceTest': Unsatisfied dependency expressed through field 'exampleService'; nested exception is
org.springframework.beans.factory.BeanNotOfRequiredTypeException: Bean named 'exampleService' is expected to be of type 'com.example.ExampleService' but was actually of type 'com.sun.proxy.$Proxy86'
Проблема в том, что bean-компонент ExampleService не может быть введен.
Кто-нибудь знает, как я могу заставить эту работу?
Спасибо!
Поскольку вы используете аннотацию @Publisher в вашем ExampleService, она проксируется для этого материала публикации.
Единственный способ решить эту проблему - открыть интерфейс для вашего ExampleService и уже внедрить его в свой тестовый класс:
public interface ExampleServiceInterface {
String sendMessage(String message);
}
...
public class ExampleService implements ExampleServiceInterface {
...
@Autowired
private ExampleServiceInterface exampleService;
С другой стороны, похоже, что ваш ExampleService.sendMessage() ничего не делает с сообщением, поэтому вы можете подумать об использовании @MessagingGateway на каком-либо интерфейсе вместо этого: https://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#gateway
Что ж, есть способ, например, объявить свой собственный статический bean-компонент PublisherAnnotationBeanPostProcessor с помощью setProxyTargetClass(true) вместо @EnablePublisher. Но я действительно согласен с вашим, что нам нужно добавить такой атрибут proxyTargetClass в аннотацию @EnablePublsiher. Пожалуйста, поднимите вопрос, и ваш вклад приветствуется: github.com/spring-projects/spring-integration/issues!
Вопрос по теме: github.com/spring-projects/spring-integration/issues/2695
Привет, я поднял здесь проблему: github.com/spring-projects/spring-integration/issues/2695, но я застрял с вашим временным решением. Могу ли я повторно использовать реализацию PublisherAnnotationBeanPostProcessor и изменить только этот флаг? Я могу добавить еще один вопрос в Stack overflow, если хотите.
Что ж, это была моя точка зрения. Вы создаете компонент для PublisherAnnotationBeanPostProcessor и устанавливаете его setProxyTargetClass() на true.
Пока аннотация не исправлена, я добавил работу над здесь.
Почему бы просто не отправить сообщение в поток вручную, как показано ниже.
@Component
@Configuration
@EnableBinding(Processor.class)
public class Sender {
@Autowired
private Processor processor;
public void send(String message) {
processor.output().send(MessageBuilder.withPayload(message).build());
}
}
Можете проверить через тестер.
@SpringBootTest
public class SenderTest {
@Autowired
private MessageCollector messageCollector;
@Autowired
private Processor processor;
@Autowired
private Sender sender;
@SuppressWarnings("unchecked")
@Test
public void testSend() throws Exception{
sender.send("Hi!");
Message<String> message = (Message<String>) this.messageCollector.forChannel(this.processor.output()).poll(1, TimeUnit.SECONDS);
String messageData = message.getPayload().toString();
System.out.println(messageData);
}
}
Вы должны увидеть "Привет!" в консоли.
Несколько вещей: 1) Но, например, при использовании аннотации Cacheable в методе службы, которая также создает прокси из службы, но я могу внедрить эту прокси-службу без каких-либо проблем с объявлением EnableCaching (proxyTargetClass = true). Может есть что-то вроде EnableIntegration (proxyTargetClass = true). Очень неприятно дублировать все классы обслуживания с помощью Publisher 2) Я читал о MessageGateway, и было интересно иметь инъекционный шлюз, но мне нравится идея Publisher, потому что она работает как аспект. В противном случае я должен явно вызвать gateway.sendMessage ()