Сбой десериализации сообщения Spring kafka, поскольку для типа содержимого установлено значение application/json

Я отправляю сообщение Кафке следующим образом:

private KafkaTemplate<String, MyMessage > kafkaTemplate;

public void sendMessage(MyMessage data) {

    Message<MyMessage > message = MessageBuilder
            .withPayload(data)
            .setHeader(KafkaHeaders.TOPIC,topic)
            .setHeader(KafkaHeaders.MESSAGE_KEY,data.getKey())
            .build();
    kafkaTemplate.send(message);

У меня есть собственный де/сериализатор для MyMessage, ключ представляет собой простую строку

Я пытаюсь обработать сообщение следующим образом:

@StreamListener
    public void process(@Input("input") KStream<String,MyMessage> myStream){
            final Serde<String> stringSerde = Serdes.String();
            final MyMessageSerde myMessageSerde = new MyMessageSerde();
            myStream
                    .groupBy((key,value)-> value.getObjectKey(), Serialized.with(stringSerde,myMessageSerde))
                    .aggregate(ArrayList::new,
                            (newKey,val,agg) -> {
                                agg.add(val);
                                return agg;
                            },
                            Materialized.<String, ArrayList<MyMessage>, KeyValueStore<Bytes, byte[]>>as("object-keys")
                                    .withKeySerde(stringSerde)
                                    .withValueSerde(new ArrayListSerde(myMessageSerde)));

    ...
    interface MyStreamProcessor {
        @Input("input")
        KStream<?, ?> input();
    }

Это не удается с ошибкой десериализации, и после отладки я вижу, что причина в том, что заголовок сообщения типа содержимого установлен на application/json, и поэтому он пытается десериализовать сообщение как JSON. Как устанавливается заголовок типа содержимого и как я могу его переопределить? Я не отправляю сообщения в виде JSON, а скорее в виде byte[], как обычные сообщения Kafka, поэтому я хочу, чтобы использовался мой пользовательский де/сериализатор.

Что еще более странно, так это то, что если я поменяю это на KTable, он будет работать нормально.

@StreamListener
public void process(@Input("input") KTable<String,MyMessage> myTable){
        final Serde<String> stringSerde = Serdes.String();
        final MyMessageSerde myMessageSerde = new MyMessageSerde();
        KStream<String,MyMessage> myStream = myTable.toStream();
        myStream
                .groupBy((key,value)-> value.getObjectKey(), Serialized.with(stringSerde,myMessageSerde))
                .aggregate(ArrayList::new,
                        (newKey,val,agg) -> {
                            agg.add(val);
                            return agg;
                        },
                        Materialized.<String, ArrayList<MyMessage>, KeyValueStore<Bytes, byte[]>>as("object-keys")
                                .withKeySerde(stringSerde)
                                .withValueSerde(new ArrayListSerde(myMessageSerde)));

...
interface MyStreamProcessor {
    @Input("input")
    KTable<?, ?> input();
}

Почему с KTable сообщение десериализуется правильно? В чем разница в поведении между KStream и KTable?

Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
1
0
1 375
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Это работает, потому что вы указываете свою пользовательскую (де)сериализацию в методе .group. Взгляните на часть документации это и это, чтобы узнать, как указать пользовательское значение serde.

Я добавил в свой файл application.yml. Я все еще получаю то же исключение, когда он пытается разобрать JSON. spring.cloud.stream.bindings.input.nativeDecoding: true spring.cloud.stream.kafka.streams.binder.configuration.defau‌​lt.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde spring.cloud.stream.kafka.streams.binder.configuration.defau‌​lt.value.serde: com.MyMessageSerde

mbluke 17.07.2019 09:45

Пользовательская десериализация указывается в групповом методе независимо от того, начинается ли она как KTable или KStream. Почему он работает, когда запускается как KTable, и не работает, когда запускается как KStream?

mbluke 17.07.2019 10:06
Ответ принят как подходящий

Я заставил это работать, выполнив следующие действия:

  1. установите заголовок типа входного контента в моем файле application.yml следующим образом: spring.cloud.stream.bindings.input.content-type: application/mymessage

  2. Создайте CustomMessageConverter, который расширяет AbstractMessageConverter для преобразования из byte[] в MyMessage. Он также определяет новый MimeType

public MyMappingConverter() { super(new MimeType("application", "mymessage")); }

  1. Создайте bean-компонент MessageConverter

@Bean @StreamMessageConverter public MessageConverter myMessageConverter() { return new MyMessageConverter(); }

Я до сих пор понятия не имею, почему мне пришлось делать все это для KStream, а не для KTable, если кто-нибудь может объяснить, что это будет оценено.

Другие вопросы по теме