У меня есть топология потока, определенная с помощью таблицы kafka как таковой:
stream
... //irrelevant steps
.groupByKey(Serialized.with(Serdes.String(), customSerdes))
.aggregate(something(), somethingElse(), Materialized.with(Serdes.String(),customSerdes))
Теперь я хочу динамически запросить это хранилище состояний для целей отладки, что, как я понимаю, требует наименования хранилища состояний:
stream
... //irrelevant steps
.groupByKey(Serialized.with(Serdes.String(), customSerdes))
.aggregate(something(), somethingElse(),
Materialized.<String,CustomType,KeyValueStore<Bytes, byte[]>>
as("mappings")
.withKeySerde(Serdes.String())
.withValueSerde(customSerdes))
Однако, когда я запускаю его, поток умирает со следующим исключением
Exception in thread "<consumer-id>-ce730881-6504-4804-b74f-8ef981708603-StreamThread-2" java.lang.IllegalArgumentException: Assigned partition <consumer-id>-KSTREAM-PEEK-0000000018-repartition-4 for non-subscribed topic regex pattern; subscription pattern is core.device.events.all|<consumer-id>-KSTREAM-PEEK-0000000017-repartition|<consumer-id>-mappings-repartition
Это работает некоторое время, поэтому я бы предпочел не терять существующее состояние или не обрабатывать всю тему целиком.
Нет, я не делал, однако, согласно этой ссылке, он должен удалить внутреннюю тему, так что, возможно, это решит проблему. доложит. Спасибо
Вы сбрасывали свое приложение перед перезапуском? kafka.apache.org/20/documentation/streams/developer-guide/…