Соединителю MSK не удалось найти класс в кластере MSK в режиме KRaft

У меня есть кластер Kafka 3.7.x, использующий режим ZooKeeper, который работает хорошо. Сейчас пробую новый Кафак в режиме KRaft:

Однако для этого кластера Kafka в режиме KRaft не удалось запустить тот же коннектор.

Вот полный журнал ошибок Kakfa Connector.

Обратите внимание, внутри него есть

org.apache.kafka.connect.errors.ConnectException: Failed to find any
class that implements Connector and which name matches
com.snowflake.kafka.connector.SnowflakeSinkConnector, available
connectors are: PluginDesc{klass=class
org.apache.kafka.connect.file.FileStreamSinkConnector,
name='org.apache.kafka.connect.file.FileStreamSinkConnector',
version='2.7.1', encodedVersion=2.7.1, type=sink, typeName='sink',
location='classpath'}, PluginDesc{klass=class
org.apache.kafka.connect.file.FileStreamSourceConnector,
name='org.apache.kafka.connect.file.FileStreamSourceConnector',
version='2.7.1', encodedVersion=2.7.1, type=source, typeName='source',
location='classpath'}, PluginDesc{klass=class
org.apache.kafka.connect.mirror.MirrorCheckpointConnector,
name='org.apache.kafka.connect.mirror.MirrorCheckpointConnector',
version='1', encodedVersion=1, type=source, typeName='source',
location='classpath'}, PluginDesc{klass=class
org.apache.kafka.connect.mirror.MirrorHeartbeatConnector,
name='org.apache.kafka.connect.mirror.MirrorHeartbeatConnector',
version='1', encodedVersion=1, type=source, typeName='source',
location='classpath'}, PluginDesc{klass=class
org.apache.kafka.connect.mirror.MirrorSourceConnector,
name='org.apache.kafka.connect.mirror.MirrorSourceConnector',
version='1', encodedVersion=1, type=source, typeName='source',
location='classpath'}, PluginDesc{klass=class
org.apache.kafka.connect.tools.MockConnector,
name='org.apache.kafka.connect.tools.MockConnector', version='2.7.1',
encodedVersion=2.7.1, type=connector, typeName='connector',
location='classpath'}, PluginDesc{klass=class
org.apache.kafka.connect.tools.MockSinkConnector,
name='org.apache.kafka.connect.tools.MockSinkConnector',
version='2.7.1', encodedVersion=2.7.1, type=sink, typeName='sink',
location='classpath'}, PluginDesc{klass=class
org.apache.kafka.connect.tools.MockSourceConnector,
name='org.apache.kafka.connect.tools.MockSourceConnector',
version='2.7.1', encodedVersion=2.7.1, type=source, typeName='source',
location='classpath'}, PluginDesc{klass=class
org.apache.kafka.connect.tools.SchemaSourceConnector,
name='org.apache.kafka.connect.tools.SchemaSourceConnector',
version='2.7.1', encodedVersion=2.7.1, type=source, typeName='source',
location='classpath'}, PluginDesc{klass=class
org.apache.kafka.connect.tools.VerifiableSinkConnector,
name='org.apache.kafka.connect.tools.VerifiableSinkConnector',
version='2.7.1', encodedVersion=2.7.1, type=source, typeName='source',
location='classpath'}, PluginDesc{klass=class
org.apache.kafka.connect.tools.VerifiableSourceConnector,
name='org.apache.kafka.connect.tools.VerifiableSourceConnector',
version='2.7.1', encodedVersion=2.7.1, type=source, typeName='source',
location='classpath'} ```

Оба кластера Kafka используют один и тот же плагин, который представляет собой zip-файл с этими jar-файлами. Вы можете увидеть Snowflake-kafka-connector-2.2.2.jar, внутри которого есть com.snowflake.kafka.connector.SnowflakeSinkConnector.

Для этих двух кластеров Kafka соединитель и плагин используют один и тот же код cTerraform, что означает, что все параметры одинаковы. Единственная разница — kafka_version в кластере Kafka: 3.7.x и 3.7.x.kraft.

resource "aws_msk_cluster" "hm_amazon_msk_cluster" {
  cluster_name           = var.amazon_msk_cluster_name
  kafka_version          = "3.7.x.kraft" # <- only this line is different, the other one is `3.7.x`
  number_of_broker_nodes = var.kafka_broker_number
  storage_mode           = "TIERED"
  broker_node_group_info {
    instance_type   = "kafka.m7g.large"
    security_groups = [var.amazon_vpc_security_group_id]
    client_subnets  = var.amazon_vpc_subnet_ids
  }
  logging_info {
    broker_logs {
      s3 {
        enabled = true
        bucket  = var.kafka_broker_log_s3_bucket_name
        prefix  = "brokers"
      }
    }
  }
  encryption_info {
    encryption_at_rest_kms_key_arn = var.aws_kms_key_arn
  }
  client_authentication {
    sasl {
      iam = true
    }
  }
}
resource "aws_s3_object" "hm_amazon_s3_object" {
  bucket = var.s3_bucket_name
  key    = var.s3_key
  source = var.local_file_path
  etag   = filemd5(var.local_file_path)
}
resource "aws_mskconnect_custom_plugin" "hm_amazon_msk_plugin" {
  name         = var.amazon_msk_plugin_name
  content_type = "ZIP"
  location {
    s3 {
      bucket_arn = var.s3_bucket_arn
      file_key   = var.amazon_msk_plugin_s3_key
    }
  }
}
resource "aws_mskconnect_connector" "my_amazon_msk_connector" {
  name                 = var.amazon_msk_connector_name
  kafkaconnect_version = "2.7.1"
  capacity {
    autoscaling {
      mcu_count        = 1
      min_worker_count = 1
      max_worker_count = 2
      scale_in_policy {
        cpu_utilization_percentage = 40
      }
      scale_out_policy {
        cpu_utilization_percentage = 95
      }
    }
  }
  # https://docs.snowflake.com/en/user-guide/kafka-connector-install#label-kafka-properties
  # https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-kafka
  connector_configuration = {
    "connector.class"                  = "com.snowflake.kafka.connector.SnowflakeSinkConnector"
    "tasks.max"                        = 4
    "topics"                           = var.kafka_topic_name
    "buffer.count.records"             = 10000
    "buffer.flush.time"                = 5
    "buffer.size.bytes"                = 20000000
    "snowflake.url.name"               = "xx.snowflakecomputing.com"
    "snowflake.user.name"              = var.snowflake_user_name
    "snowflake.private.key"            = var.snowflake_private_key
    "snowflake.private.key.passphrase" = var.snowflake_private_key_passphrase
    "snowflake.role.name"              = var.snowflake_role_name
    "snowflake.ingestion.method"       = "SNOWPIPE_STREAMING"
    "snowflake.enable.schematization"  = true
    "snowflake.database.name"          = var.snowflake_database_name
    "snowflake.schema.name"            = var.snowflake_schema_name
    "value.converter"                     = "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url" = var.confluent_schema_registry_url
    "errors.log.enable"                   = true
    "errors.tolerance"                    = "all"
    "jmx"                                 = true
  }
  kafka_cluster {
    apache_kafka_cluster {
      bootstrap_servers = var.amazon_msk_cluster_bootstrap_servers
      vpc {
        security_groups = [var.amazon_vpc_security_group_id]
        subnets         = var.amazon_vpc_subnet_ids
      }
    }
  }
  kafka_cluster_client_authentication {
    authentication_type = "IAM"
  }
  kafka_cluster_encryption_in_transit {
    encryption_type = "TLS"
  }
  plugin {
    custom_plugin {
      arn      = aws_mskconnect_custom_plugin.hm_amazon_msk_plugin.arn
      revision = aws_mskconnect_custom_plugin.hm_amazon_msk_plugin.latest_revision
    }
  }
  log_delivery {
    worker_log_delivery {
      s3 {
        bucket  = var.msk_log_s3_bucket_name
        prefix  = var.msk_log_s3_key
        enabled = true
      }
    }
  }
  service_execution_role_arn = var.amazon_msk_connector_iam_role_arn
}

Есть ли что-нибудь еще, на что мне нужно обратить внимание, чтобы использовать режим Kafka KRaft? А может ли это быть глюк MSK? Спасибо!

В вашей версии Kafka Connect указана версия 2.7.1, поэтому вы не используете путь к классам «3.7.x»…? Кроме того, не рекомендуется запускать Kafka Connect у брокеров, тогда Kraft не будет иметь значения. Как вы настраиваете параметр plugin.path для получения этих jar-файлов?

OneCricketeer 11.07.2024 00:22

Спасибо @OneCricketeer, хм, это может быть причиной. Да, к сожалению, похоже, что MSK поддерживает только Kafka Connect версии 2.7.1 docs.aws.amazon.com/msk/latest/developerguide/msk-connect.ht‌​ml Даже когда пользователи создают коннектор Kafka в пользовательском интерфейсе, Apache Kafka Версия Connect установлена ​​на 2.7.1 🥲

Hongbo Miao 11.07.2024 00:30

Привет @OneCricketeer. Я добавил свой плагин Kafka и код соединителя Terraform. По сути, плагин указывает на ZIP-файл (банки внутри) в S3. И коннектор указывает на этот плагин.

Hongbo Miao 11.07.2024 00:43

Я сам не настраивал msk Connect, но если предположить, что соединители представляют собой отдельные развертывания, то Kraft не должен ни на что влиять, поскольку он не будет использовать какой-либо путь к классам или конфигурации с Connect. Итак, если на одном кластере работает, а на другом нет, то мне это кажется багом

OneCricketeer 11.07.2024 01:55

Понятно, спасибо @OneCricketeer! Кстати, я могу подтвердить, что даже если я заставлю измениться kafkaconnect_version = "3.7.0", это выкинет Error: creating MSK Connect Connector (DevelopmentTrackerZooKeeperKafkaSinkConnector): BadRequestException: Invalid parameter kafkaConnectVersion: Unsupported KafkaConnectVersion [3.7.0]. Valid values: [2.7.1]. это означает, что MSK поддерживает только версию 2.7.1.

Hongbo Miao 11.07.2024 02:04

Какую папку показывает изображение? Это KAFKA_DIR/plugins/development-Tracker-kafka-sink-plugin.zip_‌​inflated внутри работника Connect?

OneCricketeer 11.07.2024 03:03

@OneCricketeer Да, необработанный файл development-tracker-zookeeper-kafka-sink-plugin.zip находится в S3, он загружается в Kafka по адресу KAFKA_DIR/plugins/development-tracker-kafka-sink-plugin.zip_‌​inflated. (Я немного обновил журнал, tracker в нижнем регистре, я редактировал слово в журнале и случайно сделал T прописным, который на самом деле стал строчным.)

Hongbo Miao 11.07.2024 04:02
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
0
7
65
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Хм, это немного неловко. После добавления depends_on между каждым ресурсом теперь это хорошо работает для Kafka в режиме KRaft. Больше ничего не изменилось.

Я думаю, потому что Terraform по умолчанию применяется одновременно. Возможно, когда запустился коннектор Kafka, ZIP в S3 еще не был полностью загружен.

resource "aws_s3_object" "hm_amazon_s3_object" {
  # ...
}

resource "aws_mskconnect_custom_plugin" "hm_amazon_msk_plugin" {
  # ...
  depends_on = [
    aws_s3_object.hm_amazon_s3_object
  ]
}

resource "aws_mskconnect_connector" "my_amazon_msk_connector" {
  # ...
  depends_on = [
    aws_mskconnect_custom_plugin.hm_amazon_msk_plugin
  ]
}

Другие вопросы по теме

Похожие вопросы

Имя потребительской темы NestJS kafka не определено
Я получаю сообщение об ошибке: org.apache.kafka.common.errors.InvalidReplicationFactorException: коэффициент репликации: 3 больше, чем доступные брокеры: 1
Потоки Kafka правильно обрабатывают сообщения, но выдают исключение десериализации
Какой неустаревший способ установки менеджера транзакций в контейнер прослушивателя - это Spring Kafka 3.2?
Как использовать Kafka в режиме SASL_SSL
Тайм-аут соединения: невозможно подключить монго к дебезию для CDC
Неожиданный запрос Kafka типа METADATA во время установления связи SASL при подключении потребителя к серверу Kafka
@KafkaHandler в потребителе не использует сообщение темы как объектный класс, а только как строку
Потоковая передача Spark + интеграция с Kafka, чтение данных из Kafka каждые 15 минут и сохранение смещения последнего чтения с помощью PySpark
Ошибка CompletionException в KafkaListener при прослушивании событий формата KafkaAvro