Spring kafka ищет последнее доступное сообщение в теме

У меня есть потребитель Kafka, который подписывается на следующие темы: MY_TOPIC, MY_UNINTERESTED_TOPIC.

В следующем сценарии меня не интересует вторая тема, но я должен был упомянуть ее, потому что, если я настрою ее с помощью чего-то вроде auto.offset.reset, это может повлиять на все темы.

В теме MY_TOPIC я публикую разного рода сообщения: MESSAGE_TYPE_A и MESSAGE_TYPE_B. Оба сообщения являются экземплярами BaseKafkaMessage (настраиваемый класс) с разными свойствами.

Теперь мне интересно найти Только сообщение последний типа MESSAGE_TYPE_A. Как я могу это сделать?

Реальный сценарий таков: я публикую два типа сообщений на одну и ту же тему. Один из них используется для подготовки локального кеша у каждого потребителя, которому интересна эта тема и это сообщение. Если потребитель останавливается, при перезагрузке он должен повторно инициализировать свой кеш с использованием последней версии MESSAGE_TYPE_A. MESSAGE_TYPE_B следует игнорировать. Я не хочу отправлять уведомление о Kafka поставщику данных для повторной публикации данных, потому что у всех подписчиков будет много ненужной работы.

Как я могу это получить? Это возможно?

Я нашел https://docs.spring.io/spring-kafka/reference/htmlsingle/#seek, но не уверен, что это то, что я ищу, или есть другой способ сделать это.

но всякий раз, когда потребитель перезагружается, он правильно извлекает новые данные? Тогда зачем беспокоиться о MESSAGE_TYPE_B, и это свойство auto.offset.reset будет затронуто, если это новая группа потребителей

Deadpool 10.01.2019 18:29

@Deadpool, пожалуйста, взгляните на ответ cricket_007, а также на мой комментарий

tzortzik 11.01.2019 10:08

У вас есть представление об идентификаторе группы потребителей? Я действительно не понимаю, как потребитель будет опрашивать с самого начала, когда он просто перезапускается с идентификатором sama group? @tzortzik

Deadpool 11.01.2019 13:41

Не стесняйтесь принять ответ, поставив галочку рядом с сообщением

OneCricketeer 09.01.2020 10:27
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
4
469
1

Ответы 1

Непонятно, в каком формате находятся эти сообщения и почему вам вообще нужно, чтобы они были в одной теме.

Например, вы могли бы использовать разные типы Avro. Или вам пришлось бы try-catch анализировать два разных байтовых массива (объекты JSON?)

Или вы можете разбить тему по типу, чтобы все сообщения одного типа располагались в одном и том же разделе.

Но не существует механизма для поиска по индексу, чтобы получить последнее отправленное сообщение. Либо вы начинаете с последнего события и получаете следующее входящее сообщение, либо можете начать с самого начала, а затем сканировать до тех пор, пока не получите 0 записей в следующем цикле опроса, который теоретически является «самым последним»

something like auto.offset.reset it may affect all the topics

Это влияет только на те темы, которые интересуют потребителя, а не на все.

На данный момент Avro для меня не решение. Я не хочу использовать auto.offset.reset, потому что мой потребитель подписан на такие темы, как ORDER_MANAGEMENT и PRODUCT_MANAGEMENT. В этом случае мы можем воссоздать ранее созданный заказ или товар. Меня интересует только PRODUCT_MANAGEMENT с сообщениями типа (тип - это поле в сообщении) `PRODUCT_CACHE_UPDATE. The solution you propose is to search from the beginning until I have no messages and look for my latest PRODUCT_CACHE_UPDATE`?

tzortzik 11.01.2019 10:06

дедупликация записей зависит от вашей потребительской логики. Изменение свойства кафки или нет, не предотвратит этого. Вы можете использовать Kafka Streams для filter, разделив вашу одну тему на 2 темы в зависимости от типов. Однако, опять же, для того, чтобы «посмотреть», требуется сканирование темы.

OneCricketeer 11.01.2019 17:53

Другие вопросы по теме