Spring облачный поток Kafka Не удалось создать привязку производителя

У меня есть проект с весенней загрузкой, в котором есть зависимости spring-cloud-starter-stream-kafka и spring-cloud-stream версии 2.1.2. Он выступает в роли продюсера. При запуске постоянно предупреждает:

2019-05-06 15:00:25.136 WARN 28116 --- [| adminclient-2] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-2] Connection to node -1 could not be established. Broker may not be available.

и выдать ошибку:

2019-05-06 15:01:10.280 ERROR 28116 --- [ask-scheduler-1] o.s.cloud.stream.binding.BindingService  : Failed to create producer binding; retrying in 30 seconds

org.springframework.cloud.stream.provisioning.ProvisioningException: Provisioning exception; nested exception is java.util.concurrent.TimeoutException
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:290) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:137) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:78) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:193) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:97) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:151) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleProducerBinding$2(BindingService.java:290) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_201]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_201]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_201]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_201]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_201]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_201]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_201]
Caused by: java.util.concurrent.TimeoutException: null
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:108) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274) ~[kafka-clients-2.0.1.jar:na]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:323) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:299) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:281) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    ... 14 common frames omitted

Это мой класс начальной загрузки:

@EnableEurekaClient
@SpringBootApplication
@EnableHystrixDashboard
@EnableCircuitBreaker
@EnableBinding(Channels.class)
public class Service1Application {

    public static void main(String[] args) {
        SpringApplication.run(Service1Application.class, args);
    }

}

Это класс Channels:

public interface Channels {
    String outputChannel = "myOutputChannel";

    @Output(outputChannel)
    MessageChannel myOutputChannel();
}

Это application.properties:

eureka.instance.prefer-ip-address=true
eureka.client.register-with-eureka=true
eureka.client.fetch-registry=true
eureka.client.service-url.defaultZone=http://localhost:8761/eureka/
spring.application.name=service1
server.port=8081

spring.cloud.stream.bindings.myOutputChannel.destination=myTopic
spring.cloud.stream.bindings.myOutputChannel.content-type=application/json
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Версия Java на основе версии загрузки
Версия Java на основе версии загрузки
Если вы зайдете на официальный сайт Spring Boot , там представлен start.spring.io , который упрощает создание проектов Spring Boot, как показано ниже.
Документирование API с помощью Swagger на Springboot
Документирование API с помощью Swagger на Springboot
В предыдущей статье мы уже узнали, как создать Rest API с помощью Springboot и MySql .
0
0
8 160
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Это потому, что вы не используете Kafka на локальной машине. Если вы хотите указать другое свойство Kafka Brokers, вы можете сделать это в application.properties следующим образом:

spring.cloud.stream.kafka.binder.brokers=host1,host2
spring.cloud.stream.kafka.binder.defaultBrokerPort=port

Значения по умолчанию для двух вышеуказанных свойств — localhost и 9092, и если вы не используете кластер Kafka на localhost:9092, ваше приложение завершится с ошибкой.

Другие вопросы по теме