Как установить значение de/serializer для Kafka KeyValueStore?

Я сохраняю сообщения из темы Kafka в хранилище KeyValueStore, чтобы позже их можно было запросить. Я создаю KTable следующим образом:

@StreamListener public void process(@Input("input") KTable<String,MyMessage> myMessages) {

Я настроил потребителя в своем application.yml следующим образом:

ОБНОВЛЕНО пакет де/сериализатора

spring.cloud.stream.kafka.streams.bindings.input: consumer: materializedAs: all-messages key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: com.me.MyMessageDeserializer key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: com.me.MyMessageSerializer

Однако, когда я читаю из KeyValueStore, ключи возвращаются правильно как строки, но возвращаемые значения представляют собой массивы байтов, а не MyMessage. По какой-то причине мой пользовательский десериализатор не используется. Я попытался сам десериализовать сообщение, но мой десериализатор рухнул с исключением. Я поставил точку останова на свой сериализатор, и он никогда не вызывается. Мне ясно, что ни мой сериализатор, ни десериализатор не используются.

Какую конфигурацию мне не хватает, чтобы использовать де/сериализатор моего пользовательского значения? Должны ли де/сериализаторы находиться в определенном пакете, чтобы их можно было найти?

Вы пытались написать полное имя пакета перед своим классом? Или он лежит в корне?

Manuel 03.07.2019 10:44

Попробуйте поместить полное имя пакета перед MyMessageDeserializer.

Bipin Thakur 03.07.2019 10:57

у меня полное имя пакета все равно не работало

mbluke 03.07.2019 10:58
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
3
400
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

В application.yml использовались неправильные ключи конфигурации. Вместо десериализатора ключа: должен быть keySerde: и вместо десериализатора значения: должен быть valueSerde. Ниже приведена правильная конфигурация:

spring.cloud.stream.kafka.streams.bindings.input: consumer: materializedAs: all-messages keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde valueSerde: com.me.MyMessageSerde producer: keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde valueSerde: com.me.MyMessageSerde

Другие вопросы по теме

Похожие вопросы

Дополнительные кавычки вокруг строковых значений JSON, проанализированных с использованием потоков javax.json.stream.JsonParser и Java
`gradle jar`, похоже, не добавляет зависимости в файл jar
Исключение, не управляемое объектом, при вызове refresh() из планировщика Spring
Как объединить свойства двух объектов в Java? (свойства определяемого пользователем класса и значения ключей карты)
Невозможно привязать текстовое представление к статическому методу из публично не создаваемого конечного класса
Как получить json из firebase, у которого есть вложенный дочерний элемент?
Как я могу дождаться ожидаемого значения на странице корзины magento с помощью селена
Структура модели Nosql
Синхронизация доступа к одним и тем же файлам, одновременный доступ к разным файлам
Переведите числовое значение в соответствующую метку, используя средство форматирования ячеек из LinkedHashMap