Я использую Kafka уже несколько месяцев, и я понял, что некоторые из основных концепций для меня еще не так ясны. Мои сомнения связаны с отношением между ConsumerId, groupId и смещениями. В нашем приложении нам нужно, чтобы Kafka работала с использованием парадигмы публикации-подписки, поэтому мы используем разные идентификаторы групп для каждого потребителя, которые генерируются случайным образом.
Раньше я думал, что при настройке auto.offset.reset = latest
мои потребители всегда будут получать сообщения, которые они еще не получили, но недавно я узнал Это не относится к делу. Это работает только в том случае, если потребитель еще не совершил компенсацию. В любом другом случае потребитель будет продолжать получать сообщения со смещением, превышающим последнее зафиксированное смещение.
Поскольку я всегда создаю новых потребителей со случайными идентификаторами групп, я понял, что у моих потребителей «нет памяти», они новые потребители, и у них никогда не будет совершенных смещений, поэтому политика auto.offset.reset = latest
будет применяться всегда. И вот здесь начинаются мои сомнения. Предположим следующий сценарий:
my-topic
. auto.offset.reset
настройка latest
для обоих потребителей.my-topic
.groupId
является случайным, и я не устанавливаю никакого идентификатора потребителя, так что это означает, что это новый потребитель (правильно?). Приложение B не получает никаких сообщений.Итак, резюмируя, если я не ошибаюсь, A получает все сообщения, но B пропустил M4 и M5. Я пробовал это с kafka-console-consumer.sh
, и он ведет себя так.
Итак, как я могу заставить приложение B получать сообщения, опубликованные во время его закрытия? Теперь, если я запущу его, назначив тот же идентификатор группы, что и при первоначальном запуске, он будет читать сообщения M4 и M5, но это установка идентификатора группы. Можно ли также установить идентификатор потребителя и получить такое же поведение?
Или, другими словами, что понимается под повторным запуском того же потребителя? Два потребителя являются одним и тем же потребителем, если у них один и тот же идентификатор группы, один и тот же идентификатор потребителя, оба?
Кстати, ConsumerId и свойство client.id — это одно и то же?
Два потребителя находятся в одной группе, если у них одинаковая настройка group.id
.
Я не совсем понимаю, что вы имеете в виду под consumerId
. Начиная с Kafka 2.2, в потребительские конфигурации такого поля нет.
Если вы говорите о client.id
, этот параметр не имеет функционального эффекта, он используется только для пометки запросов, чтобы при необходимости их можно было сопоставить в журнале брокера.
Когда вы запускаете потребителя с auto.offset.reset=latest
, если не существует зафиксированных смещений, потребитель перезапустит потребление с конца журнала. Таким образом, он будет получать только те сообщения, которые создаются после его запуска. Так что в вашем сценарии вы правы, он никогда не получит M4 и M5.
Если вы хотите использовать все сообщения, вам нужно оставить то же самое group.id
. В этом случае auto.offset.reset
будет применяться только при первом запуске потребителя. Таким образом, когда ваш потребитель перезапустится, он вернется туда, где был, когда остановился.
Если нет идентификатора потребителя, как я могу иметь 2 потребителей в группе на одном разделе? Если я хочу аварийное переключение на потребителе без дублирования сообщений, я вижу только возможность дублировать идентификатор потребителя/клиента…
Это потому, что вы устанавливаете auto.offset.reset = last
Любые сообщения, отправленные во время неработающего потребителя, не будут обрабатываться потребителем.
Таким образом, B пропустит два сообщения
Я не тот объект, к которому относится политика auto.offset.reset. Он основан на группе? Тематический или потребительский?