KAFKA-NODE: управление несколькими группами потребителей

У меня есть вопрос об управлении несколькими CG, созданными тремя группами потребителей, у каждой CG есть собственная служба kafka, идентификатор группы и тема.

теперь я получаю сообщения, как и ожидалось, но мне интересно, можно ли создать следующий сценарий:

создать три группы потребителей, но получать сообщения только от одной, поставить другие на паузу / удерживать на данный момент, если его служба kafka упадет, потреблять сообщения от следующей группы потребителей, и то же самое с третьей.

Вот пример моего кода:

function createConsumerGroup(topics){

    const ConsumerGroup = kafka.ConsumerGroup;

    //CREATE CONSUMER GROUPS FOR EVERY SERVICE
    for(let i = 0; i < config.kafka_service.length ;i++){  //3

        const options = {
            groupId: config.kafka_service[i]['groupId'],
            host: config.kafka_service[i]['zookeeperHost'],
            kafkaHost: config.kafka_service[i]['kafkaHost'],
            sessionTimeout: 15000,
            protocol: ['roundrobin'],
            fromOffset: 'latest'
        }

        //assign all services CG names and create [i] consumer groups!
        let customConsumerGroupName = config.kafka_service[i]['consumerGroupName'];

        customConsumerGroupName = new ConsumerGroup(options, topics);

        customConsumerGroupName.on('connect', (resp) => {
            console.info(`${config.kafka_service[i]['consumerGroupName']} is connected!`);
        });

        if (i > 0){
            //pause consumers exept FIRST
            customConsumerGroupName.pause();
        }


        customConsumerGroupName.on('message', (message) => {
           console.info(message);
        });

        customConsumerGroupName.on('error', (error) => {
            console.info('consumer group error: ', error);

           //HERE I NEED TO CALL SECOND CONSUMER TO STEP UP
           //MAYBE consumerGroup.resume(); ???
        });

    }
}

надеется, что это звучит понятно, спасибо :)

Похоже, вы пытаетесь использовать отдельные группы потребителей в качестве средства защиты от сбоев, но я считаю, что одна группа потребителей должна делать то, что вы хотите. В каждой группе должно быть несколько потребителей, и если один из них откажет, другой в группе продолжит работу с того места, где остановился. Если только я не понял вашу проблему.

Brian Fitzpatrick 12.07.2018 17:42

вы это сделали, и вы, и @Moonwalkr говорили об одной CG для обработки этого сценария, но я не видел ни одного примера или руководства по настройке разных центров обработки данных в одной CG. какие-нибудь вести, пожалуйста?

draftish 15.07.2018 09:00

Чтобы настроить отдельных потребителей на одну и ту же CG (в разных центрах обработки данных), вы должны использовать параметр конфигурации groupId. У вас установлено значение 'config.kafka_service [i] [' groupId ']'. Где бы вы ни создавали своего потребителя, вы просто должны убедиться, что у него один и тот же groupId. Я не знаком с пакетом Node kafka, но я бы сказал, что вам следует придерживаться создания обычных потребителей и назначать каждому один и тот же идентификатор группы. Таким образом, отдельные потребители могут быть частью одной группы независимо от того, на каком сервере они размещены.

Brian Fitzpatrick 16.07.2018 16:56

Я только что прочитал об api узла kafka, а название ConsumerGroup несколько, к сожалению, несколько. Это потребитель kafka, а параметр groupId контролирует «фактическую» группу потребителей kafka. Таким образом, ваш код должен работать до тех пор, пока опция groupId для каждой «новой ConsumerGroup» одинакова. Таким образом, все группы ConsumerGroups будут частью фактической группы потребителей Kafka, читающей из одной и той же темы и гарантирующей, что сообщения не попадут в аварийные потребители (экземпляры ConsumerGroup на языке Node).

Brian Fitzpatrick 16.07.2018 17:09
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
4
3 018
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Группы потребителей решают два основных сценария:

1. Масштабирование Вы можете увеличить количество потребителей в группе, чтобы обрабатывать увеличивающуюся скорость сообщений, создаваемых в теме (ах), которые использует группа (масштабирование)

2. Восстановление после сбоя Имея группу потребителей, читающих одну и ту же тему (и), они автоматически справятся с ситуацией, когда один или несколько потребителей откажутся.

Таким образом, вместо того, чтобы иметь «резервные» группы потребителей, где вы должны сами решать, какие из них активны, вы просто полагаетесь на встроенное аварийное переключение Kafka. Потребители могут работать в нескольких разных контейнерах (даже в разных центрах обработки данных), и Kafka автоматически гарантирует, что сообщения доставляются отдельным потребителям, независимо от того, где они находятся и сколько из них работает в любой момент времени.

если вы запустите 3 экземпляра одного и того же файла узла (который имеет consumerGroup), каждый из них получил одни и те же данные, фактически создавая дубликаты. Предполагается, что только один потребитель сможет обрабатывать данные, если он принадлежит к тому же groupId.

Gunjan Kumar 29.09.2019 06:26
Ответ принят как подходящий

Похоже, что путаница возникает из-за названия ConsumerGroup пакета Node. В терминах Kafka группа потребителей контролируется исключительно groupId, используемым каждым потребителем. Потребители с одинаковым groupId не будут получать повторяющиеся сообщения, каждое сообщение темы читается только одним потребителем. Если потребитель выходит из строя, kafka обнаруживает это и передает разделы отдельному потребителю.

Узел ConsumerGroup - это просто еще один потребитель Kafka (новый Consumer с группами, управляемыми Kafka, а не zookeeper в Kafka> 0.9).

Таким образом, способ использования группы потребителей kafka с помощью Node ConsumerGroup будет следующим:

function createConsumerGroup(topics){

const ConsumerGroup = kafka.ConsumerGroup;

//CREATE CONSUMER GROUPS FOR EVERY SERVICE
for(let i = 0; i < config.kafka_service.length ;i++){  //3

    const options = {
        groupId: 'SOME_GROUP_NAME',
        host: config.kafka_service[i]['zookeeperHost'],
        kafkaHost: config.kafka_service[i]['kafkaHost'],
        sessionTimeout: 15000,
        protocol: ['roundrobin'],
        fromOffset: 'latest'
    }

    //assign all services CG names and create [i] consumer groups!
    let customConsumerGroupName = config.kafka_service[i]['consumerGroupName'];

    customConsumerGroupName = new ConsumerGroup(options, topics);

    customConsumerGroupName.on('connect', (resp) => {
        console.info(`${config.kafka_service[i]['consumerGroupName']} is connected!`);
    });

    customConsumerGroupName.on('message', (message) => {
       console.info(message);
    });

    customConsumerGroupName.on('error', (error) => {
        console.info('consumer group error: ', error);

       //Error handling logic here, restart the consumer that failed perhaps? 
       //Depends on how you want to managed failed consumers.
    });
  }
}

Каждый экземпляр Nodes ConsumerGroup будет членом группы SOME_GROUP_NAME, а любые другие потребители, созданные с тем же groupId, также будут действовать как члены той же группы потребителей kafka, независимо от сервера и т. д.

Большое спасибо, хорошие моменты, я немного обработал ошибки, в основном из-за сбоя брокера и тайм-аута, как вы упомянули. следующий потребитель занимает одно место. ваше здоровье!

draftish 17.07.2018 09:12

Другие вопросы по теме