Проблема с кодировкой исходного соединителя Kafka | Oracle JDBC-коннектор

Я пытаюсь подключиться к серверу Oracle с помощью соединителя JDBC Kafka Connect, вот моя конфигурация соединителя:

CREATE SOURCE CONNECTOR hsr_source_connector2
WITH (
  'connection.url' = 'jdbc:oracle:thin:@//HOSTIP:PORT/Service',
  'connector.class' = 'io.confluent.connect.jdbc.JdbcSourceConnector',
  'connection.user' = '{User}',
  'connection.password' = '{PASS}',
  'mode' = 'bulk',
  'query' = 'select * from <Tablename>',
  'topic.prefix' = 'newuser_',
  'characterEncoding'='UTF-8',
  'transforms' = 'createKey,extractInt',
  'transforms.createKey.type' = 'org.apache.kafka.connect.transforms.ValueToKey',
  'transforms.createKey.fields' = 'USER_ID',
  'transforms.extractInt.type' = 'org.apache.kafka.connect.transforms.ExtractField$Key',
  'transforms.extractInt.field' = 'USER_ID'
);

Тема создается и также считывает данные, но данные имеют нежелательный характер.

rowtime: 2024/04/18 17:08:51.028 Z, key: VN185082, value: ☺►VN185082►VN185082♠FSC☻☻W
rowtime: 2024/04/18 17:08:51.028 Z, key: MD250626, value: ☺►MD250626►MD250626♠SME☻☻R

Определение потока:

CREATE STREAM hsr_users
 (
 user_id varchar,
 user_name varchar,
 pswd varchar,
 user_role varchar,
 access_level varchar
 )WITH (kafka_topic='newuser_', value_format='JSON', partitions=1);

Можете ли вы подсказать, как применить кодировку и получить для этого подходящую документацию. Ссылаясь на этот документ https://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/source_config_options.html

Заранее спасибо!

Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
0
0
58
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Получил ответ, это можно исправить, добавив эти свойства:

CREATE SOURCE CONNECTOR hsr_source_connector3
WITH (
'connection.url' = 'jdbc:oracle:thin:@//HOSTIP:PORT/Service',
  'connector.class' = 'io.confluent.connect.jdbc.JdbcSourceConnector',
  'connection.user' = '{User}',
  'connection.password' = '{PASS}',
  'mode' = 'bulk',
  'query' = 'select * from <Tablename>',
  'topic.prefix' = 'newEncode_',
  'characterEncoding' = 'AL32UTF8', -- Updated: Added charset parameter
  'transforms' = 'createKey,extractInt',
  'transforms.createKey.type' = 'org.apache.kafka.connect.transforms.ValueToKey',
  'transforms.createKey.fields' = 'USER_ID',
  'transforms.extractInt.type' = 'org.apache.kafka.connect.transforms.ExtractField$Key',
  'key.converter' = 'org.apache.kafka.connect.json.JsonConverter',
  'value.converter'= 'org.apache.kafka.connect.json.JsonConverter',
  'transforms.extractInt.field' = 'USER_ID',
  'key.converter.schemas.enable'='false',
  'value.converter.schemas.enable'='false',
  'errors.tolerance' = 'all',
  'errors.log.enable' = 'true',
  'errors.log.include.messages' = 'true'
);

Это даст данные в следующем формате:

Key format: JSON or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2024/04/19 11:08:19.755 Z, key: "HG185035", value: {"USER_ID":"HG185035","USER_NAME":"HG185035","PSWD":null,"USER_ROLE":"SME","ACCESS_LEVEL":"R"}

По сути, мне не хватало свойств key.convertor и value.convertor. Я получил правильные свойства, настроенные из слитной документации: https://www.confluent.io/en-gb/blog/kafka-connect-deep-dive-converters-serialization-explained/

Другие вопросы по теме