Я новичок, и это мой первый вопрос. Я надеюсь, что это не дубликат, я не нашел ответа на свой вопрос. В основном я следовал руководствам (Medium/Youtube). Я также пытался использовать официальную документацию, но kafka для меня новинка, и в ней нужно многое понять.
Я пытаюсь подключиться к брокеру kafka, над которым я не контролирую, и передать данные в таблицу снежинок. У меня есть файлы подключения SSL, тема и:.
Настраивать:
Что работает
Я могу потреблять данные, запустив:
kafka/bin/kafka-console-consumer.sh --consumer.config kafka/config/connect-custom.properties --topic <my_topic> --bootstrap-server <server>:<port>
Я также могу создать своего собственного производителя и запустить следующее со стандартной конфигурацией на локальном хосте.
kafka/bin/connect-standalone.sh kafka/config/connect-standalone.properties kafka/config/connect-snowflake-kafka-connector.properties
Что не работает:
Теперь я хочу получить производственные данные в Snowflake и запустить следующее:
kafka/bin/connect-standalone.sh kafka/config/connect-**custom**.properties kafka/config/connect-snowflake-kafka-connector.properties
Здесь я получаю следующую информацию/предупреждения, и больше ничего не происходит:
[2023-04-11 14:54:06,975] INFO [snowflakesink|task-0] [Consumer clientId=connector-consumer-snowflakesink-0, groupId=connect-snowflakesink] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:935)
[2023-04-11 14:54:06,975] INFO [snowflakesink|task-0] [Consumer clientId=connector-consumer-snowflakesink-0, groupId=connect-snowflakesink] Cancelled in-flight API_VERSIONS request with correlation id 32 due to node -1 being disconnected (elapsed time since creation: 79ms, elapsed time since send: 79ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient:341)
[2023-04-11 14:54:06,975] WARN [snowflakesink|task-0] [Consumer clientId=connector-consumer-snowflakesink-0, groupId=connect-snowflakesink] Bootstrap broker <server>:<port> (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1063)
Файлы:
подключение-custom.properties:
bootstrap.servers=<server>:<port>
security.protocol=SSL
ssl.truststore.type=JKS
ssl.keystore.type=JKS
ssl.truststore.location=/path/to/truststore
ssl.truststore.password=***
ssl.keystore.location=/path/to/keystore
ssl.keystore.password=***
ssl.key.password=***
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
Я попробовал другие версии Java и Kafka, что привело к тем же предупреждающим сообщениям. Также попытался включить путь к плагину. Я предполагаю, что есть проблема с файлом свойств.
Решение:
Включите информацию о соединении SSL для потребителя:
bootstrap.servers=<server>:<port>
security.protocol=SSL
ssl.truststore.type=JKS
ssl.keystore.type=JKS
ssl.truststore.location=/path/to/truststore
ssl.truststore.password=***
ssl.keystore.location=/path/to/keystore
ssl.keystore.password=***
ssl.key.password=***
consumer.security.protocol=SSL
consumer.ssl.truststore.type=JKS
consumer.ssl.keystore.type=JKS
consumer.ssl.truststore.location=/path/to/truststore
consumer.ssl.truststore.password=***
consumer.ssl.keystore.location=/path/to/keystore
consumer.ssl.keystore.password=***
consumer.ssl.key.password=***
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
Документация Confluent охватывает SSL с Connect... По сути, вам нужно несколько раз повторить настройки SSL для самого Connect, а затем для фактического потребительского клиента Kafka со свойствами с префиксом consumer.
.
@OneCricketeer, спасибо! Это решило проблему! Видимо, я искал не ту документацию. Как я могу отметить ваш комментарий как правильный ответ?
@Sergeu спасибо за подсказку. Как-то я не подумал о дополнительных лог-файлах. Я буду использовать их для моей следующей проблемы!
Я признаю, что сама документация Kafka скудна.
Документация Confluent гораздо лучше описывает SSL с Connect... По сути, вам нужно несколько раз повторить настройки SSL для самого Connect, а затем для реального потребительского клиента Kafka со свойствами с префиксом consumer.
(и admin.
).
https://docs.confluent.io/platform/current/connect/security.html
Можете ли вы включить ведение журнала на коннекторе Kafka и JDBC и предоставить некоторую информацию оттуда?