Как удалить потребительское смещение группы по одной конкретной теме

Предполагая, что у меня есть две темы (оба с двумя разделами и бесконечным сохранением):

  • my_topic_a
  • my_topic_b

и одна группа потребителей:

  • my_consumer

В какой-то момент он потреблял обе темы, но из-за некоторых изменений перестал интересоваться my_topic_a, поэтому перестал его потреблять и теперь накапливает лаг:

kafka-consumer-groups.sh --bootstrap-server=kafka.core-kafka.svc.cluster.local:9092 --group my_consumer --describe
TOPIC                                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                  HOST            CLIENT-ID
my_topic_a                           0          300000          400000          100000          -                                                            -               -
my_topic_a                           1          300000          400000          100000          -                                                            -               -
my_topic_b                           0          500000          500000          0               -                                                            -               -
my_topic_b                           1          500000          500000          0               -                                                            -               -

Это отставание меня раздражает, потому что:

  • Мой график потребительского отставания в Grafana испорчен.
  • Срабатывает автоматический будильник, напоминающий мне о том, что потребитель слишком сильно отстает.

Таким образом, я хочу избавиться от смещений для my_topic_a из my_consumer, чтобы прийти к состоянию, как если бы my_consumer никогда не потреблял my_topic_a.

Следующая попытка не удалась:

kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group my_consumer_group --delete --topic domain.user

С этим выходом:

The consumer does not support topic-specific offset deletion from a consumer group.

Как я могу достичь своей цели? (Временная остановка всех потребителей этой группы была бы возможным вариантом в моем случае использования.)

(Я использую версию Кафки 2.2.0.)


Я предполагаю, что что-то можно сделать, написав что-то в тему __consumer_offsets, но я не знаю, что это будет. В настоящее время эта тема выглядит следующим образом (опять же, упрощенно):

kafka-console-consumer.sh --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --bootstrap-server kafka:9092 --topic __consumer_offsets --from-beginning
...
[my_consumer_group,my_topic_a,0]::OffsetAndMetadata(offset=299999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000000000, expireTimestamp=None)
[my_consumer_group,my_topic_a,0]::OffsetAndMetadata(offset=300000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_a,1]::OffsetAndMetadata(offset=299999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000000000, expireTimestamp=None)
[my_consumer_group,my_topic_a,1]::OffsetAndMetadata(offset=300000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_b,0]::OffsetAndMetadata(offset=499999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000000000, expireTimestamp=None)
[my_consumer_group,my_topic_b,0]::OffsetAndMetadata(offset=500000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_b,1]::OffsetAndMetadata(offset=499999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000000000, expireTimestamp=None)
[my_consumer_group,my_topic_b,1]::OffsetAndMetadata(offset=500000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000100000, expireTimestamp=None)

Нет ли способа отфильтровать данные с панели управления Grafana? Или, может быть, отключить оповещения по тегам темы на время периода хранения смещения?

OneCricketeer 10.12.2020 19:28

@OneCricketeer Конечно, я, вероятно, мог бы найти способ настроить конфигурации всех зависимых вещей (Prometheus для Grafana, диспетчера предупреждений и многого другого), чтобы отфильтровать это устаревшее смещение. И затем делайте это каждый раз, когда какая-то другая группа потребителей останавливается, чтобы использовать одну из ее тем. Но я бы предпочел более чистое и убедительное решение.

Tobias Hermann 10.12.2020 21:06

@OneCricketeer Кроме того, насколько я понимаю, это будет не только период хранения смещения, потому что это относится только к потребителям, которые перестали использовать все темы. Мой кластер имеет offsets.retention.minutes один день, а my_consumer не потреблял topic_a уже несколько недель, но так как он все еще активно читает другие темы, ничего не убрали и отставание все еще есть.

Tobias Hermann 10.12.2020 21:06

Эта тема сжата, что означает, что будут очищены только закрытые сегменты журнала. Размер сегмента по умолчанию — 1G, а это много данных, поскольку OffsetAndMetadata — это компактный двоичный формат. Но это также означает, что, скажем, ключ [my_consumer_group,my_topic_b,0] не виден более 1 дня и не существует в текущем открытом сегменте журнала, после чего он будет удален.

OneCricketeer 10.12.2020 23:21

@OneCricketeer Спасибо за объяснение. Поскольку я использую log.cleaner.max.compaction.lag.ms=86400000, думаю, даже если сегмент лога не доходит до log.segment.bytes, [my_consumer_group,my_topic_a,...] следует удалить через сутки. Тем не менее, на самом деле, в моем случае, он все еще там после того, как прошло несколько недель. Значит, в моем кластере что-то работает не так, как задумано.

Tobias Hermann 11.12.2020 10:00

Вы написали код для генерации задержки? Или берется из норы или какого-то другого инструмента?

s7vr 20.12.2020 17:03

@ s7vr Отставание возникло естественным образом, потому что my_consumer_group намеренно перестал потреблять my_topic_a. (На самом деле лаг — это не такое круглое число, как 100000, но для этого примера я упростил вещи, а также количество разделов.)

Tobias Hermann 21.12.2020 07:54
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
4
7
4 057
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Результат, который вам дан:

«Потребитель не поддерживает удаление смещения для конкретной темы из группы потребителей».

является показателем того, что невозможно удалить конкретную тему из группы потребителей.

Вы можете изменить группу потребителей для нового приложения только для чтения my_topic_b, перезапустить приложение, а затем полностью удалить старую и бездействующую группу потребителей. При таком подходе вы сможете отслеживать лаги потребителей, не отвлекаясь и не получая всплывающих предупреждений. При перезапуске приложения с новой ConsumerGroup обычно лучше всего остановить производителя для темы "b" во время перезапуска, чтобы убедиться, что вы не пропустите ни одного сообщения.

Я бы действительно не стал возиться с темой вручную __consumer_offsets.

В качестве альтернативы вы можете регулярно запускать инструмент командной строки, который поставляется с Kafka, чтобы уменьшить отставание вашей ConsumerGroup:

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group my_consumer --topic my_topic_a --to-latest 

Возможно, вам придется добавить опцию --execute.

Да, если бы было возможно остановить производителя, он бы уже просто переключился на новое имя группы потребителей, как вы описываете. К сожалению, это не жизнеспособный вариант в моем случае использования.

Tobias Hermann 14.12.2020 15:11

Хорошо, может быть, альтернатива, которую я только что добавил к своему ответу, может помочь вам решить вашу проблему. Тем не менее, это еще один обходной путь ... но я не видел надежного решения для манипулирования темой Consumer_offsets так, как вам нужно.

Michael Heil 15.12.2020 21:41

Спасибо, решение очень прагматичное, но я вижу в нем три проблемы. Во-первых, это добавляет сложности, поскольку приходится поддерживать планирование этого фиктивного сценария. Во-вторых, в подобном случае в будущем мне придется добавить больше таких фиктивных скриптов. В-третьих, на графиках будет выглядеть так, будто потребитель все еще потребляет эту тему, а на самом деле это не так, что может привести к ложным выводам в будущем.

Tobias Hermann 16.12.2020 08:28
Ответ принят как подходящий

Тем временем (Kafka 2.8) это стало возможным благодаря новому параметру --delete-offsets для kafka-consumer-groups.sh. :-)

Другие вопросы по теме