Десериализовать сообщения Kafka AVRO с помощью Apache Beam

Основная цель — объединить две темы Kafka, одну — сжатые медленно движущиеся данные, а другую — быстро движущиеся данные, которые поступают каждую секунду.

Я смог использовать сообщения в простых сценариях, таких как KV (Long, String), используя что-то вроде:

PCollection<KV<Long,String>> input = p.apply(KafkaIO.<Long, 
String>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
  
PCollection<String> output = input.apply(Values.<String>create());

Но это не тот подход, когда вам нужно десериализоваться из AVRO. У меня есть KV(STRING, AVRO), который мне нужно использовать.

Я попытался сгенерировать классы Java из схемы AVRO, а затем включить их в «применить», например:

PCollection<MyClass> output = input.apply(Values.<MyClass>create());

Но это не казалось правильным подходом.

Есть ли какая-либо документация/примеры, на которые кто-нибудь мог бы мне указать, чтобы я мог понять, как вы будете работать с Kafka AVRO и Beam?

Я обновил свой код:

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.LongDeserializer;

public class Main {

public static void main(String[] args) {

    PipelineOptions options = PipelineOptionsFactory.create();

    Pipeline p = Pipeline.create(options);

    PCollection<KV<Long, Myclass>> input = p.apply(KafkaIO.<Long, String>read()
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(Myclass.class))
    );

    p.run();

}
}

import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;

@DefaultCoder(AvroCoder.class)
public class Myclass{
String name;
String age;

Myclass(){}
Myclass(String n, String a) {
    this.name= n;
    this.age= a;
}
}

Но теперь я получаю следующую ошибку

incompatible types: java.lang.Class < io.confluent.kafka.serializers.KafkaAvroDeserializer > cannot be converted to java.lang.Class < ? extends org.apache.kafka.common.serialization.Deserializer < java.lang.String > >

Я должен импортировать неправильные сериализаторы?

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
3
0
2 303
5

Ответы 5

Вы можете использовать KafkaAvroDeserializer следующим образом:

PCollection<KV<Long,MyClass>> input = p.apply(KafkaIO.<Long, String>read()
.withKeyDeserializer(LongDeserializer.class)
  .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(MyClass.class))

Где Мои занятия — это схема Avro, сгенерированная классом POJO.

Убедитесь, что ваш класс POJO имеет аннотацию AvroCoder, как показано в примере ниже:

@DefaultCoder(AvroCoder.class)
   public class MyClass{
      String name;
      String age;

      MyClass(){}
      MyClass(String n, String a) {
         this.name= n;
         this.age= a;
      }
  }

Если класс KafkaAvroDeserializer.class из io.confluent.kafka.serializers.KafkaAvroDeserializer? Это то, что я сейчас использую, но выдает ошибку, так как ожидает десериализатор от org.apache.kafka.common.serialization.Deserializer

Chimmy 19.02.2019 09:32

Да, это из пакета Confluent. Какая у вас ошибка? Можете ли вы вставить трассировку стека ошибок?

Nishu Tayal 19.02.2019 09:54

Извините, я должен был быть более ясным, я получаю ошибку компиляции, а именно: Ошибка: (47, 69) java: несовместимые типы: java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDese‌​rializer> не может быть преобразован в java.lang.Class<? расширяет org.apache.kafka.common.serialization.Deserializer<my_test_c‌​девушка>>. AvroCode взят из org.apache.beam.sdk.coders.AvroCoder.

Chimmy 19.02.2019 10:42

Вы должны аннотировать свой класс POJO с помощью AvroCoder (как указано в ответе). Вы сделали это?

Nishu Tayal 19.02.2019 11:43

спасибо, что ответили мне, я обновил свой пост с предложенными вами изменениями. Но я все еще получаю ту же ошибку, я включил свой импорт, так как предполагаю, что это как-то связано с ними.

Chimmy 01.03.2019 15:09

Я сталкиваюсь с той же ошибкой. Это решение не работает. Я получил эту ошибку: Ошибка компиляции [ОШИБКА] /Users/01087872/Documents/fr-det-avro-sample/src/main/java/e‌​xamples/MyClassConsu‌​mer.java:[19,17] несовместимые типы: вывод переменная T имеет несовместимые ограничения равенства java.lang.Object,examples.MyClass

Yohei Onishi 30.04.2019 08:01

Классы Avro POJO обычно генерируются, например, из avro-maven-plugin. Как добавить к ним аннотации?

OneCricketeer 20.01.2022 17:30

Замените KafkaIO.<Long, String>read() на KafkaIO.<Long, Object>read().

Если вы посмотрите на реализацию KafkaAvroDeserializer, она реализует Deserializer<Object>:

public class KafkaAvroDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer<Object>

Я столкнулся с той же ошибкой, и это также не решает проблему. Я получил эту ошибку: Ошибка компиляции [ОШИБКА] /Users/01087872/Documents/fr-det-avro-sample/src/main/java/e‌​xamples/MyClassConsu‌​mer.java:[19,17] несовместимые типы: вывод переменная T имеет несовместимые ограничения равенства java.lang.Object,examples.MyClass

Yohei Onishi 30.04.2019 08:02

Я столкнулся с той же проблемой. Нашел решение в этом mail-архиве. http://mail-archives.apache.org/mod_mbox/beam-user/201710.mbox/%3CCAMsy_NiVrT_9_xfxOtK1inHxb=x_yAdBcBN+4aquu_hn0GJ0nA@mail.gmail.com%3E

В вашем случае вам нужно определить свой собственный Deserializer<MyClass>, который может простираться от AbstractKafkaAvroDeserializer следующим образом.

public class MyClassKafkaAvroDeserializer extends
  AbstractKafkaAvroDeserializer implements Deserializer<MyClass> {
  
  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
      configure(new KafkaAvroDeserializerConfig(configs));
  }

  @Override
  public MyClass deserialize(String s, byte[] bytes) {
      return (MyClass) this.deserialize(bytes);
  }

  @Override
  public void close() {} }

Затем укажите свой КафкаАвроДесериализатор как ValueDeserializer.

p.apply(KafkaIO.<Long, MyClass>read()
 .withKeyDeserializer(LongDeserializer.class)
 .withValueDeserializer(MyClassKafkaAvroDeserializer.class) );

Я обнаружил, что расширение от AbstractKafkaAvroDeserializer здесь не имеет значения, если вы просто реализуете интерфейс, создаете внутреннее поле для конкретного экземпляра KafkaAvroDeserializer и делегируете ему полномочия. В любом случае, вы должны иметь возможность удалить переопределения метода configure и close, если вместо этого вы расширите KafkaAvroDeserialzer

OneCricketeer 20.01.2022 17:26

Ответ Йохея хорош, но я также обнаружил, что это работает

import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;

...

public static class CustomKafkaAvroDeserializer extends SpecificAvroDeserializer<MyCustomClass> {}

...
.withValueDeserializerAndCoder(CustomKafkaAvroDeserializer.class, AvroCoder.of(MyCustomClass.class))
...

где MyCustomClass — код, созданный с помощью инструментов Avro.

Сегодня у меня была аналогичная проблема, и я наткнулся на следующий пример, который решил ее для меня.

https://github.com/andrewrjones/debezium-kafka-beam-example/blob/master/src/main/java/com/andrewjones/KafkaAvroConsumerExample.java

недостающей частью для меня был (Class)KafkaAvroDeserializer

KafkaIO.<String, MyClass>read()
        .withBootstrapServers("kafka:9092")
        .withTopic("dbserver1.inventory.customers")
        .withKeyDeserializer(StringDeserializer.class)
        .withValueDeserializerAndCoder((Class)KafkaAvroDeserializer.class, AvroCoder.of(MyClass.class))
.class уже должен возвращать объект класса, поэтому приведение не кажется необходимым...
OneCricketeer 20.01.2022 17:29

Другие вопросы по теме