У меня есть потребитель Kafka, который подписывается на следующие темы: MY_TOPIC, MY_UNINTERESTED_TOPIC.
В следующем сценарии меня не интересует вторая тема, но я должен был упомянуть ее, потому что, если я настрою ее с помощью чего-то вроде auto.offset.reset, это может повлиять на все темы.
В теме MY_TOPIC я публикую разного рода сообщения: MESSAGE_TYPE_A и MESSAGE_TYPE_B. Оба сообщения являются экземплярами BaseKafkaMessage (настраиваемый класс) с разными свойствами.
Теперь мне интересно найти Только сообщение последний типа MESSAGE_TYPE_A. Как я могу это сделать?
Реальный сценарий таков: я публикую два типа сообщений на одну и ту же тему. Один из них используется для подготовки локального кеша у каждого потребителя, которому интересна эта тема и это сообщение. Если потребитель останавливается, при перезагрузке он должен повторно инициализировать свой кеш с использованием последней версии MESSAGE_TYPE_A. MESSAGE_TYPE_B следует игнорировать. Я не хочу отправлять уведомление о Kafka поставщику данных для повторной публикации данных, потому что у всех подписчиков будет много ненужной работы.
Как я могу это получить? Это возможно?
Я нашел https://docs.spring.io/spring-kafka/reference/htmlsingle/#seek, но не уверен, что это то, что я ищу, или есть другой способ сделать это.
@Deadpool, пожалуйста, взгляните на ответ cricket_007, а также на мой комментарий
У вас есть представление об идентификаторе группы потребителей? Я действительно не понимаю, как потребитель будет опрашивать с самого начала, когда он просто перезапускается с идентификатором sama group? @tzortzik
Не стесняйтесь принять ответ, поставив галочку рядом с сообщением




Непонятно, в каком формате находятся эти сообщения и почему вам вообще нужно, чтобы они были в одной теме.
Например, вы могли бы использовать разные типы Avro. Или вам пришлось бы try-catch анализировать два разных байтовых массива (объекты JSON?)
Или вы можете разбить тему по типу, чтобы все сообщения одного типа располагались в одном и том же разделе.
Но не существует механизма для поиска по индексу, чтобы получить последнее отправленное сообщение. Либо вы начинаете с последнего события и получаете следующее входящее сообщение, либо можете начать с самого начала, а затем сканировать до тех пор, пока не получите 0 записей в следующем цикле опроса, который теоретически является «самым последним»
something like auto.offset.reset it may affect all the topics
Это влияет только на те темы, которые интересуют потребителя, а не на все.
На данный момент Avro для меня не решение. Я не хочу использовать auto.offset.reset, потому что мой потребитель подписан на такие темы, как ORDER_MANAGEMENT и PRODUCT_MANAGEMENT. В этом случае мы можем воссоздать ранее созданный заказ или товар. Меня интересует только PRODUCT_MANAGEMENT с сообщениями типа (тип - это поле в сообщении) `PRODUCT_CACHE_UPDATE. The solution you propose is to search from the beginning until I have no messages and look for my latest PRODUCT_CACHE_UPDATE`?
дедупликация записей зависит от вашей потребительской логики. Изменение свойства кафки или нет, не предотвратит этого. Вы можете использовать Kafka Streams для filter, разделив вашу одну тему на 2 темы в зависимости от типов. Однако, опять же, для того, чтобы «посмотреть», требуется сканирование темы.
но всякий раз, когда потребитель перезагружается, он правильно извлекает новые данные? Тогда зачем беспокоиться о
MESSAGE_TYPE_B, и это свойствоauto.offset.resetбудет затронуто, если это новая группа потребителей