Основная цель — объединить две темы Kafka, одну — сжатые медленно движущиеся данные, а другую — быстро движущиеся данные, которые поступают каждую секунду.
Я смог использовать сообщения в простых сценариях, таких как KV (Long, String), используя что-то вроде:
PCollection<KV<Long,String>> input = p.apply(KafkaIO.<Long,
String>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
PCollection<String> output = input.apply(Values.<String>create());
Но это не тот подход, когда вам нужно десериализоваться из AVRO. У меня есть KV(STRING, AVRO), который мне нужно использовать.
Я попытался сгенерировать классы Java из схемы AVRO, а затем включить их в «применить», например:
PCollection<MyClass> output = input.apply(Values.<MyClass>create());
Но это не казалось правильным подходом.
Есть ли какая-либо документация/примеры, на которые кто-нибудь мог бы мне указать, чтобы я мог понять, как вы будете работать с Kafka AVRO и Beam?
Я обновил свой код:
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.LongDeserializer;
public class Main {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<KV<Long, Myclass>> input = p.apply(KafkaIO.<Long, String>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(Myclass.class))
);
p.run();
}
}
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
@DefaultCoder(AvroCoder.class)
public class Myclass{
String name;
String age;
Myclass(){}
Myclass(String n, String a) {
this.name= n;
this.age= a;
}
}
Но теперь я получаю следующую ошибку
incompatible types: java.lang.Class < io.confluent.kafka.serializers.KafkaAvroDeserializer > cannot be converted to java.lang.Class < ? extends org.apache.kafka.common.serialization.Deserializer < java.lang.String > >
Я должен импортировать неправильные сериализаторы?




Вы можете использовать KafkaAvroDeserializer следующим образом:
PCollection<KV<Long,MyClass>> input = p.apply(KafkaIO.<Long, String>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(MyClass.class))
Где Мои занятия — это схема Avro, сгенерированная классом POJO.
Убедитесь, что ваш класс POJO имеет аннотацию AvroCoder, как показано в примере ниже:
@DefaultCoder(AvroCoder.class)
public class MyClass{
String name;
String age;
MyClass(){}
MyClass(String n, String a) {
this.name= n;
this.age= a;
}
}
Да, это из пакета Confluent. Какая у вас ошибка? Можете ли вы вставить трассировку стека ошибок?
Извините, я должен был быть более ясным, я получаю ошибку компиляции, а именно: Ошибка: (47, 69) java: несовместимые типы: java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDeserializer> не может быть преобразован в java.lang.Class<? расширяет org.apache.kafka.common.serialization.Deserializer<my_test_cдевушка>>. AvroCode взят из org.apache.beam.sdk.coders.AvroCoder.
Вы должны аннотировать свой класс POJO с помощью AvroCoder (как указано в ответе). Вы сделали это?
спасибо, что ответили мне, я обновил свой пост с предложенными вами изменениями. Но я все еще получаю ту же ошибку, я включил свой импорт, так как предполагаю, что это как-то связано с ними.
Я сталкиваюсь с той же ошибкой. Это решение не работает. Я получил эту ошибку: Ошибка компиляции [ОШИБКА] /Users/01087872/Documents/fr-det-avro-sample/src/main/java/examples/MyClassConsumer.java:[19,17] несовместимые типы: вывод переменная T имеет несовместимые ограничения равенства java.lang.Object,examples.MyClass
Классы Avro POJO обычно генерируются, например, из avro-maven-plugin. Как добавить к ним аннотации?
Замените KafkaIO.<Long, String>read() на KafkaIO.<Long, Object>read().
Если вы посмотрите на реализацию KafkaAvroDeserializer, она реализует Deserializer<Object>:
public class KafkaAvroDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer<Object>
Я столкнулся с той же ошибкой, и это также не решает проблему. Я получил эту ошибку: Ошибка компиляции [ОШИБКА] /Users/01087872/Documents/fr-det-avro-sample/src/main/java/examples/MyClassConsumer.java:[19,17] несовместимые типы: вывод переменная T имеет несовместимые ограничения равенства java.lang.Object,examples.MyClass
Я столкнулся с той же проблемой. Нашел решение в этом mail-архиве. http://mail-archives.apache.org/mod_mbox/beam-user/201710.mbox/%3CCAMsy_NiVrT_9_xfxOtK1inHxb=x_yAdBcBN+4aquu_hn0GJ0nA@mail.gmail.com%3E
В вашем случае вам нужно определить свой собственный Deserializer<MyClass>, который может простираться от AbstractKafkaAvroDeserializer следующим образом.
public class MyClassKafkaAvroDeserializer extends
AbstractKafkaAvroDeserializer implements Deserializer<MyClass> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
configure(new KafkaAvroDeserializerConfig(configs));
}
@Override
public MyClass deserialize(String s, byte[] bytes) {
return (MyClass) this.deserialize(bytes);
}
@Override
public void close() {} }
Затем укажите свой КафкаАвроДесериализатор как ValueDeserializer.
p.apply(KafkaIO.<Long, MyClass>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(MyClassKafkaAvroDeserializer.class) );
Я обнаружил, что расширение от AbstractKafkaAvroDeserializer здесь не имеет значения, если вы просто реализуете интерфейс, создаете внутреннее поле для конкретного экземпляра KafkaAvroDeserializer и делегируете ему полномочия. В любом случае, вы должны иметь возможность удалить переопределения метода configure и close, если вместо этого вы расширите KafkaAvroDeserialzer
Ответ Йохея хорош, но я также обнаружил, что это работает
import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;
...
public static class CustomKafkaAvroDeserializer extends SpecificAvroDeserializer<MyCustomClass> {}
...
.withValueDeserializerAndCoder(CustomKafkaAvroDeserializer.class, AvroCoder.of(MyCustomClass.class))
...
где MyCustomClass — код, созданный с помощью инструментов Avro.
Сегодня у меня была аналогичная проблема, и я наткнулся на следующий пример, который решил ее для меня.
недостающей частью для меня был (Class)KafkaAvroDeserializer
KafkaIO.<String, MyClass>read()
.withBootstrapServers("kafka:9092")
.withTopic("dbserver1.inventory.customers")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder((Class)KafkaAvroDeserializer.class, AvroCoder.of(MyClass.class))
.class уже должен возвращать объект класса, поэтому приведение не кажется необходимым...
Если класс KafkaAvroDeserializer.class из io.confluent.kafka.serializers.KafkaAvroDeserializer? Это то, что я сейчас использую, но выдает ошибку, так как ожидает десериализатор от org.apache.kafka.common.serialization.Deserializer