Я отправляю сообщение Кафке следующим образом:
private KafkaTemplate<String, MyMessage > kafkaTemplate;
public void sendMessage(MyMessage data) {
Message<MyMessage > message = MessageBuilder
.withPayload(data)
.setHeader(KafkaHeaders.TOPIC,topic)
.setHeader(KafkaHeaders.MESSAGE_KEY,data.getKey())
.build();
kafkaTemplate.send(message);
У меня есть собственный де/сериализатор для MyMessage, ключ представляет собой простую строку
Я пытаюсь обработать сообщение следующим образом:
@StreamListener
public void process(@Input("input") KStream<String,MyMessage> myStream){
final Serde<String> stringSerde = Serdes.String();
final MyMessageSerde myMessageSerde = new MyMessageSerde();
myStream
.groupBy((key,value)-> value.getObjectKey(), Serialized.with(stringSerde,myMessageSerde))
.aggregate(ArrayList::new,
(newKey,val,agg) -> {
agg.add(val);
return agg;
},
Materialized.<String, ArrayList<MyMessage>, KeyValueStore<Bytes, byte[]>>as("object-keys")
.withKeySerde(stringSerde)
.withValueSerde(new ArrayListSerde(myMessageSerde)));
...
interface MyStreamProcessor {
@Input("input")
KStream<?, ?> input();
}
Это не удается с ошибкой десериализации, и после отладки я вижу, что причина в том, что заголовок сообщения типа содержимого установлен на application/json, и поэтому он пытается десериализовать сообщение как JSON. Как устанавливается заголовок типа содержимого и как я могу его переопределить? Я не отправляю сообщения в виде JSON, а скорее в виде byte[], как обычные сообщения Kafka, поэтому я хочу, чтобы использовался мой пользовательский де/сериализатор.
Что еще более странно, так это то, что если я поменяю это на KTable, он будет работать нормально.
@StreamListener
public void process(@Input("input") KTable<String,MyMessage> myTable){
final Serde<String> stringSerde = Serdes.String();
final MyMessageSerde myMessageSerde = new MyMessageSerde();
KStream<String,MyMessage> myStream = myTable.toStream();
myStream
.groupBy((key,value)-> value.getObjectKey(), Serialized.with(stringSerde,myMessageSerde))
.aggregate(ArrayList::new,
(newKey,val,agg) -> {
agg.add(val);
return agg;
},
Materialized.<String, ArrayList<MyMessage>, KeyValueStore<Bytes, byte[]>>as("object-keys")
.withKeySerde(stringSerde)
.withValueSerde(new ArrayListSerde(myMessageSerde)));
...
interface MyStreamProcessor {
@Input("input")
KTable<?, ?> input();
}
Почему с KTable сообщение десериализуется правильно? В чем разница в поведении между KStream и KTable?

Это работает, потому что вы указываете свою пользовательскую (де)сериализацию в методе .group.
Взгляните на часть документации это и это, чтобы узнать, как указать пользовательское значение serde.
Пользовательская десериализация указывается в групповом методе независимо от того, начинается ли она как KTable или KStream. Почему он работает, когда запускается как KTable, и не работает, когда запускается как KStream?
Я заставил это работать, выполнив следующие действия:
установите заголовок типа входного контента в моем файле application.yml следующим образом:
spring.cloud.stream.bindings.input.content-type: application/mymessage
Создайте CustomMessageConverter, который расширяет AbstractMessageConverter для преобразования из byte[] в MyMessage. Он также определяет новый MimeType
public MyMappingConverter() {
super(new MimeType("application", "mymessage"));
}
@Bean
@StreamMessageConverter
public MessageConverter myMessageConverter() {
return new MyMessageConverter();
}
Я до сих пор понятия не имею, почему мне пришлось делать все это для KStream, а не для KTable, если кто-нибудь может объяснить, что это будет оценено.
Я добавил в свой файл application.yml. Я все еще получаю то же исключение, когда он пытается разобрать JSON.
spring.cloud.stream.bindings.input.nativeDecoding: true spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde: com.MyMessageSerde