Сериализатор и десериализатор Kafka Avro

Я хочу реализовать общий сериализатор / десериализатор Kafka на основе Avro. Он должен быть общего назначения без использования объектов. Требуется использовать что-то вроде GenericRecord.get ("myValue").

Это код моего сериализатора

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;

import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

public class AvroSerializer implements Serializer<GenericRecord> {

  public void configure(Map<String, ?> configs, boolean isKey) {

  }

  public byte[] serialize(String topic, GenericRecord data) {
    try {
      byte[] result = null;

      if (data != null) {

        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        BinaryEncoder binaryEncoder =
            EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);

        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(data.getSchema());
        datumWriter.write(data, binaryEncoder);

        binaryEncoder.flush();
        byteArrayOutputStream.close();

        result = byteArrayOutputStream.toByteArray();
      }
      return result;
    } catch (IOException ex) {
      throw new SerializationException(
          "Can't serialize data='" + data + "' for topic='" + topic + "'", ex);
    }
  }

  public void close() {

  }

}

а это десериализатор

import java.util.Arrays;
import java.util.Map;

import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

public class AvroDeserializer implements Deserializer<GenericRecord> {

  @Override
  public void close() {

  }

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {

  }

  @Override
  public GenericRecord deserialize(String topic, byte[] data) {
    try {
      GenericRecord result = null;

      if (data != null) {
        DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
        Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
        result = reader.read(null, decoder);
      }
      return result;
     } catch (Exception ex) {
      throw new SerializationException(
      "Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
    }
  }

}

К сожалению, я получаю SerializationException, когда сообщение Kafka десериализуется.

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition my.topic-0 at offset 7
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data '[72, -31, 122, 20, -82, 7, 89, 64]' from topic 'my.topic'
Caused by: java.lang.NullPointerException: writer cannot be null!
at org.apache.avro.io.ResolvingDecoder.resolve(ResolvingDecoder.java:80)
at org.apache.avro.io.ResolvingDecoder.<init>(ResolvingDecoder.java:49)
at org.apache.avro.io.DecoderFactory.resolvingDecoder(DecoderFactory.java:307)
at org.apache.avro.generic.GenericDatumReader.getResolver(GenericDatumReader.java:128)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:143)
at com.mydomain.serialization.AvroDeserializer.deserialize(AvroDeserializer.java:34)
at com.mydomain.serialization.AvroDeserializer.deserialize(AvroDeserializer.java:1) 

Использование реестра схем - вариант для вас? Если вы используете SR Kafka, Avro Ser / Des - из коробки.

dbustosp 17.08.2018 14:31

Вы используете SchemaRegistry для схем?

Nishu Tayal 17.08.2018 14:51

Нет, я еще не пробовал реестр схем

filip_j 17.08.2018 14:59

Посмотрите на aseigneurin.github.io/2018/08/02/… Там есть еще один блог, который не использует реестр

OneCricketeer 17.08.2018 16:48
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
4
2 040
0

Другие вопросы по теме