Spring Cloud Streams — несколько динамических направлений для источников и приемников

В моей системе был запрос на изменение, которая в настоящее время прослушивает несколько каналов и также отправляет сообщения на несколько каналов, но теперь имена получателей будут в базе данных и изменятся в любое время. Мне трудно поверить, что я первый, кто столкнулся с этим, но я вижу ограниченную информацию.

Все, что я нашел, это эти 2...
Назначение динамического приемника: https://github.com/spring-cloud-stream-app-starters/router/tree/master/spring-cloud-starter-stream-sink-router, но как это будет работать для активного прослушивания этих каналов, как это делает @StreamListener?

Цели динамического источника: https://github.com/spring-cloud/spring-cloud-stream-samples/blob/master/source-samples/dynamic-destination-source/, который делает это

@Bean
    @ServiceActivator(inputChannel = "sourceChannel")
    public ExpressionEvaluatingRouter router() {
        ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.id"));
        router.setDefaultOutputChannelName("default-output");
        router.setChannelResolver(resolver);
        return router;
    }

Но что это за "payload.id"? И где там указаны пункты назначения??

Если это пользовательское приложение, вы можете использовать стратегии, использованные в этом примере, в качестве схемы решения. github.com/spring-cloud/spring-cloud-stream-samples/tree/mas‌​ter/…

sobychacko 07.03.2019 01:48

Я видел этот пример, но не увидел, как здесь передается пункт назначения. @Bean @ServiceActivator(inputChannel = "sourceChannel") я бы")); router.setDefaultOutputChannelName («выход по умолчанию»); router.setChannelResolver(преобразователь); обратный маршрутизатор; }

CCC 07.03.2019 02:26

Не размещайте код в комментариях. Это трудно читать. Вместо этого отредактируйте вопрос и прокомментируйте ответ, который вы сделали.

Gary Russell 07.03.2019 05:38

У вас есть опыт с чем-то подобным?? Или знаете кого-то, кто делает?

CCC 13.03.2019 16:47
4
4
5 130
1

Ответы 1

Не стесняйтесь улучшать мой ответ, я надеюсь, что это поможет другим.

Теперь код (он работал в моем отладчике). Это пример, не готовый к производству!

Вот как отправить сообщение динамическому адресату

import org.springframework.messaging.MessageChannel;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.binding.BinderAwareChannelResolver;


@Service
@EnableBinding
public class MessageSenderService {

    @Autowired
    private BinderAwareChannelResolver resolver;

    @Transactional
    public void sendMessage(final String topicName, final String payload) {
        final MessageChannel messageChannel = resolver.resolveDestination(topicName);
        messageChannel.send(new GenericMessage<String>(payload));
    }
}

И конфигурация для Spring Cloud Stream.

spring:
  cloud:
    stream:
      dynamicDestinations: output.topic.1,output.topic2,output.topic.3

я нашел здесь https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/index.html#dynamicdestination Весной будет работать Cloud Stream версии 2+. Я использую 2.1.2

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
    <version>2.1.2.RELEASE</version>
</dependency>

Вот как можно использовать сообщение из динамического адресата

https://stackoverflow.com/a/56148190/4587961

Конфигурация

spring:
  cloud:
    stream:
      default:
        consumer:
          concurrency: 2
          partitioned: true
      bindings:
        # inputs
        input:
          group: application_name_group
          destination: topic-1,topic-2
          content-type: application/json;charset=UTF-8

Потребитель Java.

@Component
@EnableBinding(Sink.class)
public class CommonConsumer {

    private final static Logger logger = LoggerFactory.getLogger(CommonConsumer.class);

    @StreamListener(target = Sink.INPUT)
    public void consumeMessage(final Message<Object> message) {
        logger.info("Received a message: \nmessage:\n{}", message.getPayload());

        final String topic = message.getHeaders().get("kafka_receivedTopic");

        // Here I define logic which handles messages depending on message headers and topic.
        // In my case I have configuration which forwards these messages to webhooks, so I need to have mapping topic name -> webhook URI.
    }
}

но как он получает сообщения от динамических адресатов, если адресаты уже указаны в свойстве input.destinations?

CCC 13.06.2019 18:52

Под динамичный я имел в виду, что вы не создаете интерфейсы с @EnableBinding. Если вы хотите пойти дальше, изучите, как инициализировать Spring bean-компоненты канала сообщений. Имейте в виду, kafka может не разрешать создавать темы динамически (это зависит от конфигурации kafka)

Yan Khonski 13.06.2019 23:03

Другие вопросы по теме