Spring kafka Нет информации о типе в заголовках и не указан тип по умолчанию

У меня есть весеннее загрузочное приложение, которое определяет:

  • контроллер REST, который пишет в тему kafka, STREAM_TOPIC_IN_QQQ
  • KafkaListener, который читает из STREAM_TOPIC_IN_QQQ (groupId="bar") и регистрирует
  • KStream, который просматривает тему и регистрирует ее, преобразует ее в другой тип, а затем записывает в STREAM_TOPIC_OUT_QQQ.
  • другой KafkaListener, который читает из STREAM_TOPIC_OUT_QQQ.

(Я менял суффикс, чтобы избежать возможной путаницы, и создавал темы вручную, потому что в противном случае я получал предупреждение STREAM_TOPIC_IN_ххх=LEADER_NOT_AVAILABLE, и поток не запускался в течение минуты или около того.)

Первый слушатель и поток, кажется, работают, но когда слушатель в STREAM_OUT_TOPIC пытается десериализовать сообщение, я получаю исключение ниже. Я предоставляю серде в потоке с помощью Produced.with. Что мне нужно сделать, чтобы слушатель знал тип для десериализации?

Журнал

11 Mar 2019 14:34:00,194   DEBUG    [KafkaMessageController [] http-nio-8080-exec-1]   Sending a Kafka Message
11 Mar 2019 14:34:00,236   INFO     [KafkaConfig [] kafka9000-v0.1-b0a60795-0258-48d9-8c87-30fa9a97d7b8-StreamThread-1]   -------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: Hello, World!
11 Mar 2019 14:34:00,241   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1]   STREAM_IN_TOPIC Listener: ConsumerRecord: {}ConsumerRecord(topic = STREAM_TOPIC_IN_QQQ, partition = 0, offset = 0, CreateTime = 1552332840188, serialized key size = 1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = com.teramedica.kafakaex001web.model.Greeting@7b6c8fcc)
11 Mar 2019 14:34:00,243   INFO     [Metadata [] kafka-producer-network-thread | kafka9000-v0.1-b0a60795-0258-48d9-8c87-30fa9a97d7b8-StreamThread-1-producer]   Cluster ID: y48IEZaGQWKcWDVGf4mD6g
11 Mar 2019 14:34:00,367   ERROR    [LoggingErrorHandler [] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]   Error while processing: ConsumerRecord(topic = STREAM_TOPIC_OUT_QQQ, partition = 0, offset = 0, CreateTime = 1552332840188, serialized key size = 1, serialized value size = 48, headers = RecordHeaders(headers = [RecordHeader(key = springDeserializerExceptionValue, value = [ REDACTED ])], isReadOnly = false), key = 1, value = null)
org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is java.lang.IllegalStateException: No type information in headers and no default type provided
    at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2.deserializationException(ErrorHandlingDeserializer2.java:204) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]

Вот конфигурация:

ОТДЫХ (весна mvc):

@RequestMapping("/greeting")
public Greeting greeting(@RequestParam(value = "name", defaultValue = "World") String name) {
    Greeting gr = new Greeting(counter.incrementAndGet(), String.format(msgTemplate, name));
    this.kafkaTemplate.send(K9000Consts.STREAM_TOPIC_IN, "1", gr);
    logger.debug("Sending a Kafka Message");
    return gr;
}

Конфигурация Кафки (весна-кафка):

@Bean
public KStream<String, Greeting> kStream(StreamsBuilder kStreamBuilder) {
    KStream<String, Greeting> stream = kStreamBuilder.stream(K9000Consts.STREAM_TOPIC_IN);
    stream.peek((k, greeting) -> {
        logger.info("-------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: {}", greeting.getContent());
    })
          .map((k, v) -> new KeyValue<>(k, new GreetingResponse(v)))
          .to(K9000Consts.STREAM_TOPIC_OUT, Produced.with(stringSerde, new JsonSerde<>(GreetingResponse.class)));
    return stream;
}

@KafkaListener(topics = K9000Consts.STREAM_TOPIC_OUT, groupId = "oofda", errorHandler = "myTopicErrorHandler")
public void listenForGreetingResponse(ConsumerRecord<String, GreetingResponse> cr) throws Exception {
    logger.info("STREAM_OUT_TOPIC Listener : {}" + cr.toString());
}

@KafkaListener(topics = K9000Consts.STREAM_TOPIC_IN, groupId = "bar")
public void listenForGreetingResponses(ConsumerRecord<String, Greeting> cr) throws Exception {
    logger.info("STREAM_IN_TOPIC Listener: ConsumerRecord: {}" + cr.toString());
}

приложение.yml

spring:
kafka:
  bootstrap-servers:  localhost:9092
  consumer:
    group-id: foo
    auto-offset-reset: latest
    key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
    value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
    properties:
      spring.json.trusted.packages: com.teramedica.kafakaex001web.model
      spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
      spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
  producer:
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  streams:
    application-id: kafka9000-v0.1
    properties: # properties not explicitly handled by KafkaProperties.streams
      default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
      default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
      spring.json.trusted.packages: com.teramedica.kafakaex001web.model
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Версия Java на основе версии загрузки
Версия Java на основе версии загрузки
Если вы зайдете на официальный сайт Spring Boot , там представлен start.spring.io , который упрощает создание проектов Spring Boot, как показано ниже.
Документирование API с помощью Swagger на Springboot
Документирование API с помощью Swagger на Springboot
В предыдущей статье мы уже узнали, как создать Rest API с помощью Springboot и MySql .
8
0
34 236
6
Перейти к ответу Данный вопрос помечен как решенный

Ответы 6

Ответ принят как подходящий

См. документация.

Конкретно...

JsonDeserializer.VALUE_DEFAULT_TYPE: Fallback type for deserialization of values if no header information is present.

Это spring.json.value.default.type

Вы также можете установить spring.json.use.type.headers (по умолчанию true), чтобы даже не искать заголовки.

Десериализатор автоматически доверяет пакету типа по умолчанию, поэтому добавлять его туда не нужно.

РЕДАКТИРОВАТЬ

Однако см. также Преобразование сообщений Spring Messaging.

Используйте BytesDeserializer и BytesJsonMessageConverter, и платформа передаст тип параметра метода в качестве цели для преобразования.

Десериализатор автоматически доверяет пакету типа по умолчанию, поэтому нет необходимости добавлять его и туда.

Gary Russell 11.03.2019 21:40

Я должен был указать, что хочу Кроме как по умолчанию. Разве не принято хотеть слушать разные типы по разным темам? В любом случае, почему информация о заголовке отсутствует?

mconner 11.03.2019 22:40

Кроме того, я думаю, что нашел решение: у KafkaListener есть поле свойств, которое позволяет указывать свойства, которые заменяют свойства в фабрике-потребителе, поэтому я могу добавить следующее в аннотацию KafkaListener: properties = { ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG + ":com.teramedica.kafakaex001web.GreetingResponseDeserializer‌​" }, где GreetingResponseDeserializer расширяет JsonDeserializer и создает с super(GreetingResponse.class) It кажется, работает. Мне интересно, почему это не поддерживается напрямую в аннотации.

mconner 11.03.2019 22:43

Ах, хорошо, вместо JsonDeserializer используйте StringDeserializer (или BytesDeserializer) с StringJsonMessageConverter (или BytesJsonMessageConverter) (версии Bytes... более эффективны. Затем фреймворк сообщает преобразователю тип, найденный в методе слушателя. Опять же, см. документация,

Gary Russell 11.03.2019 22:53

Если автоконфигурация загрузки найдет конвертер @Bean, он автоматически подключит его к фабрике контейнеров.

Gary Russell 11.03.2019 22:58

Итак, я добавил @Bean RecordMessageConverter messageConverter() { return new StringJsonMessageConverter(); } и изменил десериализатор: spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer (я использую ErrorHandlingDeserializer2). Это сработало, но «сломало» слушателя, у которого в качестве параметра был ConsumerRecord<String, Greeting> record, в том смысле, что это не приветствие, а строка json. Что на самом деле имеет смысл, оглядываясь назад, поскольку десериализатор не анализирует json, а messageConverter. В любом случае, спасибо!

mconner 12.03.2019 22:10

Ах. Да, вы можете использовать Message<Greeting> или просто Greeting. Вы можете получить другие поля из ConsumerRecord. Например, "@Header(KafkaHeaders.OFFSET) длинное смещение", если они вам нужны (или message.getHeaders().get(...), если вы выберете первый вариант.

Gary Russell 12.03.2019 22:19

мне кажется, что решение BytesDeserializer, BytesJsonMessageConverter - это решение, которое большинство людей хотели бы использовать, поскольку использование заголовков для определения типа полезно только для производителей Spring или, по крайней мере, Java.

raven 14.08.2021 07:54

«Отвечая» на мой собственный вопрос в основном для консолидации информации в комментариях к @GaryRussell и от него, но в основном он дал лучший ответ. Вкратце я сделал следующее:

  • Установите потребительский десериализатор на StringDeserializer
  • Добавьте bean-компонент messageConverter как StringJsonMessageConverter.
  • В аннотированных методах KafkaListener просто используйте ожидаемый тип для полезной нагрузки.
  • При использовании ConsumerRecord в аннотированном методе KafaListener НЕ ожидайте, что он будет типа Payload. Теперь это будет String (поскольку это делает преобразователь сообщений, а не десериализатор).

Еще одна вещь: по умолчанию простое добавление messageConverter также добавляет его в автоматически настроенный шаблон kafkaTemplate при использовании автонастройки весенней загрузки. Это не кажется проблемой при вызове kafkaTemplate.send(K9000Consts.STREAM_TOPIC_IN, "1", greeting), хотя я думаю, что это может быть при использовании send(Message).

Ниже приведена рабочая конфигурация, в которой я получаю сообщения, как и ожидалось, с минимальной конфигурацией.

приложение.yml:

  spring:
    kafka:
      bootstrap-servers:  localhost:9092
      consumer:
        group-id: foo
        auto-offset-reset: latest
        key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
        value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
        properties:
          spring.json.trusted.packages: com.teramedica.kafakaex001web.model
          spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
          spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
      producer:
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      streams:
        application-id: kafka9000-v0.1
        properties: # properties not explicitly handled by KafkaProperties.streams
          default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
          default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
          spring.json.trusted.packages: com.teramedica.kafakaex001web.model

КафкаКонфиг:

        @Bean RecordMessageConverter messageConverter() {  return new StringJsonMessageConverter();  }

...
    @Bean
    public KStream<String, Greeting> kStream(StreamsBuilder kStreamBuilder) {
        KStream<String, Greeting> stream = kStreamBuilder.stream(K9000Consts.STREAM_TOPIC_IN);
        stream.peek((k, greeting) -> {
            logger.info("-------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: {}", greeting.getContent());
        })
              .map((k, v) -> new KeyValue<>(k, new GreetingResponse(v)))
              .to(K9000Consts.STREAM_TOPIC_OUT, Produced.with(stringSerde, new JsonSerde<>(GreetingResponse.class)));
        return stream;
    }

    @KafkaListener(topics = K9000Consts.STREAM_TOPIC_OUT, groupId = "oofda", errorHandler = "myTopicErrorHandler")
    public void listenForGreetingResponse(GreetingResponse gr) throws Exception {
    //    logger.info("STREAM_OUT_TOPIC Listener : {}" + cr.toString());
        logger.info("STREAM_OUT_TOPIC Listener : GreetingResponse is {}" + gr);
    }

    @KafkaListener(topics = K9000Consts.STREAM_TOPIC_IN, groupId = "bar")
    public void listenForGreetingResponses(@Payload Greeting gr,
            ConsumerRecord<String, String> record, // <--- NOTE: String, NOT Greeting
            @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
            @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) throws Exception {
        //logger.info("STREAM_IN_TOPIC Listener: ConsumerRecord: {}" + cr.toString());
        logger.info("STREAM_IN_TOPIC Listener:   Greeting: {}", gr.getContent());
        logger.info("STREAM_IN_TOPIC Listener:  From Headers: topic: {}, partition: {}, key: {}", topic, partition,
                    key);
        logger.info("STREAM_IN_TOPIC Listener:: From Record: topic: {}, parition: {}, key: {}",
                    record.topic(), record.partition(), record.key());
        logger.info("STREAM_IN_TOPIC Listener:: record value: {}, class: {}", record.value(), record.value().getClass() );
    }

    @Bean
    public KafkaListenerErrorHandler myTopicErrorHandler() {
        return (m, e) -> {
            logger.error("Got an error {}", e.getMessage());
            return "some info about the failure";
        };
    }

И вывод для сообщения:

13 Mar 2019 09:56:57,884   DEBUG    [KafkaMessageController [] http-nio-8080-exec-1]   Sending a Kafka Message
13 Mar 2019 09:56:57,913   INFO     [KafkaConfig [] kafka9000-v0.1-b0589cc5-2fab-4b72-81f7-b0d5488c7478-StreamThread-1]   -------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: Hello, World!
13 Mar 2019 09:56:57,919   INFO     [Metadata [] kafka-producer-network-thread | kafka9000-v0.1-b0589cc5-2fab-4b72-81f7-b0d5488c7478-StreamThread-1-producer]   Cluster ID: 8nREAmTCS0SZT-NzWsCacQ
13 Mar 2019 09:56:57,919   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1]   STREAM_IN_TOPIC Listener:   Greeting: Hello, World!
13 Mar 2019 09:56:57,920   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1]   STREAM_IN_TOPIC Listener:   Record: ConsumerRecord(topic = STREAM_TOPIC_IN_SSS, partition = 0, offset = 23, CreateTime = 1552489017878, serialized key size = 1, serialized value size = 34, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 116, 101, 114, 97, 109, 101, 100, 105, 99, 97, 46, 107, 97, 102, 97, 107, 97, 101, 120, 48, 48, 49, 119, 101, 98, 46, 109, 111, 100, 101, 108, 46, 71, 114, 101, 101, 116, 105, 110, 103])], isReadOnly = false), key = 1, value = {"id":1,"content":"Hello, World!"})
13 Mar 2019 09:56:57,920   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1]   STREAM_IN_TOPIC Listener:  From Headers: topic: STREAM_TOPIC_IN_SSS, partition: 0, key: 1
13 Mar 2019 09:56:57,920   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1]   STREAM_IN_TOPIC Listener:: From Record: topic: STREAM_TOPIC_IN_SSS, parition: 0, key: 1
13 Mar 2019 09:56:57,921   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1]   STREAM_IN_TOPIC Listener:: record value: {"id":1,"content":"Hello, World!"}, class: class java.lang.String
13 Mar 2019 09:56:58,030   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]   STREAM_OUT_TOPIC Listener : GreetingResponse id: 1000, response: Hello, World!, yourself

вы спасли мой день!

heroin 28.07.2020 15:18

Это не ответ; но может помочь людям, попадающим сюда из поисковых систем.

Если вы столкнулись с этим исключением при запуске приложения KafkaStreams.

  • Вы зарегистрировали jsonSerde во всех необходимых местах вашего DSL?
  • Вы предоставили jsonSerde при установке государственного хранилища?

Примечание 1. Убедитесь, что вы инициализировали jsonSerde, как описано ниже:

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.json.JsonSerializer;

Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
Serializer<JsonNode> jsonSerializer = new JsonSerializer();
Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

Примечание 2: самая распространенная ошибка

import org.springframework.kafka.support.serializer.JsonSerde;
new JsonSerde<JsonNode>(); // This is wrong

В моем случае проблема в том, что я уже опубликовал несколько разных типов сообщений в теме Кафки и получил это исключение.

Починить это.

Я создал новую тему и публиковал там сообщения. Затем запустил потребитель по этой теме, и все заработало нормально.

Так что я тоже столкнулся с той же проблемой.

Я исправил это так

Вы должны установить следующее свойство для класса, который вы пытаетесь десериализовать.

spring.json.value.default.type=com.something.model.TransactionEventPayload

Я установил свойства для KafkaListener следующим образом:

@KafkaListener(topics = "topic", groupId = "myGroupId", properties = {"spring.json.value.default.type=com.something.model.TransactionEventPayload"})
 public void consumeTransactionEvent(@Payload TransactionEventPayload payload,
                       @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
                       @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                       @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) {

Это исключение генерируется org.springframework.kafka.support.serializer.JsonDeserializer, который требует, чтобы информация о типе была включена в заголовок специального типа или предоставлена ​​@KafkaListener через spring.json.value.default.type configuration property.
Вот как я решил эту проблему в SpringBoot 2.5.3:

  1. Добавьте ByteArrayJsonMessageConverter в контекст:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.converter.ByteArrayJsonMessageConverter;
import org.springframework.kafka.support.converter.JsonMessageConverter;

@Configuration
public class JsonMessageConverterConfig {
    @Bean
    public JsonMessageConverter jsonMessageConverter() {
        return new ByteArrayJsonMessageConverter();
    }
}
  1. Настройте app.kafka.producer.value-serializer и app.kafka.consumer.value-deserializer:
app.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
app.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
  1. Теперь вы можете отключить сериализацию TypeId header:
spring.kafka.producer.properties.spring.json.add.type.headers=false

Другие вопросы по теме