Kafka не экономит смещение, если потребляет короткое время

Проблема

Потребитель с определенным идентификатором группы подключается к брокеру, слушает тему менее 1 минуты и отключается (в соответствии с бизнес-логикой). Пока он слушает тему, он может принимать некоторые сообщения. Когда один и тот же потребитель повторяет это действие, он потребляет те же сообщения!

Я обнаружил, что Кафка сохраняет смещение с интервалом в 1 минуту. Это означает, что потребитель должен слушать тему более 1 минуты. Как я могу сократить этот интервал?

Я нашла такие свойства:

  • log.flush.offset.checkpoint.interval.ms
  • log.flush.start.offset.checkpoint.interval.ms
  • offset.flush.interval.ms - выглядит наиболее уместно

Пытаюсь установить их в файле server.properties:

log.flush.offset.checkpoint.interval.ms=6000
log.flush.start.offset.checkpoint.interval.ms=6000
offset.flush.interval.ms=6000

Перезапустите Kafka и Zookeeper. Но это не помогает. Потребителю все равно придется слушать тему более 1 минуты. Что я делаю не так?

Моя среда

  • Kafka и Zookeeper через Confluent.
  • php-rdkafka как клиентская библиотека
  • enable.auto.commit установлен на true

Я использую потребителя низкого уровня. auto.offset.reset установлен на smallest. Пример кода

<?php
$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.offset.reset', 'smallest');

$conf = new \RdKafka\Conf();
$conf->set('group.id', 'foo');

$kafkaConsumer = new \RdKafka\Consumer($conf);
$kafkaConsumer->addBrokers('queue.a:9092');
$kafkaConsumer->setLogLevel(LOG_DEBUG);

$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.offset.reset', 'smallest');

$queue = $kafkaConsumer->newQueue();
$topic = $kafkaConsumer->newTopic('topic_name', $topicConf);
$topic->consumeQueueStart(0, \RD_KAFKA_OFFSET_STORED, $queue);

while (true) {
    $msg = $queue->consume(2000);
    if ($msg !== null) {
        var_dump($msg);
    }
}
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Symfony Station Communiqué - 7 июля 2023 г
Symfony Station Communiqué - 7 июля 2023 г
Это коммюнике первоначально появилось на Symfony Station .
Оживление вашего приложения Laravel: Понимание режима обслуживания
Оживление вашего приложения Laravel: Понимание режима обслуживания
Здравствуйте, разработчики! В сегодняшней статье мы рассмотрим важный аспект управления приложениями, который часто упускается из виду в суете...
Установка и настройка Nginx и PHP на Ubuntu-сервере
Установка и настройка Nginx и PHP на Ubuntu-сервере
В этот раз я сделаю руководство по установке и настройке nginx и php на Ubuntu OS.
Коллекции в Laravel более простым способом
Коллекции в Laravel более простым способом
Привет, читатели, сегодня мы узнаем о коллекциях. В Laravel коллекции - это способ манипулировать массивами и играть с массивами данных. Благодаря...
Как установить PHP на Mac
Как установить PHP на Mac
PHP - это популярный язык программирования, который используется для разработки веб-приложений. Если вы используете Mac и хотите разрабатывать...
1
0
798
1

Ответы 1

Вы должны попытаться явно зафиксировать смещение в своем потребителе:

Explicitly Committing Offsets in Consumers If you go with the automatic offset commits, you don’t need to worry about explicitly committing offsets. But you do need to think about how you will commit offsets if you decide you need more control over the timing of offset commits—either in order to minimize duplicates or because you are doing event processing outside the main consumer poll loop.

Выдержка из Полное руководство Kafka, стр. 127. (Это бесплатная электронная книга, которую вы можете скачать)

Рекомендуется использовать Всегда фиксировать смещения после обработки событий. Если вы выполняете всю обработку в цикле опроса и не поддерживаете состояние между циклами опроса (например, для агрегирования), это должно быть легко. Вы можете использовать конфигурацию автоматической фиксации или события фиксации в конце цикла опроса.

Сам я не использовал клиент php, но выглядит как это может быть то, что вам нужно.

Добавление к приведенному выше примеру кода:

while (true) {
    $msg = $queue->consume(2000);
    if ($msg !== null) {
        var_dump($msg);
        $kafkaConsumer->commit($msg);
    }
}
php-rdkafka - немного странная библиотека. Он имеет явную фиксацию только для потребительского класса высокого уровня. Я должен использовать низкоуровневый потребительский класс. И enable.auto.commit установлен на true. P.S. спасибо за книгу :)
Evgenii Karavskii 23.11.2018 12:26

@EvgeniiKaravskii php-rdkafka - это просто оболочка над библиотекой librdkafka, которая фактически выполняет всю «тяжелую работу». Он также используется на других языках.

SteveB 23.11.2018 12:40

Другие вопросы по теме