Идентификатор потребителя и идентификатор группы в Kafka: что делает двух потребителей одинаковыми

Я использую Kafka уже несколько месяцев, и я понял, что некоторые из основных концепций для меня еще не так ясны. Мои сомнения связаны с отношением между ConsumerId, groupId и смещениями. В нашем приложении нам нужно, чтобы Kafka работала с использованием парадигмы публикации-подписки, поэтому мы используем разные идентификаторы групп для каждого потребителя, которые генерируются случайным образом.

Раньше я думал, что при настройке auto.offset.reset = latest мои потребители всегда будут получать сообщения, которые они еще не получили, но недавно я узнал Это не относится к делу. Это работает только в том случае, если потребитель еще не совершил компенсацию. В любом другом случае потребитель будет продолжать получать сообщения со смещением, превышающим последнее зафиксированное смещение.

Поскольку я всегда создаю новых потребителей со случайными идентификаторами групп, я понял, что у моих потребителей «нет памяти», они новые потребители, и у них никогда не будет совершенных смещений, поэтому политика auto.offset.reset = latest будет применяться всегда. И вот здесь начинаются мои сомнения. Предположим следующий сценарий:

  1. У меня есть два клиентских приложения, A и B, с одним потребителем в каждом, работающим в режиме публикации-подписки (таким образом, с разными идентификаторами групп). Оба потребителя подписаны на тему my-topic. auto.offset.resetнастройка latest для обоих потребителей.
  2. Некоторые производители (или производители) публикуют сообщения M1, M2 и M3 в теме my-topic.
  3. И A, и B получают M1, M2 и M3.
  4. Теперь я закрываю приложение B.
  5. Производители производят сообщения M4 и M5.
  6. Приложение А получает сообщения М4 и М5.
  7. Теперь я перезапускаю приложение B. Помните, что groupId является случайным, и я не устанавливаю никакого идентификатора потребителя, так что это означает, что это новый потребитель (правильно?). Приложение B не получает никаких сообщений.
  8. Производители публикуют сообщения М6 и М7.
  9. Оба приложения A и B получают сообщения M6 и M7.

Итак, резюмируя, если я не ошибаюсь, A получает все сообщения, но B пропустил M4 и M5. Я пробовал это с kafka-console-consumer.sh, и он ведет себя так.

Итак, как я могу заставить приложение B получать сообщения, опубликованные во время его закрытия? Теперь, если я запущу его, назначив тот же идентификатор группы, что и при первоначальном запуске, он будет читать сообщения M4 и M5, но это установка идентификатора группы. Можно ли также установить идентификатор потребителя и получить такое же поведение?

Или, другими словами, что понимается под повторным запуском того же потребителя? Два потребителя являются одним и тем же потребителем, если у них один и тот же идентификатор группы, один и тот же идентификатор потребителя, оба?

Кстати, ConsumerId и свойство client.id — это одно и то же?

Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
5
0
5 964
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Два потребителя находятся в одной группе, если у них одинаковая настройка group.id.

Я не совсем понимаю, что вы имеете в виду под consumerId. Начиная с Kafka 2.2, в потребительские конфигурации такого поля нет.

Если вы говорите о client.id, этот параметр не имеет функционального эффекта, он используется только для пометки запросов, чтобы при необходимости их можно было сопоставить в журнале брокера.

Когда вы запускаете потребителя с auto.offset.reset=latest, если не существует зафиксированных смещений, потребитель перезапустит потребление с конца журнала. Таким образом, он будет получать только те сообщения, которые создаются после его запуска. Так что в вашем сценарии вы правы, он никогда не получит M4 и M5.

Если вы хотите использовать все сообщения, вам нужно оставить то же самое group.id. В этом случае auto.offset.reset будет применяться только при первом запуске потребителя. Таким образом, когда ваш потребитель перезапустится, он вернется туда, где был, когда остановился.

Я не тот объект, к которому относится политика auto.offset.reset. Он основан на группе? Тематический или потребительский?

quento 28.01.2021 13:16

Если нет идентификатора потребителя, как я могу иметь 2 потребителей в группе на одном разделе? Если я хочу аварийное переключение на потребителе без дублирования сообщений, я вижу только возможность дублировать идентификатор потребителя/клиента…

Marc 18.09.2021 15:54

Это потому, что вы устанавливаете auto.offset.reset = last

Любые сообщения, отправленные во время неработающего потребителя, не будут обрабатываться потребителем.

Таким образом, B пропустит два сообщения

Другие вопросы по теме