У диспетчера нет подписчиков на канал - spring-cloud-stream-kafka

после обновления до Spring Boot 2, Reactor 3.5, kafka-binder 2.0.0 RELEASE и kafka-client 1.0.1 один из модулей не работает. Я потратил на это 5 дней и прочитал, наверное, все связанные темы, но не могу найти причину такого поведения.

Основной класс:

@Slf4j
@EnableI18N
@EnableSideBar
@ComponentScan
@SpringBootConfiguration
@EnableConfigurationProperties
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class View
{
    public static void main(String[] args)
    {
        new SpringApplicationBuilder(View.class)
            .profiles("production")
            .bannerMode(Banner.Mode.OFF)
            .headless(true)
            .application()
            .run(args);

        log.info("\nhttp://localhost:8083/\n");
    }
}

Маркер конфигурации:

@Configuration
@Profile("production")
@EnableBinding(OffersChannel.class)
class ProductionOffersConfiguration
{
}

Интерфейс канала:

public interface OffersChannel
{
    String OFFERS_OBTAIN = "offersObtain";
    String OFFERS_OBTAIN_REQUEST= "offersObtainRequest";

    @Input(OFFERS_OBTAIN)
    SubscribableChannel offersChannel();

    @Output(OFFERS_OBTAIN_REQUEST)
    MessageChannel offersRequestChannel();
}

И класс AdminUI, что важно перед обновлением зависимостей, когда этот класс был инициализирован, я смог увидеть журнал в консоли, в котором говорится, что я подписался на канал, теперь ничего не происходит:

@Slf4j
@Push
@Theme("${view.default-theme}")
@SpringUI(path = WebsiteMapping.ADMIN)
@RequiredArgsConstructor
public class AdminUI extends UI
{
    MessageChannel offersObtainRequest;
    Grid<Ad> adGrid = createAdGrid();
    private List<Ad> ads = new LinkedList<>();
    ConnectionService connectionService;

    @Override
    @SneakyThrows
    protected void init(VaadinRequest vaadinRequest)
    {
        setContent(splitPane());
        adGrid.setItems(ads);
    }

    private VerticalSplitPanel splitPane()
    {
        VerticalSplitPanel verticalSplitPanel = new VerticalSplitPanel();
        verticalSplitPanel.setSplitPosition(10, Unit.PERCENTAGE);
        verticalSplitPanel.setFirstComponent(buttonsLayout());
        verticalSplitPanel.setSecondComponent(adGrid);

        return verticalSplitPanel;
    }

    private Layout buttonsLayout()
    {
        HorizontalLayout layout = new HorizontalLayout();
        layout.setMargin(true);

        layout.addComponent(requestMoreOffersButton());

        ThemeSelectorComboBox themeSelectorComboBox = new ThemeSelectorComboBox();
        layout.addComponent(themeSelectorComboBox);
        layout.setComponentAlignment(themeSelectorComboBox, Alignment.MIDDLE_RIGHT);

        return layout;
    }

    @PostConstruct
    private void createGridProperties()
    {
        adGrid.setSizeFull();
        adGrid.addColumn(Ad::getTitle).setCaption("Title");
        adGrid.addColumn(Ad::getLocation).setCaption("Location");
        adGrid.addColumn(Ad::getHref).setCaption("Href");
    }

    @StreamListener
    public void fetchAdsFrom(@Input(OffersChannel.OFFERS_OBTAIN) Flux<Ad> fluxAd)
    {
        fluxAd.subscribe(this::displayOfferInGrid);
    }

    private void displayOfferInGrid(Ad ad)
    {
        ads.add(ad);
        adGrid.setItems(ads);
    }

    private Button requestMoreOffersButton()
    {
        return new Button("Request 10 more offers", this::requestMoreOffers);
    }

    private Button startServiceButton(String caption, String url, String message)
    {
        return new Button(caption, buttonClickedEvent -> startService(url, message));
    }

    private void startService(String url, String message)
    {
        ConnectionRequest startProviderRequest = ConnectionRequest
            .builder()
            .url(url)
            .build();

        connectionService
            .getForHtml(startProviderRequest)
            .thenAccept(serviceStarted -> Notification.show(message));
    }

    private void requestMoreOffers(Event event)

    {
        offersObtainRequest.send(new GenericMessage(new AdBroadcastRequest(ads.size(),10)));
    }

    private Grid<Ad> createAdGrid()
    {
        return new Grid<>();
    }
}

Application.yml:

spring:
    cloud:
        stream:
            bindings:
                offersObtainRequest:
                   destination: adsBroadcastRequestes
                   binder: kafka
                   group: adsBroadcastRequestsProducer
                offersObtain:
                   destination: adsBroadcast
                   binder: kafka
                   group: adsConsumer

трассировки стека:

2018-04-07 11:27:12.010 DEBUG o.s.retry.support.RetryTemplate         : Retry: count=0
2018-04-07 11:27:12.010 DEBUG o.s.integration.channel.DirectChannel   : preSend on channel 'offersObtain', message: GenericMessage [payload=byte[227], headers = {kafka_offset=12, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@11d6acc8, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=adsBroadcast, kafka_receivedTimestamp=1523093225984}]
2018-04-07 11:27:12.010 DEBUG o.s.r.backoff.ExponentialBackOffPolicy  : Sleeping for 1000
2018-04-07 11:27:13.011 DEBUG o.s.retry.support.RetryTemplate         : Checking for rethrow: count=1
2018-04-07 11:27:13.011 DEBUG o.s.retry.support.RetryTemplate         : Retry: count=1
2018-04-07 11:27:13.011 DEBUG o.s.integration.channel.DirectChannel   : preSend on channel 'offersObtain', message: GenericMessage [payload=byte[227], headers = {kafka_offset=12, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@11d6acc8, deliveryAttempt=2, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=adsBroadcast, kafka_receivedTimestamp=1523093225984}]
2018-04-07 11:27:13.011 DEBUG o.s.r.backoff.ExponentialBackOffPolicy  : Sleeping for 2000
2018-04-07 11:27:13.909 DEBUG o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=adsConsumer] Sending Heartbeat request to coordinator Kacper-PC:9092 (id: 2147483647 rack: null)
2018-04-07 11:27:14.138 DEBUG o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=adsConsumer] Received successful Heartbeat response
2018-04-07 11:27:15.011 DEBUG o.s.retry.support.RetryTemplate         : Checking for rethrow: count=2
2018-04-07 11:27:15.011 DEBUG o.s.retry.support.RetryTemplate         : Retry: count=2
2018-04-07 11:27:15.011 DEBUG o.s.integration.channel.DirectChannel   : preSend on channel 'offersObtain', message: GenericMessage [payload=byte[227], headers = {kafka_offset=12, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@11d6acc8, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=adsBroadcast, kafka_receivedTimestamp=1523093225984}]
2018-04-07 11:27:15.011 DEBUG o.s.retry.support.RetryTemplate         : Checking for rethrow: count=3
2018-04-07 11:27:15.011 DEBUG o.s.retry.support.RetryTemplate         : Retry failed last attempt: count=3
2018-04-07 11:27:15.012 DEBUG o.s.i.h.a.ErrorMessageSendingRecoverer  : Sending ErrorMessage: failedMessage: GenericMessage [payload=byte[227], headers = {kafka_offset=12, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@11d6acc8, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=adsBroadcast, kafka_receivedTimestamp=1523093225984}]

org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.offersObtain'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[227], headers = {kafka_offset=12, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@11d6acc8, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=adsBroadcast, kafka_receivedTimestamp=1523093225984}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:463) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:70) ~[spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:387) ~[spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:364) ~[spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) [spring-retry-1.2.2.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211) [spring-retry-1.2.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1001) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:981) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:932) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:801) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:689) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:138) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    ... 23 common frames omitted
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
2
0
6 826
4
Перейти к ответу Данный вопрос помечен как решенный

Ответы 4

Ответ принят как подходящий

Что ж, там много чего происходит. Я имею в виду все, что не относится к Spring-Cloud-Stream. . . усложняет отслеживание. В любом случае ваш @StreamListener не определен внутри каких-либо классов управляемой конфигурации Spring, поэтому он не обрабатывается. Вы можете переместить его в ProductionOffersConfiguration или View или в любые другие классы управляемой конфигурации Spring.

Кроме того, рассмотрите возможность прохождения этого краткого руководства https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_quick_start (5 мин), чтобы лучше понять механику Spring-Cloud-Stream.

ох, это работает! Так что с Ваадином должно быть что-то не так, большое спасибо! :)

minizibi 07.04.2018 14:28

в моем случае я объявил интерфейс весна-облако-пар с каналом @Вход

 @Input("in")
 SubscribableChannel inbound();

но я не использовал @StreamListener для обработки этой цели, поэтому в результате я получил такую ​​ошибку

MessageDeliveryException: Dispatcher has no subscribers for channel

Решение: Я удалил объявление @Input и оставил только @Output ("out").

К вашему сведению: мой ввод настроен в другом отдельном сервисе.

Вы можете использовать аннотацию @StreamListener, но с целевым параметром внутри аннотации.

@StreamListener(target = OffersChannel.OFFERS_OBTAIN)
public void fetchAdsFrom(@Payload Flux<Ad> fluxAd, @Header final String someHeader)
{
    fluxAd.subscribe(this::displayOfferInGrid);
}

У меня сработала аналогичная настройка, как у вас (вместо Flux я получаю JSON - String).

Возможно, вам придется написать свой собственный конвертер для сообщения Flux, я не уверен.

Я потратил часы, пытаясь определить реальную причину этих ошибок «Диспетчер не имеет подписчиков».

В моем случае проблема заключалась в том, что Kafka сохранял некоторый исторический идентификатор узла (1001), в то время как все темы использовали 1003 в качестве идентификатора лидера, и поэтому сообщения не могли быть отправлены. Пришлось стереть все это вручную, чтобы все заработало.

Другие вопросы по теме