У меня есть кластер Kafka 3.7.x, использующий режим ZooKeeper, который работает хорошо. Сейчас пробую новый Кафак в режиме KRaft:
Однако для этого кластера Kafka в режиме KRaft не удалось запустить тот же коннектор.
Вот полный журнал ошибок Kakfa Connector.
Обратите внимание, внутри него есть
org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches com.snowflake.kafka.connector.SnowflakeSinkConnector, available connectors are: PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='2.7.1', encodedVersion=2.7.1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='2.7.1', encodedVersion=2.7.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorCheckpointConnector, name='org.apache.kafka.connect.mirror.MirrorCheckpointConnector', version='1', encodedVersion=1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorHeartbeatConnector, name='org.apache.kafka.connect.mirror.MirrorHeartbeatConnector', version='1', encodedVersion=1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorSourceConnector, name='org.apache.kafka.connect.mirror.MirrorSourceConnector', version='1', encodedVersion=1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='2.7.1', encodedVersion=2.7.1, type=connector, typeName='connector', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='2.7.1', encodedVersion=2.7.1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='2.7.1', encodedVersion=2.7.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='2.7.1', encodedVersion=2.7.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='2.7.1', encodedVersion=2.7.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='2.7.1', encodedVersion=2.7.1, type=source, typeName='source', location='classpath'} ```
Оба кластера Kafka используют один и тот же плагин, который представляет собой zip-файл с этими jar-файлами. Вы можете увидеть Snowflake-kafka-connector-2.2.2.jar, внутри которого есть com.snowflake.kafka.connector.SnowflakeSinkConnector.
Для этих двух кластеров Kafka соединитель и плагин используют один и тот же код cTerraform, что означает, что все параметры одинаковы.
Единственная разница — kafka_version в кластере Kafka: 3.7.x и 3.7.x.kraft.
resource "aws_msk_cluster" "hm_amazon_msk_cluster" {
cluster_name = var.amazon_msk_cluster_name
kafka_version = "3.7.x.kraft" # <- only this line is different, the other one is `3.7.x`
number_of_broker_nodes = var.kafka_broker_number
storage_mode = "TIERED"
broker_node_group_info {
instance_type = "kafka.m7g.large"
security_groups = [var.amazon_vpc_security_group_id]
client_subnets = var.amazon_vpc_subnet_ids
}
logging_info {
broker_logs {
s3 {
enabled = true
bucket = var.kafka_broker_log_s3_bucket_name
prefix = "brokers"
}
}
}
encryption_info {
encryption_at_rest_kms_key_arn = var.aws_kms_key_arn
}
client_authentication {
sasl {
iam = true
}
}
}
resource "aws_s3_object" "hm_amazon_s3_object" {
bucket = var.s3_bucket_name
key = var.s3_key
source = var.local_file_path
etag = filemd5(var.local_file_path)
}
resource "aws_mskconnect_custom_plugin" "hm_amazon_msk_plugin" {
name = var.amazon_msk_plugin_name
content_type = "ZIP"
location {
s3 {
bucket_arn = var.s3_bucket_arn
file_key = var.amazon_msk_plugin_s3_key
}
}
}
resource "aws_mskconnect_connector" "my_amazon_msk_connector" {
name = var.amazon_msk_connector_name
kafkaconnect_version = "2.7.1"
capacity {
autoscaling {
mcu_count = 1
min_worker_count = 1
max_worker_count = 2
scale_in_policy {
cpu_utilization_percentage = 40
}
scale_out_policy {
cpu_utilization_percentage = 95
}
}
}
# https://docs.snowflake.com/en/user-guide/kafka-connector-install#label-kafka-properties
# https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-kafka
connector_configuration = {
"connector.class" = "com.snowflake.kafka.connector.SnowflakeSinkConnector"
"tasks.max" = 4
"topics" = var.kafka_topic_name
"buffer.count.records" = 10000
"buffer.flush.time" = 5
"buffer.size.bytes" = 20000000
"snowflake.url.name" = "xx.snowflakecomputing.com"
"snowflake.user.name" = var.snowflake_user_name
"snowflake.private.key" = var.snowflake_private_key
"snowflake.private.key.passphrase" = var.snowflake_private_key_passphrase
"snowflake.role.name" = var.snowflake_role_name
"snowflake.ingestion.method" = "SNOWPIPE_STREAMING"
"snowflake.enable.schematization" = true
"snowflake.database.name" = var.snowflake_database_name
"snowflake.schema.name" = var.snowflake_schema_name
"value.converter" = "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url" = var.confluent_schema_registry_url
"errors.log.enable" = true
"errors.tolerance" = "all"
"jmx" = true
}
kafka_cluster {
apache_kafka_cluster {
bootstrap_servers = var.amazon_msk_cluster_bootstrap_servers
vpc {
security_groups = [var.amazon_vpc_security_group_id]
subnets = var.amazon_vpc_subnet_ids
}
}
}
kafka_cluster_client_authentication {
authentication_type = "IAM"
}
kafka_cluster_encryption_in_transit {
encryption_type = "TLS"
}
plugin {
custom_plugin {
arn = aws_mskconnect_custom_plugin.hm_amazon_msk_plugin.arn
revision = aws_mskconnect_custom_plugin.hm_amazon_msk_plugin.latest_revision
}
}
log_delivery {
worker_log_delivery {
s3 {
bucket = var.msk_log_s3_bucket_name
prefix = var.msk_log_s3_key
enabled = true
}
}
}
service_execution_role_arn = var.amazon_msk_connector_iam_role_arn
}
Есть ли что-нибудь еще, на что мне нужно обратить внимание, чтобы использовать режим Kafka KRaft? А может ли это быть глюк MSK? Спасибо!
Спасибо @OneCricketeer, хм, это может быть причиной. Да, к сожалению, похоже, что MSK поддерживает только Kafka Connect версии 2.7.1 docs.aws.amazon.com/msk/latest/developerguide/msk-connect.html Даже когда пользователи создают коннектор Kafka в пользовательском интерфейсе, Apache Kafka Версия Connect установлена на 2.7.1 🥲
Привет @OneCricketeer. Я добавил свой плагин Kafka и код соединителя Terraform. По сути, плагин указывает на ZIP-файл (банки внутри) в S3. И коннектор указывает на этот плагин.
Я сам не настраивал msk Connect, но если предположить, что соединители представляют собой отдельные развертывания, то Kraft не должен ни на что влиять, поскольку он не будет использовать какой-либо путь к классам или конфигурации с Connect. Итак, если на одном кластере работает, а на другом нет, то мне это кажется багом
Понятно, спасибо @OneCricketeer! Кстати, я могу подтвердить, что даже если я заставлю измениться kafkaconnect_version = "3.7.0", это выкинет Error: creating MSK Connect Connector (DevelopmentTrackerZooKeeperKafkaSinkConnector): BadRequestException: Invalid parameter kafkaConnectVersion: Unsupported KafkaConnectVersion [3.7.0]. Valid values: [2.7.1]. это означает, что MSK поддерживает только версию 2.7.1.
Какую папку показывает изображение? Это KAFKA_DIR/plugins/development-Tracker-kafka-sink-plugin.zip_inflated внутри работника Connect?
@OneCricketeer Да, необработанный файл development-tracker-zookeeper-kafka-sink-plugin.zip находится в S3, он загружается в Kafka по адресу KAFKA_DIR/plugins/development-tracker-kafka-sink-plugin.zip_inflated. (Я немного обновил журнал, tracker в нижнем регистре, я редактировал слово в журнале и случайно сделал T прописным, который на самом деле стал строчным.)

Хм, это немного неловко. После добавления depends_on между каждым ресурсом теперь это хорошо работает для Kafka в режиме KRaft. Больше ничего не изменилось.
Я думаю, потому что Terraform по умолчанию применяется одновременно. Возможно, когда запустился коннектор Kafka, ZIP в S3 еще не был полностью загружен.
resource "aws_s3_object" "hm_amazon_s3_object" {
# ...
}
resource "aws_mskconnect_custom_plugin" "hm_amazon_msk_plugin" {
# ...
depends_on = [
aws_s3_object.hm_amazon_s3_object
]
}
resource "aws_mskconnect_connector" "my_amazon_msk_connector" {
# ...
depends_on = [
aws_mskconnect_custom_plugin.hm_amazon_msk_plugin
]
}
В вашей версии Kafka Connect указана версия 2.7.1, поэтому вы не используете путь к классам «3.7.x»…? Кроме того, не рекомендуется запускать Kafka Connect у брокеров, тогда Kraft не будет иметь значения. Как вы настраиваете параметр plugin.path для получения этих jar-файлов?