Ошибка приемника Kafka "Этот соединитель требует, чтобы записи из Kafka содержали ключи для таблицы Cassandra"

Я пытаюсь синхронизировать все таблицы, прочитанные из Sap, в кассандру, используя kafka вот моя конфигурация кассандры

{
    "name": "cassandra",
    "config": {
        "connector.class": "io.confluent.connect.cassandra.CassandraSinkConnector",
        "tasks.max": "5",
        "topics" :"sap_table1,sap_table2",
        "cassandra.keyspace": "sap",
        "cassandra.compression":"SNAPPY",
        "cassandra.consistency.level":"LOCAL_QUORUM",
        "cassandra.write.mode":"Update",
        "transforms":"prune", 
       "transforms.prune.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
        "transforms.prune.whitelist":"CreatedAt,Id,Text,Source,Truncated",
        "transforms.ValueToKey.fields":"ROWTIME"

    }
}

Я получаю эту ошибку

Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:584) org.apache.kafka.connect.errors.DataException: Record with a null key was encountered.  This connector requires that records from Kafka contain the keys for the Cassandra table. Please use a transformation like org.apache.kafka.connect.transforms.ValueToKey to create a key with the proper fields.

Все таблицы, созданные с помощью соединителя kafka sap, не имеют ключа, я не знаю, проблема ли в этом

дайте мне знать, если я что-нибудь сделаю

Благодарность

Установка Apache Cassandra на Mac OS
Установка Apache Cassandra на Mac OS
Это краткое руководство по установке Apache Cassandra.
1
0
1 132
2

Ответы 2

"ROWTIME" существует только как концепция KSQL. На самом деле это не поле в вашем значении, поэтому для ключа установлено значение null.

Кроме того, ValueToKey не указан в списке transforms, так что это даже не применяется. Вам также придется добавить "transforms.ValueToKey.type".

Вам придется использовать другой метод преобразования, чтобы установить метку времени записи в качестве ключа сообщения ConnectRecord.

Эта ошибка означает, что ваши данные не сериализованы, поэтому они не в формате json или словаря {'key': 'value'}. если вы читаете свои данные напрямую от брокера в качестве способа устранения неполадок, вы обнаружите, что ваши данные имеют только значения без каких-либо ключей:

используйте эту команду для чтения ваших данных от брокера:

/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic your_topic_name--from-beginning

поэтому лучший способ решить эту проблему - добавить сериализатор в файл конфигурации вашего издателя. попробуйте этот файл в качестве источника коннектора или издателя

name=src-view
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
topic.prefix=test-
connection.url=jdbc:postgresql://127.0.0.1:5434/test?user=testuser&password=testpass
mode=incrementing
incrementing.column.name=id
table.types=table
table.whitelist=table_name
validate.non.null=false
batch.max.rows=10000
bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schema.registry.url=http://localhost:8081

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

а ниже - потребитель (ink.conf) для десериализации ваших данных:

name=cas-dest
connector.class=io.confluent.connect.cassandra.CassandraSinkConnector
tasks.max=1
topics=your_topic_name
cassandra.contact.points=127.0.0.1
cassandra.port=9042
cassandra.keyspace=your_keyspace_name
cassandra.write.mode=Update
cassandra.keyspace.create.enabled=true
cassandra.table.manage.enabled=true
key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
transforms=createKey
transforms.createKey.fields=id,timestamp
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey

измените createKey.fields в соответствии с вашими данными и будьте осторожны, так как это будут ваши ключи раздела, поэтому прочитайте о моделировании данных в cassandra, прежде чем выбирать свои ключи, и он должен существовать в вашем ключе данных.

Другие вопросы по теме