В чем разница между реализацией Deserializer и Serde в Kafka Consumer API?

Я пытаюсь смоделировать проект kafka-clickstream-enrich kafka-stream Гвен (Чен) Шапиры на GitHub (https://github.com/onurtokat/kafka-clickstream-enrich). Когда я использую тему, используя потребительский класс с помощью десериализаторов, я сталкиваюсь с ошибкой. Настраиваемый класс Serde имеет сериализатор и десериализатор. Но я пытаюсь понять, почему для десериализатора используется пользовательский serde, тогда потребительский API выдает ошибку, поскольку это не экземпляр org.apache.kafka.common.serialization.Deserializer

Тему можно использовать с помощью KTable с сериализатором Serdes.Integer() и новым десериализатором ProfileSerde(), как показано ниже.

KTable<Integer, UserProfile> profiles = builder.table(Constants.USER_PROFILE_TOPIC,
                Consumed.with(Serdes.Integer(), new ProfileSerde()),
                Materialized.as("profile-store"));

Индивидуальный Serde определяется как;

static public final class ProfileSerde extends WrapperSerde<UserProfile> {
        public ProfileSerde() {
            super(new JsonSerializer<UserProfile>(), new JsonDeserializer<UserProfile>(UserProfile.class));
        }
    }

И общий Serde настраивается, как показано ниже;

package com.onurtokat.serde;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;


public class WrapperSerde<T> implements Serde<T> {

    final private Serializer<T> serializer;
    final private Deserializer<T> deserializer;

    public WrapperSerde(Serializer<T> serializer, Deserializer<T> deserializer) {
        this.serializer = serializer;
        this.deserializer = deserializer;
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        serializer.configure(configs, isKey);
        deserializer.configure(configs, isKey);
    }

    @Override
    public void close() {
        serializer.close();
        deserializer.close();
    }

    @Override
    public Serializer<T> serializer() {
        return serializer;
    }

    @Override
    public Deserializer<T> deserializer() {
        return deserializer;
    }
}

Мой потребитель настолько прост, что его можно увидеть ниже;

package com.onurtokat.consumers;

import com.onurtokat.ClickstreamEnrichment;
import com.onurtokat.Constants;
import com.onurtokat.model.UserProfile;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumeProfileData {

    public static void main(String[] args) {
        //prepare config
        Properties config = new Properties();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ClickstreamEnrichment.ProfileSerde.class);

        KafkaConsumer<Integer, UserProfile> consumerProfileTopic = new KafkaConsumer<>(config);
        consumerProfileTopic.subscribe(Arrays.asList(Constants.USER_PROFILE_TOPIC));
        while (true) {
            ConsumerRecords<Integer, UserProfile> records = consumerProfileTopic.poll(Duration.ofMillis(100));
            for (ConsumerRecord<Integer, UserProfile> record : records) {
                System.out.println(record.key() + " " + record.value());
            }
        }
    }
}

Ошибка, когда я пытаюсь использовать тему с моим потребителем:

log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.consumer.ConsumerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:811)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:659)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:639)
    at com.onurtokat.consumers.ConsumeProfileData.main(ConsumeProfileData.java:25)
Caused by: org.apache.kafka.common.KafkaException: com.onurtokat.ClickstreamEnrichment$ProfileSerde is not an instance of org.apache.kafka.common.serialization.Deserializer
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:712)
    ... 3 more
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
2
0
2 406
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Кажется, вы неправильно поняли:

The topic can be consumed using KTable with Serdes.Integer() Serializer and new ProfileSerde() Deserializer like below.

вы должны предоставить Consumed.with() KeySerde и ValueSerde.

Относительно исключения:

это довольно ясно - вы должны установить реализацию Десериализатор (не Серде)

config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, //here);

Я полагаю, вы должны получить десериализацию таким образом Clickstream Enrichment.Profile Serde.deserializer()

dmkvl 24.05.2019 17:34

ты прав. Я должен реализовать десериализатор вместо использования serde. Спасибо за ваш ответ.

Onur Tokat 04.06.2019 14:38
Ответ принят как подходящий

Разница в следующем:

  • Сердес используются API Kafka Streams (он же Kafka Streams). Serde — это оболочка для пары (1) сериализатор и (2) десериализатор для одного и того же типа данных — см. следующие два пункта. То есть у Serde<T> есть Serializer<T> и Deserializer<T>. Первый фрагмент кода, который вы опубликовали (например, с KTable), является фрагментом кода Kafka Streams, поэтому ему нужен Serde. Kafka Streams нуждается в Serde, потому что он создает сообщения (для чего ему нужен Serializer) и читает сообщения (для чего ему нужен Deserializer).
  • Десериализаторы используются потребительским API Kafka (также известным как потребительский клиент) для сообщений чтение. Ваш последний фрагмент кода (например, с KafkaConsumer) использует потребительский клиент и поэтому нуждается в Deserializer, а не Serde.
  • Сериализаторы используются API производителя Kafka (он же клиент производителя) для сообщений пишу.

Касательно:

Caused by: org.apache.kafka.common.KafkaException: com.onurtokat.ClickstreamEnrichment$ProfileSerde is not an instance of org.apache.kafka.common.serialization.Deserializer
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:712)
    ... 3 more

Код вашего потребительского клиента Kafka получил Serde вместо ожидаемого Deserializer.

Спасибо, Майкл Г. Нолл. Хорошее объяснение.

Onur Tokat 04.06.2019 14:37

Действительно отличное объяснение.

Tatenda Zifudzi 07.07.2021 22:37

Другие вопросы по теме