Потребитель kafka avro: mysql decimal в java decimal

Я пытаюсь использовать записи из таблицы MySQL, которая содержит 3 столбца (Axis, Price, lastname) с их типами данных (int, decimal(14,4), varchar(50)) соответственно.

Я вставил одну запись со следующими данными (1, 5.0000, John).

Следующий код Java (который использует записи AVRO из раздела, созданного соединителем MySQL на платформе Confluent) считывает десятичный столбец: Price как тип java.nio.HeapByteBuffer, поэтому я не могу получить значение столбца, когда получаю Это.

Есть ли способ извлечь или преобразовать полученные данные в десятичный или двойной тип данных Java?

Вот файл свойств MySQL Connector: -

{
  "name": "mysql-source",
  "config": {
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
   "key.converter": "io.confluent.connect.avro.AvroConverter",
   "key.converter.schema.registry.url": "http://localhost:8081",
   "value.converter": "io.confluent.connect.avro.AvroConverter",
   "value.converter.schema.registry.url": "http://localhost:8081",
   "incrementing.column.name": "Axis",
   "tasks.max": "1",
   "table.whitelist": "ticket",
   "mode": "incrementing",
   "topic.prefix": "mysql-",
   "name": "mysql-source",
   "validate.non.null": "false",
   "connection.url": "jdbc:mysql://localhost:3306/ticket? 
   user=user&password=password"
   }
}

Вот код: -

    public static void main(String[] args) throws InterruptedException, 
     IOException {

        Properties props = new Properties();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");


        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
        "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
        "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        props.put("schema.registry.url", "http://localhost:8081");

        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        String topic = "sql-ticket";
final Consumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
consumer.subscribe(Arrays.asList(topic));

try {
  while (true) {
    ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
    for (ConsumerRecord<String, GenericRecord> record : records) {
      System.out.printf("value = %s \n", record.value().get("Price"));
    }
  }
} finally {
  consumer.close();
}

}

enter image description here

Ваш потребительский код использует старый, устаревший Consumer API ... Откуда вы взяли этот код?

OneCricketeer 14.09.2018 00:04

@ cricket_007 Я использую confluent 4.0.0 и пример java в этой версии docs.confluent.io/4.0.0/schema-registry/docs/…, если я использую последнюю версию, решит ли это проблему?

Mahmoud Elbably 14.09.2018 07:53

Что ж, этот раздел документации устарел ... Я могу сообщить о проблеме и поработать над ее обновлением, но, пожалуйста, посмотрите это. В частности, Zookeeper больше не нужен клиенту для взаимодействия с Kafka confluent.io/blog/….

OneCricketeer 14.09.2018 08:46

В противном случае попробуйте avroRecord.get("Price").getFloat())

OneCricketeer 14.09.2018 08:50

@ cricket_007, хорошо, теперь я использую confluent 5.0.0, и я обновил код из этого документа docs.confluent.io/current/schema-registry/docs/…, но получаю те же результаты, также говорится, что нет метода getFloat()

Mahmoud Elbably 14.09.2018 11:27

Я получил этот метод из javadoc heapbytebuffer

OneCricketeer 14.09.2018 20:34

Это может помочь stackoverflow.com/questions/26676733/…

OneCricketeer 14.09.2018 20:41

@ cricket_007 пробовал это, но вывод бесполезен, какая-то случайная строка. есть ли другой способ обойти это? эта проблема очень раздражает, и только этот тип данных (десятичный) останавливает весь процесс!

Mahmoud Elbably 15.09.2018 02:28

1) Я не использовал исходный соединитель JDBC, поэтому, к сожалению, не знаю решения для вас. Должен быть какой-то метод HeapByteBuffer, который принимает байты в нужный вам тип. 2) Я обычно использую типы avro SpecificRecord, созданные с помощью плагина Avro Maven, а не обычные GenericRecords.

OneCricketeer 15.09.2018 08:36
1
9
990
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Хорошо, я наконец нашел решение.

Heapbytebuffer необходимо преобразовать в массив byte[], затем я использовал BigInteger, который конструирует значение из созданного байтового массива, затем я создал переменную BigDecimal, которая принимает значение BigInteger, и я установил десятичную точку с movePointLeft(4), которая является шкалой (в моем случае: 4), и все заработало, как ожидалось.

    ByteBuffer buf = (ByteBuffer) record.value().get(("Price"));
    byte[] arr = new byte[buf.remaining()];
    buf.get(arr);
    BigInteger bi =new BigInteger(1,arr);
    BigDecimal bd = new BigDecimal(bi).movePointLeft(4);
    System.out.println(bd);

Вот результаты (слева - результат, справа - MySQL): -

enter image description here

Другие вопросы по теме