У меня есть весеннее загрузочное приложение, которое определяет:
(Я менял суффикс, чтобы избежать возможной путаницы, и создавал темы вручную, потому что в противном случае я получал предупреждение STREAM_TOPIC_IN_ххх=LEADER_NOT_AVAILABLE, и поток не запускался в течение минуты или около того.)
Первый слушатель и поток, кажется, работают, но когда слушатель в STREAM_OUT_TOPIC пытается десериализовать сообщение, я получаю исключение ниже. Я предоставляю серде в потоке с помощью Produced.with. Что мне нужно сделать, чтобы слушатель знал тип для десериализации?
Журнал
11 Mar 2019 14:34:00,194 DEBUG [KafkaMessageController [] http-nio-8080-exec-1] Sending a Kafka Message
11 Mar 2019 14:34:00,236 INFO [KafkaConfig [] kafka9000-v0.1-b0a60795-0258-48d9-8c87-30fa9a97d7b8-StreamThread-1] -------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: Hello, World!
11 Mar 2019 14:34:00,241 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener: ConsumerRecord: {}ConsumerRecord(topic = STREAM_TOPIC_IN_QQQ, partition = 0, offset = 0, CreateTime = 1552332840188, serialized key size = 1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = com.teramedica.kafakaex001web.model.Greeting@7b6c8fcc)
11 Mar 2019 14:34:00,243 INFO [Metadata [] kafka-producer-network-thread | kafka9000-v0.1-b0a60795-0258-48d9-8c87-30fa9a97d7b8-StreamThread-1-producer] Cluster ID: y48IEZaGQWKcWDVGf4mD6g
11 Mar 2019 14:34:00,367 ERROR [LoggingErrorHandler [] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] Error while processing: ConsumerRecord(topic = STREAM_TOPIC_OUT_QQQ, partition = 0, offset = 0, CreateTime = 1552332840188, serialized key size = 1, serialized value size = 48, headers = RecordHeaders(headers = [RecordHeader(key = springDeserializerExceptionValue, value = [ REDACTED ])], isReadOnly = false), key = 1, value = null)
org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is java.lang.IllegalStateException: No type information in headers and no default type provided
at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2.deserializationException(ErrorHandlingDeserializer2.java:204) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
Вот конфигурация:
ОТДЫХ (весна mvc):
@RequestMapping("/greeting")
public Greeting greeting(@RequestParam(value = "name", defaultValue = "World") String name) {
Greeting gr = new Greeting(counter.incrementAndGet(), String.format(msgTemplate, name));
this.kafkaTemplate.send(K9000Consts.STREAM_TOPIC_IN, "1", gr);
logger.debug("Sending a Kafka Message");
return gr;
}
Конфигурация Кафки (весна-кафка):
@Bean
public KStream<String, Greeting> kStream(StreamsBuilder kStreamBuilder) {
KStream<String, Greeting> stream = kStreamBuilder.stream(K9000Consts.STREAM_TOPIC_IN);
stream.peek((k, greeting) -> {
logger.info("-------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: {}", greeting.getContent());
})
.map((k, v) -> new KeyValue<>(k, new GreetingResponse(v)))
.to(K9000Consts.STREAM_TOPIC_OUT, Produced.with(stringSerde, new JsonSerde<>(GreetingResponse.class)));
return stream;
}
@KafkaListener(topics = K9000Consts.STREAM_TOPIC_OUT, groupId = "oofda", errorHandler = "myTopicErrorHandler")
public void listenForGreetingResponse(ConsumerRecord<String, GreetingResponse> cr) throws Exception {
logger.info("STREAM_OUT_TOPIC Listener : {}" + cr.toString());
}
@KafkaListener(topics = K9000Consts.STREAM_TOPIC_IN, groupId = "bar")
public void listenForGreetingResponses(ConsumerRecord<String, Greeting> cr) throws Exception {
logger.info("STREAM_IN_TOPIC Listener: ConsumerRecord: {}" + cr.toString());
}
приложение.yml
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: foo
auto-offset-reset: latest
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
properties:
spring.json.trusted.packages: com.teramedica.kafakaex001web.model
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
streams:
application-id: kafka9000-v0.1
properties: # properties not explicitly handled by KafkaProperties.streams
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
spring.json.trusted.packages: com.teramedica.kafakaex001web.model




См. документация.
Конкретно...
JsonDeserializer.VALUE_DEFAULT_TYPE: Fallback type for deserialization of values if no header information is present.
Это spring.json.value.default.type
Вы также можете установить spring.json.use.type.headers (по умолчанию true), чтобы даже не искать заголовки.
Десериализатор автоматически доверяет пакету типа по умолчанию, поэтому добавлять его туда не нужно.
РЕДАКТИРОВАТЬ
Однако см. также Преобразование сообщений Spring Messaging.
Используйте BytesDeserializer и BytesJsonMessageConverter, и платформа передаст тип параметра метода в качестве цели для преобразования.
Я должен был указать, что хочу Кроме как по умолчанию. Разве не принято хотеть слушать разные типы по разным темам? В любом случае, почему информация о заголовке отсутствует?
Кроме того, я думаю, что нашел решение: у KafkaListener есть поле свойств, которое позволяет указывать свойства, которые заменяют свойства в фабрике-потребителе, поэтому я могу добавить следующее в аннотацию KafkaListener: properties = { ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG + ":com.teramedica.kafakaex001web.GreetingResponseDeserializer" }, где GreetingResponseDeserializer расширяет JsonDeserializer и создает с super(GreetingResponse.class) It кажется, работает. Мне интересно, почему это не поддерживается напрямую в аннотации.
Ах, хорошо, вместо JsonDeserializer используйте StringDeserializer (или BytesDeserializer) с StringJsonMessageConverter (или BytesJsonMessageConverter) (версии Bytes... более эффективны. Затем фреймворк сообщает преобразователю тип, найденный в методе слушателя. Опять же, см. документация,
Если автоконфигурация загрузки найдет конвертер @Bean, он автоматически подключит его к фабрике контейнеров.
Итак, я добавил @Bean RecordMessageConverter messageConverter() { return new StringJsonMessageConverter(); } и изменил десериализатор: spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer (я использую ErrorHandlingDeserializer2). Это сработало, но «сломало» слушателя, у которого в качестве параметра был ConsumerRecord<String, Greeting> record, в том смысле, что это не приветствие, а строка json. Что на самом деле имеет смысл, оглядываясь назад, поскольку десериализатор не анализирует json, а messageConverter. В любом случае, спасибо!
Ах. Да, вы можете использовать Message<Greeting> или просто Greeting. Вы можете получить другие поля из ConsumerRecord. Например, "@Header(KafkaHeaders.OFFSET) длинное смещение", если они вам нужны (или message.getHeaders().get(...), если вы выберете первый вариант.
мне кажется, что решение BytesDeserializer, BytesJsonMessageConverter - это решение, которое большинство людей хотели бы использовать, поскольку использование заголовков для определения типа полезно только для производителей Spring или, по крайней мере, Java.
«Отвечая» на мой собственный вопрос в основном для консолидации информации в комментариях к @GaryRussell и от него, но в основном он дал лучший ответ. Вкратце я сделал следующее:
Еще одна вещь: по умолчанию простое добавление messageConverter также добавляет его в автоматически настроенный шаблон kafkaTemplate при использовании автонастройки весенней загрузки. Это не кажется проблемой при вызове kafkaTemplate.send(K9000Consts.STREAM_TOPIC_IN, "1", greeting), хотя я думаю, что это может быть при использовании send(Message).
Ниже приведена рабочая конфигурация, в которой я получаю сообщения, как и ожидалось, с минимальной конфигурацией.
приложение.yml:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: foo
auto-offset-reset: latest
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
properties:
spring.json.trusted.packages: com.teramedica.kafakaex001web.model
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
streams:
application-id: kafka9000-v0.1
properties: # properties not explicitly handled by KafkaProperties.streams
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
spring.json.trusted.packages: com.teramedica.kafakaex001web.model
КафкаКонфиг:
@Bean RecordMessageConverter messageConverter() { return new StringJsonMessageConverter(); }
...
@Bean
public KStream<String, Greeting> kStream(StreamsBuilder kStreamBuilder) {
KStream<String, Greeting> stream = kStreamBuilder.stream(K9000Consts.STREAM_TOPIC_IN);
stream.peek((k, greeting) -> {
logger.info("-------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: {}", greeting.getContent());
})
.map((k, v) -> new KeyValue<>(k, new GreetingResponse(v)))
.to(K9000Consts.STREAM_TOPIC_OUT, Produced.with(stringSerde, new JsonSerde<>(GreetingResponse.class)));
return stream;
}
@KafkaListener(topics = K9000Consts.STREAM_TOPIC_OUT, groupId = "oofda", errorHandler = "myTopicErrorHandler")
public void listenForGreetingResponse(GreetingResponse gr) throws Exception {
// logger.info("STREAM_OUT_TOPIC Listener : {}" + cr.toString());
logger.info("STREAM_OUT_TOPIC Listener : GreetingResponse is {}" + gr);
}
@KafkaListener(topics = K9000Consts.STREAM_TOPIC_IN, groupId = "bar")
public void listenForGreetingResponses(@Payload Greeting gr,
ConsumerRecord<String, String> record, // <--- NOTE: String, NOT Greeting
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) throws Exception {
//logger.info("STREAM_IN_TOPIC Listener: ConsumerRecord: {}" + cr.toString());
logger.info("STREAM_IN_TOPIC Listener: Greeting: {}", gr.getContent());
logger.info("STREAM_IN_TOPIC Listener: From Headers: topic: {}, partition: {}, key: {}", topic, partition,
key);
logger.info("STREAM_IN_TOPIC Listener:: From Record: topic: {}, parition: {}, key: {}",
record.topic(), record.partition(), record.key());
logger.info("STREAM_IN_TOPIC Listener:: record value: {}, class: {}", record.value(), record.value().getClass() );
}
@Bean
public KafkaListenerErrorHandler myTopicErrorHandler() {
return (m, e) -> {
logger.error("Got an error {}", e.getMessage());
return "some info about the failure";
};
}
И вывод для сообщения:
13 Mar 2019 09:56:57,884 DEBUG [KafkaMessageController [] http-nio-8080-exec-1] Sending a Kafka Message
13 Mar 2019 09:56:57,913 INFO [KafkaConfig [] kafka9000-v0.1-b0589cc5-2fab-4b72-81f7-b0d5488c7478-StreamThread-1] -------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: Hello, World!
13 Mar 2019 09:56:57,919 INFO [Metadata [] kafka-producer-network-thread | kafka9000-v0.1-b0589cc5-2fab-4b72-81f7-b0d5488c7478-StreamThread-1-producer] Cluster ID: 8nREAmTCS0SZT-NzWsCacQ
13 Mar 2019 09:56:57,919 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener: Greeting: Hello, World!
13 Mar 2019 09:56:57,920 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener: Record: ConsumerRecord(topic = STREAM_TOPIC_IN_SSS, partition = 0, offset = 23, CreateTime = 1552489017878, serialized key size = 1, serialized value size = 34, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 116, 101, 114, 97, 109, 101, 100, 105, 99, 97, 46, 107, 97, 102, 97, 107, 97, 101, 120, 48, 48, 49, 119, 101, 98, 46, 109, 111, 100, 101, 108, 46, 71, 114, 101, 101, 116, 105, 110, 103])], isReadOnly = false), key = 1, value = {"id":1,"content":"Hello, World!"})
13 Mar 2019 09:56:57,920 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener: From Headers: topic: STREAM_TOPIC_IN_SSS, partition: 0, key: 1
13 Mar 2019 09:56:57,920 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener:: From Record: topic: STREAM_TOPIC_IN_SSS, parition: 0, key: 1
13 Mar 2019 09:56:57,921 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener:: record value: {"id":1,"content":"Hello, World!"}, class: class java.lang.String
13 Mar 2019 09:56:58,030 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] STREAM_OUT_TOPIC Listener : GreetingResponse id: 1000, response: Hello, World!, yourself
вы спасли мой день!
Это не ответ; но может помочь людям, попадающим сюда из поисковых систем.
Если вы столкнулись с этим исключением при запуске приложения KafkaStreams.
Примечание 1. Убедитесь, что вы инициализировали jsonSerde, как описано ниже:
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.json.JsonSerializer;
Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
Serializer<JsonNode> jsonSerializer = new JsonSerializer();
Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
Примечание 2: самая распространенная ошибка
import org.springframework.kafka.support.serializer.JsonSerde;
new JsonSerde<JsonNode>(); // This is wrong
В моем случае проблема в том, что я уже опубликовал несколько разных типов сообщений в теме Кафки и получил это исключение.
Починить это.
Я создал новую тему и публиковал там сообщения. Затем запустил потребитель по этой теме, и все заработало нормально.
Так что я тоже столкнулся с той же проблемой.
Я исправил это так
Вы должны установить следующее свойство для класса, который вы пытаетесь десериализовать.
spring.json.value.default.type=com.something.model.TransactionEventPayload
Я установил свойства для KafkaListener следующим образом:
@KafkaListener(topics = "topic", groupId = "myGroupId", properties = {"spring.json.value.default.type=com.something.model.TransactionEventPayload"})
public void consumeTransactionEvent(@Payload TransactionEventPayload payload,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) {
Это исключение генерируется org.springframework.kafka.support.serializer.JsonDeserializer, который требует, чтобы информация о типе была включена в заголовок специального типа или предоставлена @KafkaListener через spring.json.value.default.type configuration property.
Вот как я решил эту проблему в SpringBoot 2.5.3:
ByteArrayJsonMessageConverter в контекст:import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.converter.ByteArrayJsonMessageConverter;
import org.springframework.kafka.support.converter.JsonMessageConverter;
@Configuration
public class JsonMessageConverterConfig {
@Bean
public JsonMessageConverter jsonMessageConverter() {
return new ByteArrayJsonMessageConverter();
}
}
app.kafka.producer.value-serializer и app.kafka.consumer.value-deserializer:app.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
app.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
TypeId header:spring.kafka.producer.properties.spring.json.add.type.headers=false
Десериализатор автоматически доверяет пакету типа по умолчанию, поэтому нет необходимости добавлять его и туда.