Невозможно десериализовать данные avro с помощью прямого потока kafka в потоковой передаче искр

Я использовал Kafka Avro Producer для записи данных avro в тему kafka, и теперь я хочу прочитать эти данные с помощью потоковой передачи искр. Я использовал API DirectKafkaStream для чтения данных avro, но он не работает с ошибкой.

Это мой искровый потоковый код, который читает данные avro:

    public static void main(String[] args) throws InterruptedException {

    SparkConf sparkConf = new SparkConf().setAppName("Kafka Streaming").setMaster("local[*]");
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));

    Set<String> topicsSet = new HashSet(Arrays.asList("test_topic"));
    Map<String, String> kafkaParams = new HashMap();
    //kafkaParams.put("metadata.broker.list", "localhost:9092");

    kafkaParams.put("bootstrap.servers", "localhost:9092");
    kafkaParams.put("key.deserializer", StringDeserializer.class.getName());
    kafkaParams.put("value.deserializer", KafkaAvroDeserializer.class.getName());
    kafkaParams.put("schema.registry.url", "http://localhost:7788");
    kafkaParams.put("specific.avro.reader", "true");

    // Create direct kafka stream with brokers and topics
    JavaPairInputDStream<String, Customer> directKafkaStream = KafkaUtils.createDirectStream(
        jssc,
        String.class,
        Customer.class,
        KafkaAvroDecoder.class,
        KafkaAvroDecoder.class,
        kafkaParams,
        topicsSet
    );

    directKafkaStream.print();
    jssc.start();
    jssc.awaitTermination();}

Здесь Customer - это класс моей схемы, который я использовал при записи данных avro в test_topic с помощью Kafka Avro Producer.

Когда я запускаю этот код, он выдает ошибку:

Error:(38, 78) java: no suitable method found for createDirectStream(org.apache.spark.streaming.api.java.JavaStreamingContext,java.lang.Class<java.lang.String>,java.lang.Class<com.example.Customer>,java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDecoder>,java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDecoder>,java.util.Map<java.lang.String,java.lang.String>,java.util.Set<java.lang.String>)
method org.apache.spark.streaming.kafka.KafkaUtils.<K,V,KD,VD>createDirectStream(org.apache.spark.streaming.api.java.JavaStreamingContext,java.lang.Class<K>,java.lang.Class<V>,java.lang.Class<KD>,java.lang.Class<VD>,java.util.Map<java.lang.String,java.lang.String>,java.util.Set<java.lang.String>) is not applicable
  (inferred type does not conform to equality constraint(s)
    inferred: java.lang.Object
    equality constraints(s): java.lang.Object,java.lang.String)
method org.apache.spark.streaming.kafka.KafkaUtils.<K,V,KD,VD,R>createDirectStream(org.apache.spark.streaming.api.java.JavaStreamingContext,java.lang.Class<K>,java.lang.Class<V>,java.lang.Class<KD>,java.lang.Class<VD>,java.lang.Class<R>,java.util.Map<java.lang.String,java.lang.String>,java.util.Map<kafka.common.TopicAndPartition,java.lang.Long>,org.apache.spark.api.java.function.Function<kafka.message.MessageAndMetadata<K,V>,R>) is not applicable
  (cannot infer type-variable(s) K,V,KD,VD,R
    (actual and formal argument lists differ in length))
method org.apache.spark.streaming.kafka.KafkaUtils.<K,V,KD,VD>createDirectStream(org.apache.spark.streaming.StreamingContext,scala.collection.immutable.Map<java.lang.String,java.lang.String>,scala.collection.immutable.Set<java.lang.String>,scala.reflect.ClassTag<K>,scala.reflect.ClassTag<V>,scala.reflect.ClassTag<KD>,scala.reflect.ClassTag<VD>) is not applicable
  (cannot infer type-variable(s) K,V,KD,VD
    (argument mismatch; org.apache.spark.streaming.api.java.JavaStreamingContext cannot be converted to org.apache.spark.streaming.StreamingContext))
method org.apache.spark.streaming.kafka.KafkaUtils.<K,V,KD,VD,R>createDirectStream(org.apache.spark.streaming.StreamingContext,scala.collection.immutable.Map<java.lang.String,java.lang.String>,scala.collection.immutable.Map<kafka.common.TopicAndPartition,java.lang.Object>,scala.Function1<kafka.message.MessageAndMetadata<K,V>,R>,scala.reflect.ClassTag<K>,scala.reflect.ClassTag<V>,scala.reflect.ClassTag<KD>,scala.reflect.ClassTag<VD>,scala.reflect.ClassTag<R>) is not applicable
  (cannot infer type-variable(s) K,V,KD,VD,R
    (actual and formal argument lists differ in length))

Тот же код работает, когда я использую StringDecoder, а не KafkaAvroDecoder, но полученные данные находятся в нечитаемом формате (какие-то сумасшедшие символы). Но я хочу, чтобы данные декодировались при их получении, чтобы я мог понять данные. Пожалуйста, предложите правильный способ сделать это.

Вы пишете этот код в среде IDE? Это похоже на ошибки компиляции ... Эти «сумасшедшие символы» из StringDecoder являются значениями Avro. Непонятно, откуда вы получаете KafkaAvroDecoder, но Confluent не предоставляет никаких классов Spark Decoder для Avro. Взгляните на github.com/AbsaOSS/ABRiS

OneCricketeer 31.10.2018 08:15

Да, я пишу этот код в среде IDE. Он не показывает мне никаких ошибок компиляции. Я использовал KafkaAvroDecoder, потому что думал, что, возможно, данные будут декодированы правильно, но это не работает.

Divya 01.11.2018 07:41

Я бы предложил сначала попробовать вышеуказанное репо, если вы не хотите его использовать, затем попробуйте использовать StringDecoder для ключа, потому что это не Avro, а затем используйте KafkaAvroDeserializer, а не Avro Decoder

OneCricketeer 01.11.2018 15:34
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
3
497
0

Другие вопросы по теме