Kafka Consumer: запись для подключения не найдена

Я пытаюсь проверить потребителя kafka, используя данные из темы в удаленном кластере Kafka. Я получаю следующую ошибку при использовании kafka-console-consumer.sh:

 ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
java.lang.IllegalStateException: No entry found for connection 2147475658
    at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:330)
    at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:134)
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:885)
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:276)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:548)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:655)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:635)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:231)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:316)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1214)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
    at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:436)
    at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:104)
    at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:76)
    at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
    at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Processed a total of 0 messages

Вот команда, которую я использую:

./bin/kafka-console-consumer.sh --bootstrap-server SSL://{IP}:{PORT},SSL://{IP}:{PORT},SSL://{IP}:{PORT} --consumer.config ./config/consumer.properties --topic MYTOPIC --group MYGROUP

Вот файл ./config/consumer.properties:

bootstrap.servers=SSL://{IP}:{PORT},SSL://{IP}:{PORT},SSL://{IP}:{PORT}

# consumer group id
group.id=MYGROUP

# What to do when there is no initial offset in Kafka or if the current
# offset does not exist any more on the server: latest, earliest, none
auto.offset.reset=earliest

#### Security
security.protocol=SSL
ssl.key.password=test1234
ssl.keystore.location=/opt/kafka/config/certs/keystore.jks
ssl.keystore.password=test1234
ssl.truststore.location=/opt/kafka/config/certs/truststore.jks
ssl.truststore.password=test1234

Вы хоть представляете, в чем проблема?

Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
18
0
17 872
6
Перейти к ответу Данный вопрос помечен как решенный

Ответы 6

Кажется, что свойство прослушивателя кластера Kafka не настроено в server.properties.

В удаленном кластере kafka это свойство должно быть раскомментировано с правильным именем хоста.

listeners=PLAINTEXT://0.0.0.0:9092

Кластер Kafka был в порядке. Люди могли использовать его с той же машины. проблема была в том, что я не мог получить ответ обратно.

Nooshin 01.02.2019 13:07

Вы уверены, что удаленная кафка работает? Я бы предложил запустить nmap -p PORT HOST, чтобы убедиться, что порт открыт (если он не настроен по-другому, порт должен быть 9092). Если это нормально, вы можете использовать kafkacat, который упрощает задачу. Создайте потребителя с kafkacat -b HOST:PORT -t YOUR_TOPIC -C -o beginning или создайте производителя с kafkacat -b HOST:PORT -t YOUR_TOPIC -P

Ответ принят как подходящий

Я нашел проблему. В конце концов, это была проблема с DNS. Я обращался к брокерам Kafka по IP-адресам, но брокер отвечает DNS-именем. После установки DNS-имен на стороне потребителя он снова начал работать.

Не могли бы вы подробнее объяснить, как вы это сделали? У меня похожая проблема.

Abigail Fox 09.05.2019 15:13

Вы можете настроить локальное разрешение dns в файле hosts. Например, если я использую macOS, я добавлю строку в /etc/hosts «your_kafka_host_name 1.2.3.4». То же самое для других операционных систем.

Alan42 20.06.2019 09:50

У меня была эта проблема (с потребителями и производителями) при запуске Kafka и Zookeeper в качестве контейнеров Docker.

Решение состояло в том, чтобы установить advertised.listeners в файле config/server.properties брокеров Kafka, чтобы он содержал IP-адрес контейнера, например.

advertised.listeners=PLAINTEXT://172.15.0.8:9092

См. https://github.com/maxant/kafkaplayground/blob/master/start-kafka.sh пример сценария, используемого для запуска Kafka внутри контейнера после правильной настройки файла свойств.

Вы можете сделать это, если у вас есть доступ к основной Kafka, но если вы только потребитель или производитель и не имеете доступа к кластеру Kafka, лучше установить его на свой DNS.

Nooshin 04.03.2019 08:42

Точно так же вы можете использовать имя контейнера вместо IP-адреса. Затем добавьте псевдоним к имени контейнера в файле hosts. Имеет тот же эффект, но не сломается при использовании контейнера с другим IP-адресом.

ThetaSinner 05.04.2019 11:16

В моем случае я получил это, пытаясь подключиться к моему контейнеру Kafka, мне пришлось передать следующее:

-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092

-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092

Надеюсь, это поможет кому-то

В моем случае не удалось найти идентификатор брокера (2147475658), упомянутый по ошибке.

No entry found for connection 2147475658

Вы можете создать брокера с идентификатором 2147475658, установив свойство Broker.id в файле server.properties. Создайте отдельные файлы server.properties для всех брокеров.

Или, если у вас есть хотя бы один живой брокер, вы можете удалить/удалить брокера, который выдает ошибку.

Ссылка на документацию: https://kafka.apache.org/documentation/#quickstart_multibroker

Другие вопросы по теме