Предполагая, что у меня есть две темы (оба с двумя разделами и бесконечным сохранением):
my_topic_a
my_topic_b
и одна группа потребителей:
my_consumer
В какой-то момент он потреблял обе темы, но из-за некоторых изменений перестал интересоваться my_topic_a
, поэтому перестал его потреблять и теперь накапливает лаг:
kafka-consumer-groups.sh --bootstrap-server=kafka.core-kafka.svc.cluster.local:9092 --group my_consumer --describe
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my_topic_a 0 300000 400000 100000 - - -
my_topic_a 1 300000 400000 100000 - - -
my_topic_b 0 500000 500000 0 - - -
my_topic_b 1 500000 500000 0 - - -
Это отставание меня раздражает, потому что:
Таким образом, я хочу избавиться от смещений для my_topic_a
из my_consumer
, чтобы прийти к состоянию, как если бы my_consumer
никогда не потреблял my_topic_a
.
Следующая попытка не удалась:
kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group my_consumer_group --delete --topic domain.user
С этим выходом:
The consumer does not support topic-specific offset deletion from a consumer group.
Как я могу достичь своей цели? (Временная остановка всех потребителей этой группы была бы возможным вариантом в моем случае использования.)
(Я использую версию Кафки 2.2.0
.)
Я предполагаю, что что-то можно сделать, написав что-то в тему __consumer_offsets
, но я не знаю, что это будет. В настоящее время эта тема выглядит следующим образом (опять же, упрощенно):
kafka-console-consumer.sh --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --bootstrap-server kafka:9092 --topic __consumer_offsets --from-beginning
...
[my_consumer_group,my_topic_a,0]::OffsetAndMetadata(offset=299999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000000000, expireTimestamp=None)
[my_consumer_group,my_topic_a,0]::OffsetAndMetadata(offset=300000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_a,1]::OffsetAndMetadata(offset=299999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000000000, expireTimestamp=None)
[my_consumer_group,my_topic_a,1]::OffsetAndMetadata(offset=300000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_b,0]::OffsetAndMetadata(offset=499999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000000000, expireTimestamp=None)
[my_consumer_group,my_topic_b,0]::OffsetAndMetadata(offset=500000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_b,1]::OffsetAndMetadata(offset=499999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000000000, expireTimestamp=None)
[my_consumer_group,my_topic_b,1]::OffsetAndMetadata(offset=500000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000100000, expireTimestamp=None)
@OneCricketeer Конечно, я, вероятно, мог бы найти способ настроить конфигурации всех зависимых вещей (Prometheus для Grafana, диспетчера предупреждений и многого другого), чтобы отфильтровать это устаревшее смещение. И затем делайте это каждый раз, когда какая-то другая группа потребителей останавливается, чтобы использовать одну из ее тем. Но я бы предпочел более чистое и убедительное решение.
@OneCricketeer Кроме того, насколько я понимаю, это будет не только период хранения смещения, потому что это относится только к потребителям, которые перестали использовать все темы. Мой кластер имеет offsets.retention.minutes
один день, а my_consumer
не потреблял topic_a
уже несколько недель, но так как он все еще активно читает другие темы, ничего не убрали и отставание все еще есть.
Эта тема сжата, что означает, что будут очищены только закрытые сегменты журнала. Размер сегмента по умолчанию — 1G, а это много данных, поскольку OffsetAndMetadata
— это компактный двоичный формат. Но это также означает, что, скажем, ключ [my_consumer_group,my_topic_b,0]
не виден более 1 дня и не существует в текущем открытом сегменте журнала, после чего он будет удален.
@OneCricketeer Спасибо за объяснение. Поскольку я использую log.cleaner.max.compaction.lag.ms=86400000
, думаю, даже если сегмент лога не доходит до log.segment.bytes
, [my_consumer_group,my_topic_a,...]
следует удалить через сутки. Тем не менее, на самом деле, в моем случае, он все еще там после того, как прошло несколько недель. Значит, в моем кластере что-то работает не так, как задумано.
Вы написали код для генерации задержки? Или берется из норы или какого-то другого инструмента?
@ s7vr Отставание возникло естественным образом, потому что my_consumer_group
намеренно перестал потреблять my_topic_a
. (На самом деле лаг — это не такое круглое число, как 100000, но для этого примера я упростил вещи, а также количество разделов.)
Результат, который вам дан:
«Потребитель не поддерживает удаление смещения для конкретной темы из группы потребителей».
является показателем того, что невозможно удалить конкретную тему из группы потребителей.
Вы можете изменить группу потребителей для нового приложения только для чтения my_topic_b
, перезапустить приложение, а затем полностью удалить старую и бездействующую группу потребителей. При таком подходе вы сможете отслеживать лаги потребителей, не отвлекаясь и не получая всплывающих предупреждений. При перезапуске приложения с новой ConsumerGroup обычно лучше всего остановить производителя для темы "b" во время перезапуска, чтобы убедиться, что вы не пропустите ни одного сообщения.
Я бы действительно не стал возиться с темой вручную __consumer_offsets
.
В качестве альтернативы вы можете регулярно запускать инструмент командной строки, который поставляется с Kafka, чтобы уменьшить отставание вашей ConsumerGroup:
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group my_consumer --topic my_topic_a --to-latest
Возможно, вам придется добавить опцию --execute
.
Да, если бы было возможно остановить производителя, он бы уже просто переключился на новое имя группы потребителей, как вы описываете. К сожалению, это не жизнеспособный вариант в моем случае использования.
Хорошо, может быть, альтернатива, которую я только что добавил к своему ответу, может помочь вам решить вашу проблему. Тем не менее, это еще один обходной путь ... но я не видел надежного решения для манипулирования темой Consumer_offsets так, как вам нужно.
Спасибо, решение очень прагматичное, но я вижу в нем три проблемы. Во-первых, это добавляет сложности, поскольку приходится поддерживать планирование этого фиктивного сценария. Во-вторых, в подобном случае в будущем мне придется добавить больше таких фиктивных скриптов. В-третьих, на графиках будет выглядеть так, будто потребитель все еще потребляет эту тему, а на самом деле это не так, что может привести к ложным выводам в будущем.
Тем временем (Kafka 2.8) это стало возможным благодаря новому параметру --delete-offsets
для kafka-consumer-groups.sh
. :-)
Нет ли способа отфильтровать данные с панели управления Grafana? Или, может быть, отключить оповещения по тегам темы на время периода хранения смещения?