Kafka connect автономная проблема: отключение брокера

Я новичок, и это мой первый вопрос. Я надеюсь, что это не дубликат, я не нашел ответа на свой вопрос. В основном я следовал руководствам (Medium/Youtube). Я также пытался использовать официальную документацию, но kafka для меня новинка, и в ней нужно многое понять.

Я пытаюсь подключиться к брокеру kafka, над которым я не контролирую, и передать данные в таблицу снежинок. У меня есть файлы подключения SSL, тема и:.

Настраивать:

  • убунту 20.04
  • Java openjdk 1.8.0362
  • Кафка 3.2.1
  • снежинка-кафка-коннектор-1.9.1

Что работает

Я могу потреблять данные, запустив:

kafka/bin/kafka-console-consumer.sh --consumer.config kafka/config/connect-custom.properties --topic <my_topic> --bootstrap-server <server>:<port>

Я также могу создать своего собственного производителя и запустить следующее со стандартной конфигурацией на локальном хосте.

kafka/bin/connect-standalone.sh kafka/config/connect-standalone.properties kafka/config/connect-snowflake-kafka-connector.properties 

Что не работает:

Теперь я хочу получить производственные данные в Snowflake и запустить следующее:

kafka/bin/connect-standalone.sh kafka/config/connect-**custom**.properties kafka/config/connect-snowflake-kafka-connector.properties 

Здесь я получаю следующую информацию/предупреждения, и больше ничего не происходит:

[2023-04-11 14:54:06,975] INFO [snowflakesink|task-0] [Consumer clientId=connector-consumer-snowflakesink-0, groupId=connect-snowflakesink] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:935)
[2023-04-11 14:54:06,975] INFO [snowflakesink|task-0] [Consumer clientId=connector-consumer-snowflakesink-0, groupId=connect-snowflakesink] Cancelled in-flight API_VERSIONS request with correlation id 32 due to node -1 being disconnected (elapsed time since creation: 79ms, elapsed time since send: 79ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient:341)
[2023-04-11 14:54:06,975] WARN [snowflakesink|task-0] [Consumer clientId=connector-consumer-snowflakesink-0, groupId=connect-snowflakesink] Bootstrap broker <server>:<port> (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1063)

Файлы:

подключение-custom.properties:

bootstrap.servers=<server>:<port>

security.protocol=SSL
ssl.truststore.type=JKS
ssl.keystore.type=JKS
ssl.truststore.location=/path/to/truststore
ssl.truststore.password=***
ssl.keystore.location=/path/to/keystore
ssl.keystore.password=***
ssl.key.password=***
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

Я попробовал другие версии Java и Kafka, что привело к тем же предупреждающим сообщениям. Также попытался включить путь к плагину. Я предполагаю, что есть проблема с файлом свойств.

Решение:

Включите информацию о соединении SSL для потребителя:

bootstrap.servers=<server>:<port>

security.protocol=SSL
ssl.truststore.type=JKS
ssl.keystore.type=JKS
ssl.truststore.location=/path/to/truststore
ssl.truststore.password=***
ssl.keystore.location=/path/to/keystore
ssl.keystore.password=***
ssl.key.password=***

consumer.security.protocol=SSL
consumer.ssl.truststore.type=JKS
consumer.ssl.keystore.type=JKS
consumer.ssl.truststore.location=/path/to/truststore
consumer.ssl.truststore.password=***
consumer.ssl.keystore.location=/path/to/keystore
consumer.ssl.keystore.password=***
consumer.ssl.key.password=***

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

Можете ли вы включить ведение журнала на коннекторе Kafka и JDBC и предоставить некоторую информацию оттуда?

Sergiu 11.04.2023 16:54

Документация Confluent охватывает SSL с Connect... По сути, вам нужно несколько раз повторить настройки SSL для самого Connect, а затем для фактического потребительского клиента Kafka со свойствами с префиксом consumer..

OneCricketeer 11.04.2023 20:46

@OneCricketeer, спасибо! Это решило проблему! Видимо, я искал не ту документацию. Как я могу отметить ваш комментарий как правильный ответ?

dwed 12.04.2023 08:52

@Sergeu спасибо за подсказку. Как-то я не подумал о дополнительных лог-файлах. Я буду использовать их для моей следующей проблемы!

dwed 12.04.2023 08:52
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
0
4
77
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я признаю, что сама документация Kafka скудна.

Документация Confluent гораздо лучше описывает SSL с Connect... По сути, вам нужно несколько раз повторить настройки SSL для самого Connect, а затем для реального потребительского клиента Kafka со свойствами с префиксом consumer.admin.).

https://docs.confluent.io/platform/current/connect/security.html

Другие вопросы по теме