Я сохраняю сообщения из темы Kafka в хранилище KeyValueStore, чтобы позже их можно было запросить. Я создаю KTable следующим образом:
@StreamListener
public void process(@Input("input") KTable<String,MyMessage> myMessages) {
Я настроил потребителя в своем application.yml следующим образом:
ОБНОВЛЕНО пакет де/сериализатора
spring.cloud.stream.kafka.streams.bindings.input:
consumer:
materializedAs: all-messages
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: com.me.MyMessageDeserializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.me.MyMessageSerializer
Однако, когда я читаю из KeyValueStore, ключи возвращаются правильно как строки, но возвращаемые значения представляют собой массивы байтов, а не MyMessage. По какой-то причине мой пользовательский десериализатор не используется. Я попытался сам десериализовать сообщение, но мой десериализатор рухнул с исключением. Я поставил точку останова на свой сериализатор, и он никогда не вызывается. Мне ясно, что ни мой сериализатор, ни десериализатор не используются.
Какую конфигурацию мне не хватает, чтобы использовать де/сериализатор моего пользовательского значения? Должны ли де/сериализаторы находиться в определенном пакете, чтобы их можно было найти?
Попробуйте поместить полное имя пакета перед MyMessageDeserializer.
у меня полное имя пакета все равно не работало




В application.yml использовались неправильные ключи конфигурации. Вместо десериализатора ключа: должен быть keySerde: и вместо десериализатора значения: должен быть valueSerde. Ниже приведена правильная конфигурация:
spring.cloud.stream.kafka.streams.bindings.input:
consumer:
materializedAs: all-messages
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: com.me.MyMessageSerde
producer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: com.me.MyMessageSerde
Вы пытались написать полное имя пакета перед своим классом? Или он лежит в корне?