Как вы можете не потреблять сообщения в определенной среде?

Я использовал spring-cloud-starter-stream-kafka 2.1.2. Потребители для соответствующей темы были написаны и развернуты в тестовой среде. Затем я использую @conditional для исключения локальных потребителей, и всегда будет появляться следующая ошибка:

@Component
@Slf4j
@Conditional(NotWindowsCondition.class)
public class TestListener {
    @StreamListener(TestSink.INPUT)
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application-1.input'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[38], headers = {deliveryAttempt=3, X-B3-ParentSpanId=41e8fe9d35f75986, kafka_timestampType=CREATE_TIME, kafka_receivedTopic=user_huanxin_register, spanTraceId=1c2878998d138740, spanId=34cbd18294b7858d, spanParentSpanId=41e8fe9d35f75986, nativeHeaders = {spanTraceId=[1c2878998d138740], spanId=[8654b6a6c4aa8517], spanParentSpanId=[3df3fa4e69033f5e], spanSampled=[0], X-B3-TraceId=[1c2878998d138740], X-B3-SpanId=[8654b6a6c4aa8517], X-B3-ParentSpanId=[3df3fa4e69033f5e], X-B3-Sampled=[0]}, kafka_offset=74, X-B3-SpanId=34cbd18294b7858d, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@16c73ae4, X-B3-Sampled=0, X-B3-TraceId=1c2878998d138740, id=201aaf99-082b-864f-8129-de7961e03972, kafka_receivedPartitionId=0, spanSampled=0, kafka_receivedTimestamp=1559041573040, contentType=application/json, timestamp=1559041580558}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)

Я пытался найти конфигурацию, чтобы указать потребление или не потребление, но не нашел

Желаемые результаты: позволяет определенной среде использовать или не использовать без сообщений об ошибках.

Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
0
0
344
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Установите для свойства auto-startup значение false.

@SpringBootApplication
@EnableBinding(Sink.class)
public class So56341052Application {

    public static void main(String[] args) {
        SpringApplication.run(So56341052Application.class, args);
    }

}

@Component
@ConditionalOnProperty("is.enabled")
class Listener {

    @StreamListener(Sink.INPUT)
    public void listen(String in) {

    }

}
is.enabled=false

spring.cloud.stream.bindings.input.consumer.auto-startup=${is.enabled}

Это установка для одной темы. Есть ли действительная конфигурация для всех тем? Я пытался использовать spring.cloud.stream.default.bindings или spring.cloud.stream.bindings.default, но, похоже, безрезультатно.

hero-zh 29.05.2019 05:07
spring.cloud.stream.default.consumer.auto-startup=false У меня работает нормально.
Gary Russell 29.05.2019 15:26

Другие вопросы по теме